Skip to content

Commit

Permalink
Peers are now banned for 20 seconds after a protocol error or substre…
Browse files Browse the repository at this point in the history
…am refusal (#2633)

* Split slot assignments in two phases

* Make unassign_slot public

* Implement banning nodes

* CHANGELOG

* Spellcheck
  • Loading branch information
tomaka authored Aug 15, 2022
1 parent a63ac12 commit af5e811
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 46 deletions.
78 changes: 72 additions & 6 deletions bin/full-node/src/run/network_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@ struct Guarded {
fnv::FnvBuildHasher,
>,

/// List of peer and chain index tuples for which no outbound slot should be assigned.
///
/// The values are the moment when the ban expires.
// TODO: use SipHasher
slots_assign_backoff: HashMap<(PeerId, usize), Instant, fnv::FnvBuildHasher>,

messages_from_connections_tx:
mpsc::Sender<(service::ConnectionId, service::ConnectionToCoordinator)>,

Expand Down Expand Up @@ -279,6 +285,10 @@ impl NetworkService {
messages_from_connections_rx,
conn_tasks_tx: conn_tasks_tx.clone(),
network,
slots_assign_backoff: hashbrown::HashMap::with_capacity_and_hasher(
50, // TODO: ?
Default::default(),
),
active_connections: hashbrown::HashMap::with_capacity_and_hasher(
100, // TODO: ?
Default::default(),
Expand Down Expand Up @@ -792,6 +802,9 @@ async fn update_round(inner: &Arc<Inner>, event_senders: &mut [mpsc::Sender<Even
%error,
"block-announce-bad-header"
);

guarded.unassign_slot_and_ban(chain_index, peer_id);
inner.wake_up_main_background_task.notify(1);
}
}
}
Expand Down Expand Up @@ -821,16 +834,28 @@ async fn update_round(inner: &Arc<Inner>, event_senders: &mut [mpsc::Sender<Even
..
} => {
tracing::debug!(%peer_id, "chain-disconnected");

guarded.unassign_slot_and_ban(chain_index, peer_id.clone());
inner.wake_up_main_background_task.notify(1);

break Event::Disconnected {
chain_index,
peer_id,
};
}
service::Event::ChainConnectAttemptFailed { peer_id, error, .. } => {
service::Event::ChainConnectAttemptFailed {
chain_index,
peer_id,
error,
..
} => {
tracing::debug!(
%peer_id, %error,
"chain-connect-attempt-failed"
);

guarded.unassign_slot_and_ban(chain_index, peer_id);
inner.wake_up_main_background_task.notify(1);
}
service::Event::InboundSlotAssigned { .. } => {
// TODO: log this
Expand Down Expand Up @@ -939,12 +964,16 @@ async fn update_round(inner: &Arc<Inner>, event_senders: &mut [mpsc::Sender<Even
);
}
service::Event::ProtocolError { peer_id, error } => {
// TODO: handle properly?
tracing::warn!(
%peer_id,
%error,
"protocol-error"
);

for chain_index in 0..guarded.network.num_chains() {
guarded.unassign_slot_and_ban(chain_index, peer_id.clone());
}
inner.wake_up_main_background_task.notify(1);
}
}
};
Expand Down Expand Up @@ -974,11 +1003,31 @@ async fn update_round(inner: &Arc<Inner>, event_senders: &mut [mpsc::Sender<Even

// TODO: doc
for chain_index in 0..guarded.network.num_chains() {
let now = Instant::now();

// Clean up the content of `slots_assign_backoff`.
// TODO: the background task should be woken up when the ban expires
// TODO: O(n)
guarded
.slots_assign_backoff
.retain(|_, expiration| *expiration > now);

// Assign outgoing slots.
loop {
let assigned_peer = guarded.network.assign_slots(chain_index);
if let Some(assigned_peer) = assigned_peer {
// TODO: log slot de-assignments
tracing::debug!(peer_id = %assigned_peer, %chain_index, "slot-assigned");
let peer_to_assign = guarded
.network
.slots_to_assign(chain_index)
.filter(|peer_id| {
!guarded
.slots_assign_backoff
.contains_key(&((**peer_id).clone(), chain_index)) // TODO: spurious cloning
})
.next()
.cloned();

if let Some(peer_to_assign) = peer_to_assign {
tracing::debug!(peer_id = %peer_to_assign, %chain_index, "slot-assigned");
guarded.network.assign_out_slot(chain_index, peer_to_assign);
} else {
break;
}
Expand Down Expand Up @@ -1033,6 +1082,23 @@ async fn update_round(inner: &Arc<Inner>, event_senders: &mut [mpsc::Sender<Even
}
}

impl Guarded {
fn unassign_slot_and_ban(&mut self, chain_index: usize, peer_id: PeerId) {
self.network.unassign_slot(chain_index, &peer_id);

let new_expiration = Instant::now() + Duration::from_secs(20); // TODO: arbitrary constant
match self.slots_assign_backoff.entry((peer_id, chain_index)) {
hashbrown::hash_map::Entry::Occupied(e) if *e.get() < new_expiration => {
*e.into_mut() = new_expiration;
}
hashbrown::hash_map::Entry::Occupied(_) => {}
hashbrown::hash_map::Entry::Vacant(e) => {
e.insert(new_expiration);
}
}
}
}

/// Asynchronous task managing a specific connection, including the dialing process.
#[tracing::instrument(level = "trace", skip(start_connect, inner, connection_to_coordinator))]
async fn opening_connection_task(
Expand Down
60 changes: 57 additions & 3 deletions bin/light-base/src/network_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use smoldot::{
network::{protocol, service},
};
use std::{
collections::{HashMap, HashSet},
collections::{hash_map, HashMap, HashSet},
pin::Pin,
sync::Arc,
};
Expand Down Expand Up @@ -143,6 +143,12 @@ struct SharedGuarded<TPlat: Platform> {
// TODO: should also detect whenever we fail to open a block announces substream with any of these peers
important_nodes: HashSet<PeerId, fnv::FnvBuildHasher>,

/// List of peer and chain index tuples for which no outbound slot should be assigned.
///
/// The values are the moment when the ban expires.
// TODO: use SipHasher
slots_assign_backoff: HashMap<(PeerId, usize), TPlat::Instant, fnv::FnvBuildHasher>,

messages_from_connections_tx:
mpsc::Sender<(service::ConnectionId, service::ConnectionToCoordinator)>,

Expand Down Expand Up @@ -246,6 +252,7 @@ impl<TPlat: Platform> NetworkService<TPlat> {
handshake_timeout: Duration::from_secs(8),
randomness_seed: rand::random(),
}),
slots_assign_backoff: HashMap::with_capacity_and_hasher(32, Default::default()),
important_nodes: HashSet::with_capacity_and_hasher(16, Default::default()),
active_connections: HashMap::with_capacity_and_hasher(32, Default::default()),
messages_from_connections_tx,
Expand Down Expand Up @@ -1064,6 +1071,8 @@ async fn update_round<TPlat: Platform>(
&shared.log_chain_names[chain_index],
peer_id
);
guarded.unassign_slot_and_ban(chain_index, peer_id);
shared.wake_up_main_background_task.notify(1);
}
service::Event::ChainDisconnected {
peer_id,
Expand All @@ -1086,6 +1095,8 @@ async fn update_round<TPlat: Platform>(
&shared.log_chain_names[chain_index],
peer_id
);
guarded.unassign_slot_and_ban(chain_index, peer_id.clone());
shared.wake_up_main_background_task.notify(1);
break Event::Disconnected {
peer_id,
chain_index,
Expand Down Expand Up @@ -1242,6 +1253,11 @@ async fn update_round<TPlat: Platform>(
peer_id,
error,
);

for chain_index in 0..guarded.network.num_chains() {
guarded.unassign_slot_and_ban(chain_index, peer_id.clone());
}
shared.wake_up_main_background_task.notify(1);
}
}
};
Expand Down Expand Up @@ -1271,15 +1287,36 @@ async fn update_round<TPlat: Platform>(

// TODO: doc
for chain_index in 0..shared.log_chain_names.len() {
let now = TPlat::now();

// Clean up the content of `slots_assign_backoff`.
// TODO: the background task should be woken up when the ban expires
// TODO: O(n)
guarded
.slots_assign_backoff
.retain(|_, expiration| *expiration > now);

loop {
let peer = guarded.network.assign_slots(chain_index);
if let Some(peer_id) = peer {
let peer_id = guarded
.network
.slots_to_assign(chain_index)
.filter(|peer_id| {
!guarded
.slots_assign_backoff
.contains_key(&((**peer_id).clone(), chain_index)) // TODO: spurious cloning
})
.next()
.cloned();

if let Some(peer_id) = peer_id {
log::debug!(
target: "connections",
"OutSlots({}) ∋ {}",
&shared.log_chain_names[chain_index],
peer_id
);

guarded.network.assign_out_slot(chain_index, peer_id);
} else {
break;
}
Expand Down Expand Up @@ -1337,6 +1374,23 @@ async fn update_round<TPlat: Platform>(
}
}

impl<TPlat: Platform> SharedGuarded<TPlat> {
fn unassign_slot_and_ban(&mut self, chain_index: usize, peer_id: PeerId) {
self.network.unassign_slot(chain_index, &peer_id);

let new_expiration = TPlat::now() + Duration::from_secs(20); // TODO: arbitrary constant
match self.slots_assign_backoff.entry((peer_id, chain_index)) {
hash_map::Entry::Occupied(e) if *e.get() < new_expiration => {
*e.into_mut() = new_expiration;
}
hash_map::Entry::Occupied(_) => {}
hash_map::Entry::Vacant(e) => {
e.insert(new_expiration);
}
}
}
}

/// Asynchronous task managing a specific connection, including the connection process and the
/// processing of the connection after it's been open.
async fn connection_task<TPlat: Platform>(
Expand Down
4 changes: 4 additions & 0 deletions bin/wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

### Changed

- In case of protocol error, or if a peer refuses a block announces substream, no new substream with the same peer will be attempted for 20 seconds. This avoids loops where the same peer is tried over and over again.

## 0.6.30 - 2022-08-12

### Fixed
Expand Down
80 changes: 43 additions & 37 deletions src/network/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use core::{
ops::{Add, Sub},
time::Duration,
};
use rand::{seq::SliceRandom as _, Rng as _, SeedableRng as _};
use rand::{Rng as _, SeedableRng as _};

pub use crate::libp2p::{
collection::ReadWrite,
Expand Down Expand Up @@ -2211,48 +2211,54 @@ where
self.inner.peers_list()
}

///
///
/// Returns the [`PeerId`] that now has an outbound slot.
// TODO: docs
// TODO: when to call this?
pub fn assign_slots(&mut self, chain_index: usize) -> Option<PeerId> {
let chain = &mut self.chains[chain_index];

let list = {
let mut list = chain.kbuckets.iter_ordered().collect::<Vec<_>>();
list.shuffle(&mut self.randomness);
list
};
// TODO: docs and appropriate naming
pub fn slots_to_assign(&'_ self, chain_index: usize) -> impl Iterator<Item = &'_ PeerId> + '_ {
let chain = &self.chains[chain_index];

for (peer_id, _) in list {
// Check if maximum number of slots is reached.
if chain.out_peers.len()
>= usize::try_from(chain.chain_config.out_slots).unwrap_or(usize::max_value())
{
break;
}
// Check if maximum number of slots is reached.
if chain.out_peers.len()
>= usize::try_from(chain.chain_config.out_slots).unwrap_or(usize::max_value())
{
return either::Right(iter::empty());
}

// Don't assign slots to peers that already have a slot.
if chain.out_peers.contains(peer_id) || chain.in_peers.contains(peer_id) {
continue;
}
// TODO: return in some specific order?
either::Left(
chain
.kbuckets
.iter_ordered()
.map(|(peer_id, _)| peer_id)
.filter(|peer_id| {
// Don't assign slots to peers that already have a slot.
!chain.out_peers.contains(peer_id) && !chain.in_peers.contains(peer_id)
}),
)
}

// It is now guaranteed that this peer will be assigned an outbound slot.
// TODO: docs
// TODO: when to call this?
pub fn assign_out_slot(&mut self, chain_index: usize, peer_id: PeerId) {
let chain = &mut self.chains[chain_index];

// The peer is marked as desired before inserting it in `out_peers`, to handle
// potential future cancellation issues.
self.inner.set_peer_notifications_out_desired(
peer_id,
chain_index * NOTIFICATIONS_PROTOCOLS_PER_CHAIN,
peers::DesiredState::DesiredReset, // TODO: ?
);
chain.out_peers.insert(peer_id.clone());
// Check if maximum number of slots is reached.
if chain.out_peers.len()
>= usize::try_from(chain.chain_config.out_slots).unwrap_or(usize::max_value())
{
return; // TODO: return error?
}

return Some(peer_id.clone());
// Don't assign slots to peers that already have a slot.
if chain.out_peers.contains(&peer_id) || chain.in_peers.contains(&peer_id) {
return; // TODO: return error?
}

None
self.inner.set_peer_notifications_out_desired(
&peer_id,
chain_index * NOTIFICATIONS_PROTOCOLS_PER_CHAIN,
peers::DesiredState::DesiredReset, // TODO: ?
);

chain.out_peers.insert(peer_id);
}

///
Expand Down Expand Up @@ -2324,7 +2330,7 @@ where
}

/// Removes the slot assignment of the given peer, if any.
fn unassign_slot(&mut self, chain_index: usize, peer_id: &PeerId) -> Option<SlotTy> {
pub fn unassign_slot(&mut self, chain_index: usize, peer_id: &PeerId) -> Option<SlotTy> {
self.inner.set_peer_notifications_out_desired(
peer_id,
chain_index * NOTIFICATIONS_PROTOCOLS_PER_CHAIN,
Expand Down

0 comments on commit af5e811

Please sign in to comment.