From 342fdf6d51c29186b725ee5be32bddf7fd6c9464 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Sun, 17 Dec 2023 17:20:01 +0100 Subject: [PATCH 01/14] Add `ban_and_disconnect` to full node networking --- full-node/src/consensus_service.rs | 15 ++++++ full-node/src/network_service.rs | 79 ++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+) diff --git a/full-node/src/consensus_service.rs b/full-node/src/consensus_service.rs index bd68cb3a16..d082b9d6a6 100644 --- a/full-node/src/consensus_service.rs +++ b/full-node/src/consensus_service.rs @@ -1060,6 +1060,19 @@ impl SyncBackground { } WakeUpReason::RequestFinished(request_id, source_id, result) => { + if result.is_err() { + // Note that we perform the ban even if the source is now disconnected. + let peer_id = self.sync[source_id].as_ref().unwrap().peer_id.clone(); + self.network_service + .ban_and_disconnect( + peer_id, + self.network_chain_id, + network_service::BanSeverity::Low, + "blocks-request-error", + ) + .await; + } + // TODO: clarify this piece of code let result = result.map_err(|_| ()); let (_, response_outcome) = self.sync.blocks_request_response( @@ -1616,6 +1629,7 @@ impl SyncBackground { | all::ProcessOne::WarpSyncBuildChainInformation(_) | all::ProcessOne::WarpSyncFinished { .. } => unreachable!(), all::ProcessOne::VerifyBlock(verify) => { + // TODO: ban peer in case of verification failure let when_verification_started = Instant::now(); let mut database_accesses_duration = Duration::new(0, 0); let mut runtime_build_duration = Duration::new(0, 0); @@ -2053,6 +2067,7 @@ impl SyncBackground { } all::ProcessOne::VerifyFinalityProof(verify) => { + // TODO: ban peer in case of verification failure match verify.perform(rand::random()) { ( sync_out, diff --git a/full-node/src/network_service.rs b/full-node/src/network_service.rs index fac810fa0a..9ad6edf0a0 100644 --- a/full-node/src/network_service.rs +++ b/full-node/src/network_service.rs @@ -173,6 +173,12 @@ pub struct NetworkService { } enum ToBackground { + ForegroundDisconnectAndBan { + peer_id: PeerId, + chain_id: ChainId, + severity: BanSeverity, + reason: &'static str, + }, ForegroundAnnounceBlock { target: PeerId, chain_id: ChainId, @@ -298,6 +304,12 @@ struct Chain { max_in_peers: usize, } +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum BanSeverity { + Low, + High, +} + impl NetworkService { /// Initializes the network service with the given configuration. pub async fn new( @@ -562,6 +574,37 @@ impl NetworkService { .await; } + /// Starts asynchronously disconnect the given peer. A [`Event::Disconnected`] will later be + /// generated. Prevents a new gossip link with the same peer from being reopened for a + /// little while. + /// + /// `reason` is a human-readable string printed in the logs. + /// + /// Due to race conditions, it is possible to reconnect to the peer soon after, in case the + /// reconnection was already happening as the call to this function is still being processed. + /// If that happens another [`Event::Disconnected`] will be delivered afterwards. In other + /// words, this function guarantees that we will be disconnected in the future rather than + /// guarantees that we will disconnect. + pub async fn ban_and_disconnect( + &self, + peer_id: PeerId, + chain_id: ChainId, + severity: BanSeverity, + reason: &'static str, + ) { + let _ = self + .to_background_tx + .lock() + .await + .send(ToBackground::ForegroundDisconnectAndBan { + peer_id, + chain_id, + severity, + reason, + }) + .await; + } + pub async fn send_block_announce( self: Arc, target: PeerId, @@ -982,6 +1025,42 @@ async fn background_task(mut inner: Inner) { return; } + WakeUpReason::Message(ToBackground::ForegroundDisconnectAndBan { + peer_id, + chain_id, + severity, + reason, + }) => { + // Note that peer doesn't necessarily have an out slot. + inner.peering_strategy.unassign_slot_and_ban( + &chain_id, + &peer_id, + Instant::now() + + Duration::from_secs(match severity { + BanSeverity::High => 45, + BanSeverity::Low => 10, + }), + ); + let _ = inner.network.gossip_close( + chain_id, + &peer_id, + service::GossipKind::ConsensusTransactions, + ); + if inner.network.gossip_remove_desired( + chain_id, + &peer_id, + service::GossipKind::ConsensusTransactions, + ) { + inner.log_callback.log( + LogLevel::Debug, + format!( + "slot-unassigned; peer_id={}; chain={}; reason=user-ban; user-reason={}", + peer_id, inner.network[chain_id].log_name, reason + ), + ); + } + } + WakeUpReason::Message(ToBackground::ForegroundAnnounceBlock { target, chain_id, From 2a04b9a9bd04d1375d96c85a56cb6d77bf74f11d Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Sun, 17 Dec 2023 17:31:42 +0100 Subject: [PATCH 02/14] Add `ban_and_disconnect` to light client network service --- full-node/src/network_service.rs | 5 +- light-base/src/network_service.rs | 83 +++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+), 3 deletions(-) diff --git a/full-node/src/network_service.rs b/full-node/src/network_service.rs index 9ad6edf0a0..a08f99b3d9 100644 --- a/full-node/src/network_service.rs +++ b/full-node/src/network_service.rs @@ -304,10 +304,10 @@ struct Chain { max_in_peers: usize, } +/// Severity of a ban. See [`NetworkService::ban_and_disconnect`]. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum BanSeverity { Low, - High, } impl NetworkService { @@ -574,7 +574,7 @@ impl NetworkService { .await; } - /// Starts asynchronously disconnect the given peer. A [`Event::Disconnected`] will later be + /// Starts asynchronously disconnecting the given peer. A [`Event::Disconnected`] will later be /// generated. Prevents a new gossip link with the same peer from being reopened for a /// little while. /// @@ -1037,7 +1037,6 @@ async fn background_task(mut inner: Inner) { &peer_id, Instant::now() + Duration::from_secs(match severity { - BanSeverity::High => 45, BanSeverity::Low => 10, }), ); diff --git a/light-base/src/network_service.rs b/light-base/src/network_service.rs index ad0713e3d9..7103ee6efd 100644 --- a/light-base/src/network_service.rs +++ b/light-base/src/network_service.rs @@ -285,6 +285,12 @@ pub struct NetworkServiceChain { marker: core::marker::PhantomData, } +/// Severity of a ban. See [`NetworkService::ban_and_disconnect`]. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum BanSeverity { + Low, +} + impl NetworkServiceChain { /// Subscribes to the networking events that happen on the given chain. /// @@ -318,6 +324,33 @@ impl NetworkServiceChain { rx } + /// Starts asynchronously disconnecting the given peer. A [`Event::Disconnected`] will later be + /// generated. Prevents a new gossip link with the same peer from being reopened for a + /// little while. + /// + /// `reason` is a human-readable string printed in the logs. + /// + /// Due to race conditions, it is possible to reconnect to the peer soon after, in case the + /// reconnection was already happening as the call to this function is still being processed. + /// If that happens another [`Event::Disconnected`] will be delivered afterwards. In other + /// words, this function guarantees that we will be disconnected in the future rather than + /// guarantees that we will disconnect. + pub async fn ban_and_disconnect( + &self, + peer_id: PeerId, + severity: BanSeverity, + reason: &'static str, + ) { + let _ = self + .messages_tx + .send(ToBackgroundChain::DisconnectAndBan { + peer_id, + severity, + reason, + }) + .await; + } + /// Sends a blocks request to the given peer. // TODO: more docs pub async fn blocks_request( @@ -763,6 +796,11 @@ enum ToBackgroundChain { Subscribe { sender: async_channel::Sender, }, + DisconnectAndBan { + peer_id: PeerId, + severity: BanSeverity, + reason: &'static str, + }, // TODO: serialize the request before sending over channel StartBlocksRequest { target: PeerId, // TODO: takes by value because of future longevity issue @@ -1264,6 +1302,51 @@ async fn background_task(mut task: BackgroundTask) { WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::Subscribe { sender }) => { task.pending_new_subscriptions.push((chain_id, sender)); } + WakeUpReason::MessageForChain( + chain_id, + ToBackgroundChain::DisconnectAndBan { + peer_id, + severity, + reason, + }, + ) => { + let ban_duration = Duration::from_secs(match severity { + BanSeverity::Low => 10, + }); + + let had_slot = matches!( + task.peering_strategy.unassign_slot_and_ban( + &chain_id, + &peer_id, + task.platform.now() + ban_duration, + ), + basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true } + ); + + let _ = task.network.gossip_close( + chain_id, + &peer_id, + service::GossipKind::ConsensusTransactions, + ); + + if had_slot { + log::debug!( + target: "network", + "Slots({}) ∌ {} (reason=user-ban, ban-duration={:?}, user-reason={})", + &task.network[chain_id].log_name, + peer_id, + ban_duration, + reason + ); + task.network.gossip_remove_desired( + chain_id, + &peer_id, + service::GossipKind::ConsensusTransactions, + ); + } + + task.open_gossip_links.remove(&(chain_id, peer_id)); + } WakeUpReason::MessageForChain( chain_id, ToBackgroundChain::StartBlocksRequest { From 8f77c2c36b07b7c586b2264fc86d3e90457aea6e Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Sun, 17 Dec 2023 17:31:48 +0100 Subject: [PATCH 03/14] CHANGELOG --- wasm-node/CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/wasm-node/CHANGELOG.md b/wasm-node/CHANGELOG.md index 8b2d6b11e5..250e176030 100644 --- a/wasm-node/CHANGELOG.md +++ b/wasm-node/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +### Changed + +- When a network request fails, smoldot now closes the gossip link with the peer it tried to send the request to, and will not re-open a new gossip link for a little while. + ## 2.0.14 - 2023-12-11 ### Fixed From afd00f8a6ee017155522f469a954836b1fb8eefc Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 19 Dec 2023 21:36:22 +0100 Subject: [PATCH 04/14] Ban peers in full node --- full-node/src/consensus_service.rs | 91 +++++++++++++++++++++++++++--- full-node/src/network_service.rs | 2 + lib/src/sync/all.rs | 14 +++++ lib/src/sync/all_forks.rs | 5 ++ lib/src/sync/optimistic.rs | 12 ++++ 5 files changed, 116 insertions(+), 8 deletions(-) diff --git a/full-node/src/consensus_service.rs b/full-node/src/consensus_service.rs index 34ebe5f7f4..0635900a21 100644 --- a/full-node/src/consensus_service.rs +++ b/full-node/src/consensus_service.rs @@ -1178,6 +1178,17 @@ impl SyncBackground { source_id, result: Err(_), }) => { + // Note that we perform the ban even if the source is now disconnected. + let peer_id = self.sync[source_id].as_ref().unwrap().peer_id.clone(); + self.network_service + .ban_and_disconnect( + peer_id, + self.network_chain_id, + network_service::BanSeverity::Low, + "warp-sync-request-error", + ) + .await; + let _ = self.sync.grandpa_warp_sync_response_err(request_id); // If the source was actually disconnected and has no other request in @@ -1203,7 +1214,19 @@ impl SyncBackground { source_id, result, }) => { - // Storage proof request. + if result.is_err() { + // Note that we perform the ban even if the source is now disconnected. + let peer_id = self.sync[source_id].as_ref().unwrap().peer_id.clone(); + self.network_service + .ban_and_disconnect( + peer_id, + self.network_chain_id, + network_service::BanSeverity::Low, + "storage-proof-request-error", + ) + .await; + } + let _ = self .sync .storage_get_response(request_id, result.map(|r| r.decode().to_owned())); @@ -1231,7 +1254,19 @@ impl SyncBackground { source_id, result, }) => { - // Successful call proof request. + if result.is_err() { + // Note that we perform the ban even if the source is now disconnected. + let peer_id = self.sync[source_id].as_ref().unwrap().peer_id.clone(); + self.network_service + .ban_and_disconnect( + peer_id, + self.network_chain_id, + network_service::BanSeverity::Low, + "call-proof-request-error", + ) + .await; + } + self.sync .call_proof_response(request_id, result.map(|r| r.decode().to_owned())); // TODO: need help from networking service to avoid this to_owned @@ -1866,6 +1901,17 @@ impl SyncBackground { ); } Err(err) => { + if let Some(sender) = &sender { + self.network_service + .ban_and_disconnect( + sender.clone(), + self.network_chain_id, + network_service::BanSeverity::High, + "bad-warp-sync-fragment", + ) + .await; + } + self.log_callback.log( LogLevel::Warn, format!( @@ -2345,7 +2391,13 @@ impl SyncBackground { } all::ProcessOne::VerifyFinalityProof(verify) => { - // TODO: ban peer in case of verification failure + let sender = verify + .sender() + .1 + .as_ref() + .map(|s| s.peer_id.clone()) + .unwrap(); + match verify.perform(rand::random()) { ( sync_out, @@ -2365,7 +2417,7 @@ impl SyncBackground { self.log_callback.log( LogLevel::Debug, format!( - "finality-proof-verification; outcome=success; new-finalized={}", + "finality-proof-verification; outcome=success, sender={sender}, new-finalized={}", HashDisplay(&new_finalized_hash) ), ); @@ -2420,7 +2472,8 @@ impl SyncBackground { (sync_out, all::FinalityProofVerifyOutcome::GrandpaCommitPending) => { self.log_callback.log( LogLevel::Debug, - "finality-proof-verification; outcome=pending".to_string(), + "finality-proof-verification; outcome=pending, sender={sender}" + .to_string(), ); self.sync = sync_out; (self, true) @@ -2428,23 +2481,45 @@ impl SyncBackground { (sync_out, all::FinalityProofVerifyOutcome::AlreadyFinalized) => { self.log_callback.log( LogLevel::Debug, - "finality-proof-verification; outcome=already-finalized".to_string(), + "finality-proof-verification; outcome=already-finalized, sender={sender}".to_string(), ); self.sync = sync_out; (self, true) } (sync_out, all::FinalityProofVerifyOutcome::GrandpaCommitError(error)) => { + self.network_service + .ban_and_disconnect( + sender.clone(), + self.network_chain_id, + network_service::BanSeverity::High, + "bad-warp-sync-fragment", + ) + .await; self.log_callback.log( LogLevel::Warn, - format!("finality-proof-verification-failure; error={}", error), + format!( + "finality-proof-verification-failure; sender={sender}, error={}", + error + ), ); self.sync = sync_out; (self, true) } (sync_out, all::FinalityProofVerifyOutcome::JustificationError(error)) => { + self.network_service + .ban_and_disconnect( + sender.clone(), + self.network_chain_id, + network_service::BanSeverity::High, + "bad-warp-sync-fragment", + ) + .await; self.log_callback.log( LogLevel::Warn, - format!("finality-proof-verification-failure; error={}", error), + format!( + "finality-proof-verification-failure; sender={sender}, error={}", + error + ), ); self.sync = sync_out; (self, true) diff --git a/full-node/src/network_service.rs b/full-node/src/network_service.rs index 5ee00b5fbd..0fec7cbed3 100644 --- a/full-node/src/network_service.rs +++ b/full-node/src/network_service.rs @@ -347,6 +347,7 @@ struct Chain { #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum BanSeverity { Low, + High, } impl NetworkService { @@ -1117,6 +1118,7 @@ async fn background_task(mut inner: Inner) { Instant::now() + Duration::from_secs(match severity { BanSeverity::Low => 10, + BanSeverity::High => 40, }), ); let _ = inner.network.gossip_close( diff --git a/lib/src/sync/all.rs b/lib/src/sync/all.rs index 3d8ab1d2cd..8b5025b595 100644 --- a/lib/src/sync/all.rs +++ b/lib/src/sync/all.rs @@ -2476,6 +2476,20 @@ enum FinalityProofVerifyInner { } impl FinalityProofVerify { + /// Returns the source the justification was obtained from. + pub fn sender(&self) -> (SourceId, &TSrc) { + match &self.inner { + FinalityProofVerifyInner::AllForks(inner) => { + let sender = inner.sender().1; + (sender.outer_source_id, &sender.user_data) + } + FinalityProofVerifyInner::Optimistic(inner) => { + let sender = inner.sender().1; + (sender.outer_source_id, &sender.user_data) + } + } + } + /// Perform the verification. /// /// A randomness seed must be provided and will be used during the verification. Note that the diff --git a/lib/src/sync/all_forks.rs b/lib/src/sync/all_forks.rs index 0b71f5c4e9..f899ae813b 100644 --- a/lib/src/sync/all_forks.rs +++ b/lib/src/sync/all_forks.rs @@ -2180,6 +2180,11 @@ pub struct FinalityProofVerify { } impl FinalityProofVerify { + /// Returns the source the justification was obtained from. + pub fn sender(&self) -> (SourceId, &TSrc) { + (self.source_id, &self.parent[self.source_id]) + } + /// Perform the verification. /// /// A randomness seed must be provided and will be used during the verification. Note that the diff --git a/lib/src/sync/optimistic.rs b/lib/src/sync/optimistic.rs index 5ff54cd8c3..e73d0b6152 100644 --- a/lib/src/sync/optimistic.rs +++ b/lib/src/sync/optimistic.rs @@ -159,6 +159,7 @@ struct OptimisticSyncInner { verification_queue::VerificationQueue<(RequestId, TRq), RequestSuccessBlock>, /// Justifications, if any, of the block that has just been verified. + // TODO: clean up when a source is removed pending_encoded_justifications: vec::IntoIter<([u8; 4], Vec, SourceId)>, /// Identifier to assign to the next request. @@ -1041,6 +1042,17 @@ pub struct JustificationVerify { } impl JustificationVerify { + /// Returns the source the justification was obtained from. + pub fn sender(&self) -> (SourceId, &TSrc) { + let (_, _, source_id) = self + .inner + .pending_encoded_justifications + .as_slice() + .first() + .unwrap(); + (*source_id, &self.inner.sources[source_id].user_data) + } + /// Verify the justification. /// /// A randomness seed must be provided and will be used during the verification. Note that the From 378d47d9e487d5c8b9b762d6880e192fb0e004f3 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 19 Dec 2023 21:37:57 +0100 Subject: [PATCH 05/14] PR link --- wasm-node/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wasm-node/CHANGELOG.md b/wasm-node/CHANGELOG.md index 250e176030..00270d3b12 100644 --- a/wasm-node/CHANGELOG.md +++ b/wasm-node/CHANGELOG.md @@ -4,7 +4,7 @@ ### Changed -- When a network request fails, smoldot now closes the gossip link with the peer it tried to send the request to, and will not re-open a new gossip link for a little while. +- When a network request fails, smoldot now closes the gossip link with the peer it tried to send the request to, and will not re-open a new gossip link for a little while. ([#1482](https://github.com/smol-dot/smoldot/pull/1482)) ## 2.0.14 - 2023-12-11 From 695a34ee231715efbc7258dd1a3332598258de77 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 19 Dec 2023 22:08:47 +0100 Subject: [PATCH 06/14] Prepare ban locations in light client --- full-node/src/consensus_service.rs | 2 +- light-base/src/network_service.rs | 2 + light-base/src/sync_service.rs | 58 ++++++++++++++-- light-base/src/sync_service/standalone.rs | 85 +++++++++++++++++++---- 4 files changed, 125 insertions(+), 22 deletions(-) diff --git a/full-node/src/consensus_service.rs b/full-node/src/consensus_service.rs index 0635900a21..3a6136af64 100644 --- a/full-node/src/consensus_service.rs +++ b/full-node/src/consensus_service.rs @@ -1038,7 +1038,7 @@ impl SyncBackground { all::BlockAnnounceOutcome::NotFinalizedChain => {} all::BlockAnnounceOutcome::Discarded => {} all::BlockAnnounceOutcome::StoredForLater {} => {} - all::BlockAnnounceOutcome::InvalidHeader(_) => unreachable!(), + all::BlockAnnounceOutcome::InvalidHeader(_) => unreachable!(), // TODO: ?!?! why unreachable? also, ban the peer } } WakeUpReason::NetworkEvent(network_service::Event::GrandpaNeighborPacket { diff --git a/light-base/src/network_service.rs b/light-base/src/network_service.rs index 7103ee6efd..9788df66a7 100644 --- a/light-base/src/network_service.rs +++ b/light-base/src/network_service.rs @@ -289,6 +289,7 @@ pub struct NetworkServiceChain { #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum BanSeverity { Low, + High, } impl NetworkServiceChain { @@ -1312,6 +1313,7 @@ async fn background_task(mut task: BackgroundTask) { ) => { let ban_duration = Duration::from_secs(match severity { BanSeverity::Low => 10, + BanSeverity::High => 40, }); let had_slot = matches!( diff --git a/light-base/src/sync_service.rs b/light-base/src/sync_service.rs index 3c5e9dfdff..0b478e2d7d 100644 --- a/light-base/src/sync_service.rs +++ b/light-base/src/sync_service.rs @@ -334,11 +334,20 @@ impl SyncService { let mut result = match self .network_service .clone() - .blocks_request(target, request_config.clone(), timeout_per_request) + .blocks_request(target.clone(), request_config.clone(), timeout_per_request) .await { Ok(b) if !b.is_empty() => b, - Ok(_) | Err(_) => continue, + Ok(_) | Err(_) => { + self.network_service + .ban_and_disconnect( + target, + network_service::BanSeverity::Low, + "blocks-request-failed", + ) + .await; + continue; + } }; return Ok(result.remove(0)); @@ -379,7 +388,12 @@ impl SyncService { .await { Ok(b) if !b.is_empty() => b, - Ok(_) | Err(_) => continue, + Ok(_) | Err(_) => { + // Because we have no idea whether the block is canonical, it might be + // totally legitimate for the peer to refuse the request. For this reason, + // we don't ban it. + continue; + } }; return Ok(result.remove(0)); @@ -553,7 +567,7 @@ impl SyncService { .network_service .clone() .storage_proof_request( - target, + target.clone(), codec::StorageProofRequestConfig { block_hash: *block_hash, keys: keys_to_request.into_iter(), @@ -580,6 +594,13 @@ impl SyncService { network_service::StorageProofRequestError::RequestTooLarge ) || response_nodes_cap == 1 { + self.network_service + .ban_and_disconnect( + target, + network_service::BanSeverity::Low, + "storage-request-failed", + ) + .await; outcome_errors.push(StorageQueryErrorDetail::Network(err)); } @@ -596,6 +617,13 @@ impl SyncService { }) { Ok(d) => d, Err(err) => { + self.network_service + .ban_and_disconnect( + target, + network_service::BanSeverity::High, + "bad-merkle-proof", + ) + .await; outcome_errors.push(StorageQueryErrorDetail::ProofVerification(err)); continue; } @@ -796,20 +824,36 @@ impl SyncService { let result = self .network_service .clone() - .call_proof_request(target, config.clone(), timeout_per_request) + .call_proof_request(target.clone(), config.clone(), timeout_per_request) .await; match result { Ok(value) if !value.decode().is_empty() => return Ok(value), // TODO: this check of emptiness is a bit of a hack; it is necessary because Substrate responds to requests about blocks it doesn't know with an empty proof - Ok(_) => outcome_errors.push(network_service::CallProofRequestError::Request( + Ok(_) => { + self.network_service + .ban_and_disconnect( + target, + network_service::BanSeverity::Low, + "call-proof-request-failed", + ) + .await; + outcome_errors.push(network_service::CallProofRequestError::Request( service::CallProofRequestError::Request( smoldot::network::service::RequestError::Substream( smoldot::libp2p::connection::established::RequestError::SubstreamClosed, ), ), - )), + )) + } Err(err) => { + self.network_service + .ban_and_disconnect( + target, + network_service::BanSeverity::Low, + "call-proof-request-failed", + ) + .await; outcome_errors.push(err); } } diff --git a/light-base/src/sync_service/standalone.rs b/light-base/src/sync_service/standalone.rs index bf31becb0e..c110201b88 100644 --- a/light-base/src/sync_service/standalone.rs +++ b/light-base/src/sync_service/standalone.rs @@ -316,7 +316,7 @@ pub(super) async fn start_standalone_chain( target: &task.log_target, "Failed to decode header in block announce received from {}. Error: {}", peer_id, error, - ) + ); } } @@ -359,6 +359,14 @@ pub(super) async fn start_standalone_chain( announce_block_height, peer_id ); + + task.network_service + .ban_and_disconnect( + peer_id, + network_service::BanSeverity::Low, + "announced-block-below-known-finalized", + ) + .await; } all::BlockAnnounceOutcome::NotFinalizedChain => { log::debug!( @@ -374,6 +382,13 @@ pub(super) async fn start_standalone_chain( } all::BlockAnnounceOutcome::InvalidHeader(_) => { // Log messages are already printed above. + task.network_service + .ban_and_disconnect( + peer_id, + network_service::BanSeverity::High, + "bad-block-announce", + ) + .await; } } } @@ -605,7 +620,13 @@ pub(super) async fn start_standalone_chain( WakeUpReason::RequestFinished(request_id, Ok(RequestOutcome::Block(Err(_)))) => { // Failed block request. - // TODO: should disconnect peer + task.network_service + .ban_and_disconnect( + todo!(), + network_service::BanSeverity::Low, + "failed-blocks-request", + ) + .await; task.sync .blocks_request_response(request_id, Err::, _>(())); } @@ -627,12 +648,27 @@ pub(super) async fn start_standalone_chain( WakeUpReason::RequestFinished(request_id, Ok(RequestOutcome::WarpSync(Err(_)))) => { // Failed warp sync request. - // TODO: should disconnect peer + task.network_service + .ban_and_disconnect( + todo!(), + network_service::BanSeverity::Low, + "failed-warp-sync-request", + ) + .await; task.sync.grandpa_warp_sync_response_err(request_id); } WakeUpReason::RequestFinished(request_id, Ok(RequestOutcome::Storage(r))) => { // Storage proof request. + if r.is_err() { + task.network_service + .ban_and_disconnect( + todo!(), + network_service::BanSeverity::Low, + "failed-storage-request", + ) + .await; + } task.sync.storage_get_response(request_id, r); } @@ -645,6 +681,13 @@ pub(super) async fn start_standalone_chain( WakeUpReason::RequestFinished(request_id, Ok(RequestOutcome::CallProof(Err(err)))) => { // Failed call proof request. + task.network_service + .ban_and_disconnect( + todo!(), + network_service::BanSeverity::Low, + "failed-call-proof-request", + ) + .await; task.sync.call_proof_response(request_id, Err(err)); } @@ -971,7 +1014,6 @@ impl Task { ); } Err(err) => { - // TODO: should disconnect peer log::debug!(target: &self.log_target, "Sync => WarpSyncRuntimeBuild(error={})", err); if !matches!(err, all::WarpSyncBuildRuntimeError::SourceMisbehavior(_)) { log::warn!(target: &self.log_target, "Failed to compile runtime during warp syncing process: {}", err); @@ -988,7 +1030,6 @@ impl Task { log::debug!(target: &self.log_target, "Sync => WarpSyncBuildChainInformation(success=true)") } Err(err) => { - // TODO: should disconnect peer log::debug!(target: &self.log_target, "Sync => WarpSyncBuildChainInformation(error={})", err); if !matches!( err, @@ -1040,10 +1081,9 @@ impl Task { all::ProcessOne::VerifyWarpSyncFragment(verify) => { // Grandpa warp sync fragment to verify. - let sender_peer_id = verify + let sender_if_still_connected = verify .proof_sender() - .map(|(_, (peer_id, _))| Cow::Owned(peer_id.to_string())) // TODO: unnecessary cloning most of the time - .unwrap_or(Cow::Borrowed("")); + .map(|(_, (peer_id, _))| peer_id.clone()); let (sync, result) = verify.perform({ let mut seed = [0; 32]; @@ -1058,25 +1098,31 @@ impl Task { log::debug!( target: &self.log_target, "Sync => WarpSyncFragmentVerified(sender={}, verified_hash={}, verified_height={fragment_number})", - sender_peer_id, + sender_if_still_connected.as_ref().map(|p| Cow::Owned(p.to_base58())).unwrap_or(Cow::Borrowed("")), HashDisplay(&fragment_hash) ); } Err(err) => { - // TODO: should disconnect peer - let maybe_forced_change = - matches!(err, all::VerifyFragmentError::JustificationVerify(_)); log::warn!( target: &self.log_target, "Failed to verify warp sync fragment from {}: {}{}", - sender_peer_id, + sender_if_still_connected.as_ref().map(|p| Cow::Owned(p.to_base58())).unwrap_or(Cow::Borrowed("")), err, - if maybe_forced_change { + if matches!(err, all::VerifyFragmentError::JustificationVerify(_)) { ". This might be caused by a forced GrandPa authorities change having \ been enacted on the chain. If this is the case, please update the \ chain specification with a checkpoint past this forced change." } else { "" } ); + if let Some(sender_if_still_connected) = sender_if_still_connected { + self.network_service + .ban_and_disconnect( + sender_if_still_connected, + network_service::BanSeverity::High, + "bad-warp-sync-fragment", + ) + .await; + } } } } @@ -1223,6 +1269,17 @@ impl Task { HashDisplay(&verified_hash), error ); + + // TODO: ban peers that have announced the block + /*for peer_id in self.sync.knows_non_finalized_block(height, hash) { + self.network_service + .ban_and_disconnect( + peer_id, + network_service::BanSeverity::High, + "bad-block", + ) + .await; + }*/ } } } From 0f4808917d667752a75f79a130d6be7140104275 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 19 Dec 2023 22:33:25 +0100 Subject: [PATCH 07/14] Properly ban in case of request failure --- lib/src/sync/all.rs | 23 ++++++++++++++++++- lib/src/sync/all_forks.rs | 10 ++++++++ lib/src/sync/all_forks/pending_blocks.rs | 2 +- lib/src/sync/optimistic.rs | 19 +++++++++++++++ lib/src/sync/optimistic/verification_queue.rs | 11 +++++++++ lib/src/sync/warp_sync.rs | 10 ++++++++ light-base/src/sync_service/standalone.rs | 15 ++++++++---- 7 files changed, 83 insertions(+), 7 deletions(-) diff --git a/lib/src/sync/all.rs b/lib/src/sync/all.rs index 8b5025b595..aafa9ddbda 100644 --- a/lib/src/sync/all.rs +++ b/lib/src/sync/all.rs @@ -492,7 +492,7 @@ impl AllSync { } /// Removes a source from the state machine. Returns the user data of this source, and all - /// the requests that this source were expected to perform. + /// the requests that this source was expected to perform. /// /// # Panic /// @@ -1143,6 +1143,27 @@ impl AllSync { } } + /// Returns the [`SourceId`] that is expected to fulfill the given request. + /// + /// # Panic + /// + /// Panics if the [`RequestId`] is invalid. + /// + pub fn request_source_id(&self, request_id: RequestId) -> SourceId { + match (&self.inner, &self.shared.requests[request_id.0]) { + (AllSyncInner::AllForks(inner), RequestMapping::AllForks(rq)) => { + inner[inner.request_source_id(*rq)].outer_source_id + } + (AllSyncInner::Optimistic { inner }, RequestMapping::Optimistic(rq)) => { + inner[inner.request_source_id(*rq)].outer_source_id + } + (AllSyncInner::WarpSync { inner, .. }, RequestMapping::WarpSync(rq)) => { + inner[inner.request_source_id(*rq)].outer_source_id + } + _ => unreachable!(), + } + } + /// Process the next block in the queue of verification. /// /// This method takes ownership of the [`AllSync`] and starts a verification process. The diff --git a/lib/src/sync/all_forks.rs b/lib/src/sync/all_forks.rs index f899ae813b..cd6e9bf450 100644 --- a/lib/src/sync/all_forks.rs +++ b/lib/src/sync/all_forks.rs @@ -787,6 +787,16 @@ impl AllForksSync { self.inner.blocks.obsolete_requests() } + /// Returns the [`SourceId`] that is expected to fulfill the given request. + /// + /// # Panic + /// + /// Panics if the [`RequestId`] is invalid. + /// + pub fn request_source_id(&self, request_id: RequestId) -> SourceId { + self.inner.blocks.request_source_id(request_id) + } + /// Call in response to a blocks request being successful. /// /// This method takes ownership of the [`AllForksSync`] and puts it in a mode where the blocks diff --git a/lib/src/sync/all_forks/pending_blocks.rs b/lib/src/sync/all_forks/pending_blocks.rs index 24d73579e5..6075b58590 100644 --- a/lib/src/sync/all_forks/pending_blocks.rs +++ b/lib/src/sync/all_forks/pending_blocks.rs @@ -873,7 +873,7 @@ impl PendingBlocks { /// Panics if the [`RequestId`] is invalid. /// #[track_caller] - pub fn request_source(&self, request_id: RequestId) -> SourceId { + pub fn request_source_id(&self, request_id: RequestId) -> SourceId { self.requests.get(request_id.0).unwrap().source_id } diff --git a/lib/src/sync/optimistic.rs b/lib/src/sync/optimistic.rs index e73d0b6152..7833dc1b5e 100644 --- a/lib/src/sync/optimistic.rs +++ b/lib/src/sync/optimistic.rs @@ -670,6 +670,25 @@ impl OptimisticSync { user_data } + /// Returns the [`SourceId`] that is expected to fulfill the given request. + /// + /// # Panic + /// + /// Panics if the [`RequestId`] is invalid. + /// + pub fn request_source_id(&self, request_id: RequestId) -> SourceId { + if let Some((src, _)) = self.inner.obsolete_requests.get(&request_id) { + *src + } else { + self.inner + .verification_queue + .requests() + .find(|(rq, _)| rq.0 == request_id) + .unwrap() + .1 + } + } + /// Process the next block in the queue of verification. /// /// This method takes ownership of the [`OptimisticSync`]. The [`OptimisticSync`] is yielded diff --git a/lib/src/sync/optimistic/verification_queue.rs b/lib/src/sync/optimistic/verification_queue.rs index 4295319b61..025bf1e050 100644 --- a/lib/src/sync/optimistic/verification_queue.rs +++ b/lib/src/sync/optimistic/verification_queue.rs @@ -432,6 +432,17 @@ impl VerificationQueue { ) } + /// Returns an iterator to all the requests that are inside of the queue. + pub fn requests(&'_ self) -> impl Iterator + '_ { + self.verification_queue.iter().filter_map(|queue_elem| { + if let VerificationQueueEntryTy::Requested { user_data, source } = &queue_elem.ty { + Some((user_data, *source)) + } else { + None + } + }) + } + /// Consumes the queue and returns an iterator to all the requests that were inside of it. pub fn into_requests(self) -> impl Iterator { self.verification_queue diff --git a/lib/src/sync/warp_sync.rs b/lib/src/sync/warp_sync.rs index 1227d449aa..7242403ee3 100644 --- a/lib/src/sync/warp_sync.rs +++ b/lib/src/sync/warp_sync.rs @@ -1042,6 +1042,16 @@ impl WarpSync { user_data } + /// Returns the [`SourceId`] that is expected to fulfill the given request. + /// + /// # Panic + /// + /// Panics if the [`RequestId`] is invalid. + /// + pub fn request_source_id(&self, request_id: RequestId) -> SourceId { + self.in_progress_requests[request_id.0].0 + } + /// Injects a successful Merkle proof and removes the given request from the state machine. /// Returns the user data that was associated to it. /// diff --git a/light-base/src/sync_service/standalone.rs b/light-base/src/sync_service/standalone.rs index c110201b88..127a1080f1 100644 --- a/light-base/src/sync_service/standalone.rs +++ b/light-base/src/sync_service/standalone.rs @@ -24,7 +24,7 @@ use crate::{network_service, platform::PlatformRef, util}; use alloc::{ borrow::{Cow, ToOwned as _}, boxed::Box, - string::{String, ToString as _}, + string::String, sync::Arc, vec::Vec, }; @@ -620,9 +620,10 @@ pub(super) async fn start_standalone_chain( WakeUpReason::RequestFinished(request_id, Ok(RequestOutcome::Block(Err(_)))) => { // Failed block request. + let source_peer_id = task.sync[task.sync.request_source_id(request_id)].0.clone(); task.network_service .ban_and_disconnect( - todo!(), + source_peer_id, network_service::BanSeverity::Low, "failed-blocks-request", ) @@ -648,9 +649,10 @@ pub(super) async fn start_standalone_chain( WakeUpReason::RequestFinished(request_id, Ok(RequestOutcome::WarpSync(Err(_)))) => { // Failed warp sync request. + let source_peer_id = task.sync[task.sync.request_source_id(request_id)].0.clone(); task.network_service .ban_and_disconnect( - todo!(), + source_peer_id, network_service::BanSeverity::Low, "failed-warp-sync-request", ) @@ -661,9 +663,11 @@ pub(super) async fn start_standalone_chain( WakeUpReason::RequestFinished(request_id, Ok(RequestOutcome::Storage(r))) => { // Storage proof request. if r.is_err() { + let source_peer_id = + task.sync[task.sync.request_source_id(request_id)].0.clone(); task.network_service .ban_and_disconnect( - todo!(), + source_peer_id, network_service::BanSeverity::Low, "failed-storage-request", ) @@ -681,9 +685,10 @@ pub(super) async fn start_standalone_chain( WakeUpReason::RequestFinished(request_id, Ok(RequestOutcome::CallProof(Err(err)))) => { // Failed call proof request. + let source_peer_id = task.sync[task.sync.request_source_id(request_id)].0.clone(); task.network_service .ban_and_disconnect( - todo!(), + source_peer_id, network_service::BanSeverity::Low, "failed-call-proof-request", ) From f5ca41e4151d5dcbe80278e9bc7e5ef4a195fbaa Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 19 Dec 2023 22:37:31 +0100 Subject: [PATCH 08/14] Log the gossip closing --- full-node/src/network_service.rs | 22 +++++++++++++++++----- light-base/src/network_service.rs | 21 ++++++++++++++++----- 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/full-node/src/network_service.rs b/full-node/src/network_service.rs index 0fec7cbed3..d63d09ba63 100644 --- a/full-node/src/network_service.rs +++ b/full-node/src/network_service.rs @@ -1121,11 +1121,23 @@ async fn background_task(mut inner: Inner) { BanSeverity::High => 40, }), ); - let _ = inner.network.gossip_close( - chain_id, - &peer_id, - service::GossipKind::ConsensusTransactions, - ); + if inner + .network + .gossip_close( + chain_id, + &peer_id, + service::GossipKind::ConsensusTransactions, + ) + .is_ok() + { + inner.log_callback.log( + LogLevel::Debug, + format!( + "chain-disconnected; peer_id={}; chain={}", + peer_id, inner.network[chain_id].log_name + ), + ); + } if inner.network.gossip_remove_desired( chain_id, &peer_id, diff --git a/light-base/src/network_service.rs b/light-base/src/network_service.rs index 9788df66a7..d8aed62a5b 100644 --- a/light-base/src/network_service.rs +++ b/light-base/src/network_service.rs @@ -1325,11 +1325,22 @@ async fn background_task(mut task: BackgroundTask) { basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true } ); - let _ = task.network.gossip_close( - chain_id, - &peer_id, - service::GossipKind::ConsensusTransactions, - ); + if task + .network + .gossip_close( + chain_id, + &peer_id, + service::GossipKind::ConsensusTransactions, + ) + .is_ok() + { + log::debug!( + target: "network", + "Gossip({}, {}) => Closed", + &task.network[chain_id].log_name, + peer_id, + ); + } if had_slot { log::debug!( From a75ab74a06d69ed67c4d706d80047c7cfa1bd745 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 19 Dec 2023 22:40:19 +0100 Subject: [PATCH 09/14] Properly send event --- full-node/src/network_service.rs | 3 +++ light-base/src/network_service.rs | 5 ++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/full-node/src/network_service.rs b/full-node/src/network_service.rs index d63d09ba63..3a71a5f887 100644 --- a/full-node/src/network_service.rs +++ b/full-node/src/network_service.rs @@ -1151,6 +1151,9 @@ async fn background_task(mut inner: Inner) { ), ); } + + debug_assert!(inner.event_pending_send.is_none()); + inner.event_pending_send = Some(Event::Disconnected { chain_id, peer_id }); } WakeUpReason::Message(ToBackground::ForegroundAnnounceBlock { diff --git a/light-base/src/network_service.rs b/light-base/src/network_service.rs index d8aed62a5b..7e5c79c0a5 100644 --- a/light-base/src/network_service.rs +++ b/light-base/src/network_service.rs @@ -1340,6 +1340,8 @@ async fn background_task(mut task: BackgroundTask) { &task.network[chain_id].log_name, peer_id, ); + let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id.clone())); + debug_assert!(_was_in.is_some()); } if had_slot { @@ -1358,7 +1360,8 @@ async fn background_task(mut task: BackgroundTask) { ); } - task.open_gossip_links.remove(&(chain_id, peer_id)); + debug_assert!(task.event_pending_send.is_none()); + task.event_pending_send = Some((chain_id, Event::Disconnected { peer_id })); } WakeUpReason::MessageForChain( chain_id, From 5b8557a6677536ae9e5039f87fdcb8a970ca2928 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 19 Dec 2023 22:52:49 +0100 Subject: [PATCH 10/14] Also ban for justification verification error --- light-base/src/sync_service/standalone.rs | 32 ++++++++++++++++------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/light-base/src/sync_service/standalone.rs b/light-base/src/sync_service/standalone.rs index 127a1080f1..4526efe4f5 100644 --- a/light-base/src/sync_service/standalone.rs +++ b/light-base/src/sync_service/standalone.rs @@ -1291,6 +1291,7 @@ impl Task { all::ProcessOne::VerifyFinalityProof(verify) => { // Finality proof to verify. + let sender = verify.sender().1 .0.clone(); match verify.perform({ let mut seed = [0; 32]; self.platform.fill_random_bytes(&mut seed); @@ -1308,7 +1309,7 @@ impl Task { log::debug!( target: &self.log_target, - "Sync => FinalityProofVerified(finalized_blocks={})", + "Sync => FinalityProofVerified(finalized_blocks={}, sender={sender})", finalized_blocks_newest_to_oldest.len(), ); @@ -1344,28 +1345,33 @@ impl Task { (sync, all::FinalityProofVerifyOutcome::JustificationError(error)) => { self.sync = sync; - // TODO: print which peer sent the proof log::debug!( target: &self.log_target, - "Sync => JustificationVerificationError(error={:?})", - error, + "Sync => JustificationVerificationError(error={error:?}, sender={sender})", ); + // TODO: don't print for consensusenginemismatch? log::warn!( target: &self.log_target, - "Error while verifying justification: {}", - error + "Error while verifying justification: {error}" ); + + // TODO: don't ban for consensusenginemismatch? + self.network_service + .ban_and_disconnect( + sender, + network_service::BanSeverity::High, + "bad-justification", + ) + .await; } (sync, all::FinalityProofVerifyOutcome::GrandpaCommitError(error)) => { self.sync = sync; - // TODO: print which peer sent the proof log::debug!( target: &self.log_target, - "Sync => GrandpaCommitVerificationError(error={:?})", - error, + "Sync => GrandpaCommitVerificationError(error={error:?}, sender={sender})", ); log::warn!( @@ -1373,6 +1379,14 @@ impl Task { "Error while verifying GrandPa commit: {}", error ); + + self.network_service + .ban_and_disconnect( + sender, + network_service::BanSeverity::High, + "bad-grandpa-commit", + ) + .await; } } } From 48a303c6405db65f69a92f92aed47e4537dc09ba Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 19 Dec 2023 22:53:31 +0100 Subject: [PATCH 11/14] Update CHANGELOG --- wasm-node/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wasm-node/CHANGELOG.md b/wasm-node/CHANGELOG.md index 00270d3b12..3353f58677 100644 --- a/wasm-node/CHANGELOG.md +++ b/wasm-node/CHANGELOG.md @@ -4,7 +4,7 @@ ### Changed -- When a network request fails, smoldot now closes the gossip link with the peer it tried to send the request to, and will not re-open a new gossip link for a little while. ([#1482](https://github.com/smol-dot/smoldot/pull/1482)) +- When a network request fails, or when a block or justification fails to verify, smoldot now closes the gossip link with the peer it tried to send the request to or obtained the block or justification from, and will not re-open a new gossip link for a little while. ([#1482](https://github.com/smol-dot/smoldot/pull/1482)) ## 2.0.14 - 2023-12-11 From a180ef6dbe174c953b21ec8248b93f41370008c3 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 19 Dec 2023 22:58:23 +0100 Subject: [PATCH 12/14] Docfix --- light-base/src/network_service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/light-base/src/network_service.rs b/light-base/src/network_service.rs index 7e5c79c0a5..42fc2e413d 100644 --- a/light-base/src/network_service.rs +++ b/light-base/src/network_service.rs @@ -285,7 +285,7 @@ pub struct NetworkServiceChain { marker: core::marker::PhantomData, } -/// Severity of a ban. See [`NetworkService::ban_and_disconnect`]. +/// Severity of a ban. See [`NetworkServiceChain::ban_and_disconnect`]. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum BanSeverity { Low, From 9d5282a1811afc1da1ba26c0088ce29425782dfa Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 20 Dec 2023 08:51:45 +0100 Subject: [PATCH 13/14] Bugfix: close returns ok when inbound too --- full-node/src/network_service.rs | 38 ++++++++++++++----------- lib/src/network/service.rs | 47 +++++++++++++++++++++++++++++++ light-base/src/network_service.rs | 47 +++++++++++++++++-------------- 3 files changed, 94 insertions(+), 38 deletions(-) diff --git a/full-node/src/network_service.rs b/full-node/src/network_service.rs index 3a71a5f887..6eb87e5a8d 100644 --- a/full-node/src/network_service.rs +++ b/full-node/src/network_service.rs @@ -1121,39 +1121,43 @@ async fn background_task(mut inner: Inner) { BanSeverity::High => 40, }), ); - if inner - .network - .gossip_close( - chain_id, - &peer_id, - service::GossipKind::ConsensusTransactions, - ) - .is_ok() - { + if inner.network.gossip_remove_desired( + chain_id, + &peer_id, + service::GossipKind::ConsensusTransactions, + ) { inner.log_callback.log( LogLevel::Debug, format!( - "chain-disconnected; peer_id={}; chain={}", - peer_id, inner.network[chain_id].log_name + "slot-unassigned; peer_id={}; chain={}; reason=user-ban; user-reason={}", + peer_id, inner.network[chain_id].log_name, reason ), ); } - if inner.network.gossip_remove_desired( + + if inner.network.gossip_is_connected( chain_id, &peer_id, service::GossipKind::ConsensusTransactions, ) { + let _close_result = inner.network.gossip_close( + chain_id, + &peer_id, + service::GossipKind::ConsensusTransactions, + ); + debug_assert!(_close_result.is_ok()); + inner.log_callback.log( LogLevel::Debug, format!( - "slot-unassigned; peer_id={}; chain={}; reason=user-ban; user-reason={}", - peer_id, inner.network[chain_id].log_name, reason + "chain-disconnected; peer_id={}; chain={}", + peer_id, inner.network[chain_id].log_name ), ); - } - debug_assert!(inner.event_pending_send.is_none()); - inner.event_pending_send = Some(Event::Disconnected { chain_id, peer_id }); + debug_assert!(inner.event_pending_send.is_none()); + inner.event_pending_send = Some(Event::Disconnected { chain_id, peer_id }); + } } WakeUpReason::Message(ToBackground::ForegroundAnnounceBlock { diff --git a/lib/src/network/service.rs b/lib/src/network/service.rs index f9d609ceea..c39a7831f6 100644 --- a/lib/src/network/service.rs +++ b/lib/src/network/service.rs @@ -3333,6 +3333,53 @@ where .map(|(_, peer_index, _, _, _)| &self.peers[peer_index.0]) } + /// Returns the list of all peers for a [`Event::GossipConnected`] event of the given kind has + /// been emitted. + /// It is possible to send gossip notifications to these peers. + /// + /// # Panic + /// + /// Panics if the [`ChainId`] is invalid. + /// + pub fn gossip_is_connected( + &'_ self, + chain_id: ChainId, + target: &PeerId, + kind: GossipKind, + ) -> bool { + assert!(self.chains.contains(chain_id.0)); + let GossipKind::ConsensusTransactions = kind; + + let Some(&peer_index) = self.peers_by_peer_id.get(target) else { + // If the `PeerId` is unknown, then we also don't have any gossip link to it. + return false; + }; + + self.notification_substreams_by_peer_id + .range( + ( + NotificationsProtocol::BlockAnnounces { + chain_index: chain_id.0, + }, + peer_index, + SubstreamDirection::Out, + NotificationsSubstreamState::open_min_value(), + SubstreamId::min_value(), + ) + ..=( + NotificationsProtocol::BlockAnnounces { + chain_index: chain_id.0, + }, + peer_index, + SubstreamDirection::Out, + NotificationsSubstreamState::open_max_value(), + SubstreamId::max_value(), + ), + ) + .next() + .is_some() + } + /// Open a gossiping substream with the given peer on the given chain. /// /// Either a [`Event::GossipConnected`] or [`Event::GossipOpenFailed`] is guaranteed to later diff --git a/light-base/src/network_service.rs b/light-base/src/network_service.rs index 42fc2e413d..40cccc203b 100644 --- a/light-base/src/network_service.rs +++ b/light-base/src/network_service.rs @@ -1325,25 +1325,6 @@ async fn background_task(mut task: BackgroundTask) { basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true } ); - if task - .network - .gossip_close( - chain_id, - &peer_id, - service::GossipKind::ConsensusTransactions, - ) - .is_ok() - { - log::debug!( - target: "network", - "Gossip({}, {}) => Closed", - &task.network[chain_id].log_name, - peer_id, - ); - let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id.clone())); - debug_assert!(_was_in.is_some()); - } - if had_slot { log::debug!( target: "network", @@ -1360,8 +1341,32 @@ async fn background_task(mut task: BackgroundTask) { ); } - debug_assert!(task.event_pending_send.is_none()); - task.event_pending_send = Some((chain_id, Event::Disconnected { peer_id })); + if task.network.gossip_is_connected( + chain_id, + &peer_id, + service::GossipKind::ConsensusTransactions, + ) { + let _closed_result = task.network.gossip_close( + chain_id, + &peer_id, + service::GossipKind::ConsensusTransactions, + ); + debug_assert!(_closed_result.is_ok()); + + log::debug!( + target: "network", + "Gossip({}, {}) => Closed", + &task.network[chain_id].log_name, + peer_id, + ); + + let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id.clone())); + debug_assert!(_was_in.is_some()); + + debug_assert!(task.event_pending_send.is_none()); + task.event_pending_send = Some((chain_id, Event::Disconnected { peer_id })); + } + } WakeUpReason::MessageForChain( chain_id, From 0a25b4acfb231fbaa000ae1eb25b46ce3f7aebd6 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 20 Dec 2023 10:48:42 +0100 Subject: [PATCH 14/14] Rustfmt --- light-base/src/network_service.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/light-base/src/network_service.rs b/light-base/src/network_service.rs index 7f370f7d9a..13cb2d4256 100644 --- a/light-base/src/network_service.rs +++ b/light-base/src/network_service.rs @@ -1234,7 +1234,6 @@ async fn background_task(mut task: BackgroundTask) { debug_assert!(task.event_pending_send.is_none()); task.event_pending_send = Some((chain_id, Event::Disconnected { peer_id })); } - } WakeUpReason::MessageForChain( chain_id,