Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process incoming foreground messages while subscribing #2705

Merged
merged 3 commits into from
Aug 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 101 additions & 20 deletions bin/light-base/src/sync_service/parachain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,25 +76,112 @@ pub(super) async fn start_parachain<TPlat: Platform>(
// a gap in its blocks, or if the node is overloaded and can't process blocks in time, then
// we break out of the inner loop in order to reset everything.
loop {
// List of senders that get notified when the tree of blocks is modified.
// Note that this list is created in the inner loop, as to be cleared if the relay chain
// blocks stream has a gap.
let mut all_subscriptions = Vec::<mpsc::Sender<_>>::new();
log::debug!(target: &log_target, "Subscriptions <= Reset");

// Stream of blocks of the relay chain this parachain is registered on.
// The buffer size should be large enough so that, if the CPU is busy, it doesn't
// become full before the execution of the sync service resumes.
// The maximum number of pinned block is ignored, as this maximum is a way to avoid
// malicious behaviors. This code is by definition not considered malicious.
let mut relay_chain_subscribe_all = relay_chain_sync
.subscribe_all(
"parachain-sync",
32,
NonZeroUsize::new(usize::max_value()).unwrap(),
)
.await;
log::debug!(
target: &log_target,
"RelayChain => NewSubscription(finalized_hash={})",
HashDisplay(&header::hash_from_scale_encoded_header(
&relay_chain_subscribe_all.finalized_block_scale_encoded_header
))
);
let mut relay_chain_subscribe_all = loop {
// Subscribing to the runtime service might take a long time, as it waits for the
// runtime of the finalized block to be downloaded.
// For this reason, we start the future (without awaiting on it yet), and below
// process messages from the foreground at the same time as this subscription is
// performed.
let subscription = relay_chain_sync
.subscribe_all(
"parachain-sync",
32,
NonZeroUsize::new(usize::max_value()).unwrap(),
)
.fuse();
futures::pin_mut!(subscription);

// While we wait for the `subscription` future to be ready, we still need to process
// messages coming from the public API of the syncing service.
futures::select! {
subscription = subscription => {
// Subscription finished.
log::debug!(
target: &log_target,
"RelayChain => NewSubscription(finalized_hash={})",
HashDisplay(&header::hash_from_scale_encoded_header(
&subscription.finalized_block_scale_encoded_header
))
);

break subscription
},

foreground_message = from_foreground.next().fuse() => {
// Message from the public API of the syncing service.

// Terminating the parachain sync task if the foreground has closed.
let foreground_message = match foreground_message {
Some(m) => m,
None => return,
};

match foreground_message {
ToBackground::IsNearHeadOfChainHeuristic { send_back } => {
// If no finalized parahead is known yet, we might be very close
// to the head but also maybe very very far away. We lean on the
// cautious side and always return `false`.
let _ = send_back.send(false);
},
ToBackground::SubscribeAll { send_back, buffer_size, .. } => {
let (tx, new_blocks) = mpsc::channel(buffer_size.saturating_sub(1));

// No known finalized parahead.
let _ = send_back.send(super::SubscribeAll {
finalized_block_scale_encoded_header: obsolete_finalized_parahead.clone(),
finalized_block_runtime: None,
non_finalized_blocks_ancestry_order: Vec::new(),
new_blocks,
});

all_subscriptions.push(tx);
}
ToBackground::PeersAssumedKnowBlock { send_back, block_number, block_hash } => {
// If `block_number` is over the finalized block, then which source
// knows which block is precisely tracked. Otherwise, it is assumed
// that all sources are on the finalized chain and thus that all
// sources whose best block is superior to `block_number` have it.
let list = if block_number > sync_sources.finalized_block_height() {
sync_sources.knows_non_finalized_block(block_number, &block_hash)
.map(|local_id| sync_sources[local_id].0.clone())
.collect()
} else {
sync_sources
.keys()
.filter(|local_id| {
sync_sources.best_block(*local_id).0 >= block_number
})
.map(|local_id| sync_sources[local_id].0.clone())
.collect()
};

let _ = send_back.send(list);
}
ToBackground::SyncingPeers { send_back } => {
let _ = send_back.send(sync_sources.keys().map(|local_id| {
let (height, hash) = sync_sources.best_block(local_id);
let (peer_id, role) = sync_sources[local_id].clone();
(peer_id, role, height, *hash)
}).collect());
}
ToBackground::SerializeChainInformation { send_back } => {
let _ = send_back.send(None);
}
}
},
}
};

// Hash of the best parachain that has been reported to the subscriptions.
// `None` if and only if no finalized parahead is known yet.
Expand Down Expand Up @@ -143,12 +230,6 @@ pub(super) async fn start_parachain<TPlat: Platform>(
async_tree
};

// List of senders that get notified when the tree of blocks is modified.
// Note that this list is created in the inner loop, as to be cleared if the relay chain
// blocks stream has a gap.
let mut all_subscriptions = Vec::<mpsc::Sender<_>>::new();
log::debug!(target: &log_target, "Subscriptions <= Reset");

// List of in-progress parahead fetching operations.
//
// The operations require some blocks to be pinned within the relay chain runtime service,
Expand Down
4 changes: 4 additions & 0 deletions bin/wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

### Fixed

- Fix parachain initialization unnecessarily waiting for its corresponding relay chain initialization to be finished. ([#2705](https://github.com/paritytech/smoldot/pull/2705))

## 0.6.31 - 2022-08-30

### Changed
Expand Down