From af5e81134aca212595af58e713d7342636c01747 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 15 Aug 2022 14:51:54 +0200 Subject: [PATCH] Peers are now banned for 20 seconds after a protocol error or substream refusal (#2633) * Split slot assignments in two phases * Make unassign_slot public * Implement banning nodes * CHANGELOG * Spellcheck --- bin/full-node/src/run/network_service.rs | 78 +++++++++++++++++++++-- bin/light-base/src/network_service.rs | 60 +++++++++++++++++- bin/wasm-node/CHANGELOG.md | 4 ++ src/network/service.rs | 80 +++++++++++++----------- 4 files changed, 176 insertions(+), 46 deletions(-) diff --git a/bin/full-node/src/run/network_service.rs b/bin/full-node/src/run/network_service.rs index 095f7a4b67..6d9dd35d0e 100644 --- a/bin/full-node/src/run/network_service.rs +++ b/bin/full-node/src/run/network_service.rs @@ -182,6 +182,12 @@ struct Guarded { fnv::FnvBuildHasher, >, + /// List of peer and chain index tuples for which no outbound slot should be assigned. + /// + /// The values are the moment when the ban expires. + // TODO: use SipHasher + slots_assign_backoff: HashMap<(PeerId, usize), Instant, fnv::FnvBuildHasher>, + messages_from_connections_tx: mpsc::Sender<(service::ConnectionId, service::ConnectionToCoordinator)>, @@ -279,6 +285,10 @@ impl NetworkService { messages_from_connections_rx, conn_tasks_tx: conn_tasks_tx.clone(), network, + slots_assign_backoff: hashbrown::HashMap::with_capacity_and_hasher( + 50, // TODO: ? + Default::default(), + ), active_connections: hashbrown::HashMap::with_capacity_and_hasher( 100, // TODO: ? Default::default(), @@ -792,6 +802,9 @@ async fn update_round(inner: &Arc, event_senders: &mut [mpsc::Sender, event_senders: &mut [mpsc::Sender { tracing::debug!(%peer_id, "chain-disconnected"); + + guarded.unassign_slot_and_ban(chain_index, peer_id.clone()); + inner.wake_up_main_background_task.notify(1); + break Event::Disconnected { chain_index, peer_id, }; } - service::Event::ChainConnectAttemptFailed { peer_id, error, .. } => { + service::Event::ChainConnectAttemptFailed { + chain_index, + peer_id, + error, + .. + } => { tracing::debug!( %peer_id, %error, "chain-connect-attempt-failed" ); + + guarded.unassign_slot_and_ban(chain_index, peer_id); + inner.wake_up_main_background_task.notify(1); } service::Event::InboundSlotAssigned { .. } => { // TODO: log this @@ -939,12 +964,16 @@ async fn update_round(inner: &Arc, event_senders: &mut [mpsc::Sender { - // TODO: handle properly? tracing::warn!( %peer_id, %error, "protocol-error" ); + + for chain_index in 0..guarded.network.num_chains() { + guarded.unassign_slot_and_ban(chain_index, peer_id.clone()); + } + inner.wake_up_main_background_task.notify(1); } } }; @@ -974,11 +1003,31 @@ async fn update_round(inner: &Arc, event_senders: &mut [mpsc::Sender now); + + // Assign outgoing slots. loop { - let assigned_peer = guarded.network.assign_slots(chain_index); - if let Some(assigned_peer) = assigned_peer { - // TODO: log slot de-assignments - tracing::debug!(peer_id = %assigned_peer, %chain_index, "slot-assigned"); + let peer_to_assign = guarded + .network + .slots_to_assign(chain_index) + .filter(|peer_id| { + !guarded + .slots_assign_backoff + .contains_key(&((**peer_id).clone(), chain_index)) // TODO: spurious cloning + }) + .next() + .cloned(); + + if let Some(peer_to_assign) = peer_to_assign { + tracing::debug!(peer_id = %peer_to_assign, %chain_index, "slot-assigned"); + guarded.network.assign_out_slot(chain_index, peer_to_assign); } else { break; } @@ -1033,6 +1082,23 @@ async fn update_round(inner: &Arc, event_senders: &mut [mpsc::Sender { + *e.into_mut() = new_expiration; + } + hashbrown::hash_map::Entry::Occupied(_) => {} + hashbrown::hash_map::Entry::Vacant(e) => { + e.insert(new_expiration); + } + } + } +} + /// Asynchronous task managing a specific connection, including the dialing process. #[tracing::instrument(level = "trace", skip(start_connect, inner, connection_to_coordinator))] async fn opening_connection_task( diff --git a/bin/light-base/src/network_service.rs b/bin/light-base/src/network_service.rs index d3a937da6a..bf2d35cf58 100644 --- a/bin/light-base/src/network_service.rs +++ b/bin/light-base/src/network_service.rs @@ -52,7 +52,7 @@ use smoldot::{ network::{protocol, service}, }; use std::{ - collections::{HashMap, HashSet}, + collections::{hash_map, HashMap, HashSet}, pin::Pin, sync::Arc, }; @@ -143,6 +143,12 @@ struct SharedGuarded { // TODO: should also detect whenever we fail to open a block announces substream with any of these peers important_nodes: HashSet, + /// List of peer and chain index tuples for which no outbound slot should be assigned. + /// + /// The values are the moment when the ban expires. + // TODO: use SipHasher + slots_assign_backoff: HashMap<(PeerId, usize), TPlat::Instant, fnv::FnvBuildHasher>, + messages_from_connections_tx: mpsc::Sender<(service::ConnectionId, service::ConnectionToCoordinator)>, @@ -246,6 +252,7 @@ impl NetworkService { handshake_timeout: Duration::from_secs(8), randomness_seed: rand::random(), }), + slots_assign_backoff: HashMap::with_capacity_and_hasher(32, Default::default()), important_nodes: HashSet::with_capacity_and_hasher(16, Default::default()), active_connections: HashMap::with_capacity_and_hasher(32, Default::default()), messages_from_connections_tx, @@ -1064,6 +1071,8 @@ async fn update_round( &shared.log_chain_names[chain_index], peer_id ); + guarded.unassign_slot_and_ban(chain_index, peer_id); + shared.wake_up_main_background_task.notify(1); } service::Event::ChainDisconnected { peer_id, @@ -1086,6 +1095,8 @@ async fn update_round( &shared.log_chain_names[chain_index], peer_id ); + guarded.unassign_slot_and_ban(chain_index, peer_id.clone()); + shared.wake_up_main_background_task.notify(1); break Event::Disconnected { peer_id, chain_index, @@ -1242,6 +1253,11 @@ async fn update_round( peer_id, error, ); + + for chain_index in 0..guarded.network.num_chains() { + guarded.unassign_slot_and_ban(chain_index, peer_id.clone()); + } + shared.wake_up_main_background_task.notify(1); } } }; @@ -1271,15 +1287,36 @@ async fn update_round( // TODO: doc for chain_index in 0..shared.log_chain_names.len() { + let now = TPlat::now(); + + // Clean up the content of `slots_assign_backoff`. + // TODO: the background task should be woken up when the ban expires + // TODO: O(n) + guarded + .slots_assign_backoff + .retain(|_, expiration| *expiration > now); + loop { - let peer = guarded.network.assign_slots(chain_index); - if let Some(peer_id) = peer { + let peer_id = guarded + .network + .slots_to_assign(chain_index) + .filter(|peer_id| { + !guarded + .slots_assign_backoff + .contains_key(&((**peer_id).clone(), chain_index)) // TODO: spurious cloning + }) + .next() + .cloned(); + + if let Some(peer_id) = peer_id { log::debug!( target: "connections", "OutSlots({}) ∋ {}", &shared.log_chain_names[chain_index], peer_id ); + + guarded.network.assign_out_slot(chain_index, peer_id); } else { break; } @@ -1337,6 +1374,23 @@ async fn update_round( } } +impl SharedGuarded { + fn unassign_slot_and_ban(&mut self, chain_index: usize, peer_id: PeerId) { + self.network.unassign_slot(chain_index, &peer_id); + + let new_expiration = TPlat::now() + Duration::from_secs(20); // TODO: arbitrary constant + match self.slots_assign_backoff.entry((peer_id, chain_index)) { + hash_map::Entry::Occupied(e) if *e.get() < new_expiration => { + *e.into_mut() = new_expiration; + } + hash_map::Entry::Occupied(_) => {} + hash_map::Entry::Vacant(e) => { + e.insert(new_expiration); + } + } + } +} + /// Asynchronous task managing a specific connection, including the connection process and the /// processing of the connection after it's been open. async fn connection_task( diff --git a/bin/wasm-node/CHANGELOG.md b/bin/wasm-node/CHANGELOG.md index a0ac7db3a6..97ecedd487 100644 --- a/bin/wasm-node/CHANGELOG.md +++ b/bin/wasm-node/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +### Changed + +- In case of protocol error, or if a peer refuses a block announces substream, no new substream with the same peer will be attempted for 20 seconds. This avoids loops where the same peer is tried over and over again. + ## 0.6.30 - 2022-08-12 ### Fixed diff --git a/src/network/service.rs b/src/network/service.rs index 85ea5d3647..791b52fa5a 100644 --- a/src/network/service.rs +++ b/src/network/service.rs @@ -38,7 +38,7 @@ use core::{ ops::{Add, Sub}, time::Duration, }; -use rand::{seq::SliceRandom as _, Rng as _, SeedableRng as _}; +use rand::{Rng as _, SeedableRng as _}; pub use crate::libp2p::{ collection::ReadWrite, @@ -2211,48 +2211,54 @@ where self.inner.peers_list() } - /// - /// - /// Returns the [`PeerId`] that now has an outbound slot. - // TODO: docs - // TODO: when to call this? - pub fn assign_slots(&mut self, chain_index: usize) -> Option { - let chain = &mut self.chains[chain_index]; - - let list = { - let mut list = chain.kbuckets.iter_ordered().collect::>(); - list.shuffle(&mut self.randomness); - list - }; + // TODO: docs and appropriate naming + pub fn slots_to_assign(&'_ self, chain_index: usize) -> impl Iterator + '_ { + let chain = &self.chains[chain_index]; - for (peer_id, _) in list { - // Check if maximum number of slots is reached. - if chain.out_peers.len() - >= usize::try_from(chain.chain_config.out_slots).unwrap_or(usize::max_value()) - { - break; - } + // Check if maximum number of slots is reached. + if chain.out_peers.len() + >= usize::try_from(chain.chain_config.out_slots).unwrap_or(usize::max_value()) + { + return either::Right(iter::empty()); + } - // Don't assign slots to peers that already have a slot. - if chain.out_peers.contains(peer_id) || chain.in_peers.contains(peer_id) { - continue; - } + // TODO: return in some specific order? + either::Left( + chain + .kbuckets + .iter_ordered() + .map(|(peer_id, _)| peer_id) + .filter(|peer_id| { + // Don't assign slots to peers that already have a slot. + !chain.out_peers.contains(peer_id) && !chain.in_peers.contains(peer_id) + }), + ) + } - // It is now guaranteed that this peer will be assigned an outbound slot. + // TODO: docs + // TODO: when to call this? + pub fn assign_out_slot(&mut self, chain_index: usize, peer_id: PeerId) { + let chain = &mut self.chains[chain_index]; - // The peer is marked as desired before inserting it in `out_peers`, to handle - // potential future cancellation issues. - self.inner.set_peer_notifications_out_desired( - peer_id, - chain_index * NOTIFICATIONS_PROTOCOLS_PER_CHAIN, - peers::DesiredState::DesiredReset, // TODO: ? - ); - chain.out_peers.insert(peer_id.clone()); + // Check if maximum number of slots is reached. + if chain.out_peers.len() + >= usize::try_from(chain.chain_config.out_slots).unwrap_or(usize::max_value()) + { + return; // TODO: return error? + } - return Some(peer_id.clone()); + // Don't assign slots to peers that already have a slot. + if chain.out_peers.contains(&peer_id) || chain.in_peers.contains(&peer_id) { + return; // TODO: return error? } - None + self.inner.set_peer_notifications_out_desired( + &peer_id, + chain_index * NOTIFICATIONS_PROTOCOLS_PER_CHAIN, + peers::DesiredState::DesiredReset, // TODO: ? + ); + + chain.out_peers.insert(peer_id); } /// @@ -2324,7 +2330,7 @@ where } /// Removes the slot assignment of the given peer, if any. - fn unassign_slot(&mut self, chain_index: usize, peer_id: &PeerId) -> Option { + pub fn unassign_slot(&mut self, chain_index: usize, peer_id: &PeerId) -> Option { self.inner.set_peer_notifications_out_desired( peer_id, chain_index * NOTIFICATIONS_PROTOCOLS_PER_CHAIN,