Skip to content

Commit

Permalink
Process incoming foreground messages while subscribing (#2705)
Browse files Browse the repository at this point in the history
Calling `relay_chain_sync.subscribe_all(...).await` might take a long time (as it waits for the runtime of the chain to have been downloaded), during which messages coming from the public API of the sync service just pile up and aren't answered.
This PR fixes this by answering messages while we wait for the subscription to happen.

The consequence is that a parachain initialization now finishes quickly, while before it was waiting for its relay chain initialization to finish.
It also means that we now properly clean up parachains if we remove them, even if their relay chain never downloads its runtime. Before, the clean up didn't happen because the clean up never happens before initialization is complete.

I'm not really happy with the code duplication here, and would perform a refactoring that moves local variables into a struct, and adds methods to that struct, similar to `standalone.rs`.

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
tomaka and mergify[bot] authored Aug 31, 2022
1 parent a8736e3 commit ce720cc
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 20 deletions.
121 changes: 101 additions & 20 deletions bin/light-base/src/sync_service/parachain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,25 +76,112 @@ pub(super) async fn start_parachain<TPlat: Platform>(
// a gap in its blocks, or if the node is overloaded and can't process blocks in time, then
// we break out of the inner loop in order to reset everything.
loop {
// List of senders that get notified when the tree of blocks is modified.
// Note that this list is created in the inner loop, as to be cleared if the relay chain
// blocks stream has a gap.
let mut all_subscriptions = Vec::<mpsc::Sender<_>>::new();
log::debug!(target: &log_target, "Subscriptions <= Reset");

// Stream of blocks of the relay chain this parachain is registered on.
// The buffer size should be large enough so that, if the CPU is busy, it doesn't
// become full before the execution of the sync service resumes.
// The maximum number of pinned block is ignored, as this maximum is a way to avoid
// malicious behaviors. This code is by definition not considered malicious.
let mut relay_chain_subscribe_all = relay_chain_sync
.subscribe_all(
"parachain-sync",
32,
NonZeroUsize::new(usize::max_value()).unwrap(),
)
.await;
log::debug!(
target: &log_target,
"RelayChain => NewSubscription(finalized_hash={})",
HashDisplay(&header::hash_from_scale_encoded_header(
&relay_chain_subscribe_all.finalized_block_scale_encoded_header
))
);
let mut relay_chain_subscribe_all = loop {
// Subscribing to the runtime service might take a long time, as it waits for the
// runtime of the finalized block to be downloaded.
// For this reason, we start the future (without awaiting on it yet), and below
// process messages from the foreground at the same time as this subscription is
// performed.
let subscription = relay_chain_sync
.subscribe_all(
"parachain-sync",
32,
NonZeroUsize::new(usize::max_value()).unwrap(),
)
.fuse();
futures::pin_mut!(subscription);

// While we wait for the `subscription` future to be ready, we still need to process
// messages coming from the public API of the syncing service.
futures::select! {
subscription = subscription => {
// Subscription finished.
log::debug!(
target: &log_target,
"RelayChain => NewSubscription(finalized_hash={})",
HashDisplay(&header::hash_from_scale_encoded_header(
&subscription.finalized_block_scale_encoded_header
))
);

break subscription
},

foreground_message = from_foreground.next().fuse() => {
// Message from the public API of the syncing service.

// Terminating the parachain sync task if the foreground has closed.
let foreground_message = match foreground_message {
Some(m) => m,
None => return,
};

match foreground_message {
ToBackground::IsNearHeadOfChainHeuristic { send_back } => {
// If no finalized parahead is known yet, we might be very close
// to the head but also maybe very very far away. We lean on the
// cautious side and always return `false`.
let _ = send_back.send(false);
},
ToBackground::SubscribeAll { send_back, buffer_size, .. } => {
let (tx, new_blocks) = mpsc::channel(buffer_size.saturating_sub(1));

// No known finalized parahead.
let _ = send_back.send(super::SubscribeAll {
finalized_block_scale_encoded_header: obsolete_finalized_parahead.clone(),
finalized_block_runtime: None,
non_finalized_blocks_ancestry_order: Vec::new(),
new_blocks,
});

all_subscriptions.push(tx);
}
ToBackground::PeersAssumedKnowBlock { send_back, block_number, block_hash } => {
// If `block_number` is over the finalized block, then which source
// knows which block is precisely tracked. Otherwise, it is assumed
// that all sources are on the finalized chain and thus that all
// sources whose best block is superior to `block_number` have it.
let list = if block_number > sync_sources.finalized_block_height() {
sync_sources.knows_non_finalized_block(block_number, &block_hash)
.map(|local_id| sync_sources[local_id].0.clone())
.collect()
} else {
sync_sources
.keys()
.filter(|local_id| {
sync_sources.best_block(*local_id).0 >= block_number
})
.map(|local_id| sync_sources[local_id].0.clone())
.collect()
};

let _ = send_back.send(list);
}
ToBackground::SyncingPeers { send_back } => {
let _ = send_back.send(sync_sources.keys().map(|local_id| {
let (height, hash) = sync_sources.best_block(local_id);
let (peer_id, role) = sync_sources[local_id].clone();
(peer_id, role, height, *hash)
}).collect());
}
ToBackground::SerializeChainInformation { send_back } => {
let _ = send_back.send(None);
}
}
},
}
};

// Hash of the best parachain that has been reported to the subscriptions.
// `None` if and only if no finalized parahead is known yet.
Expand Down Expand Up @@ -143,12 +230,6 @@ pub(super) async fn start_parachain<TPlat: Platform>(
async_tree
};

// List of senders that get notified when the tree of blocks is modified.
// Note that this list is created in the inner loop, as to be cleared if the relay chain
// blocks stream has a gap.
let mut all_subscriptions = Vec::<mpsc::Sender<_>>::new();
log::debug!(target: &log_target, "Subscriptions <= Reset");

// List of in-progress parahead fetching operations.
//
// The operations require some blocks to be pinned within the relay chain runtime service,
Expand Down
4 changes: 4 additions & 0 deletions bin/wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

### Fixed

- Fix parachain initialization unnecessarily waiting for its corresponding relay chain initialization to be finished. ([#2705](https://github.com/paritytech/smoldot/pull/2705))

## 0.6.31 - 2022-08-30

### Changed
Expand Down

0 comments on commit ce720cc

Please sign in to comment.