Skip to content

Commit

Permalink
Fix chain_subscribeAllHeads not recreating the channel if it dies (#2465
Browse files Browse the repository at this point in the history
)

* Fix chain_subscribeAllHeads not recreating the channel if it dies

* CHANGELOG

* Bump buffer size to 64

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
tomaka and mergify[bot] authored Jul 5, 2022
1 parent 8ccad65 commit c0bf03d
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 64 deletions.
136 changes: 72 additions & 64 deletions bin/light-base/src/json_rpc_service/state_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,82 +397,90 @@ impl<TPlat: Platform> Background<TPlat> {
)
.await;

let mut new_blocks = {
// The buffer size should be large enough so that, if the CPU is busy, it doesn't
// become full before the execution of the runtime service resumes.
// The maximum number of pinned block is ignored, as this maximum is a way to avoid
// malicious behaviors. This code is by definition not considered malicious.
let subscribe_all = self
.runtime_service
.subscribe_all(32, NonZeroUsize::new(usize::max_value()).unwrap())
.await;

// The finalized and already-known blocks aren't reported to the user, but we need
// unpin them on to the runtime service.
subscribe_all
.new_blocks
.unpin_block(&header::hash_from_scale_encoded_header(
&subscribe_all.finalized_block_scale_encoded_header,
))
.await;
for block in subscribe_all.non_finalized_blocks_ancestry_order {
subscribe_all
.new_blocks
.unpin_block(&header::hash_from_scale_encoded_header(
&block.scale_encoded_header,
))
.await;
}

subscribe_all.new_blocks
};

// Spawn a separate task for the subscription.
let task = {
let me = self.clone();
async move {
loop {
match new_blocks.next().await {
Some(runtime_service::Notification::Block(block)) => {
new_blocks
let mut new_blocks = {
// The buffer size should be large enough so that, if the CPU is busy, it
// doesn't become full before the execution of the runtime service resumes.
// The maximum number of pinned block is ignored, as this maximum is a way
// to avoid malicious behaviors. This code is by definition not considered
// malicious.
let subscribe_all = me
.runtime_service
.subscribe_all(64, NonZeroUsize::new(usize::max_value()).unwrap())
.await;

// The existing finalized and already-known blocks aren't reported to the
// user, but we need to unpin them on to the runtime service.
subscribe_all
.new_blocks
.unpin_block(&header::hash_from_scale_encoded_header(
&subscribe_all.finalized_block_scale_encoded_header,
))
.await;
for block in subscribe_all.non_finalized_blocks_ancestry_order {
subscribe_all
.new_blocks
.unpin_block(&header::hash_from_scale_encoded_header(
&block.scale_encoded_header,
))
.await;
}

let header = match methods::Header::from_scale_encoded_header(
&block.scale_encoded_header,
) {
Ok(h) => h,
Err(error) => {
log::warn!(
target: &me.log_target,
"`chain_subscribeAllHeads` subscription has skipped \
block due to undecodable header. Hash: {}. Error: {}",
HashDisplay(&header::hash_from_scale_encoded_header(&block.scale_encoded_header)),
error,
);
continue;
}
};
subscribe_all.new_blocks
};

let _ = me
.requests_subscriptions
.try_push_notification(
&state_machine_subscription,
methods::ServerToClient::chain_newHead {
subscription: (&subscription_id).into(),
result: header,
loop {
match new_blocks.next().await {
Some(runtime_service::Notification::Block(block)) => {
new_blocks
.unpin_block(&header::hash_from_scale_encoded_header(
&block.scale_encoded_header,
))
.await;

let header = match methods::Header::from_scale_encoded_header(
&block.scale_encoded_header,
) {
Ok(h) => h,
Err(error) => {
log::warn!(
target: &me.log_target,
"`chain_subscribeAllHeads` subscription has skipped \
block due to undecodable header. Hash: {}. Error: {}",
HashDisplay(&header::hash_from_scale_encoded_header(&block.scale_encoded_header)),
error,
);
continue;
}
.to_json_call_object_parameters(None),
)
.await;
}
Some(runtime_service::Notification::BestBlockChanged { .. })
| Some(runtime_service::Notification::Finalized { .. }) => {}
None => {
// TODO: must recreate the channel
return;
};

// This function call will fail if the queue of notifications to
// the user has too many elements in it. This JSON-RPC function
// unfortunately doesn't provide any mechanism to deal with this
// situation, and we handle it by simply not sending the
// notification.
let _ = me
.requests_subscriptions
.try_push_notification(
&state_machine_subscription,
methods::ServerToClient::chain_newHead {
subscription: (&subscription_id).into(),
result: header,
}
.to_json_call_object_parameters(None),
)
.await;
}
Some(runtime_service::Notification::BestBlockChanged { .. })
| Some(runtime_service::Notification::Finalized { .. }) => {}
None => {
// Break from the inner loop in order to recreate the channel.
break;
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions bin/wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Fixed

- Changes in the current best block of a parachain are now taken into account if the new best block had already been reported in the past. ([#2457](https://github.com/paritytech/smoldot/pull/2457))
- Fix active `chain_subscribeAllHeads` subscriptions silently freezing when the number of non-finalized blocks gets above a certain threshold, which typically happens if Internet connectivity is lost for a long time. ([#2465](https://github.com/paritytech/smoldot/pull/2465))

## 0.6.21 - 2022-06-30

Expand Down

0 comments on commit c0bf03d

Please sign in to comment.