Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #2591 #2602

Merged
merged 15 commits into from
Aug 9, 2022
177 changes: 123 additions & 54 deletions bin/light-base/src/sync_service/parachain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,19 @@ pub(super) async fn start_parachain<TPlat: Platform>(
// Necessary for the `select!` loop below.
let mut from_network_service = from_network_service.fuse();

// Last good known parachain header of a relay chain finalized block.
// Last-known finalized parachain header. Can be very old and obsolete.
// Updated after we successfully fetch the parahead of a relay chain finalized block, and left
// untouched if the fetch fails.
// Initialized to the parachain genesis block header.
let mut finalized_parahead = chain_information
let mut obsolete_finalized_parahead = chain_information
.as_ref()
.finalized_block_header
.scale_encoding_vec(block_number_bytes);

// Hash of the best parachain that has been reported to the output.
let mut best_parahead_hash = header::hash_from_scale_encoded_header(&finalized_parahead);

// State machine that tracks the list of parachain network sources and their known blocks.
let mut sync_sources = sources::AllForksSources::<(PeerId, protocol::Role)>::new(
40,
header::decode(&finalized_parahead, block_number_bytes)
header::decode(&obsolete_finalized_parahead, block_number_bytes)
.unwrap()
.number,
);
Expand All @@ -94,17 +91,29 @@ pub(super) async fn start_parachain<TPlat: Platform>(
))
);

// Hash of the best parachain that has been reported to the subscriptions.
// `None` if and only if no finalized parahead is known yet.
let mut reported_best_parahead_hash = None;

// Tree of relay chain blocks. Blocks are inserted when received from the relay chain
// sync service. Once inside, their corresponding parahead is fetched. Once the parahead
// is fetched, this parahead is reported to our subscriptions.
//
// Each block in the tree has an associated parahead behind an `Option`. This `Option`
// always contains `Some`, unless the relay chain finalized block hasn't had its parahead
// fetched yet.
// The root of the tree is a "virtual" block. It can be thought as the parent of the relay
// chain finalized block, but is there even if the relay chain finalized block is block 0.
//
// All block in the tree has an associated parahead behind an `Option`. This `Option`
// always contains `Some`, except for the "virtual" root block for which it is `None`.
//
// If the output finalized block has a parahead equal to `None`, it therefore means that
// no finalized parahead is known yet.
// Note that, when it is the case, `SubscribeAll` messages from the frontend are still
// answered with a single finalized block set to `obsolete_finalized_parahead`. Once a
// finalized parahead is known, it is important to reset all subscriptions.
//
// The set of blocks in this tree whose parahead hasn't been fetched yet is the same as
// the set of blocks that is maintained pinned on the runtime service. Blocks are unpinned
// when their parahead fetching succeeds.
// when their parahead fetching succeeds or when they are removed from the tree.
let mut async_tree = {
let mut async_tree =
async_tree::AsyncTree::<TPlat::Instant, [u8; 32], _>::new(async_tree::Config {
Expand Down Expand Up @@ -148,6 +157,12 @@ pub(super) async fn start_parachain<TPlat: Platform>(
let mut wakeup_deadline = future::Either::Right(future::pending());

loop {
// Internal state check.
debug_assert_eq!(
reported_best_parahead_hash.is_some(),
async_tree.finalized_async_user_data().is_some()
);

// Start fetching paraheads of new blocks whose parahead needs to be fetched.
while in_progress_paraheads.len() < 4 {
match async_tree.next_necessary_async_op(&TPlat::now()) {
Expand Down Expand Up @@ -194,18 +209,30 @@ pub(super) async fn start_parachain<TPlat: Platform>(
while let Some(update) = async_tree.try_advance_output() {
match update {
async_tree::OutputUpdate::Finalized {
async_op_user_data: new_parahead,
former_finalized_async_op_user_data: former_parahead,
async_op_user_data: new_finalized_parahead,
former_finalized_async_op_user_data: former_finalized_parahead,
pruned_blocks,
..
} if *new_parahead != former_parahead => {
debug_assert!(new_parahead.is_some());
let hash =
header::hash_from_scale_encoded_header(new_parahead.as_ref().unwrap());
} if *new_finalized_parahead != former_finalized_parahead => {
debug_assert!(new_finalized_parahead.is_some());

// If this is the first time (in this loop) a finalized parahead is known,
// any `SubscribeAll` message that has been answered beforehand was
// answered in a dummy way with a potentially obsolete finalized header.
// For this reason, we reset all subscriptions to force all subscribers to
// re-subscribe.
if former_finalized_parahead.is_none() {
all_subscriptions.clear();
}

finalized_parahead = new_parahead.clone().unwrap();
let hash = header::hash_from_scale_encoded_header(
new_finalized_parahead.as_ref().unwrap(),
);

if let Ok(header) = header::decode(&finalized_parahead, block_number_bytes)
obsolete_finalized_parahead = new_finalized_parahead.clone().unwrap();

if let Ok(header) =
header::decode(&obsolete_finalized_parahead, block_number_bytes)
{
sync_sources.set_finalized_block_height(header.number);
// TODO: what about an `else`? does sync_sources leak if the block can't be decoded?
Expand Down Expand Up @@ -233,7 +260,7 @@ pub(super) async fn start_parachain<TPlat: Platform>(
header::hash_from_scale_encoded_header(parahead.as_ref().unwrap())
})
.unwrap_or(hash);
best_parahead_hash = best_block_hash;
reported_best_parahead_hash = Some(best_block_hash);

// Elements in `all_subscriptions` are removed one by one and
// inserted back if the channel is still open.
Expand All @@ -250,6 +277,13 @@ pub(super) async fn start_parachain<TPlat: Platform>(
}
async_tree::OutputUpdate::Finalized { .. }
| async_tree::OutputUpdate::BestBlockChanged { .. } => {
// Do not report anything to subscriptions if no finalized parahead is
// known yet.
let finalized_parahead = match async_tree.finalized_async_user_data() {
Some(p) => p,
None => continue,
};

// Calculate hash of the parablock corresponding to the new best relay
// chain block.
let parahash = header::hash_from_scale_encoded_header(
Expand All @@ -259,8 +293,8 @@ pub(super) async fn start_parachain<TPlat: Platform>(
.unwrap_or(&finalized_parahead),
);

if parahash != best_parahead_hash {
best_parahead_hash = parahash;
if reported_best_parahead_hash.as_ref() != Some(&parahash) {
reported_best_parahead_hash = Some(parahash);

log::debug!(
target: &log_target,
Expand Down Expand Up @@ -290,11 +324,18 @@ pub(super) async fn start_parachain<TPlat: Platform>(
header::hash_from_scale_encoded_header(&scale_encoded_header);
let block_index = block.index;

// Do not report anything to subscriptions if no finalized parahead is
// known yet.
let finalized_parahead = match async_tree.finalized_async_user_data() {
Some(p) => p,
None => continue,
};

// Do not report the new block if it has already been reported in the
// past. This covers situations where the parahead is identical to the
// relay chain's parent's parahead, but also situations where multiple
// sibling relay chain blocks have the same parahead.
if finalized_parahead == scale_encoded_header
if *finalized_parahead == scale_encoded_header
|| async_tree
.input_iter_unordered()
.filter(|item| item.id != block_index)
Expand All @@ -304,8 +345,10 @@ pub(super) async fn start_parachain<TPlat: Platform>(
// While the parablock has already been reported, it is possible that
// it becomes the new best block while it wasn't before, in which
// case we should send a notification.
if is_new_best && parahash != best_parahead_hash {
best_parahead_hash = parahash;
if is_new_best
&& reported_best_parahead_hash.as_ref() != Some(&parahash)
{
reported_best_parahead_hash = Some(parahash);

log::debug!(
target: &log_target,
Expand Down Expand Up @@ -335,7 +378,7 @@ pub(super) async fn start_parachain<TPlat: Platform>(
);

if is_new_best {
best_parahead_hash = parahash;
reported_best_parahead_hash = Some(parahash);
}

let parent_hash = header::hash_from_scale_encoded_header(
Expand All @@ -348,7 +391,6 @@ pub(super) async fn start_parachain<TPlat: Platform>(
.as_ref()
.unwrap()
})
.or_else(|| async_tree.finalized_async_user_data().as_ref())
.unwrap_or(&finalized_parahead),
);

Expand Down Expand Up @@ -479,37 +521,64 @@ pub(super) async fn start_parachain<TPlat: Platform>(

match foreground_message {
ToBackground::IsNearHeadOfChainHeuristic { send_back } => {
// Since there is a mapping between relay chain blocks and parachain
// blocks, whether a parachain is at the head of the chain is the
// same thing as whether its relay chain is at the head of the chain.
// Note that there is no ordering guarantee of any kind w.r.t.
// block subscriptions notifications.
let val = relay_chain_sync.is_near_head_of_chain_heuristic().await;
let _ = send_back.send(val);
if async_tree.finalized_async_user_data().is_some() {
// Since there is a mapping between relay chain blocks and
// parachain blocks, whether a parachain is at the head of the
// chain is the same thing as whether its relay chain is at the
// head of the chain.
// Note that there is no ordering guarantee of any kind w.r.t.
// block subscriptions notifications.
let val = relay_chain_sync.is_near_head_of_chain_heuristic().await;
let _ = send_back.send(val);
} else {
// If no finalized parahead is known yet, we might be very close
// to the head but also maybe very very far away. We lean on the
// cautious side and always return `false`.
let _ = send_back.send(false);
}
},
ToBackground::SubscribeAll { send_back, buffer_size, .. } => {
let (tx, new_blocks) = mpsc::channel(buffer_size.saturating_sub(1));
let _ = send_back.send(super::SubscribeAll {
finalized_block_scale_encoded_header: finalized_parahead.clone(),
finalized_block_runtime: None,
non_finalized_blocks_ancestry_order: async_tree.input_iter_unordered().filter_map(|block| {
// `async_op_user_data` is `Some` only if this block has
// already been reported on the output. In order to maintain
// consistency, only these blocks should be reported.
let parahead = block.async_op_user_data?.as_ref().unwrap();
let parent_hash = async_tree.parent(block.id)
.map(|idx| header::hash_from_scale_encoded_header(&async_tree.block_async_user_data(idx).unwrap().as_ref().unwrap()))
.or_else(|| async_tree.finalized_async_user_data().as_ref().map(header::hash_from_scale_encoded_header))
.unwrap_or(header::hash_from_scale_encoded_header(&finalized_parahead));

Some(super::BlockNotification {
is_new_best: block.is_output_best,
scale_encoded_header: parahead.clone(),
parent_hash,
})
}).collect(),
new_blocks,
});

// There are two possibilities here: either we know of any recent
// finalized parahead, or we don't. In case where we don't know of
// any finalized parahead yet, we report a single obsolete finalized
// parahead, which is `obsolete_finalized_parahead`. The rest of this
// module makes sure that no other block is reported to subscriptions
// as long as this is the case, and that subscriptions are reset once
// the first known finalized parahead is known.
if let Some(finalized_parahead) = async_tree.finalized_async_user_data() {
// Finalized parahead is known.
let _ = send_back.send(super::SubscribeAll {
finalized_block_scale_encoded_header: finalized_parahead.clone(),
finalized_block_runtime: None,
non_finalized_blocks_ancestry_order: async_tree.input_iter_unordered().filter_map(|block| {
// `async_op_user_data` is `Some` only if this block has
// already been reported on the output. In order to
// maintain consistency, only these blocks should be
// reported.
let parahead = block.async_op_user_data?.as_ref().unwrap();
let parent_hash = async_tree.parent(block.id)
.map(|idx| header::hash_from_scale_encoded_header(&async_tree.block_async_user_data(idx).unwrap().as_ref().unwrap()))
.unwrap_or_else(|| header::hash_from_scale_encoded_header(&finalized_parahead));

Some(super::BlockNotification {
is_new_best: block.is_output_best,
scale_encoded_header: parahead.clone(),
parent_hash,
})
}).collect(),
new_blocks,
});
} else {
// No known finalized parahead.
let _ = send_back.send(super::SubscribeAll {
finalized_block_scale_encoded_header: obsolete_finalized_parahead.clone(),
finalized_block_runtime: None,
non_finalized_blocks_ancestry_order: Vec::new(),
new_blocks,
});
}

all_subscriptions.push(tx);
}
Expand Down
5 changes: 5 additions & 0 deletions bin/wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## Unreleased

### Fixed

- Fix sometimes erroneously reporting a very old `parent_hash` (usually the genesis block hash) in `chainHead_unstable_follow` when following a parachain. ([#2602](https://github.com/paritytech/smoldot/pull/2602))
- After smoldot has downloaded the runtime of an old parachain block, it would sometimes erroneously consider that this runtime hasn't changed since then. This would lead to issues such as `state_getRuntimeVersion` and `state_subscribeRuntimeVersion` returning information about an old runtime, or `state_getMetadata` or `state_call` using an old runtime. ([#2602](https://github.com/paritytech/smoldot/pull/2602))

## 0.6.28 - 2022-08-08

### Changed
Expand Down