diff --git a/Cargo.lock b/Cargo.lock index 1ae6aebfb8..892f37c504 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2411,6 +2411,7 @@ dependencies = [ "async-lock", "base64 0.21.2", "blake2-rfc", + "crossbeam-queue", "derive_more", "either", "env_logger", diff --git a/lib/src/sync/all.rs b/lib/src/sync/all.rs index d9780794f9..5e13a45ba7 100644 --- a/lib/src/sync/all.rs +++ b/lib/src/sync/all.rs @@ -683,6 +683,7 @@ impl AllSync { /// /// Panics if the [`SourceId`] is invalid. /// + // TODO: this function is questionable, because in practice we send requests to sources that are outside the scope of syncing pub fn source_num_ongoing_requests(&self, source_id: SourceId) -> usize { debug_assert!(self.shared.sources.contains(source_id.0)); diff --git a/light-base/Cargo.toml b/light-base/Cargo.toml index 14496a671b..d038c2de47 100644 --- a/light-base/Cargo.toml +++ b/light-base/Cargo.toml @@ -17,6 +17,7 @@ async-channel = { version = "1.8.0", default-features = false } # TODO: no-std- async-lock = { version = "2.7.0", default-features = false } # TODO: no-std-ize; this is has been done and is just waiting for a release: https://github.com/smol-rs/event-listener/pull/34 base64 = { version = "0.21.2", default-features = false, features = ["alloc"] } blake2-rfc = { version = "0.2.18", default-features = false } +crossbeam-queue = { version = "0.3.8", default-features = false, features = ["alloc"] } derive_more = "0.99.17" either = { version = "1.8.1", default-features = false } event-listener = { version = "2.5.3" } diff --git a/light-base/src/network_service.rs b/light-base/src/network_service.rs index 01addc92c8..8a3e2ddad5 100644 --- a/light-base/src/network_service.rs +++ b/light-base/src/network_service.rs @@ -143,6 +143,13 @@ struct Shared { /// if the event is notified while the background task is already awake, the background task /// will do an additional loop. wake_up_main_background_task: event_listener::Event, + + /// Whenever a request to a peer finished or is aborted, an element is pushed to this queue. + /// The queue is later processed in order to update [`SharedGuarded::peer_requests_locks`]. + peer_requests_unlocks: crossbeam_queue::SegQueue, + + /// Event to notify when an element is pushed onto [`Shared::peer_requests_unlocks`]. + peer_requests_unlocks_pushed: event_listener::Event, } struct SharedGuarded { @@ -153,6 +160,9 @@ struct SharedGuarded { // TODO: should also detect whenever we fail to open a block announces substream with any of these peers important_nodes: HashSet, + /// List of peers for which a request lock has been grabbed. + peer_requests_locks: HashSet, + /// List of peer and chain index tuples for which no outbound slot should be assigned. /// /// The values are the moment when the ban expires. @@ -262,6 +272,7 @@ impl NetworkService { util::SipHasherBuild::new(rand::random()), ), important_nodes: HashSet::with_capacity_and_hasher(16, Default::default()), + peer_requests_locks: HashSet::with_capacity_and_hasher(16, Default::default()), active_connections: HashMap::with_capacity_and_hasher(32, Default::default()), messages_from_connections_tx, messages_from_connections_rx, @@ -281,6 +292,8 @@ impl NetworkService { identify_agent_version: config.identify_agent_version, log_chain_names, wake_up_main_background_task: event_listener::Event::new(), + peer_requests_unlocks: crossbeam_queue::SegQueue::new(), + peer_requests_unlocks_pushed: event_listener::Event::new(), }); // Spawn main task that processes the network service. @@ -355,11 +368,90 @@ impl NetworkService { (final_network_service, event_receivers) } + /// Try to grab a so-called "request lock" associated with the given `PeerId`. While the lock + /// is active, no other "request lock" towards the same `PeerId` can be acquired. The lock + /// can then be used to actually start a request. + /// + /// This function does **not** check whether there exists a connection with the given peer. + /// In other words, it is possible to successfully grab a lock only for the request to fail + /// because we aren't connected to the given peer. The reason for this design is that it is + /// possible for the peer disconnect between the lock being grabbed and the request starting, + /// and as such there's no benefit in performing the verification twice. + pub async fn try_lock_peer_for_request( + self: Arc, + target: PeerId, + ) -> Option> { + let mut guarded = self.shared.guarded.lock().await; + + while let Some(unlocked_peer) = self.shared.peer_requests_unlocks.pop() { + let _was_in = guarded.peer_requests_locks.remove(&unlocked_peer); + debug_assert!(_was_in); + } + + if guarded.peer_requests_locks.contains(&target) { + return None; + } + + guarded.peer_requests_locks.insert(target.clone()); + drop(guarded); + + Some(PeerRequestLock { + service: self, + peer_id: target, + }) + } + + /// Grabs a so-called "request lock" associated with any of the `PeerId` provided. While the + /// lock is active, no other "request lock" towards the same `PeerId` can be acquired. The + /// lock can then be used to actually start a request. + /// + /// This function waits until one of the peers can be locked and locks it. If multiple peers + /// are ready, one is chosen at random. + /// + /// See also [`NetworkService::try_lock_peer_for_request`]. + // TODO: better API for the list? + pub async fn lock_any_for_request( + self: Arc, + list: hashbrown::HashSet, + ) -> PeerRequestLock { + let mut on_pushed = None::; + + loop { + if let Some(on_pushed) = on_pushed.take() { + on_pushed.await; + } + + let mut guarded = self.shared.guarded.lock().await; + + on_pushed = Some(self.shared.peer_requests_unlocks_pushed.listen()); + + while let Some(unlocked_peer) = self.shared.peer_requests_unlocks.pop() { + let _was_in = guarded.peer_requests_locks.remove(&unlocked_peer); + debug_assert!(_was_in); + } + + if let Some(peer_id) = rand::seq::IteratorRandom::choose( + list.difference(&guarded.peer_requests_locks), + &mut rand::thread_rng(), + ) + .cloned() + { + guarded.peer_requests_locks.insert(peer_id.clone()); + drop(guarded); + + break PeerRequestLock { + service: self, + peer_id, + }; + } + } + } + /// Sends a blocks request to the given peer. // TODO: more docs pub async fn blocks_request( self: Arc, - target: PeerId, // TODO: takes by value because of future longevity issue + request_lock: PeerRequestLock, chain_index: usize, config: protocol::BlocksRequestConfig, timeout: Duration, @@ -368,7 +460,7 @@ impl NetworkService { let mut guarded = self.shared.guarded.lock().await; // The call to `start_blocks_request` below panics if we have no active connection. - if !guarded.network.can_start_requests(&target) { + if !guarded.network.can_start_requests(&request_lock.peer_id) { return Err(BlocksRequestError::NoConnection); } @@ -377,7 +469,7 @@ impl NetworkService { log::debug!( target: "network", "Connection({}) <= BlocksRequest(chain={}, start={}, num={}, descending={:?}, header={:?}, body={:?}, justifications={:?})", - target, self.shared.log_chain_names[chain_index], HashDisplay(hash), + request_lock.peer_id, self.shared.log_chain_names[chain_index], HashDisplay(hash), config.desired_count.get(), matches!(config.direction, protocol::BlocksRequestDirection::Descending), config.fields.header, config.fields.body, config.fields.justifications @@ -387,7 +479,7 @@ impl NetworkService { log::debug!( target: "network", "Connection({}) <= BlocksRequest(chain={}, start=#{}, num={}, descending={:?}, header={:?}, body={:?}, justifications={:?})", - target, self.shared.log_chain_names[chain_index], number, + request_lock.peer_id, self.shared.log_chain_names[chain_index], number, config.desired_count.get(), matches!(config.direction, protocol::BlocksRequestDirection::Descending), config.fields.header, config.fields.body, config.fields.justifications @@ -397,7 +489,7 @@ impl NetworkService { let request_id = guarded.network.start_blocks_request( self.shared.platform.now(), - &target, + &request_lock.peer_id, chain_index, config, timeout, @@ -417,7 +509,7 @@ impl NetworkService { log::debug!( target: "network", "Connection({}) => BlocksRequest(chain={}, num_blocks={}, block_data_total_size={})", - target, + request_lock.peer_id, self.shared.log_chain_names[chain_index], blocks.len(), BytesDisplay(blocks.iter().fold(0, |sum, block| { @@ -432,7 +524,7 @@ impl NetworkService { log::debug!( target: "network", "Connection({}) => BlocksRequest(chain={}, error={:?})", - target, + request_lock.peer_id, self.shared.log_chain_names[chain_index], err ); @@ -449,13 +541,15 @@ impl NetworkService { log::warn!( target: "network", "Error in block request with {}. This might indicate an incompatibility. Error: {}", - target, + request_lock.peer_id, err ); } } } + drop(request_lock); + result.map_err(BlocksRequestError::Request) } @@ -463,7 +557,7 @@ impl NetworkService { // TODO: more docs pub async fn grandpa_warp_sync_request( self: Arc, - target: PeerId, // TODO: takes by value because of future longevity issue + request_lock: PeerRequestLock, chain_index: usize, begin_hash: [u8; 32], timeout: Duration, @@ -473,18 +567,18 @@ impl NetworkService { // The call to `start_grandpa_warp_sync_request` below panics if we have no // active connection. - if !guarded.network.can_start_requests(&target) { + if !guarded.network.can_start_requests(&request_lock.peer_id) { return Err(GrandpaWarpSyncRequestError::NoConnection); } log::debug!( target: "network", "Connection({}) <= GrandpaWarpSyncRequest(chain={}, start={})", - target, self.shared.log_chain_names[chain_index], HashDisplay(&begin_hash) + request_lock.peer_id, self.shared.log_chain_names[chain_index], HashDisplay(&begin_hash) ); let request_id = guarded.network.start_grandpa_warp_sync_request( self.shared.platform.now(), - &target, + &request_lock.peer_id, chain_index, begin_hash, timeout, @@ -506,7 +600,7 @@ impl NetworkService { log::debug!( target: "network", "Connection({}) => GrandpaWarpSyncRequest(chain={}, num_fragments={}, finished={:?})", - target, + request_lock.peer_id, self.shared.log_chain_names[chain_index], decoded.fragments.len(), decoded.is_finished, @@ -516,13 +610,15 @@ impl NetworkService { log::debug!( target: "network", "Connection({}) => GrandpaWarpSyncRequest(chain={}, error={:?})", - target, + request_lock.peer_id, self.shared.log_chain_names[chain_index], err, ); } } + drop(request_lock); + result.map_err(GrandpaWarpSyncRequestError::Request) } @@ -568,7 +664,7 @@ impl NetworkService { pub async fn storage_proof_request( self: Arc, chain_index: usize, - target: PeerId, // TODO: takes by value because of futures longevity issue + request_lock: PeerRequestLock, config: protocol::StorageProofRequestConfig + Clone>>, timeout: Duration, ) -> Result { @@ -577,21 +673,21 @@ impl NetworkService { // The call to `start_storage_proof_request` below panics if we have no active // connection. - if !guarded.network.can_start_requests(&target) { + if !guarded.network.can_start_requests(&request_lock.peer_id) { return Err(StorageProofRequestError::NoConnection); } log::debug!( target: "network", "Connection({}) <= StorageProofRequest(chain={}, block={})", - target, + request_lock.peer_id, self.shared.log_chain_names[chain_index], HashDisplay(&config.block_hash) ); let request_id = match guarded.network.start_storage_proof_request( self.shared.platform.now(), - &target, + &request_lock.peer_id, chain_index, config, timeout, @@ -618,7 +714,7 @@ impl NetworkService { log::debug!( target: "network", "Connection({}) => StorageProofRequest(chain={}, total_size={})", - target, + request_lock.peer_id, self.shared.log_chain_names[chain_index], BytesDisplay(u64::try_from(decoded.len()).unwrap()), ); @@ -627,13 +723,15 @@ impl NetworkService { log::debug!( target: "network", "Connection({}) => StorageProofRequest(chain={}, error={:?})", - target, + request_lock.peer_id, self.shared.log_chain_names[chain_index], err ); } } + drop(request_lock); + result.map_err(StorageProofRequestError::Request) } @@ -644,7 +742,7 @@ impl NetworkService { pub async fn call_proof_request( self: Arc, chain_index: usize, - target: PeerId, // TODO: takes by value because of futures longevity issue + request_lock: PeerRequestLock, config: protocol::CallProofRequestConfig<'_, impl Iterator>>, timeout: Duration, ) -> Result { @@ -652,14 +750,14 @@ impl NetworkService { let mut guarded = self.shared.guarded.lock().await; // The call to `start_call_proof_request` below panics if we have no active connection. - if !guarded.network.can_start_requests(&target) { + if !guarded.network.can_start_requests(&request_lock.peer_id) { return Err(CallProofRequestError::NoConnection); } log::debug!( target: "network", "Connection({}) <= CallProofRequest({}, {}, {})", - target, + request_lock.peer_id, self.shared.log_chain_names[chain_index], HashDisplay(&config.block_hash), config.method @@ -667,7 +765,7 @@ impl NetworkService { let request_id = match guarded.network.start_call_proof_request( self.shared.platform.now(), - &target, + &request_lock.peer_id, chain_index, config, timeout, @@ -693,7 +791,7 @@ impl NetworkService { log::debug!( target: "network", "Connection({}) => CallProofRequest({}, total_size: {})", - target, + request_lock.peer_id, self.shared.log_chain_names[chain_index], BytesDisplay(u64::try_from(decoded.len()).unwrap()) ); @@ -702,13 +800,15 @@ impl NetworkService { log::debug!( target: "network", "Connection({}) => CallProofRequest({}, {})", - target, + request_lock.peer_id, self.shared.log_chain_names[chain_index], err ); } } + drop(request_lock); + result.map_err(CallProofRequestError::Request) } @@ -884,6 +984,27 @@ pub enum Event { }, } +/// Active lock preventing other requests towards the same peer from being started. +/// +/// See [`NetworkService::try_lock_peer_for_request`]. +pub struct PeerRequestLock { + service: Arc>, + peer_id: PeerId, +} + +impl Drop for PeerRequestLock { + fn drop(&mut self) { + self.service + .shared + .peer_requests_unlocks + .push(self.peer_id.clone()); + self.service + .shared + .peer_requests_unlocks_pushed + .notify(usize::max_value()); + } +} + /// Error returned by [`NetworkService::blocks_request`]. #[derive(Debug, derive_more::Display)] pub enum BlocksRequestError { diff --git a/light-base/src/sync_service.rs b/light-base/src/sync_service.rs index a58d46a954..7f1e19c10e 100644 --- a/light-base/src/sync_service.rs +++ b/light-base/src/sync_service.rs @@ -33,7 +33,6 @@ use async_lock::Mutex; use core::{fmt, mem, num::NonZeroU32, time::Duration}; use futures_channel::{mpsc, oneshot}; use futures_util::{stream, SinkExt as _}; -use rand::seq::IteratorRandom as _; use smoldot::{ chain, executor::host, @@ -302,7 +301,7 @@ impl SyncService { &self, block_number: u64, block_hash: &[u8; 32], - ) -> impl Iterator { + ) -> Vec { let (send_back, rx) = oneshot::channel(); self.to_background @@ -316,7 +315,7 @@ impl SyncService { .await .unwrap(); - rx.await.unwrap().into_iter() + rx.await.unwrap() } // TODO: doc; explain the guarantees @@ -325,7 +324,7 @@ impl SyncService { block_number: u64, hash: [u8; 32], fields: protocol::BlocksRequestFields, - total_attempts: u32, + mut total_attempts: u32, timeout_per_request: Duration, _max_parallel: NonZeroU32, ) -> Result { @@ -338,17 +337,29 @@ impl SyncService { }; // TODO: handle max_parallel - // TODO: better peers selection ; don't just take the first 3 - for target in self - .peers_assumed_know_blocks(block_number, &hash) - .await - .take(usize::try_from(total_attempts).unwrap_or(usize::max_value())) - { + // TODO: shuffle peers ; don't just take the first + // TODO: slowly increase number of peers dynamically if no peer is immediately available + loop { + if total_attempts == 0 { + break; + } + + let peers_assumed_know_blocks = + self.peers_assumed_know_blocks(block_number, &hash).await; + + let request_lock = self + .network_service + .clone() + .lock_any_for_request(peers_assumed_know_blocks.into_iter().collect()) + .await; + + total_attempts -= 1; + let mut result = match self .network_service .clone() .blocks_request( - target, + request_lock, self.network_chain_index, request_config.clone(), timeout_per_request, @@ -370,7 +381,7 @@ impl SyncService { self: Arc, hash: [u8; 32], fields: protocol::BlocksRequestFields, - total_attempts: u32, + mut total_attempts: u32, timeout_per_request: Duration, _max_parallel: NonZeroU32, ) -> Result { @@ -383,18 +394,28 @@ impl SyncService { }; // TODO: handle max_parallel - // TODO: better peers selection ; don't just take the first - for target in self - .network_service - .peers_list() - .await - .take(usize::try_from(total_attempts).unwrap_or(usize::max_value())) - { + // TODO: shuffle peers ; don't just take the first + // TODO: slowly increase number of peers dynamically if no peer is immediately available + loop { + if total_attempts == 0 { + break; + } + + let peers_list = self.network_service.peers_list().await; + + let request_lock = self + .network_service + .clone() + .lock_any_for_request(peers_list.collect()) + .await; + + total_attempts -= 1; + let mut result = match self .network_service .clone() .blocks_request( - target, + request_lock, self.network_chain_index, request_config.clone(), timeout_per_request, @@ -498,17 +519,19 @@ impl SyncService { } // Choose peer to query. - // TODO: better peers selection - let Some(target) = self - .peers_assumed_know_blocks(block_number, block_hash) - .await - .choose(&mut rand::thread_rng()) - else { - // No peer knows this block. Returning with a failure. + let peers_assumed_know_blocks = self + .peers_assumed_know_blocks(block_number, &block_hash) + .await; + if peers_assumed_know_blocks.is_empty() { return Err(StorageQueryError { errors: outcome_errors, }); - }; + } + let request_lock = self + .network_service + .clone() + .lock_any_for_request(peers_assumed_know_blocks.into_iter().collect()) + .await; // Build the list of keys to request. let keys_to_request = { @@ -546,7 +569,7 @@ impl SyncService { .clone() .storage_proof_request( self.network_chain_index, - target, + request_lock, protocol::StorageProofRequestConfig { block_hash: *block_hash, keys: keys_to_request.into_iter(), @@ -735,26 +758,39 @@ impl SyncService { '_, impl Iterator> + Clone, >, - total_attempts: u32, + mut total_attempts: u32, timeout_per_request: Duration, _max_parallel: NonZeroU32, ) -> Result { let mut outcome_errors = Vec::with_capacity(usize::try_from(total_attempts).unwrap_or(usize::max_value())); - // TODO: better peers selection ; don't just take the first // TODO: handle max_parallel - for target in self - .peers_assumed_know_blocks(block_number, &config.block_hash) - .await - .take(usize::try_from(total_attempts).unwrap_or(usize::max_value())) - { + // TODO: shuffle peers ; don't just take the first + // TODO: slowly increase number of peers dynamically if no peer is immediately available + loop { + if total_attempts == 0 { + break; + } + + let peers_assumed_know_blocks = self + .peers_assumed_know_blocks(block_number, &config.block_hash) + .await; + + let request_lock = self + .network_service + .clone() + .lock_any_for_request(peers_assumed_know_blocks.into_iter().collect()) + .await; + + total_attempts -= 1; + let result = self .network_service .clone() .call_proof_request( self.network_chain_index, - target, + request_lock, config.clone(), timeout_per_request, ) diff --git a/light-base/src/sync_service/standalone.rs b/light-base/src/sync_service/standalone.rs index 1e6aaa1fd2..eca772543f 100644 --- a/light-base/src/sync_service/standalone.rs +++ b/light-base/src/sync_service/standalone.rs @@ -130,7 +130,7 @@ pub(super) async fn start_standalone_chain( // Start a networking request (block requests, warp sync requests, etc.) that the // syncing state machine would like to start. - if task.start_next_request() { + if task.start_next_request().await { queue_empty = false; } @@ -401,18 +401,36 @@ impl Task { /// Starts one network request if any is necessary. /// /// Returns `true` if a request has been started. - fn start_next_request(&mut self) -> bool { + async fn start_next_request(&mut self) -> bool { // `desired_requests()` returns, in decreasing order of priority, the requests // that should be started in order for the syncing to proceed. The fact that multiple // requests are returned could be used to filter out undesired one. We use this // filtering to enforce a maximum of one ongoing request per source. - let (source_id, _, mut request_detail) = match self - .sync - .desired_requests() - .find(|(source_id, _, _)| self.sync.source_num_ongoing_requests(*source_id) == 0) - { - Some(v) => v, - None => return false, + let (source_id, request_lock, mut request_detail) = { + // We need to collect the desired requests, as we can't keep the iterator alive + // during an await point. + let mut desired_requests_iter = self + .sync + .desired_requests() + .map(|(src_id, _, details)| (src_id, details)) + .collect::>() + .into_iter(); + + loop { + let Some((source_id, request_details)) = desired_requests_iter.next() + // TODO: if no peer can be locked because they're all busy with something else, the syncing will not wake up once they are + else { return false }; + + let peer_id = self.sync[source_id].0.clone(); + if let Some(request_lock) = self + .network_service + .clone() + .try_lock_peer_for_request(peer_id) + .await + { + break (source_id, request_lock, request_details); + } + } }; // Before inserting the request back to the syncing state machine, clamp the number @@ -432,10 +450,8 @@ impl Task { request_bodies, request_justification, } => { - let peer_id = self.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue - let block_request = self.network_service.clone().blocks_request( - peer_id, + request_lock, self.network_chain_index, network::protocol::BlocksRequestConfig { start: if let Some(first_block_hash) = first_block_hash { @@ -475,10 +491,8 @@ impl Task { all::DesiredRequest::GrandpaWarpSync { sync_start_block_hash, } => { - let peer_id = self.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue - let grandpa_request = self.network_service.clone().grandpa_warp_sync_request( - peer_id, + request_lock, self.network_chain_index, sync_start_block_hash, // The timeout needs to be long enough to potentially download the maximum @@ -509,11 +523,9 @@ impl Task { ref keys, .. } => { - let peer_id = self.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue - let storage_request = self.network_service.clone().storage_proof_request( self.network_chain_index, - peer_id, + request_lock, network::protocol::StorageProofRequestConfig { block_hash, keys: keys.clone().into_iter(), @@ -551,7 +563,6 @@ impl Task { ref function_name, ref parameter_vectored, } => { - let peer_id = self.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue let network_service = self.network_service.clone(); let network_chain_index = self.network_chain_index; // TODO: all this copying is done because of lifetime requirements in NetworkService::call_proof_request; maybe check if it can be avoided @@ -561,7 +572,7 @@ impl Task { let call_proof_request = async move { let rq = network_service.call_proof_request( network_chain_index, - peer_id, + request_lock, network::protocol::CallProofRequestConfig { block_hash, method: &function_name, diff --git a/wasm-node/CHANGELOG.md b/wasm-node/CHANGELOG.md index c20bca5aff..eafc075af1 100644 --- a/wasm-node/CHANGELOG.md +++ b/wasm-node/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +### Changed + +- A limit of one simultaneous request per peer is now enforced in order to limit the load that a light client induces on each full node it is connected to. This limit is theoretically part of the Polkadot networking protocol but in practice isn't properly enforced. Consequently, requests that were previously being executed in parallel might now execute more one after the other. ([#779](https://github.com/smol-dot/smoldot/pull/779)) + ## 1.0.12 - 2023-07-10 ### Changed