diff --git a/full-node/src/consensus_service.rs b/full-node/src/consensus_service.rs index b855fff3e1..3a6136af64 100644 --- a/full-node/src/consensus_service.rs +++ b/full-node/src/consensus_service.rs @@ -1038,7 +1038,7 @@ impl SyncBackground { all::BlockAnnounceOutcome::NotFinalizedChain => {} all::BlockAnnounceOutcome::Discarded => {} all::BlockAnnounceOutcome::StoredForLater {} => {} - all::BlockAnnounceOutcome::InvalidHeader(_) => unreachable!(), + all::BlockAnnounceOutcome::InvalidHeader(_) => unreachable!(), // TODO: ?!?! why unreachable? also, ban the peer } } WakeUpReason::NetworkEvent(network_service::Event::GrandpaNeighborPacket { @@ -1102,6 +1102,17 @@ impl SyncBackground { source_id, result: Err(_), }) => { + // Note that we perform the ban even if the source is now disconnected. + let peer_id = self.sync[source_id].as_ref().unwrap().peer_id.clone(); + self.network_service + .ban_and_disconnect( + peer_id, + self.network_chain_id, + network_service::BanSeverity::Low, + "blocks-request-error", + ) + .await; + let _ = self .sync .blocks_request_response(request_id, Err::, _>(())); @@ -1167,6 +1178,17 @@ impl SyncBackground { source_id, result: Err(_), }) => { + // Note that we perform the ban even if the source is now disconnected. + let peer_id = self.sync[source_id].as_ref().unwrap().peer_id.clone(); + self.network_service + .ban_and_disconnect( + peer_id, + self.network_chain_id, + network_service::BanSeverity::Low, + "warp-sync-request-error", + ) + .await; + let _ = self.sync.grandpa_warp_sync_response_err(request_id); // If the source was actually disconnected and has no other request in @@ -1192,7 +1214,19 @@ impl SyncBackground { source_id, result, }) => { - // Storage proof request. + if result.is_err() { + // Note that we perform the ban even if the source is now disconnected. + let peer_id = self.sync[source_id].as_ref().unwrap().peer_id.clone(); + self.network_service + .ban_and_disconnect( + peer_id, + self.network_chain_id, + network_service::BanSeverity::Low, + "storage-proof-request-error", + ) + .await; + } + let _ = self .sync .storage_get_response(request_id, result.map(|r| r.decode().to_owned())); @@ -1220,7 +1254,19 @@ impl SyncBackground { source_id, result, }) => { - // Successful call proof request. + if result.is_err() { + // Note that we perform the ban even if the source is now disconnected. + let peer_id = self.sync[source_id].as_ref().unwrap().peer_id.clone(); + self.network_service + .ban_and_disconnect( + peer_id, + self.network_chain_id, + network_service::BanSeverity::Low, + "call-proof-request-error", + ) + .await; + } + self.sync .call_proof_response(request_id, result.map(|r| r.decode().to_owned())); // TODO: need help from networking service to avoid this to_owned @@ -1855,6 +1901,17 @@ impl SyncBackground { ); } Err(err) => { + if let Some(sender) = &sender { + self.network_service + .ban_and_disconnect( + sender.clone(), + self.network_chain_id, + network_service::BanSeverity::High, + "bad-warp-sync-fragment", + ) + .await; + } + self.log_callback.log( LogLevel::Warn, format!( @@ -1896,6 +1953,7 @@ impl SyncBackground { unimplemented!() } all::ProcessOne::VerifyBlock(verify) => { + // TODO: ban peer in case of verification failure let when_verification_started = Instant::now(); let mut database_accesses_duration = Duration::new(0, 0); let mut runtime_build_duration = Duration::new(0, 0); @@ -2333,6 +2391,13 @@ impl SyncBackground { } all::ProcessOne::VerifyFinalityProof(verify) => { + let sender = verify + .sender() + .1 + .as_ref() + .map(|s| s.peer_id.clone()) + .unwrap(); + match verify.perform(rand::random()) { ( sync_out, @@ -2352,7 +2417,7 @@ impl SyncBackground { self.log_callback.log( LogLevel::Debug, format!( - "finality-proof-verification; outcome=success; new-finalized={}", + "finality-proof-verification; outcome=success, sender={sender}, new-finalized={}", HashDisplay(&new_finalized_hash) ), ); @@ -2407,7 +2472,8 @@ impl SyncBackground { (sync_out, all::FinalityProofVerifyOutcome::GrandpaCommitPending) => { self.log_callback.log( LogLevel::Debug, - "finality-proof-verification; outcome=pending".to_string(), + "finality-proof-verification; outcome=pending, sender={sender}" + .to_string(), ); self.sync = sync_out; (self, true) @@ -2415,23 +2481,45 @@ impl SyncBackground { (sync_out, all::FinalityProofVerifyOutcome::AlreadyFinalized) => { self.log_callback.log( LogLevel::Debug, - "finality-proof-verification; outcome=already-finalized".to_string(), + "finality-proof-verification; outcome=already-finalized, sender={sender}".to_string(), ); self.sync = sync_out; (self, true) } (sync_out, all::FinalityProofVerifyOutcome::GrandpaCommitError(error)) => { + self.network_service + .ban_and_disconnect( + sender.clone(), + self.network_chain_id, + network_service::BanSeverity::High, + "bad-warp-sync-fragment", + ) + .await; self.log_callback.log( LogLevel::Warn, - format!("finality-proof-verification-failure; error={}", error), + format!( + "finality-proof-verification-failure; sender={sender}, error={}", + error + ), ); self.sync = sync_out; (self, true) } (sync_out, all::FinalityProofVerifyOutcome::JustificationError(error)) => { + self.network_service + .ban_and_disconnect( + sender.clone(), + self.network_chain_id, + network_service::BanSeverity::High, + "bad-warp-sync-fragment", + ) + .await; self.log_callback.log( LogLevel::Warn, - format!("finality-proof-verification-failure; error={}", error), + format!( + "finality-proof-verification-failure; sender={sender}, error={}", + error + ), ); self.sync = sync_out; (self, true) diff --git a/full-node/src/network_service.rs b/full-node/src/network_service.rs index 6403eaed6e..17dc76913c 100644 --- a/full-node/src/network_service.rs +++ b/full-node/src/network_service.rs @@ -172,6 +172,12 @@ pub struct NetworkService { } enum ToBackground { + ForegroundDisconnectAndBan { + peer_id: PeerId, + chain_id: ChainId, + severity: BanSeverity, + reason: &'static str, + }, ForegroundAnnounceBlock { target: PeerId, chain_id: ChainId, @@ -334,6 +340,13 @@ struct Chain { max_in_peers: usize, } +/// Severity of a ban. See [`NetworkService::ban_and_disconnect`]. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum BanSeverity { + Low, + High, +} + impl NetworkService { /// Initializes the network service with the given configuration. pub async fn new( @@ -605,6 +618,37 @@ impl NetworkService { .await; } + /// Starts asynchronously disconnecting the given peer. A [`Event::Disconnected`] will later be + /// generated. Prevents a new gossip link with the same peer from being reopened for a + /// little while. + /// + /// `reason` is a human-readable string printed in the logs. + /// + /// Due to race conditions, it is possible to reconnect to the peer soon after, in case the + /// reconnection was already happening as the call to this function is still being processed. + /// If that happens another [`Event::Disconnected`] will be delivered afterwards. In other + /// words, this function guarantees that we will be disconnected in the future rather than + /// guarantees that we will disconnect. + pub async fn ban_and_disconnect( + &self, + peer_id: PeerId, + chain_id: ChainId, + severity: BanSeverity, + reason: &'static str, + ) { + let _ = self + .to_background_tx + .lock() + .await + .send(ToBackground::ForegroundDisconnectAndBan { + peer_id, + chain_id, + severity, + reason, + }) + .await; + } + pub async fn send_block_announce( self: Arc, target: PeerId, @@ -1049,6 +1093,61 @@ async fn background_task(mut inner: Inner) { return; } + WakeUpReason::Message(ToBackground::ForegroundDisconnectAndBan { + peer_id, + chain_id, + severity, + reason, + }) => { + // Note that peer doesn't necessarily have an out slot. + inner.peering_strategy.unassign_slot_and_ban( + &chain_id, + &peer_id, + Instant::now() + + Duration::from_secs(match severity { + BanSeverity::Low => 10, + BanSeverity::High => 40, + }), + ); + if inner.network.gossip_remove_desired( + chain_id, + &peer_id, + service::GossipKind::ConsensusTransactions, + ) { + inner.log_callback.log( + LogLevel::Debug, + format!( + "slot-unassigned; peer_id={}; chain={}; reason=user-ban; user-reason={}", + peer_id, inner.network[chain_id].log_name, reason + ), + ); + } + + if inner.network.gossip_is_connected( + chain_id, + &peer_id, + service::GossipKind::ConsensusTransactions, + ) { + let _close_result = inner.network.gossip_close( + chain_id, + &peer_id, + service::GossipKind::ConsensusTransactions, + ); + debug_assert!(_close_result.is_ok()); + + inner.log_callback.log( + LogLevel::Debug, + format!( + "chain-disconnected; peer_id={}; chain={}", + peer_id, inner.network[chain_id].log_name + ), + ); + + debug_assert!(inner.event_pending_send.is_none()); + inner.event_pending_send = Some(Event::Disconnected { chain_id, peer_id }); + } + } + WakeUpReason::Message(ToBackground::ForegroundAnnounceBlock { target, chain_id, diff --git a/lib/src/network/service.rs b/lib/src/network/service.rs index 2e827e9143..461cb76949 100644 --- a/lib/src/network/service.rs +++ b/lib/src/network/service.rs @@ -3357,6 +3357,53 @@ where .map(|(_, peer_index, _, _, _)| &self.peers[peer_index.0]) } + /// Returns the list of all peers for a [`Event::GossipConnected`] event of the given kind has + /// been emitted. + /// It is possible to send gossip notifications to these peers. + /// + /// # Panic + /// + /// Panics if the [`ChainId`] is invalid. + /// + pub fn gossip_is_connected( + &'_ self, + chain_id: ChainId, + target: &PeerId, + kind: GossipKind, + ) -> bool { + assert!(self.chains.contains(chain_id.0)); + let GossipKind::ConsensusTransactions = kind; + + let Some(&peer_index) = self.peers_by_peer_id.get(target) else { + // If the `PeerId` is unknown, then we also don't have any gossip link to it. + return false; + }; + + self.notification_substreams_by_peer_id + .range( + ( + NotificationsProtocol::BlockAnnounces { + chain_index: chain_id.0, + }, + peer_index, + SubstreamDirection::Out, + NotificationsSubstreamState::open_min_value(), + SubstreamId::min_value(), + ) + ..=( + NotificationsProtocol::BlockAnnounces { + chain_index: chain_id.0, + }, + peer_index, + SubstreamDirection::Out, + NotificationsSubstreamState::open_max_value(), + SubstreamId::max_value(), + ), + ) + .next() + .is_some() + } + /// Open a gossiping substream with the given peer on the given chain. /// /// Either a [`Event::GossipConnected`] or [`Event::GossipOpenFailed`] is guaranteed to later diff --git a/lib/src/sync/all.rs b/lib/src/sync/all.rs index 3d8ab1d2cd..aafa9ddbda 100644 --- a/lib/src/sync/all.rs +++ b/lib/src/sync/all.rs @@ -492,7 +492,7 @@ impl AllSync { } /// Removes a source from the state machine. Returns the user data of this source, and all - /// the requests that this source were expected to perform. + /// the requests that this source was expected to perform. /// /// # Panic /// @@ -1143,6 +1143,27 @@ impl AllSync { } } + /// Returns the [`SourceId`] that is expected to fulfill the given request. + /// + /// # Panic + /// + /// Panics if the [`RequestId`] is invalid. + /// + pub fn request_source_id(&self, request_id: RequestId) -> SourceId { + match (&self.inner, &self.shared.requests[request_id.0]) { + (AllSyncInner::AllForks(inner), RequestMapping::AllForks(rq)) => { + inner[inner.request_source_id(*rq)].outer_source_id + } + (AllSyncInner::Optimistic { inner }, RequestMapping::Optimistic(rq)) => { + inner[inner.request_source_id(*rq)].outer_source_id + } + (AllSyncInner::WarpSync { inner, .. }, RequestMapping::WarpSync(rq)) => { + inner[inner.request_source_id(*rq)].outer_source_id + } + _ => unreachable!(), + } + } + /// Process the next block in the queue of verification. /// /// This method takes ownership of the [`AllSync`] and starts a verification process. The @@ -2476,6 +2497,20 @@ enum FinalityProofVerifyInner { } impl FinalityProofVerify { + /// Returns the source the justification was obtained from. + pub fn sender(&self) -> (SourceId, &TSrc) { + match &self.inner { + FinalityProofVerifyInner::AllForks(inner) => { + let sender = inner.sender().1; + (sender.outer_source_id, &sender.user_data) + } + FinalityProofVerifyInner::Optimistic(inner) => { + let sender = inner.sender().1; + (sender.outer_source_id, &sender.user_data) + } + } + } + /// Perform the verification. /// /// A randomness seed must be provided and will be used during the verification. Note that the diff --git a/lib/src/sync/all_forks.rs b/lib/src/sync/all_forks.rs index 0b71f5c4e9..cd6e9bf450 100644 --- a/lib/src/sync/all_forks.rs +++ b/lib/src/sync/all_forks.rs @@ -787,6 +787,16 @@ impl AllForksSync { self.inner.blocks.obsolete_requests() } + /// Returns the [`SourceId`] that is expected to fulfill the given request. + /// + /// # Panic + /// + /// Panics if the [`RequestId`] is invalid. + /// + pub fn request_source_id(&self, request_id: RequestId) -> SourceId { + self.inner.blocks.request_source_id(request_id) + } + /// Call in response to a blocks request being successful. /// /// This method takes ownership of the [`AllForksSync`] and puts it in a mode where the blocks @@ -2180,6 +2190,11 @@ pub struct FinalityProofVerify { } impl FinalityProofVerify { + /// Returns the source the justification was obtained from. + pub fn sender(&self) -> (SourceId, &TSrc) { + (self.source_id, &self.parent[self.source_id]) + } + /// Perform the verification. /// /// A randomness seed must be provided and will be used during the verification. Note that the diff --git a/lib/src/sync/all_forks/pending_blocks.rs b/lib/src/sync/all_forks/pending_blocks.rs index 24d73579e5..6075b58590 100644 --- a/lib/src/sync/all_forks/pending_blocks.rs +++ b/lib/src/sync/all_forks/pending_blocks.rs @@ -873,7 +873,7 @@ impl PendingBlocks { /// Panics if the [`RequestId`] is invalid. /// #[track_caller] - pub fn request_source(&self, request_id: RequestId) -> SourceId { + pub fn request_source_id(&self, request_id: RequestId) -> SourceId { self.requests.get(request_id.0).unwrap().source_id } diff --git a/lib/src/sync/optimistic.rs b/lib/src/sync/optimistic.rs index 5ff54cd8c3..7833dc1b5e 100644 --- a/lib/src/sync/optimistic.rs +++ b/lib/src/sync/optimistic.rs @@ -159,6 +159,7 @@ struct OptimisticSyncInner { verification_queue::VerificationQueue<(RequestId, TRq), RequestSuccessBlock>, /// Justifications, if any, of the block that has just been verified. + // TODO: clean up when a source is removed pending_encoded_justifications: vec::IntoIter<([u8; 4], Vec, SourceId)>, /// Identifier to assign to the next request. @@ -669,6 +670,25 @@ impl OptimisticSync { user_data } + /// Returns the [`SourceId`] that is expected to fulfill the given request. + /// + /// # Panic + /// + /// Panics if the [`RequestId`] is invalid. + /// + pub fn request_source_id(&self, request_id: RequestId) -> SourceId { + if let Some((src, _)) = self.inner.obsolete_requests.get(&request_id) { + *src + } else { + self.inner + .verification_queue + .requests() + .find(|(rq, _)| rq.0 == request_id) + .unwrap() + .1 + } + } + /// Process the next block in the queue of verification. /// /// This method takes ownership of the [`OptimisticSync`]. The [`OptimisticSync`] is yielded @@ -1041,6 +1061,17 @@ pub struct JustificationVerify { } impl JustificationVerify { + /// Returns the source the justification was obtained from. + pub fn sender(&self) -> (SourceId, &TSrc) { + let (_, _, source_id) = self + .inner + .pending_encoded_justifications + .as_slice() + .first() + .unwrap(); + (*source_id, &self.inner.sources[source_id].user_data) + } + /// Verify the justification. /// /// A randomness seed must be provided and will be used during the verification. Note that the diff --git a/lib/src/sync/optimistic/verification_queue.rs b/lib/src/sync/optimistic/verification_queue.rs index 4295319b61..025bf1e050 100644 --- a/lib/src/sync/optimistic/verification_queue.rs +++ b/lib/src/sync/optimistic/verification_queue.rs @@ -432,6 +432,17 @@ impl VerificationQueue { ) } + /// Returns an iterator to all the requests that are inside of the queue. + pub fn requests(&'_ self) -> impl Iterator + '_ { + self.verification_queue.iter().filter_map(|queue_elem| { + if let VerificationQueueEntryTy::Requested { user_data, source } = &queue_elem.ty { + Some((user_data, *source)) + } else { + None + } + }) + } + /// Consumes the queue and returns an iterator to all the requests that were inside of it. pub fn into_requests(self) -> impl Iterator { self.verification_queue diff --git a/lib/src/sync/warp_sync.rs b/lib/src/sync/warp_sync.rs index 1227d449aa..7242403ee3 100644 --- a/lib/src/sync/warp_sync.rs +++ b/lib/src/sync/warp_sync.rs @@ -1042,6 +1042,16 @@ impl WarpSync { user_data } + /// Returns the [`SourceId`] that is expected to fulfill the given request. + /// + /// # Panic + /// + /// Panics if the [`RequestId`] is invalid. + /// + pub fn request_source_id(&self, request_id: RequestId) -> SourceId { + self.in_progress_requests[request_id.0].0 + } + /// Injects a successful Merkle proof and removes the given request from the state machine. /// Returns the user data that was associated to it. /// diff --git a/light-base/src/network_service.rs b/light-base/src/network_service.rs index cae9adf57e..13cb2d4256 100644 --- a/light-base/src/network_service.rs +++ b/light-base/src/network_service.rs @@ -278,6 +278,13 @@ pub struct NetworkServiceChain { marker: core::marker::PhantomData, } +/// Severity of a ban. See [`NetworkServiceChain::ban_and_disconnect`]. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum BanSeverity { + Low, + High, +} + impl NetworkServiceChain { /// Subscribes to the networking events that happen on the given chain. /// @@ -311,6 +318,33 @@ impl NetworkServiceChain { rx } + /// Starts asynchronously disconnecting the given peer. A [`Event::Disconnected`] will later be + /// generated. Prevents a new gossip link with the same peer from being reopened for a + /// little while. + /// + /// `reason` is a human-readable string printed in the logs. + /// + /// Due to race conditions, it is possible to reconnect to the peer soon after, in case the + /// reconnection was already happening as the call to this function is still being processed. + /// If that happens another [`Event::Disconnected`] will be delivered afterwards. In other + /// words, this function guarantees that we will be disconnected in the future rather than + /// guarantees that we will disconnect. + pub async fn ban_and_disconnect( + &self, + peer_id: PeerId, + severity: BanSeverity, + reason: &'static str, + ) { + let _ = self + .messages_tx + .send(ToBackgroundChain::DisconnectAndBan { + peer_id, + severity, + reason, + }) + .await; + } + /// Sends a blocks request to the given peer. // TODO: more docs pub async fn blocks_request( @@ -637,6 +671,11 @@ enum ToBackgroundChain { Subscribe { sender: async_channel::Sender, }, + DisconnectAndBan { + peer_id: PeerId, + severity: BanSeverity, + reason: &'static str, + }, // TODO: serialize the request before sending over channel StartBlocksRequest { target: PeerId, // TODO: takes by value because of future longevity issue @@ -1132,6 +1171,70 @@ async fn background_task(mut task: BackgroundTask) { WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::Subscribe { sender }) => { task.pending_new_subscriptions.push((chain_id, sender)); } + WakeUpReason::MessageForChain( + chain_id, + ToBackgroundChain::DisconnectAndBan { + peer_id, + severity, + reason, + }, + ) => { + let ban_duration = Duration::from_secs(match severity { + BanSeverity::Low => 10, + BanSeverity::High => 40, + }); + + let had_slot = matches!( + task.peering_strategy.unassign_slot_and_ban( + &chain_id, + &peer_id, + task.platform.now() + ban_duration, + ), + basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true } + ); + + if had_slot { + log::debug!( + target: "network", + "Slots({}) ∌ {} (reason=user-ban, ban-duration={:?}, user-reason={})", + &task.network[chain_id].log_name, + peer_id, + ban_duration, + reason + ); + task.network.gossip_remove_desired( + chain_id, + &peer_id, + service::GossipKind::ConsensusTransactions, + ); + } + + if task.network.gossip_is_connected( + chain_id, + &peer_id, + service::GossipKind::ConsensusTransactions, + ) { + let _closed_result = task.network.gossip_close( + chain_id, + &peer_id, + service::GossipKind::ConsensusTransactions, + ); + debug_assert!(_closed_result.is_ok()); + + log::debug!( + target: "network", + "Gossip({}, {}) => Closed", + &task.network[chain_id].log_name, + peer_id, + ); + + let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id.clone())); + debug_assert!(_was_in.is_some()); + + debug_assert!(task.event_pending_send.is_none()); + task.event_pending_send = Some((chain_id, Event::Disconnected { peer_id })); + } + } WakeUpReason::MessageForChain( chain_id, ToBackgroundChain::StartBlocksRequest { diff --git a/light-base/src/sync_service.rs b/light-base/src/sync_service.rs index 3c5e9dfdff..0b478e2d7d 100644 --- a/light-base/src/sync_service.rs +++ b/light-base/src/sync_service.rs @@ -334,11 +334,20 @@ impl SyncService { let mut result = match self .network_service .clone() - .blocks_request(target, request_config.clone(), timeout_per_request) + .blocks_request(target.clone(), request_config.clone(), timeout_per_request) .await { Ok(b) if !b.is_empty() => b, - Ok(_) | Err(_) => continue, + Ok(_) | Err(_) => { + self.network_service + .ban_and_disconnect( + target, + network_service::BanSeverity::Low, + "blocks-request-failed", + ) + .await; + continue; + } }; return Ok(result.remove(0)); @@ -379,7 +388,12 @@ impl SyncService { .await { Ok(b) if !b.is_empty() => b, - Ok(_) | Err(_) => continue, + Ok(_) | Err(_) => { + // Because we have no idea whether the block is canonical, it might be + // totally legitimate for the peer to refuse the request. For this reason, + // we don't ban it. + continue; + } }; return Ok(result.remove(0)); @@ -553,7 +567,7 @@ impl SyncService { .network_service .clone() .storage_proof_request( - target, + target.clone(), codec::StorageProofRequestConfig { block_hash: *block_hash, keys: keys_to_request.into_iter(), @@ -580,6 +594,13 @@ impl SyncService { network_service::StorageProofRequestError::RequestTooLarge ) || response_nodes_cap == 1 { + self.network_service + .ban_and_disconnect( + target, + network_service::BanSeverity::Low, + "storage-request-failed", + ) + .await; outcome_errors.push(StorageQueryErrorDetail::Network(err)); } @@ -596,6 +617,13 @@ impl SyncService { }) { Ok(d) => d, Err(err) => { + self.network_service + .ban_and_disconnect( + target, + network_service::BanSeverity::High, + "bad-merkle-proof", + ) + .await; outcome_errors.push(StorageQueryErrorDetail::ProofVerification(err)); continue; } @@ -796,20 +824,36 @@ impl SyncService { let result = self .network_service .clone() - .call_proof_request(target, config.clone(), timeout_per_request) + .call_proof_request(target.clone(), config.clone(), timeout_per_request) .await; match result { Ok(value) if !value.decode().is_empty() => return Ok(value), // TODO: this check of emptiness is a bit of a hack; it is necessary because Substrate responds to requests about blocks it doesn't know with an empty proof - Ok(_) => outcome_errors.push(network_service::CallProofRequestError::Request( + Ok(_) => { + self.network_service + .ban_and_disconnect( + target, + network_service::BanSeverity::Low, + "call-proof-request-failed", + ) + .await; + outcome_errors.push(network_service::CallProofRequestError::Request( service::CallProofRequestError::Request( smoldot::network::service::RequestError::Substream( smoldot::libp2p::connection::established::RequestError::SubstreamClosed, ), ), - )), + )) + } Err(err) => { + self.network_service + .ban_and_disconnect( + target, + network_service::BanSeverity::Low, + "call-proof-request-failed", + ) + .await; outcome_errors.push(err); } } diff --git a/light-base/src/sync_service/standalone.rs b/light-base/src/sync_service/standalone.rs index bf31becb0e..4526efe4f5 100644 --- a/light-base/src/sync_service/standalone.rs +++ b/light-base/src/sync_service/standalone.rs @@ -24,7 +24,7 @@ use crate::{network_service, platform::PlatformRef, util}; use alloc::{ borrow::{Cow, ToOwned as _}, boxed::Box, - string::{String, ToString as _}, + string::String, sync::Arc, vec::Vec, }; @@ -316,7 +316,7 @@ pub(super) async fn start_standalone_chain( target: &task.log_target, "Failed to decode header in block announce received from {}. Error: {}", peer_id, error, - ) + ); } } @@ -359,6 +359,14 @@ pub(super) async fn start_standalone_chain( announce_block_height, peer_id ); + + task.network_service + .ban_and_disconnect( + peer_id, + network_service::BanSeverity::Low, + "announced-block-below-known-finalized", + ) + .await; } all::BlockAnnounceOutcome::NotFinalizedChain => { log::debug!( @@ -374,6 +382,13 @@ pub(super) async fn start_standalone_chain( } all::BlockAnnounceOutcome::InvalidHeader(_) => { // Log messages are already printed above. + task.network_service + .ban_and_disconnect( + peer_id, + network_service::BanSeverity::High, + "bad-block-announce", + ) + .await; } } } @@ -605,7 +620,14 @@ pub(super) async fn start_standalone_chain( WakeUpReason::RequestFinished(request_id, Ok(RequestOutcome::Block(Err(_)))) => { // Failed block request. - // TODO: should disconnect peer + let source_peer_id = task.sync[task.sync.request_source_id(request_id)].0.clone(); + task.network_service + .ban_and_disconnect( + source_peer_id, + network_service::BanSeverity::Low, + "failed-blocks-request", + ) + .await; task.sync .blocks_request_response(request_id, Err::, _>(())); } @@ -627,12 +649,30 @@ pub(super) async fn start_standalone_chain( WakeUpReason::RequestFinished(request_id, Ok(RequestOutcome::WarpSync(Err(_)))) => { // Failed warp sync request. - // TODO: should disconnect peer + let source_peer_id = task.sync[task.sync.request_source_id(request_id)].0.clone(); + task.network_service + .ban_and_disconnect( + source_peer_id, + network_service::BanSeverity::Low, + "failed-warp-sync-request", + ) + .await; task.sync.grandpa_warp_sync_response_err(request_id); } WakeUpReason::RequestFinished(request_id, Ok(RequestOutcome::Storage(r))) => { // Storage proof request. + if r.is_err() { + let source_peer_id = + task.sync[task.sync.request_source_id(request_id)].0.clone(); + task.network_service + .ban_and_disconnect( + source_peer_id, + network_service::BanSeverity::Low, + "failed-storage-request", + ) + .await; + } task.sync.storage_get_response(request_id, r); } @@ -645,6 +685,14 @@ pub(super) async fn start_standalone_chain( WakeUpReason::RequestFinished(request_id, Ok(RequestOutcome::CallProof(Err(err)))) => { // Failed call proof request. + let source_peer_id = task.sync[task.sync.request_source_id(request_id)].0.clone(); + task.network_service + .ban_and_disconnect( + source_peer_id, + network_service::BanSeverity::Low, + "failed-call-proof-request", + ) + .await; task.sync.call_proof_response(request_id, Err(err)); } @@ -971,7 +1019,6 @@ impl Task { ); } Err(err) => { - // TODO: should disconnect peer log::debug!(target: &self.log_target, "Sync => WarpSyncRuntimeBuild(error={})", err); if !matches!(err, all::WarpSyncBuildRuntimeError::SourceMisbehavior(_)) { log::warn!(target: &self.log_target, "Failed to compile runtime during warp syncing process: {}", err); @@ -988,7 +1035,6 @@ impl Task { log::debug!(target: &self.log_target, "Sync => WarpSyncBuildChainInformation(success=true)") } Err(err) => { - // TODO: should disconnect peer log::debug!(target: &self.log_target, "Sync => WarpSyncBuildChainInformation(error={})", err); if !matches!( err, @@ -1040,10 +1086,9 @@ impl Task { all::ProcessOne::VerifyWarpSyncFragment(verify) => { // Grandpa warp sync fragment to verify. - let sender_peer_id = verify + let sender_if_still_connected = verify .proof_sender() - .map(|(_, (peer_id, _))| Cow::Owned(peer_id.to_string())) // TODO: unnecessary cloning most of the time - .unwrap_or(Cow::Borrowed("")); + .map(|(_, (peer_id, _))| peer_id.clone()); let (sync, result) = verify.perform({ let mut seed = [0; 32]; @@ -1058,25 +1103,31 @@ impl Task { log::debug!( target: &self.log_target, "Sync => WarpSyncFragmentVerified(sender={}, verified_hash={}, verified_height={fragment_number})", - sender_peer_id, + sender_if_still_connected.as_ref().map(|p| Cow::Owned(p.to_base58())).unwrap_or(Cow::Borrowed("")), HashDisplay(&fragment_hash) ); } Err(err) => { - // TODO: should disconnect peer - let maybe_forced_change = - matches!(err, all::VerifyFragmentError::JustificationVerify(_)); log::warn!( target: &self.log_target, "Failed to verify warp sync fragment from {}: {}{}", - sender_peer_id, + sender_if_still_connected.as_ref().map(|p| Cow::Owned(p.to_base58())).unwrap_or(Cow::Borrowed("")), err, - if maybe_forced_change { + if matches!(err, all::VerifyFragmentError::JustificationVerify(_)) { ". This might be caused by a forced GrandPa authorities change having \ been enacted on the chain. If this is the case, please update the \ chain specification with a checkpoint past this forced change." } else { "" } ); + if let Some(sender_if_still_connected) = sender_if_still_connected { + self.network_service + .ban_and_disconnect( + sender_if_still_connected, + network_service::BanSeverity::High, + "bad-warp-sync-fragment", + ) + .await; + } } } } @@ -1223,12 +1274,24 @@ impl Task { HashDisplay(&verified_hash), error ); + + // TODO: ban peers that have announced the block + /*for peer_id in self.sync.knows_non_finalized_block(height, hash) { + self.network_service + .ban_and_disconnect( + peer_id, + network_service::BanSeverity::High, + "bad-block", + ) + .await; + }*/ } } } all::ProcessOne::VerifyFinalityProof(verify) => { // Finality proof to verify. + let sender = verify.sender().1 .0.clone(); match verify.perform({ let mut seed = [0; 32]; self.platform.fill_random_bytes(&mut seed); @@ -1246,7 +1309,7 @@ impl Task { log::debug!( target: &self.log_target, - "Sync => FinalityProofVerified(finalized_blocks={})", + "Sync => FinalityProofVerified(finalized_blocks={}, sender={sender})", finalized_blocks_newest_to_oldest.len(), ); @@ -1282,28 +1345,33 @@ impl Task { (sync, all::FinalityProofVerifyOutcome::JustificationError(error)) => { self.sync = sync; - // TODO: print which peer sent the proof log::debug!( target: &self.log_target, - "Sync => JustificationVerificationError(error={:?})", - error, + "Sync => JustificationVerificationError(error={error:?}, sender={sender})", ); + // TODO: don't print for consensusenginemismatch? log::warn!( target: &self.log_target, - "Error while verifying justification: {}", - error + "Error while verifying justification: {error}" ); + + // TODO: don't ban for consensusenginemismatch? + self.network_service + .ban_and_disconnect( + sender, + network_service::BanSeverity::High, + "bad-justification", + ) + .await; } (sync, all::FinalityProofVerifyOutcome::GrandpaCommitError(error)) => { self.sync = sync; - // TODO: print which peer sent the proof log::debug!( target: &self.log_target, - "Sync => GrandpaCommitVerificationError(error={:?})", - error, + "Sync => GrandpaCommitVerificationError(error={error:?}, sender={sender})", ); log::warn!( @@ -1311,6 +1379,14 @@ impl Task { "Error while verifying GrandPa commit: {}", error ); + + self.network_service + .ban_and_disconnect( + sender, + network_service::BanSeverity::High, + "bad-grandpa-commit", + ) + .await; } } } diff --git a/wasm-node/CHANGELOG.md b/wasm-node/CHANGELOG.md index 8b2d6b11e5..3353f58677 100644 --- a/wasm-node/CHANGELOG.md +++ b/wasm-node/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +### Changed + +- When a network request fails, or when a block or justification fails to verify, smoldot now closes the gossip link with the peer it tried to send the request to or obtained the block or justification from, and will not re-open a new gossip link for a little while. ([#1482](https://github.com/smol-dot/smoldot/pull/1482)) + ## 2.0.14 - 2023-12-11 ### Fixed