diff --git a/bin/light-base/src/sync_service/standalone.rs b/bin/light-base/src/sync_service/standalone.rs index a895b71a9e..21333eaae9 100644 --- a/bin/light-base/src/sync_service/standalone.rs +++ b/bin/light-base/src/sync_service/standalone.rs @@ -94,16 +94,47 @@ pub(super) async fn start_standalone_chain( let mut from_network_service = from_network_service.fuse(); // Main loop of the syncing logic. + // + // This loop contains some CPU-heavy operations (e.g. verifying justifications and warp sync + // proofs) but also responding to messages sent by the foreground sync service. In order to + // avoid long delays in responding to foreground messages, the CPU-heavy operations are split + // into small chunks, and each iteration of the loop processes at most one of these chunks and + // processes one foreground message. loop { - // Start all networking requests (block requests, warp sync requests, etc.) that the - // syncing state machine would like to start. - task.start_requests(); + // Try to perform some CPU-heavy operations. + // If any CPU-heavy verification was performed, then `queue_empty` will be `false`, in + // which case we will loop again as soon as possible. + let queue_empty = { + let mut queue_empty = true; + + // Start a networking request (block requests, warp sync requests, etc.) that the + // syncing state machine would like to start. + if task.start_next_request() { + queue_empty = false; + } + + // TODO: handle obsolete requests + + // The sync state machine can be in a few various states. At the time of writing: + // idle, verifying header, verifying block, verifying grandpa warp sync proof, + // verifying storage proof. + // If the state is one of the "verifying" states, perform the actual verification + // and set ̀`queue_empty` to `false`. + let (task_update, has_done_verif) = task.process_one_verification_queue(); + task = task_update; - // TODO: handle obsolete requests + if has_done_verif { + queue_empty = false; - // The syncing state machine holds a queue of things (blocks, justifications, warp sync - // fragments, etc.) to verify. Process this queue now. - task = task.process_verification_queue().await; + // Since JavaScript/Wasm is single-threaded, executing many CPU-heavy operations + // in a row would prevent all the other tasks in the background from running. + // In order to provide a better granularity, we force a yield after each + // verification. + crate::util::yield_once().await; + } + + queue_empty + }; // Processing the queue might have updated the best block of the syncing state machine. if !task.network_up_to_date_best { @@ -255,6 +286,19 @@ pub(super) async fn start_standalone_chain( future::Either::Left(TPlat::sleep(Duration::from_secs(15))).fuse(); continue; }, + + // If the list of CPU-heavy operations to perform is potentially non-empty, then we + // wait for a future that is always instantly ready, in order to loop again and + // perform the next CPU-heavy operation. + // Note that if any of the other futures in that `select!` block is ready, then that + // other ready future might take precedence (or not, it is pseudo-random). This + // guarantees proper interleaving between CPU-heavy operations and responding to + // other kind of events. + () = if queue_empty { future::Either::Left(future::pending()) } + else { future::Either::Right(future::ready(())) } => + { + continue; + } }; // `response_outcome` represents the way the state machine has changed as a @@ -389,307 +433,290 @@ struct Task { } impl Task { - fn start_requests(&mut self) { - loop { - // `desired_requests()` returns, in decreasing order of priority, the requests - // that should be started in order for the syncing to proceed. The fact that multiple - // requests are returned could be used to filter out undesired one. We use this - // filtering to enforce a maximum of one ongoing request per source. - let (source_id, _, mut request_detail) = - match self.sync.desired_requests().find(|(source_id, _, _)| { - self.sync.source_num_ongoing_requests(*source_id) == 0 - }) { - Some(v) => v, - None => break, - }; + /// Starts one network request if any is necessary. + /// + /// Returns `true` if a request has been started. + fn start_next_request(&mut self) -> bool { + // `desired_requests()` returns, in decreasing order of priority, the requests + // that should be started in order for the syncing to proceed. The fact that multiple + // requests are returned could be used to filter out undesired one. We use this + // filtering to enforce a maximum of one ongoing request per source. + let (source_id, _, mut request_detail) = match self + .sync + .desired_requests() + .find(|(source_id, _, _)| self.sync.source_num_ongoing_requests(*source_id) == 0) + { + Some(v) => v, + None => return false, + }; - // Before inserting the request back to the syncing state machine, clamp the number - // of blocks to the number of blocks we expect to receive. - // This constant corresponds to the maximum number of blocks that nodes will answer - // in one request. If this constant happens to be inaccurate, everything will still - // work but less efficiently. - request_detail.num_blocks_clamp(NonZeroU64::new(64).unwrap()); - - match request_detail { - all::RequestDetail::BlocksRequest { - first_block_hash, - first_block_height, - ascending, - num_blocks, - request_headers, - request_bodies, - request_justification, - } => { - let peer_id = self.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue - - let block_request = self.network_service.clone().blocks_request( - peer_id, - self.network_chain_index, - network::protocol::BlocksRequestConfig { - start: if let Some(first_block_hash) = first_block_hash { - network::protocol::BlocksRequestConfigStart::Hash(first_block_hash) - } else { - network::protocol::BlocksRequestConfigStart::Number( - first_block_height, - ) - }, - desired_count: NonZeroU32::new( - u32::try_from(num_blocks.get()).unwrap_or(u32::max_value()), - ) - .unwrap(), - direction: if ascending { - network::protocol::BlocksRequestDirection::Ascending - } else { - network::protocol::BlocksRequestDirection::Descending - }, - fields: network::protocol::BlocksRequestFields { - header: request_headers, - body: request_bodies, - justifications: request_justification, - }, + // Before inserting the request back to the syncing state machine, clamp the number + // of blocks to the number of blocks we expect to receive. + // This constant corresponds to the maximum number of blocks that nodes will answer + // in one request. If this constant happens to be inaccurate, everything will still + // work but less efficiently. + request_detail.num_blocks_clamp(NonZeroU64::new(64).unwrap()); + + match request_detail { + all::RequestDetail::BlocksRequest { + first_block_hash, + first_block_height, + ascending, + num_blocks, + request_headers, + request_bodies, + request_justification, + } => { + let peer_id = self.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue + + let block_request = self.network_service.clone().blocks_request( + peer_id, + self.network_chain_index, + network::protocol::BlocksRequestConfig { + start: if let Some(first_block_hash) = first_block_hash { + network::protocol::BlocksRequestConfigStart::Hash(first_block_hash) + } else { + network::protocol::BlocksRequestConfigStart::Number(first_block_height) }, - Duration::from_secs(10), - ); + desired_count: NonZeroU32::new( + u32::try_from(num_blocks.get()).unwrap_or(u32::max_value()), + ) + .unwrap(), + direction: if ascending { + network::protocol::BlocksRequestDirection::Ascending + } else { + network::protocol::BlocksRequestDirection::Descending + }, + fields: network::protocol::BlocksRequestFields { + header: request_headers, + body: request_bodies, + justifications: request_justification, + }, + }, + Duration::from_secs(10), + ); - let (block_request, abort) = future::abortable(block_request); - let request_id = self.sync.add_request(source_id, request_detail, abort); + let (block_request, abort) = future::abortable(block_request); + let request_id = self.sync.add_request(source_id, request_detail, abort); - self.pending_block_requests - .push(async move { (request_id, block_request.await) }.boxed()); - } + self.pending_block_requests + .push(async move { (request_id, block_request.await) }.boxed()); + } + + all::RequestDetail::GrandpaWarpSync { + sync_start_block_hash, + } => { + let peer_id = self.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue - all::RequestDetail::GrandpaWarpSync { + let grandpa_request = self.network_service.clone().grandpa_warp_sync_request( + peer_id, + self.network_chain_index, sync_start_block_hash, - } => { - let peer_id = self.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue - - let grandpa_request = self.network_service.clone().grandpa_warp_sync_request( - peer_id, - self.network_chain_index, - sync_start_block_hash, - // The timeout needs to be long enough to potentially download the maximum - // response size of 16 MiB. Assuming a 128 kiB/sec connection, that's - // 128 seconds. Unfortunately, 128 seconds is way too large, and for - // pragmatic reasons we use a lower value. - Duration::from_secs(24), - ); + // The timeout needs to be long enough to potentially download the maximum + // response size of 16 MiB. Assuming a 128 kiB/sec connection, that's + // 128 seconds. Unfortunately, 128 seconds is way too large, and for + // pragmatic reasons we use a lower value. + Duration::from_secs(24), + ); - let (grandpa_request, abort) = future::abortable(grandpa_request); - let request_id = self.sync.add_request(source_id, request_detail, abort); + let (grandpa_request, abort) = future::abortable(grandpa_request); + let request_id = self.sync.add_request(source_id, request_detail, abort); - self.pending_grandpa_requests - .push(async move { (request_id, grandpa_request.await) }.boxed()); - } + self.pending_grandpa_requests + .push(async move { (request_id, grandpa_request.await) }.boxed()); + } - all::RequestDetail::StorageGet { - block_hash, - state_trie_root, - ref keys, - } => { - let peer_id = self.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue - - let storage_request = self.network_service.clone().storage_proof_request( - self.network_chain_index, - peer_id, - network::protocol::StorageProofRequestConfig { - block_hash, - keys: keys.clone().into_iter(), - }, - Duration::from_secs(16), - ); + all::RequestDetail::StorageGet { + block_hash, + state_trie_root, + ref keys, + } => { + let peer_id = self.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue + + let storage_request = self.network_service.clone().storage_proof_request( + self.network_chain_index, + peer_id, + network::protocol::StorageProofRequestConfig { + block_hash, + keys: keys.clone().into_iter(), + }, + Duration::from_secs(16), + ); - let keys = keys.clone(); - let storage_request = async move { - if let Ok(outcome) = storage_request.await { - // TODO: lots of copying around - // TODO: log what happens - keys.iter() - .map(|key| { - proof_verify::verify_proof(proof_verify::VerifyProofConfig { - proof: outcome.iter().map(|nv| &nv[..]), - requested_key: key.as_ref(), - trie_root_hash: &state_trie_root, - }) - .map_err(|_| ()) - .map(|v| v.map(|v| v.to_vec())) + let keys = keys.clone(); + let storage_request = async move { + if let Ok(outcome) = storage_request.await { + // TODO: lots of copying around + // TODO: log what happens + keys.iter() + .map(|key| { + proof_verify::verify_proof(proof_verify::VerifyProofConfig { + proof: outcome.iter().map(|nv| &nv[..]), + requested_key: key.as_ref(), + trie_root_hash: &state_trie_root, }) - .collect::, ()>>() - } else { - Err(()) - } - }; + .map_err(|_| ()) + .map(|v| v.map(|v| v.to_vec())) + }) + .collect::, ()>>() + } else { + Err(()) + } + }; - let (storage_request, abort) = future::abortable(storage_request); - let request_id = self.sync.add_request(source_id, request_detail, abort); + let (storage_request, abort) = future::abortable(storage_request); + let request_id = self.sync.add_request(source_id, request_detail, abort); - self.pending_storage_requests - .push(async move { (request_id, storage_request.await) }.boxed()); - } + self.pending_storage_requests + .push(async move { (request_id, storage_request.await) }.boxed()); } } + + true } - /// Verifies all the blocks, justifications, warp sync fragments, etc. that are queued for + /// Verifies one block, or justification, or warp sync fragment, etc. that is queued for /// verification. /// - /// Returns ̀`self`. - async fn process_verification_queue(mut self) -> Self { - // The sync state machine can be in a few various states. At the time of writing: - // idle, verifying header, verifying block, verifying grandpa warp sync proof, - // verifying storage proof. - // If the state is one of the "verifying" states, perform the actual verification and - // loop again until the sync is in an idle state. - loop { - // Since this task is verifying blocks or warp sync fragments, which are heavy CPU-only - // operation, it is very much possible for it to take a long time before having to wait - // for some event. Since JavaScript/Wasm is single-threaded, this would prevent all - // the other tasks in the background from running. - // In order to provide a better granularity, we force a yield after each verification. - crate::util::yield_once().await; - - // Note that `process_one` moves out of `sync` and provides the value back in its - // return value. - match self.sync.process_one() { - all::ProcessOne::AllSync(sync) => { - // Nothing to do. Queue is empty. - self.sync = sync; - return self; + /// Returns ̀`self` and a boolean indicating whether something has been processed. + fn process_one_verification_queue(mut self) -> (Self, bool) { + // Note that `process_one` moves out of `sync` and provides the value back in its + // return value. + match self.sync.process_one() { + all::ProcessOne::AllSync(sync) => { + // Nothing to do. Queue is empty. + self.sync = sync; + return (self, false); + } + + all::ProcessOne::VerifyWarpSyncFragment(verify) => { + // Grandpa warp sync fragment to verify. + let sender_peer_id = verify.proof_sender().1 .0.clone(); // TODO: unnecessary cloning most of the time + + let (sync, result) = verify.perform(); + self.sync = sync; + + if let Err(err) = result { + log::warn!( + target: &self.log_target, + "Failed to verify warp sync fragment from {}: {}", + sender_peer_id, + err + ); } + } + + all::ProcessOne::VerifyHeader(verify) => { + // Header to verify. + let verified_hash = verify.hash(); + match verify.perform(TPlat::now_from_unix_epoch(), ()) { + all::HeaderVerifyOutcome::Success { + sync, is_new_best, .. + } => { + self.sync = sync; + + log::debug!( + target: &self.log_target, + "Sync => HeaderVerified(hash={}, new_best={})", + HashDisplay(&verified_hash), + if is_new_best { "yes" } else { "no" } + ); + + if is_new_best { + self.network_up_to_date_best = false; + } + + // Notify of the new block. + self.dispatch_all_subscribers({ + // TODO: the code below is `O(n)` complexity + let header = self + .sync + .non_finalized_blocks_unordered() + .find(|h| h.hash() == verified_hash) + .unwrap(); + Notification::Block(BlockNotification { + is_new_best, + scale_encoded_header: header.scale_encoding_vec(), + parent_hash: *header.parent_hash, + }) + }); + } - all::ProcessOne::VerifyWarpSyncFragment(verify) => { - // Grandpa warp sync fragment to verify. - let sender_peer_id = verify.proof_sender().1 .0.clone(); // TODO: unnecessary cloning most of the time + all::HeaderVerifyOutcome::Error { sync, error, .. } => { + self.sync = sync; - let (sync, result) = verify.perform(); - self.sync = sync; + // TODO: print which peer sent the header + log::debug!( + target: &self.log_target, + "Sync => HeaderVerifyError(hash={}, error={:?})", + HashDisplay(&verified_hash), + error + ); - if let Err(err) = result { log::warn!( target: &self.log_target, - "Failed to verify warp sync fragment from {}: {}", - sender_peer_id, - err + "Error while verifying header {}: {}", + HashDisplay(&verified_hash), + error ); } } + } - all::ProcessOne::VerifyHeader(verify) => { - // Header to verify. - let verified_hash = verify.hash(); - match verify.perform(TPlat::now_from_unix_epoch(), ()) { - all::HeaderVerifyOutcome::Success { - sync, is_new_best, .. - } => { - self.sync = sync; - - log::debug!( - target: &self.log_target, - "Sync => HeaderVerified(hash={}, new_best={})", - HashDisplay(&verified_hash), - if is_new_best { "yes" } else { "no" } - ); - - if is_new_best { - self.network_up_to_date_best = false; - } - - // Notify of the new block. - self.dispatch_all_subscribers({ - // TODO: the code below is `O(n)` complexity - let header = self - .sync - .non_finalized_blocks_unordered() - .find(|h| h.hash() == verified_hash) - .unwrap(); - Notification::Block(BlockNotification { - is_new_best, - scale_encoded_header: header.scale_encoding_vec(), - parent_hash: *header.parent_hash, - }) - }); + all::ProcessOne::VerifyJustification(verify) => { + // Justification to verify. + match verify.perform() { + ( + sync, + all::JustificationVerifyOutcome::NewFinalized { + updates_best_block, + finalized_blocks, + .. + }, + ) => { + self.sync = sync; - continue; - } + log::debug!( + target: &self.log_target, + "Sync => JustificationVerified(finalized_blocks={})", + finalized_blocks.len(), + ); - all::HeaderVerifyOutcome::Error { sync, error, .. } => { - self.sync = sync; - - // TODO: print which peer sent the header - log::debug!( - target: &self.log_target, - "Sync => HeaderVerifyError(hash={}, error={:?})", - HashDisplay(&verified_hash), - error - ); - - log::warn!( - target: &self.log_target, - "Error while verifying header {}: {}", - HashDisplay(&verified_hash), - error - ); - - continue; + if updates_best_block { + self.network_up_to_date_best = false; } + self.network_up_to_date_finalized = false; + self.known_finalized_runtime = None; // TODO: only do if there was no RuntimeUpdated log item + self.dispatch_all_subscribers(Notification::Finalized { + hash: self.sync.finalized_block_header().hash(), + best_block_hash: self.sync.best_block_hash(), + }); } - } - - all::ProcessOne::VerifyJustification(verify) => { - // Justification to verify. - match verify.perform() { - ( - sync, - all::JustificationVerifyOutcome::NewFinalized { - updates_best_block, - finalized_blocks, - .. - }, - ) => { - self.sync = sync; - - log::debug!( - target: &self.log_target, - "Sync => JustificationVerified(finalized_blocks={})", - finalized_blocks.len(), - ); - - if updates_best_block { - self.network_up_to_date_best = false; - } - self.network_up_to_date_finalized = false; - self.known_finalized_runtime = None; // TODO: only do if there was no RuntimeUpdated log item - self.dispatch_all_subscribers(Notification::Finalized { - hash: self.sync.finalized_block_header().hash(), - best_block_hash: self.sync.best_block_hash(), - }); - continue; - } - (sync, all::JustificationVerifyOutcome::Error(error)) => { - self.sync = sync; + (sync, all::JustificationVerifyOutcome::Error(error)) => { + self.sync = sync; - // TODO: print which peer sent the justification - log::debug!( - target: &self.log_target, - "Sync => JustificationVerificationError(error={:?})", - error, - ); - - log::warn!( - target: &self.log_target, - "Error while verifying justification: {}", - error - ); + // TODO: print which peer sent the justification + log::debug!( + target: &self.log_target, + "Sync => JustificationVerificationError(error={:?})", + error, + ); - continue; - } + log::warn!( + target: &self.log_target, + "Error while verifying justification: {}", + error + ); } } - - // Can't verify header and body in non-full mode. - all::ProcessOne::VerifyBodyHeader(_) => unreachable!(), } + + // Can't verify header and body in non-full mode. + all::ProcessOne::VerifyBodyHeader(_) => unreachable!(), } + + (self, true) } /// Process a request coming from the foreground service. diff --git a/bin/wasm-node/CHANGELOG.md b/bin/wasm-node/CHANGELOG.md index 1ec671bb2b..1c3ad2c029 100644 --- a/bin/wasm-node/CHANGELOG.md +++ b/bin/wasm-node/CHANGELOG.md @@ -5,6 +5,7 @@ ### Fixed - No longer panic if passed a chain specification containing an invalid bootnode address. Because the specification of the format of a multiaddress is flexible, invalid bootnode addresses do not trigger a hard error but instead are ignored and a warning is printed. ([#2207](https://github.com/paritytech/smoldot/pull/2207)) +- Make sure that the tasks of the nodes that have a lot of CPU-heavy operations to perform periodically yield to other tasks, ensuring that the less busy tasks still make progress. This fixes a variety of issues such as chains taking a long time to initialize, or simple JSON-RPC requests taking a long time to be answered. ([#2213](https://github.com/paritytech/smoldot/pull/2213)) - Fix several potential infinite loops when finality lags behind too much ([#2215](https://github.com/paritytech/smoldot/pull/2215)). ## 0.6.13 - 2022-04-05