From 84815cfc763aa2f80d7a6057229e68a38fb277a7 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 12 Aug 2022 12:44:19 +0200 Subject: [PATCH 1/3] Make sure connected addresses are inserted when discovering --- src/libp2p/peers.rs | 68 ++++++++++++++++++++++++++++++-- src/network/service.rs | 45 +++++++++++++++++---- src/network/service/addresses.rs | 13 ++++++ 3 files changed, 115 insertions(+), 11 deletions(-) diff --git a/src/libp2p/peers.rs b/src/libp2p/peers.rs index a742667fe3..fe3b8afd1b 100644 --- a/src/libp2p/peers.rs +++ b/src/libp2p/peers.rs @@ -55,6 +55,7 @@ use alloc::{ }; use core::{ hash::Hash, + iter, num::NonZeroU32, ops::{self, Add, Sub}, time::Duration, @@ -62,10 +63,10 @@ use core::{ use rand::{Rng as _, SeedableRng as _}; pub use collection::{ - ConfigRequestResponse, ConfigRequestResponseIn, ConnectionId, ConnectionToCoordinator, - CoordinatorToConnection, InboundError, MultiStreamConnectionTask, NotificationProtocolConfig, - NotificationsInClosedErr, NotificationsOutErr, ReadWrite, RequestError, ShutdownCause, - SingleStreamConnectionTask, SubstreamId, + ConfigRequestResponse, ConfigRequestResponseIn, ConnectionId, ConnectionState, + ConnectionToCoordinator, CoordinatorToConnection, InboundError, MultiStreamConnectionTask, + NotificationProtocolConfig, NotificationsInClosedErr, NotificationsOutErr, ReadWrite, + RequestError, ShutdownCause, SingleStreamConnectionTask, SubstreamId, }; /// Configuration for a [`Peers`]. @@ -852,6 +853,65 @@ where (connection_id, connection_task) } + /// Returns all the non-handshaking connections that are connected to the given peer. The list + /// also includes connections that are shutting down. + pub fn established_peer_connections( + &'_ self, + peer_id: &PeerId, + ) -> impl Iterator + '_ { + let peer_index = match self.peer_indices.get(peer_id) { + Some(idx) => *idx, + None => return either::Right(iter::empty()), + }; + + either::Left( + self.connections_by_peer + .range( + (peer_index, ConnectionId::min_value()) + ..=(peer_index, ConnectionId::max_value()), + ) + .map(|(_, connection_id)| *connection_id) + .filter(move |connection_id| { + self.inner.connection_state(*connection_id).established + }), + ) + } + + /// Returns all the handshaking connections that are expected to reach the given peer. The + /// list also includes connections that are shutting down. + pub fn handshaking_peer_connections( + &'_ self, + peer_id: &PeerId, + ) -> impl Iterator + '_ { + let peer_index = match self.peer_indices.get(peer_id) { + Some(idx) => *idx, + None => return either::Right(iter::empty()), + }; + + either::Left( + self.connections_by_peer + .range( + (peer_index, ConnectionId::min_value()) + ..=(peer_index, ConnectionId::max_value()), + ) + .map(|(_, connection_id)| *connection_id) + .filter(move |connection_id| { + !self.inner.connection_state(*connection_id).established + }), + ) + } + + /// Returns the state of the given connection. + /// + /// # Panic + /// + /// Panics if the identifier is invalid or corresponds to a connection that has already + /// entirely shut down. + /// + pub fn connection_state(&self, connection_id: ConnectionId) -> ConnectionState { + self.inner.connection_state(connection_id) + } + /// Returns the list of [`PeerId`]s that have been marked as desired, but that don't have any /// associated connection. An associated connection is either a fully established connection /// with that peer, or an outgoing connection that is still handshaking but expects to reach diff --git a/src/network/service.rs b/src/network/service.rs index 60cfd5046f..365ceb94b8 100644 --- a/src/network/service.rs +++ b/src/network/service.rs @@ -847,6 +847,7 @@ where kademlia::kbuckets::Entry::LocalKey => return, // TODO: return some diagnostic? kademlia::kbuckets::Entry::Vacant(entry) => { match entry.insert((), now, kademlia::kbuckets::PeerState::Disconnected) { + Err(kademlia::kbuckets::InsertError::Full) => return, // TODO: return some diagnostic? Ok((_, removed_entry)) => { // `removed_entry` is the peer that was removed the k-buckets as the // result of the new insertion. Purge it from `self.kbuckets_peers` @@ -873,16 +874,46 @@ where NonZeroUsize::new(e.num_references.get() + 1).unwrap(); e } - // TODO: is it possible that we're already connected to this peer? since we call set_disconnected() when we disconnect, we would geta panic - hashbrown::hash_map::Entry::Vacant(e) => e.insert(KBucketsPeer { - num_references: NonZeroUsize::new(1).unwrap(), - addresses: addresses::Addresses::with_capacity( + hashbrown::hash_map::Entry::Vacant(e) => { + // The peer was not in the k-buckets, but it is possible that + // we already have existing connections to it. + let mut addresses = addresses::Addresses::with_capacity( self.max_addresses_per_peer.get(), - ), - }), + ); + + for connection_id in + self.inner.established_peer_connections(&e.key()) + { + let state = self.inner.connection_state(connection_id); + debug_assert!(state.established); + if state.shutting_down { + continue; + } + addresses + .insert_discovered(self.inner[connection_id].clone()); + addresses.set_connected(&self.inner[connection_id]); + } + + for connection_id in + self.inner.handshaking_peer_connections(&e.key()) + { + let state = self.inner.connection_state(connection_id); + debug_assert!(!state.established); + if state.shutting_down { + continue; + } + addresses + .insert_discovered(self.inner[connection_id].clone()); + addresses.set_pending(&self.inner[connection_id]); + } + + e.insert(KBucketsPeer { + num_references: NonZeroUsize::new(1).unwrap(), + addresses, + }) + } } } - Err(kademlia::kbuckets::InsertError::Full) => return, // TODO: return some diagnostic? } } kademlia::kbuckets::Entry::Occupied(_) => { diff --git a/src/network/service/addresses.rs b/src/network/service/addresses.rs index d2ccda46f5..04416b0d51 100644 --- a/src/network/service/addresses.rs +++ b/src/network/service/addresses.rs @@ -93,6 +93,19 @@ impl Addresses { } } + /// If the given address is in the list, sets its state to "pending". + /// + /// # Panic + /// + /// Panics if the state of this address was already pending. + /// + pub(super) fn set_pending(&mut self, addr: &multiaddr::Multiaddr) { + if let Some(index) = self.list.iter().position(|(a, _)| a == addr) { + assert!(!matches!(self.list[index].1, State::PendingConnect)); + self.list[index].1 = State::PendingConnect; + } + } + /// If the given address is in the list, sets its state to "disconnected". /// /// # Panic From 66a05672bf11ab84d157c45e79d1acd76fd69c39 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 12 Aug 2022 12:45:45 +0200 Subject: [PATCH 2/3] CHANGELOG --- bin/wasm-node/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/bin/wasm-node/CHANGELOG.md b/bin/wasm-node/CHANGELOG.md index 789b4d50fa..9ddb3ca5e1 100644 --- a/bin/wasm-node/CHANGELOG.md +++ b/bin/wasm-node/CHANGELOG.md @@ -4,6 +4,7 @@ ### Fixed +- Fix panic that occured when connecting to a peer, then discovering it through the background discovery process, then disconnecting from it. ([#2616](https://github.com/paritytech/smoldot/pull/2616)) - Fix circular dependency between JavaScript modules. ([#2614](https://github.com/paritytech/smoldot/pull/2614)) ## 0.6.29 - 2022-08-09 From 11075f9643dcbad601bc2ad8a2ec4ff6c0d8b173 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 12 Aug 2022 12:46:36 +0200 Subject: [PATCH 3/3] Add comments --- src/network/service.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/network/service.rs b/src/network/service.rs index 365ceb94b8..d7c33f9b97 100644 --- a/src/network/service.rs +++ b/src/network/service.rs @@ -886,6 +886,9 @@ where { let state = self.inner.connection_state(connection_id); debug_assert!(state.established); + // Because we mark addresses as disconnected when the + // shutdown process starts, we ignore shutting down + // connections. if state.shutting_down { continue; } @@ -899,6 +902,9 @@ where { let state = self.inner.connection_state(connection_id); debug_assert!(!state.established); + // Because we mark addresses as disconnected when the + // shutdown process starts, we ignore shutting down + // connections. if state.shutting_down { continue; }