Skip to content

Commit

Permalink
Fix panic with unpinning pruned blocks (#2386)
Browse files Browse the repository at this point in the history
* Fix panic with unpinning pruned blocks

* CHANGELOG

* Some docs updates

* Clarify non-canonical

* Rustfmt
  • Loading branch information
tomaka authored Jun 14, 2022
1 parent 1f22c58 commit 87d47d2
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 30 deletions.
74 changes: 44 additions & 30 deletions bin/light-base/src/runtime_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,12 @@ impl<TPlat: Platform> RuntimeService<TPlat> {
/// Only up to `buffer_size` block notifications are buffered in the channel. If the channel
/// is full when a new notification is attempted to be pushed, the channel gets closed.
///
/// A maximum number of finalized pinned blocks must be passed, indicating the maximum number
/// of finalized blocks that the runtime service will pin at the same time for this
/// subscription. If this maximum is reached, the channel will get closed. In situations where
/// the subscriber is guaranteed to always properly unpin blocks, a value of
/// `usize::max_value()` can be passed in order to ignore this maximum.
/// A maximum number of finalized or non-canonical (i.e. not part of the finalized chain)
/// pinned blocks must be passed, indicating the maximum number of blocks that are finalized
/// or non-canonical that the runtime service will pin at the same time for this subscription.
/// If this maximum is reached, the channel will get closed. In situations where the subscriber
/// is guaranteed to always properly unpin blocks, a value of `usize::max_value()` can be
/// passed in order to ignore this maximum.
///
/// The channel also gets closed if a gap in the finality happens, such as after a Grandpa
/// warp syncing.
Expand All @@ -195,7 +196,7 @@ impl<TPlat: Platform> RuntimeService<TPlat> {
pub async fn subscribe_all(
&self,
buffer_size: usize,
max_finalized_pinned_blocks: NonZeroUsize,
max_pinned_blocks: NonZeroUsize,
) -> SubscribeAll<TPlat> {
// First, lock `guarded` and wait for the tree to be in `FinalizedBlockRuntimeKnown` mode.
// This can take a long time.
Expand Down Expand Up @@ -243,6 +244,7 @@ impl<TPlat: Platform> RuntimeService<TPlat> {
tree.finalized_async_user_data().clone(),
*decoded_finalized_block.state_root,
decoded_finalized_block.number,
false,
),
);

Expand Down Expand Up @@ -278,6 +280,7 @@ impl<TPlat: Platform> RuntimeService<TPlat> {
runtime.clone(),
*decoded_header.state_root,
decoded_header.number,
true,
),
);

Expand Down Expand Up @@ -307,8 +310,7 @@ impl<TPlat: Platform> RuntimeService<TPlat> {
0 | 1
));

all_blocks_subscriptions
.insert(subscription_id, (tx, max_finalized_pinned_blocks.get() - 1));
all_blocks_subscriptions.insert(subscription_id, (tx, max_pinned_blocks.get() - 1));

SubscribeAll {
finalized_block_scale_encoded_header: finalized_block.scale_encoded_header.clone(),
Expand Down Expand Up @@ -353,13 +355,12 @@ impl<TPlat: Platform> RuntimeService<TPlat> {
if let GuardedInner::FinalizedBlockRuntimeKnown {
all_blocks_subscriptions,
pinned_blocks,
finalized_block,
..
} = &mut guarded_lock.tree
{
let unpinned_block_height =
let block_counts_towards_limit =
match pinned_blocks.remove(&(subscription_id.0, *block_hash)) {
Some((_, _, h)) => h,
Some((_, _, _, to_remove)) => !to_remove,
None => {
// Cold path.
if all_blocks_subscriptions.contains_key(&subscription_id.0) {
Expand All @@ -372,12 +373,7 @@ impl<TPlat: Platform> RuntimeService<TPlat> {

guarded_lock.runtimes.retain(|_, rt| rt.strong_count() > 0);

let block_is_finalized = unpinned_block_height
<= header::decode(&finalized_block.scale_encoded_header)
.unwrap()
.number;

if block_is_finalized {
if block_counts_towards_limit {
let (_, finalized_pinned_remaining) = all_blocks_subscriptions
.get_mut(&subscription_id.0)
.unwrap();
Expand Down Expand Up @@ -406,7 +402,7 @@ impl<TPlat: Platform> RuntimeService<TPlat> {
let mut guarded = self.guarded.lock().await;
let guarded = &mut *guarded;

let (runtime, block_state_root_hash, block_number) = {
let (runtime, block_state_root_hash, block_number, _) = {
if let GuardedInner::FinalizedBlockRuntimeKnown { pinned_blocks, .. } =
&mut guarded.tree
{
Expand Down Expand Up @@ -958,7 +954,7 @@ enum GuardedInner<TPlat: Platform> {

/// List of senders that get notified when new blocks arrive.
/// See [`RuntimeService::subscribe_all`]. Alongside with each sender, the number of pinned
/// blocks remaining for this subscription.
/// finalized or non-canonical blocks remaining for this subscription.
///
/// Keys are assigned from [`Guarded::next_subscription_id`].
all_blocks_subscriptions:
Expand All @@ -972,9 +968,10 @@ enum GuardedInner<TPlat: Platform> {
/// blocks from this container that are no longer relevant.
///
/// Keys are `(subscription_id, block_hash)`. Values are indices within
/// [`Guarded::runtimes`], state trie root hashes, and block numbers.
/// [`Guarded::runtimes`], state trie root hashes, block numbers, and whether the block
/// is non-finalized and part of the canonical chain.
// TODO: use structs instead of tuples
pinned_blocks: BTreeMap<(u64, [u8; 32]), (Arc<Runtime>, [u8; 32], u64)>,
pinned_blocks: BTreeMap<(u64, [u8; 32]), (Arc<Runtime>, [u8; 32], u64, bool)>,
},
FinalizedBlockRuntimeUnknown {
/// Tree of blocks. Holds the state of the download of everything. Always `Some` when the
Expand Down Expand Up @@ -1489,25 +1486,37 @@ impl<TPlat: Platform> Background<TPlat> {
let all_blocks_notif = Notification::Finalized {
best_block_hash,
hash: finalized_block.hash,
pruned_blocks: pruned_blocks
.into_iter()
.map(|(_, b, _)| b.hash)
.collect(),
pruned_blocks: pruned_blocks.iter().map(|(_, b, _)| b.hash).collect(),
};

let mut to_remove = Vec::new();
for (subscription_id, (sender, finalized_pinned_remaining)) in
all_blocks_subscriptions.iter_mut()
{
if *finalized_pinned_remaining == 0 {
let count_limit = pruned_blocks.len() + 1;

if *finalized_pinned_remaining < count_limit {
to_remove.push(*subscription_id);
continue;
}

if sender.try_send(all_blocks_notif.clone()).is_ok() {
*finalized_pinned_remaining -= 1;
} else {
if sender.try_send(all_blocks_notif.clone()).is_err() {
to_remove.push(*subscription_id);
continue;
}

*finalized_pinned_remaining -= count_limit;

// Mark the finalized and pruned blocks as finalized or non-canonical.
for block in iter::once(&finalized_block.hash)
.chain(pruned_blocks.iter().map(|(_, b, _)| &b.hash))
{
if let Some((_, _, _, non_finalized_canonical)) =
pinned_blocks.get_mut(&(*subscription_id, *block))
{
debug_assert!(*non_finalized_canonical);
*non_finalized_canonical = false;
}
}
}
for to_remove in to_remove {
Expand Down Expand Up @@ -1570,7 +1579,12 @@ impl<TPlat: Platform> Background<TPlat> {
if sender.try_send(notif.clone()).is_ok() {
pinned_blocks.insert(
(*subscription_id, block_hash),
(block_runtime.clone(), block_state_root_hash, block_number),
(
block_runtime.clone(),
block_state_root_hash,
block_number,
true,
),
);
} else {
to_remove.push(*subscription_id);
Expand Down
4 changes: 4 additions & 0 deletions bin/wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

### Fixed

- Fix panic introduced in v0.6.18 in case of a fork in the chain related to tracking the number of blocks kept alive in the node's memory. ([#2386](https://github.com/paritytech/smoldot/pull/2386))

## 0.6.18 - 2022-06-14

### Added
Expand Down

0 comments on commit 87d47d2

Please sign in to comment.