diff --git a/bin/light-base/src/runtime_service.rs b/bin/light-base/src/runtime_service.rs index 7ddbc54c9e..da9203b317 100644 --- a/bin/light-base/src/runtime_service.rs +++ b/bin/light-base/src/runtime_service.rs @@ -182,11 +182,12 @@ impl RuntimeService { /// Only up to `buffer_size` block notifications are buffered in the channel. If the channel /// is full when a new notification is attempted to be pushed, the channel gets closed. /// - /// A maximum number of finalized pinned blocks must be passed, indicating the maximum number - /// of finalized blocks that the runtime service will pin at the same time for this - /// subscription. If this maximum is reached, the channel will get closed. In situations where - /// the subscriber is guaranteed to always properly unpin blocks, a value of - /// `usize::max_value()` can be passed in order to ignore this maximum. + /// A maximum number of finalized or non-canonical (i.e. not part of the finalized chain) + /// pinned blocks must be passed, indicating the maximum number of blocks that are finalized + /// or non-canonical that the runtime service will pin at the same time for this subscription. + /// If this maximum is reached, the channel will get closed. In situations where the subscriber + /// is guaranteed to always properly unpin blocks, a value of `usize::max_value()` can be + /// passed in order to ignore this maximum. /// /// The channel also gets closed if a gap in the finality happens, such as after a Grandpa /// warp syncing. @@ -195,7 +196,7 @@ impl RuntimeService { pub async fn subscribe_all( &self, buffer_size: usize, - max_finalized_pinned_blocks: NonZeroUsize, + max_pinned_blocks: NonZeroUsize, ) -> SubscribeAll { // First, lock `guarded` and wait for the tree to be in `FinalizedBlockRuntimeKnown` mode. // This can take a long time. @@ -243,6 +244,7 @@ impl RuntimeService { tree.finalized_async_user_data().clone(), *decoded_finalized_block.state_root, decoded_finalized_block.number, + false, ), ); @@ -278,6 +280,7 @@ impl RuntimeService { runtime.clone(), *decoded_header.state_root, decoded_header.number, + true, ), ); @@ -307,8 +310,7 @@ impl RuntimeService { 0 | 1 )); - all_blocks_subscriptions - .insert(subscription_id, (tx, max_finalized_pinned_blocks.get() - 1)); + all_blocks_subscriptions.insert(subscription_id, (tx, max_pinned_blocks.get() - 1)); SubscribeAll { finalized_block_scale_encoded_header: finalized_block.scale_encoded_header.clone(), @@ -353,13 +355,12 @@ impl RuntimeService { if let GuardedInner::FinalizedBlockRuntimeKnown { all_blocks_subscriptions, pinned_blocks, - finalized_block, .. } = &mut guarded_lock.tree { - let unpinned_block_height = + let block_counts_towards_limit = match pinned_blocks.remove(&(subscription_id.0, *block_hash)) { - Some((_, _, h)) => h, + Some((_, _, _, to_remove)) => !to_remove, None => { // Cold path. if all_blocks_subscriptions.contains_key(&subscription_id.0) { @@ -372,12 +373,7 @@ impl RuntimeService { guarded_lock.runtimes.retain(|_, rt| rt.strong_count() > 0); - let block_is_finalized = unpinned_block_height - <= header::decode(&finalized_block.scale_encoded_header) - .unwrap() - .number; - - if block_is_finalized { + if block_counts_towards_limit { let (_, finalized_pinned_remaining) = all_blocks_subscriptions .get_mut(&subscription_id.0) .unwrap(); @@ -406,7 +402,7 @@ impl RuntimeService { let mut guarded = self.guarded.lock().await; let guarded = &mut *guarded; - let (runtime, block_state_root_hash, block_number) = { + let (runtime, block_state_root_hash, block_number, _) = { if let GuardedInner::FinalizedBlockRuntimeKnown { pinned_blocks, .. } = &mut guarded.tree { @@ -958,7 +954,7 @@ enum GuardedInner { /// List of senders that get notified when new blocks arrive. /// See [`RuntimeService::subscribe_all`]. Alongside with each sender, the number of pinned - /// blocks remaining for this subscription. + /// finalized or non-canonical blocks remaining for this subscription. /// /// Keys are assigned from [`Guarded::next_subscription_id`]. all_blocks_subscriptions: @@ -972,9 +968,10 @@ enum GuardedInner { /// blocks from this container that are no longer relevant. /// /// Keys are `(subscription_id, block_hash)`. Values are indices within - /// [`Guarded::runtimes`], state trie root hashes, and block numbers. + /// [`Guarded::runtimes`], state trie root hashes, block numbers, and whether the block + /// is non-finalized and part of the canonical chain. // TODO: use structs instead of tuples - pinned_blocks: BTreeMap<(u64, [u8; 32]), (Arc, [u8; 32], u64)>, + pinned_blocks: BTreeMap<(u64, [u8; 32]), (Arc, [u8; 32], u64, bool)>, }, FinalizedBlockRuntimeUnknown { /// Tree of blocks. Holds the state of the download of everything. Always `Some` when the @@ -1489,25 +1486,37 @@ impl Background { let all_blocks_notif = Notification::Finalized { best_block_hash, hash: finalized_block.hash, - pruned_blocks: pruned_blocks - .into_iter() - .map(|(_, b, _)| b.hash) - .collect(), + pruned_blocks: pruned_blocks.iter().map(|(_, b, _)| b.hash).collect(), }; let mut to_remove = Vec::new(); for (subscription_id, (sender, finalized_pinned_remaining)) in all_blocks_subscriptions.iter_mut() { - if *finalized_pinned_remaining == 0 { + let count_limit = pruned_blocks.len() + 1; + + if *finalized_pinned_remaining < count_limit { to_remove.push(*subscription_id); continue; } - if sender.try_send(all_blocks_notif.clone()).is_ok() { - *finalized_pinned_remaining -= 1; - } else { + if sender.try_send(all_blocks_notif.clone()).is_err() { to_remove.push(*subscription_id); + continue; + } + + *finalized_pinned_remaining -= count_limit; + + // Mark the finalized and pruned blocks as finalized or non-canonical. + for block in iter::once(&finalized_block.hash) + .chain(pruned_blocks.iter().map(|(_, b, _)| &b.hash)) + { + if let Some((_, _, _, non_finalized_canonical)) = + pinned_blocks.get_mut(&(*subscription_id, *block)) + { + debug_assert!(*non_finalized_canonical); + *non_finalized_canonical = false; + } } } for to_remove in to_remove { @@ -1570,7 +1579,12 @@ impl Background { if sender.try_send(notif.clone()).is_ok() { pinned_blocks.insert( (*subscription_id, block_hash), - (block_runtime.clone(), block_state_root_hash, block_number), + ( + block_runtime.clone(), + block_state_root_hash, + block_number, + true, + ), ); } else { to_remove.push(*subscription_id); diff --git a/bin/wasm-node/CHANGELOG.md b/bin/wasm-node/CHANGELOG.md index e4460b3d4e..82ae336b92 100644 --- a/bin/wasm-node/CHANGELOG.md +++ b/bin/wasm-node/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +### Fixed + +- Fix panic introduced in v0.6.18 in case of a fork in the chain related to tracking the number of blocks kept alive in the node's memory. ([#2386](https://github.com/paritytech/smoldot/pull/2386)) + ## 0.6.18 - 2022-06-14 ### Added