diff --git a/lib/src/network/service.rs b/lib/src/network/service.rs index 59bebdd3f5..6056285fc5 100644 --- a/lib/src/network/service.rs +++ b/lib/src/network/service.rs @@ -84,7 +84,7 @@ use alloc::{borrow::ToOwned as _, collections::BTreeSet, string::String, vec::Ve use core::{ fmt, hash::Hash, - iter, mem, + iter, ops::{self, Add, Sub}, time::Duration, }; @@ -171,37 +171,10 @@ pub struct ChainNetwork { /// Underlying data structure. inner: collection::Network, TNow>, - /// List of all known network identities. The `usize` in [`PeerIndex`] refers to an index - /// within this list. - // TODO: shrink to fit from time to time - peers: slab::Slab, - - /// Same entries as [`ChainNetwork::peers`], by indexed differently. - // TODO: shrink to fit from time to time - peers_by_peer_id: hashbrown::HashMap, - /// List of all chains that have been added. // TODO: shrink to fit from time to time chains: slab::Slab>, - /// All the substreams of [`ChainNetwork::inner`], with info attached to them. - // TODO: add a substream user data to `collection::Network` instead - // TODO: shrink to fit from time to time - substreams: hashbrown::HashMap, - - /// Connections indexed by the value in [`ConnectionInfo::peer_index`]. - connections_by_peer_id: BTreeSet<(PeerIndex, collection::ConnectionId)>, - - /// All the outbound notification substreams, indexed by protocol, `PeerId`, and state. - // TODO: unclear whether PeerId should come before or after the state, same for direction/state - notification_substreams_by_peer_id: BTreeSet<( - NotificationsProtocol, - PeerIndex, - SubstreamDirection, - NotificationsSubstreamState, - collection::SubstreamId, - )>, - /// Chains indexed by genesis hash and fork ID. /// /// Contains the same number of entries as [`ChainNetwork::chains`]. The values are `usize`s @@ -210,6 +183,18 @@ pub struct ChainNetwork { chains_by_protocol_info: hashbrown::HashMap<([u8; 32], Option), usize, fnv::FnvBuildHasher>, + /// List of all known network identities. The `usize` in [`PeerIndex`] refers to an index + /// within this list. + // TODO: shrink to fit from time to time + peers: slab::Slab, + + /// Same entries as [`ChainNetwork::peers`], by indexed differently. + // TODO: shrink to fit from time to time + peers_by_peer_id: hashbrown::HashMap, + + /// Connections indexed by the value in [`ConnectionInfo::peer_index`]. + connections_by_peer_id: BTreeSet<(PeerIndex, collection::ConnectionId)>, + /// List of peers that have been marked as desired. Can include peers not connected to the /// local node yet. gossip_desired_peers_by_chain: BTreeSet<(usize, GossipKind, PeerIndex)>, @@ -233,6 +218,21 @@ pub struct ChainNetwork { // TODO: shrink to fit from time to time opened_gossip_undesired: hashbrown::HashSet<(ChainId, PeerIndex, GossipKind), fnv::FnvBuildHasher>, + + /// All the substreams of [`ChainNetwork::inner`], with info attached to them. + // TODO: add a substream user data to `collection::Network` instead + // TODO: shrink to fit from time to time + substreams: hashbrown::HashMap, + + /// All the outbound notification substreams, indexed by protocol, `PeerId`, and state. + // TODO: unclear whether PeerId should come before or after the state, same for direction/state + notification_substreams_by_peer_id: BTreeSet<( + NotificationsProtocol, + PeerIndex, + SubstreamDirection, + NotificationsSubstreamState, + collection::SubstreamId, + )>, } #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] @@ -296,9 +296,7 @@ struct SubstreamInfo { enum Protocol { Identify, Ping, - BlockAnnounces { chain_index: usize }, - Transactions { chain_index: usize }, - Grandpa { chain_index: usize }, + Notifications(NotificationsProtocol), Sync { chain_index: usize }, LightUnknown { chain_index: usize }, LightStorage { chain_index: usize }, @@ -315,41 +313,26 @@ enum NotificationsProtocol { Grandpa { chain_index: usize }, } -impl TryFrom for NotificationsProtocol { - type Error = (); - - fn try_from(value: Protocol) -> Result { - match value { - Protocol::BlockAnnounces { chain_index } => { - Ok(NotificationsProtocol::BlockAnnounces { chain_index }) - } - Protocol::Transactions { chain_index } => { - Ok(NotificationsProtocol::Transactions { chain_index }) - } - Protocol::Grandpa { chain_index } => Ok(NotificationsProtocol::Grandpa { chain_index }), - Protocol::Identify => Err(()), - Protocol::Ping => Err(()), - Protocol::Sync { .. } => Err(()), - Protocol::LightUnknown { .. } => Err(()), - Protocol::LightStorage { .. } => Err(()), - Protocol::LightCall { .. } => Err(()), - Protocol::Kad { .. } => Err(()), - Protocol::SyncWarp { .. } => Err(()), - Protocol::State { .. } => Err(()), - } - } -} - #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] enum SubstreamDirection { In, Out, } +impl SubstreamDirection { + fn min_value() -> Self { + SubstreamDirection::In + } + + fn max_value() -> Self { + SubstreamDirection::Out + } +} + #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] enum NotificationsSubstreamState { Pending, - Open, + Open { asked_to_leave: bool }, } impl NotificationsSubstreamState { @@ -358,7 +341,21 @@ impl NotificationsSubstreamState { } fn max_value() -> Self { - NotificationsSubstreamState::Open + NotificationsSubstreamState::Open { + asked_to_leave: true, + } + } + + fn open_min_value() -> Self { + NotificationsSubstreamState::Open { + asked_to_leave: false, + } + } + + fn open_max_value() -> Self { + NotificationsSubstreamState::Open { + asked_to_leave: true, + } } } @@ -490,20 +487,20 @@ where protocol, PeerIndex(usize::min_value()), SubstreamDirection::Out, - NotificationsSubstreamState::Pending, + NotificationsSubstreamState::open_min_value(), SubstreamId::min_value(), ) ..=( protocol, PeerIndex(usize::max_value()), SubstreamDirection::Out, - NotificationsSubstreamState::Open, + NotificationsSubstreamState::open_max_value(), SubstreamId::max_value(), ), ) .find(|(_, _, direction, state, _)| { matches!(*direction, SubstreamDirection::Out) - && matches!(*state, NotificationsSubstreamState::Open) + && matches!(*state, NotificationsSubstreamState::Open { .. }) }) .is_some() { @@ -526,11 +523,10 @@ where PeerIndex(usize::max_value()), ), ) - // TODO: optimize to not Clone? is that possible? - .map(|(_, _, peer_index)| self.peers[peer_index.0].clone()) + .map(|(_, _, peer_index)| *peer_index) .collect::>(); for desired in desired { - self.gossip_remove_desired(chain_id, &desired, GossipKind::ConsensusTransactions); + self.gossip_remove_desired_inner(chain_id, desired, GossipKind::ConsensusTransactions); } // Close any notifications substream of the chain. @@ -551,15 +547,15 @@ where ( protocol, PeerIndex(usize::min_value()), - SubstreamDirection::In, - NotificationsSubstreamState::Pending, + SubstreamDirection::min_value(), + NotificationsSubstreamState::min_value(), SubstreamId::min_value(), ) ..=( protocol, PeerIndex(usize::max_value()), - SubstreamDirection::Out, - NotificationsSubstreamState::Open, + SubstreamDirection::max_value(), + NotificationsSubstreamState::max_value(), SubstreamId::max_value(), ), ) @@ -580,7 +576,20 @@ where let _was_in = self.substreams.remove(&substream_id); debug_assert!(_was_in.is_some()); } - (SubstreamDirection::In, NotificationsSubstreamState::Open) => { + ( + SubstreamDirection::In, + NotificationsSubstreamState::Open { + asked_to_leave: true, + }, + ) => { + self.substreams.get_mut(&substream_id).unwrap().protocol = None; + } + ( + SubstreamDirection::In, + NotificationsSubstreamState::Open { + asked_to_leave: false, + }, + ) => { self.inner .start_close_in_notifications(substream_id, Duration::from_secs(5)); // TODO: arbitrary timeout ^ @@ -599,9 +608,13 @@ where // TODO: how to do request-responses in non-O(n) time? for (_, substream) in &mut self.substreams { match substream.protocol { - Some(Protocol::BlockAnnounces { chain_index }) - | Some(Protocol::Transactions { chain_index }) - | Some(Protocol::Grandpa { chain_index }) + Some(Protocol::Notifications(NotificationsProtocol::BlockAnnounces { + chain_index, + })) + | Some(Protocol::Notifications(NotificationsProtocol::Transactions { + chain_index, + })) + | Some(Protocol::Notifications(NotificationsProtocol::Grandpa { chain_index })) | Some(Protocol::Sync { chain_index }) | Some(Protocol::LightUnknown { chain_index }) | Some(Protocol::LightStorage { chain_index }) @@ -791,6 +804,15 @@ where return false; }; + self.gossip_remove_desired_inner(chain_id, peer_index, kind) + } + + fn gossip_remove_desired_inner( + &mut self, + chain_id: ChainId, + peer_index: PeerIndex, + kind: GossipKind, + ) -> bool { if !self .gossip_desired_peers_by_chain .remove(&(chain_id.0, kind, peer_index)) @@ -890,18 +912,33 @@ where self.try_clean_up_peer(peer_index); } - /// Returns the number of gossip-desired peers for the given chain. + /// Returns the list of gossip-desired peers for the given chain, in no specific order. /// /// # Panic /// /// Panics if the given [`ChainId`] is invalid. /// - pub fn gossip_desired_num(&mut self, chain_id: ChainId, kind: GossipKind) -> usize { - // TODO: O(n), optimize + pub fn gossip_desired_iter( + &self, + chain_id: ChainId, + kind: GossipKind, + ) -> impl Iterator + '_ { self.gossip_desired_peers_by_chain - .iter() - .filter(|(c, k, _)| *c == chain_id.0 && *k == kind) - .count() + .range( + (chain_id.0, kind, PeerIndex(usize::min_value())) + ..=(chain_id.0, kind, PeerIndex(usize::max_value())), + ) + .map(|(_, _, peer_index)| &self.peers[peer_index.0]) + } + + /// Returns the number of gossip-desired peers for the given chain. + /// + /// # Panic + /// + /// Panics if the given [`ChainId`] is invalid. + /// + pub fn gossip_desired_num(&self, chain_id: ChainId, kind: GossipKind) -> usize { + self.gossip_desired_iter(chain_id, kind).count() } /// Returns the list of [`PeerId`]s that are desired (for any chain) but for which no @@ -1132,27 +1169,35 @@ where id, peer_id: actual_peer_id, } => { - // Store the actual `PeerId` into the connection, making sure to update `self`. + // A handshaking connection has finished its handshake. We must update `self` + // and return an event. + // What makes this a bit complicated is the possibility that the actual PeerId + // might not be the same as the one that was expected. + let actual_peer_index = self.peer_index_or_insert(actual_peer_id); - let connection_info = &mut self.inner[id]; - let expected_peer_id = connection_info - .peer_index - .map(|idx| self.peers[idx.0].clone()); - match &mut connection_info.peer_index { - Some(expected_peer_index) if *expected_peer_index == actual_peer_index => {} - peer_index_refmut @ None => { + let expected_peer_index = self.inner[id].peer_index; + + match expected_peer_index { + None => { + // The connection didn't have any expected PeerId. + self.inner[id].peer_index = Some(actual_peer_index); self.unconnected_desired.remove(&actual_peer_index); - *peer_index_refmut = Some(actual_peer_index); } - Some(peer_index_refmut) => { + Some(expected_peer_index) if expected_peer_index != actual_peer_index => { // The actual PeerId doesn't match the expected PeerId. - let expected_peer_index = - mem::replace(peer_index_refmut, actual_peer_index); + // Update the connection owner. + self.inner[id].peer_index = Some(actual_peer_index); let _was_removed = self .connections_by_peer_id .remove(&(expected_peer_index, id)); debug_assert!(_was_removed); + let _was_inserted = + self.connections_by_peer_id.insert((actual_peer_index, id)); + debug_assert!(_was_inserted); + + // Update `unconnected_desired`. + self.unconnected_desired.remove(&actual_peer_index); if self .gossip_desired_peers .range( @@ -1184,19 +1229,17 @@ where self.unconnected_desired.insert(expected_peer_index); debug_assert!(_was_inserted); } - let _was_inserted = - self.connections_by_peer_id.insert((actual_peer_index, id)); - debug_assert!(_was_inserted); - self.unconnected_desired.remove(&actual_peer_index); - - self.try_clean_up_peer(expected_peer_index); + } + _ => { + // Expected matches actual. + debug_assert_eq!(expected_peer_index, Some(actual_peer_index)); } } - debug_assert!(!self.unconnected_desired.contains(&actual_peer_index)); - // TODO: limit the number of connections per peer? + // Insert the new connection in `self.connected_unopened_gossip_desired` + // if relevant. for (_, _, chain_id) in self.gossip_desired_peers.range( ( actual_peer_index, @@ -1242,6 +1285,18 @@ where } } + // Try to clean up the expected peer index. + // This is done at the very end so that `self` is in a coherent state. + let expected_peer_id = expected_peer_index.map(|idx| self.peers[idx.0].clone()); + if let Some(expected_peer_index) = expected_peer_index { + if expected_peer_index != actual_peer_index { + self.try_clean_up_peer(expected_peer_index); + } + } + + // Small sanity check. + debug_assert!(!self.unconnected_desired.contains(&actual_peer_index)); + return Some(Event::HandshakeFinished { id, expected_peer_id, @@ -1251,21 +1306,68 @@ where collection::Event::PingOutFailed { id } | collection::Event::StartShutdown { id, .. } => { + // A connection has started its shutdown, or must start its shutdown. + + // We handle a ping failure the same way as a shutdown start, as we react to + // a ping failure by starting the shutdown of the connection. Thus, the clean + // up phase is the same in both cases. if let collection::Event::PingOutFailed { .. } = inner_event { self.inner.start_shutdown(id); } // TODO: IMPORTANT this event should be turned into `NewOutboundSubstreamsForbidden` and the `reason` removed; see - let connection_info = &self.inner[id]; + // Nothing more to do if the connection is inbound and handshaking. + let Some(peer_index) = self.inner[id].peer_index else { + debug_assert!(!self.inner.connection_state(id).established); + continue; + }; // If peer is desired, and we have no connection or only shutting down // connections, add peer to `unconnected_desired` and remove it from // `connected_unopened_gossip_desired`. - if let Some(peer_index) = connection_info.peer_index { - if self - .gossip_desired_peers + if self + .gossip_desired_peers + .range( + ( + peer_index, + GossipKind::ConsensusTransactions, + usize::min_value(), + ) + ..=( + peer_index, + GossipKind::ConsensusTransactions, + usize::max_value(), + ), + ) + .count() + != 0 + { + if !self + .connections_by_peer_id + .range( + (peer_index, ConnectionId::min_value()) + ..=(peer_index, ConnectionId::max_value()), + ) + .any(|(_, connection_id)| { + let state = self.inner.connection_state(*connection_id); + !state.shutting_down + }) + { + self.unconnected_desired.insert(peer_index); + } + if !self + .connections_by_peer_id .range( + (peer_index, ConnectionId::min_value()) + ..=(peer_index, ConnectionId::max_value()), + ) + .any(|(_, connection_id)| { + let state = self.inner.connection_state(*connection_id); + state.established && !state.shutting_down + }) + { + for (_, _, chain_index) in self.gossip_desired_peers.range( ( peer_index, GossipKind::ConsensusTransactions, @@ -1276,52 +1378,12 @@ where GossipKind::ConsensusTransactions, usize::max_value(), ), - ) - .count() - != 0 - { - if !self - .connections_by_peer_id - .range( - (peer_index, ConnectionId::min_value()) - ..=(peer_index, ConnectionId::max_value()), - ) - .any(|(_, connection_id)| { - let state = self.inner.connection_state(*connection_id); - !state.shutting_down - }) - { - self.unconnected_desired.insert(peer_index); - } - if !self - .connections_by_peer_id - .range( - (peer_index, ConnectionId::min_value()) - ..=(peer_index, ConnectionId::max_value()), - ) - .any(|(_, connection_id)| { - let state = self.inner.connection_state(*connection_id); - state.established && !state.shutting_down - }) - { - for (_, _, chain_index) in self.gossip_desired_peers.range( - ( - peer_index, - GossipKind::ConsensusTransactions, - usize::min_value(), - ) - ..=( - peer_index, - GossipKind::ConsensusTransactions, - usize::max_value(), - ), - ) { - self.connected_unopened_gossip_desired.remove(&( - peer_index, - ChainId(*chain_index), - GossipKind::ConsensusTransactions, - )); - } + ) { + self.connected_unopened_gossip_desired.remove(&( + peer_index, + ChainId(*chain_index), + GossipKind::ConsensusTransactions, + )); } } } @@ -1332,9 +1394,9 @@ where was_established, user_data: connection_info, } => { - // A connection has been closed. - // Note that the underlying state machine guarantees that all the substreams - // have been closed beforehand through other events. + // A connection has been completely closed. + // The underlying state machine guarantees that all the substreams have been + // closed beforehand through other events. debug_assert!(connection_info.peer_index.is_some() || !was_established); @@ -1369,7 +1431,6 @@ where collection::Event::InboundError { .. } => { // TODO: report the error for diagnostic purposes, but revisit the concept of "InboundError" - continue; } collection::Event::InboundNegotiated { @@ -1377,82 +1438,66 @@ where substream_id, protocol_name, } => { - // An inbound substream has negotiated a protocol. We must decide whether to - // accept this protocol or instead reject the substream. + // An inbound substream opened by the remote would like to negotiate the given + // protocol. We must decide whether to accept this protocol or instead reject + // the substream. // If accepted, we must also save the protocol somewhere in `self` in order to // load it later once things happen on this substream. - match self.recognize_protocol(&protocol_name) { - Ok(protocol) => { - let inbound_type = match protocol { - Protocol::Identify => collection::InboundTy::Request { - request_max_size: None, - }, - Protocol::Ping => collection::InboundTy::Ping, - Protocol::BlockAnnounces { .. } => { - collection::InboundTy::Notifications { - max_handshake_size: 1024 * 1024, // TODO: arbitrary - } - } - Protocol::Transactions { .. } => { - collection::InboundTy::Notifications { - max_handshake_size: 4, - } - } - Protocol::Grandpa { chain_index } - if self.chains[chain_index] - .grandpa_protocol_config - .is_some() => - { - collection::InboundTy::Notifications { - max_handshake_size: 4, - } - } - Protocol::Grandpa { .. } => { - self.inner.reject_inbound(substream_id); - continue; - } - Protocol::Sync { chain_index } - if self.chains[chain_index].allow_inbound_block_requests => - { - collection::InboundTy::Request { - request_max_size: Some(1024), - } - } - Protocol::Sync { .. } => { - self.inner.reject_inbound(substream_id); - continue; - } - - // TODO: protocols that are not supported - Protocol::LightUnknown { .. } - | Protocol::Kad { .. } - | Protocol::SyncWarp { .. } - | Protocol::State { .. } => { - self.inner.reject_inbound(substream_id); - continue; - } - - Protocol::LightStorage { .. } | Protocol::LightCall { .. } => { - unreachable!() - } - }; - - self.inner.accept_inbound(substream_id, inbound_type); + let Ok(protocol) = self.recognize_protocol(&protocol_name) else { + self.inner.reject_inbound(substream_id); + continue; + }; - let _prev_value = self.substreams.insert( - substream_id, - SubstreamInfo { - connection_id: id, - protocol: Some(protocol), - }, - ); - debug_assert!(_prev_value.is_none()); + let inbound_type = match protocol { + Protocol::Identify => collection::InboundTy::Request { + request_max_size: None, + }, + Protocol::Ping => collection::InboundTy::Ping, + Protocol::Notifications(NotificationsProtocol::Grandpa { chain_index }) + if self.chains[chain_index].grandpa_protocol_config.is_none() => + { + self.inner.reject_inbound(substream_id); + continue; + } + Protocol::Notifications(p) => collection::InboundTy::Notifications { + max_handshake_size: self.notifications_protocol_max_handshake_size(p), + }, + Protocol::Sync { chain_index } + if self.chains[chain_index].allow_inbound_block_requests => + { + collection::InboundTy::Request { + request_max_size: Some(1024), + } } - Err(()) => { + Protocol::Sync { .. } => { self.inner.reject_inbound(substream_id); + continue; } - } - continue; + + // TODO: the protocols below are not supported yet + Protocol::LightUnknown { .. } + | Protocol::Kad { .. } + | Protocol::SyncWarp { .. } + | Protocol::State { .. } => { + self.inner.reject_inbound(substream_id); + continue; + } + + Protocol::LightStorage { .. } | Protocol::LightCall { .. } => { + unreachable!() + } + }; + + self.inner.accept_inbound(substream_id, inbound_type); + + let _prev_value = self.substreams.insert( + substream_id, + SubstreamInfo { + connection_id: id, + protocol: Some(protocol), + }, + ); + debug_assert!(_prev_value.is_none()); } collection::Event::InboundNegotiatedCancel { .. } => { @@ -1464,10 +1509,9 @@ where collection::Event::InboundAcceptedCancel { substream_id } => { // An inbound substream has been aborted after having been accepted. // Since we don't report any event to the API user when a substream is - // accepted, we have nothing to do but clean up our state. + // accepted, we have nothing to do but clean up our local state. let _was_in = self.substreams.remove(&substream_id); debug_assert!(_was_in.is_some()); - continue; } collection::Event::Response { @@ -1570,10 +1614,7 @@ where ), // The protocols below aren't request-response protocols. - Some(Protocol::Ping) - | Some(Protocol::BlockAnnounces { .. }) - | Some(Protocol::Transactions { .. }) - | Some(Protocol::Grandpa { .. }) => unreachable!(), + Some(Protocol::Ping) | Some(Protocol::Notifications(_)) => unreachable!(), }; return Some(Event::RequestResult { @@ -1586,15 +1627,12 @@ where substream_id, request_payload, } => { - // Received a request on a connection. + // Received a request on a request-response protocol. let substream_info = self .substreams .get(&substream_id) .unwrap_or_else(|| unreachable!()); - let connection_info = &self.inner[substream_info.connection_id]; - // Requests can only happen on connections after their handshake phase is - // finished, therefore their `PeerId` is known. - let peer_id = self.peers[connection_info + let peer_id = self.peers[self.inner[substream_info.connection_id] .peer_index .as_ref() .unwrap_or_else(|| unreachable!()) @@ -1652,16 +1690,20 @@ where } collection::Event::RequestInCancel { substream_id } => { + // The remote has cancelled a previously-emitted request. Propagate this event + // to the API user. let _was_in = self.substreams.remove(&substream_id); debug_assert!(_was_in.is_some()); return Some(Event::RequestInCancel { substream_id }); } + // TODO: this whole block of code is too complex collection::Event::NotificationsOutResult { substream_id, result, } => { // Outgoing notifications substream has finished opening. + // TODO: this if is pretty hacky let substream_info = if result.is_ok() { self.substreams .get(&substream_id) @@ -1670,21 +1712,15 @@ where } else { self.substreams.remove(&substream_id).unwrap() }; - let connection_id = substream_info.connection_id; - let connection_info = &self.inner[connection_id]; - // Notification substreams can only happen on connections after their - // handshake phase is finished, therefore their `PeerId` is known. - let peer_index = *connection_info + let peer_index = *self.inner[connection_id] .peer_index .as_ref() .unwrap_or_else(|| unreachable!()); - - // All outgoing substream attempts are cancelled when a chain is removed, as - // such `protocol` can't be `None`. - let Some(Ok(substream_protocol)) = - substream_info.protocol.map(NotificationsProtocol::try_from) + let Some(Protocol::Notifications(substream_protocol)) = substream_info.protocol else { + // All outgoing substream attempts are cancelled when a chain is removed, + // as such `protocol` can't be `None`. unreachable!(); }; @@ -1700,6 +1736,7 @@ where // The behaviour is very specific to the protocol. match substream_protocol { NotificationsProtocol::BlockAnnounces { chain_index } => { + // Parse the handshake to check whether it's correct. let result = match &result { Ok(handshake) => { match codec::decode_block_announces_handshake( @@ -1732,84 +1769,38 @@ where NotificationsProtocol::BlockAnnounces { chain_index }, peer_index, SubstreamDirection::Out, - NotificationsSubstreamState::Open, + NotificationsSubstreamState::Open { + asked_to_leave: false, + }, substream_id, )); debug_assert!(_was_inserted); - if self - .notification_substreams_by_peer_id - .range( - ( - NotificationsProtocol::Transactions { chain_index }, - peer_index, - SubstreamDirection::Out, - NotificationsSubstreamState::min_value(), - SubstreamId::min_value(), - ) - ..=( - NotificationsProtocol::Transactions { - chain_index, - }, - peer_index, - SubstreamDirection::Out, - NotificationsSubstreamState::max_value(), - SubstreamId::max_value(), - ), - ) - .next() - .is_none() - { - let new_substream_id = self.inner.open_out_notifications( - connection_id, - codec::encode_protocol_name_string( - codec::ProtocolName::Transactions { - genesis_hash: self.chains[chain_index] - .genesis_hash, - fork_id: self.chains[chain_index] - .fork_id - .as_deref(), - }, - ), - Duration::from_secs(10), // TODO: arbitrary - Vec::new(), - 128, // TODO: arbitrary - ); - - self.substreams.insert( - new_substream_id, - SubstreamInfo { - connection_id, - protocol: Some(Protocol::Transactions { + for other_protocol in + iter::once(NotificationsProtocol::Transactions { + chain_index, + }) + .chain( + self.chains[chain_index] + .grandpa_protocol_config + .is_some() + .then(|| NotificationsProtocol::Grandpa { chain_index, }), - }, - ); - - self.notification_substreams_by_peer_id.insert(( - NotificationsProtocol::Transactions { chain_index }, - peer_index, - SubstreamDirection::Out, - NotificationsSubstreamState::Pending, - new_substream_id, - )); - } - - if self.chains[chain_index].grandpa_protocol_config.is_some() - && self + ) + { + if self .notification_substreams_by_peer_id .range( ( - NotificationsProtocol::Grandpa { chain_index }, + other_protocol, peer_index, SubstreamDirection::Out, NotificationsSubstreamState::min_value(), SubstreamId::min_value(), ) ..=( - NotificationsProtocol::Grandpa { - chain_index, - }, + other_protocol, peer_index, SubstreamDirection::Out, NotificationsSubstreamState::max_value(), @@ -1817,34 +1808,57 @@ where ), ) .next() - .is_none() - { + .is_some() + { + continue; + } + let new_substream_id = self.inner.open_out_notifications( connection_id, codec::encode_protocol_name_string( - codec::ProtocolName::Grandpa { - genesis_hash: self.chains[chain_index] - .genesis_hash, - fork_id: self.chains[chain_index] - .fork_id - .as_deref(), + match other_protocol { + NotificationsProtocol::Transactions { + chain_index, + } => codec::ProtocolName::Transactions { + genesis_hash: self.chains[chain_index] + .genesis_hash, + fork_id: self.chains[chain_index] + .fork_id + .as_deref(), + }, + NotificationsProtocol::Grandpa { + chain_index, + } => codec::ProtocolName::Grandpa { + genesis_hash: self.chains[chain_index] + .genesis_hash, + fork_id: self.chains[chain_index] + .fork_id + .as_deref(), + }, + _ => unreachable!(), }, ), - Duration::from_secs(10), // TODO: arbitrary - self.chains[chain_index].role.scale_encoding().to_vec(), - 1024 * 1024, // TODO: arbitrary + self.notifications_protocol_handshake_timeout( + other_protocol, + ), + self.notifications_protocol_handshake(other_protocol), + self.notifications_protocol_max_handshake_size( + other_protocol, + ), ); self.substreams.insert( new_substream_id, SubstreamInfo { connection_id, - protocol: Some(Protocol::Grandpa { chain_index }), + protocol: Some(Protocol::Notifications( + other_protocol, + )), }, ); self.notification_substreams_by_peer_id.insert(( - NotificationsProtocol::Grandpa { chain_index }, + other_protocol, peer_index, SubstreamDirection::Out, NotificationsSubstreamState::Pending, @@ -1887,7 +1901,7 @@ where }, peer_index, SubstreamDirection::Out, - NotificationsSubstreamState::Open, + NotificationsSubstreamState::open_min_value(), SubstreamId::min_value(), ) ..=( @@ -1896,7 +1910,8 @@ where }, peer_index, SubstreamDirection::Out, - NotificationsSubstreamState::Open, + NotificationsSubstreamState::open_max_value( + ), SubstreamId::max_value(), ), ) @@ -1916,6 +1931,7 @@ where GossipKind::ConsensusTransactions, )); + // TODO: pretty hacky if let GossipConnectError::HandshakeDecode(_) | GossipConnectError::GenesisMismatch { .. } = error { @@ -1923,58 +1939,121 @@ where self.substreams.remove(&substream_id).unwrap(); } - for substream_id in self - .notification_substreams_by_peer_id - .range( - ( - NotificationsProtocol::Transactions { chain_index }, - peer_index, - SubstreamDirection::Out, - NotificationsSubstreamState::min_value(), - SubstreamId::min_value(), - ) - ..=( - NotificationsProtocol::Transactions { - chain_index, - }, + // Close all the notification substreams of that chain. + for other_protocol in [ + NotificationsProtocol::BlockAnnounces { chain_index }, + NotificationsProtocol::Transactions { chain_index }, + NotificationsProtocol::Grandpa { chain_index }, + ] { + for (substream_id, direction, state) in self + .notification_substreams_by_peer_id + .range( + ( + other_protocol, peer_index, - SubstreamDirection::Out, - NotificationsSubstreamState::max_value(), - SubstreamId::max_value(), - ), - ) - .map(|(_, _, _, _, s)| *s) - .collect::>() - { - self.inner.close_out_notifications(substream_id); - } - - for substream_id in self - .notification_substreams_by_peer_id - .range( - ( - NotificationsProtocol::Grandpa { chain_index }, - peer_index, - SubstreamDirection::Out, - NotificationsSubstreamState::min_value(), - SubstreamId::min_value(), + SubstreamDirection::min_value(), + NotificationsSubstreamState::min_value(), + SubstreamId::min_value(), + ) + ..=( + other_protocol, + peer_index, + SubstreamDirection::max_value(), + NotificationsSubstreamState::max_value(), + SubstreamId::max_value(), + ), ) - ..=( - NotificationsProtocol::Grandpa { chain_index }, - peer_index, - SubstreamDirection::Out, - NotificationsSubstreamState::max_value(), - SubstreamId::max_value(), - ), - ) - .map(|(_, _, _, _, s)| *s) - .collect::>() - { - self.inner.close_out_notifications(substream_id); + .map(|(_, _, dir, state, s)| (*s, *dir, *state)) + .collect::>() + { + match (direction, state) { + (SubstreamDirection::Out, _) => { + self.inner + .close_out_notifications(substream_id); + + let _was_in = self + .notification_substreams_by_peer_id + .remove(&( + other_protocol, + peer_index, + direction, + state, + substream_id, + )); + debug_assert!(_was_in); + let _was_in = + self.substreams.remove(&substream_id); + debug_assert!(_was_in.is_some()); + } + ( + SubstreamDirection::In, + NotificationsSubstreamState::Pending, + ) => { + self.inner + .reject_in_notifications(substream_id); + + let _was_in = self + .notification_substreams_by_peer_id + .remove(&( + other_protocol, + peer_index, + direction, + state, + substream_id, + )); + debug_assert!(_was_in); + let _was_in = + self.substreams.remove(&substream_id); + debug_assert!(_was_in.is_some()); + } + ( + SubstreamDirection::In, + NotificationsSubstreamState::Open { + asked_to_leave: false, + }, + ) => { + self.inner.start_close_in_notifications( + substream_id, + Duration::from_secs(5), + ); // TODO: arbitrary constant + + let _was_removed = self + .notification_substreams_by_peer_id + .remove(&( + other_protocol, + peer_index, + SubstreamDirection::In, + NotificationsSubstreamState::Open { + asked_to_leave: false, + }, + substream_id, + )); + debug_assert!(_was_removed); + let _was_inserted = self + .notification_substreams_by_peer_id + .insert(( + other_protocol, + peer_index, + SubstreamDirection::In, + NotificationsSubstreamState::Open { + asked_to_leave: true, + }, + substream_id, + )); + debug_assert!(_was_inserted); + } + ( + SubstreamDirection::In, + NotificationsSubstreamState::Open { + asked_to_leave: true, + }, + ) => { + // Nothing to do. + } + } + } } - // TODO: also close the ingoing ba+tx+gp substreams - return Some(Event::GossipOpenFailed { peer_id: self.peers[peer_index.0].clone(), chain_id: ChainId(chain_index), @@ -1987,6 +2066,8 @@ where NotificationsProtocol::Transactions { chain_index } | NotificationsProtocol::Grandpa { chain_index } => { + // TODO: doesn't check the handshakes + // This can only happen if we have a block announces substream with // that peer, otherwise the substream opening attempt should have // been cancelled. @@ -1997,14 +2078,14 @@ where NotificationsProtocol::BlockAnnounces { chain_index }, peer_index, SubstreamDirection::Out, - NotificationsSubstreamState::Open, + NotificationsSubstreamState::open_min_value(), SubstreamId::min_value() ) ..=( NotificationsProtocol::BlockAnnounces { chain_index }, peer_index, SubstreamDirection::Out, - NotificationsSubstreamState::Open, + NotificationsSubstreamState::open_max_value(), SubstreamId::max_value() ) ) @@ -2012,12 +2093,17 @@ where .is_some()); // If the substream failed to open, we simply try again. - // Trying agains means that we might be hammering the remote with + // Trying again means that we might be hammering the remote with // substream requests, however as of the writing of this text this is // necessary in order to bypass an issue in Substrate. - if result.is_err() - && !self.inner.connection_state(connection_id).shutting_down - { + // Note that in the situation where the connection is shutting down, + // we don't re-open the substream on a different connection, but + // that's ok as the block announces substream should be closed soon. + if result.is_err() { + if self.inner.connection_state(connection_id).shutting_down { + continue; + } + let new_substream_id = self.inner.open_out_notifications( connection_id, codec::encode_protocol_name_string(match substream_protocol { @@ -2039,17 +2125,14 @@ where } _ => unreachable!(), }), - Duration::from_secs(10), // TODO: arbitrary - match substream_protocol { - NotificationsProtocol::Transactions { .. } => Vec::new(), - NotificationsProtocol::Grandpa { .. } => { - self.chains[chain_index].role.scale_encoding().to_vec() - } - _ => unreachable!(), - }, - 1024 * 1024, // TODO: arbitrary + self.notifications_protocol_handshake_timeout( + substream_protocol, + ), + self.notifications_protocol_handshake(substream_protocol), + self.notifications_protocol_max_handshake_size( + substream_protocol, + ), ); - let _was_inserted = self.notification_substreams_by_peer_id.insert(( substream_protocol, @@ -2059,7 +2142,6 @@ where new_substream_id, )); debug_assert!(_was_inserted); - let _prev_value = self.substreams.insert( new_substream_id, SubstreamInfo { @@ -2068,7 +2150,6 @@ where }, ); debug_assert!(_prev_value.is_none()); - continue; } @@ -2076,7 +2157,9 @@ where substream_protocol, peer_index, SubstreamDirection::Out, - NotificationsSubstreamState::Open, + NotificationsSubstreamState::Open { + asked_to_leave: false, + }, substream_id, )); debug_assert!(_was_inserted); @@ -2115,6 +2198,9 @@ where collection::Event::NotificationsOutCloseDemanded { substream_id } | collection::Event::NotificationsOutReset { substream_id } => { // Outgoing notifications substream has been closed or must be closed. + // These two situations are handled together, as we immediately react to a + // demanded closing by performing the closing. The rest of the code is thus + // the same for both situations. // If the request demands the closing, we immediately comply. if matches!( @@ -2124,23 +2210,20 @@ where self.inner.close_out_notifications(substream_id); } + // Load various information about the substream and connection. let substream_info = self .substreams .remove(&substream_id) .unwrap_or_else(|| unreachable!()); let connection_id = substream_info.connection_id; - let connection_info = &self.inner[connection_id]; - // Notification substreams can only happen on connections after their - // handshake phase is finished, therefore their `PeerId` is known. - let peer_index = *connection_info + let peer_index = *self.inner[connection_id] .peer_index .as_ref() .unwrap_or_else(|| unreachable!()); // All outgoing substream attempts are cancelled when a chain is removed, as // such `protocol` can't be `None`. - let Some(Ok(substream_protocol)) = - substream_info.protocol.map(NotificationsProtocol::try_from) + let Some(Protocol::Notifications(substream_protocol)) = substream_info.protocol else { unreachable!(); }; @@ -2150,12 +2233,14 @@ where substream_protocol, peer_index, SubstreamDirection::Out, - NotificationsSubstreamState::Open, + NotificationsSubstreamState::Open { + asked_to_leave: false, + }, substream_id, )); debug_assert!(_was_in); - // Some substreams are tied to the state of the block announces substream. + // Rest of the code depends on the protocol. match substream_protocol { NotificationsProtocol::BlockAnnounces { chain_index } => { self.opened_gossip_undesired.remove(&( @@ -2187,7 +2272,7 @@ where NotificationsProtocol::BlockAnnounces { chain_index }, peer_index, SubstreamDirection::Out, - NotificationsSubstreamState::Pending, + NotificationsSubstreamState::min_value(), SubstreamId::min_value(), ) ..=( @@ -2196,7 +2281,7 @@ where }, peer_index, SubstreamDirection::Out, - NotificationsSubstreamState::Open, + NotificationsSubstreamState::max_value(), SubstreamId::max_value(), ), ) @@ -2212,102 +2297,152 @@ where debug_assert!(_was_inserted); } + // The transactions and Grandpa protocols are tied to the block + // announces substream. As such, we also close any transactions or + // grandpa substream, either pending or fully opened. for proto in [ NotificationsProtocol::Transactions { chain_index }, NotificationsProtocol::Grandpa { chain_index }, ] { - for (substream_state, substream_id) in self + for (substream_direction, substream_state, substream_id) in self .notification_substreams_by_peer_id .range( ( proto, peer_index, - SubstreamDirection::Out, + SubstreamDirection::min_value(), NotificationsSubstreamState::min_value(), SubstreamId::min_value(), ) ..=( proto, peer_index, - SubstreamDirection::Out, + SubstreamDirection::max_value(), NotificationsSubstreamState::max_value(), SubstreamId::max_value(), ), ) - .map(|(_, _, _, state, substream_id)| (*state, *substream_id)) + .map(|(_, _, direction, state, substream_id)| { + (*direction, *state, *substream_id) + }) .collect::>() { - self.inner.close_out_notifications(substream_id); - self.substreams.remove(&substream_id); - self.notification_substreams_by_peer_id.remove(&( - proto, - peer_index, - SubstreamDirection::Out, - substream_state, - substream_id, - )); + match (substream_direction, substream_state) { + (SubstreamDirection::Out, _) => { + self.inner.close_out_notifications(substream_id); + let _was_removed = + self.notification_substreams_by_peer_id.remove(&( + proto, + peer_index, + SubstreamDirection::Out, + substream_state, + substream_id, + )); + debug_assert!(_was_removed); + self.substreams.remove(&substream_id); + } + ( + SubstreamDirection::In, + NotificationsSubstreamState::Pending, + ) => { + // Inbound notification substreams are always accepted + // or rejected immediately when a gossip link is open. + unreachable!() + } + ( + SubstreamDirection::In, + NotificationsSubstreamState::Open { + asked_to_leave: true, + }, + ) => { + // Nothing to do. + } + ( + SubstreamDirection::In, + NotificationsSubstreamState::Open { + asked_to_leave: false, + }, + ) => { + self.inner.start_close_in_notifications( + substream_id, + Duration::from_secs(5), // TODO: arbitrary constant + ); + let _was_removed = + self.notification_substreams_by_peer_id.remove(&( + proto, + peer_index, + SubstreamDirection::In, + NotificationsSubstreamState::Open { + asked_to_leave: false, + }, + substream_id, + )); + debug_assert!(_was_removed); + let _was_inserted = + self.notification_substreams_by_peer_id.insert(( + proto, + peer_index, + SubstreamDirection::In, + NotificationsSubstreamState::Open { + asked_to_leave: true, + }, + substream_id, + )); + debug_assert!(_was_inserted); + } + } } } - // TODO: also close inbound substreams? - return Some(Event::GossipDisconnected { peer_id: self.peers[peer_index.0].clone(), chain_id: ChainId(chain_index), kind: GossipKind::ConsensusTransactions, }); } - // The transactions and Grandpa protocols are tied to the block announces + + // The transactions and grandpa protocols are tied to the block announces // substream. If there is a block announce substream with the peer, we try // to reopen these two substreams. - NotificationsProtocol::Transactions { chain_index } => { - let new_substream_id = self.inner.open_out_notifications( - connection_id, - codec::encode_protocol_name_string( - codec::ProtocolName::Transactions { - genesis_hash: self.chains[chain_index].genesis_hash, - fork_id: self.chains[chain_index].fork_id.as_deref(), - }, - ), - Duration::from_secs(10), // TODO: arbitrary - Vec::new(), - 1024 * 1024, // TODO: arbitrary - ); - self.substreams.insert( - new_substream_id, - SubstreamInfo { - connection_id, - protocol: Some(Protocol::Transactions { chain_index }), - }, - ); - self.notification_substreams_by_peer_id.insert(( - NotificationsProtocol::Transactions { chain_index }, - peer_index, - SubstreamDirection::Out, - NotificationsSubstreamState::Pending, - new_substream_id, - )); - } - NotificationsProtocol::Grandpa { chain_index } => { + NotificationsProtocol::Transactions { .. } + | NotificationsProtocol::Grandpa { .. } => { + // Don't actually try to reopen if the connection is shutting down. + // Note that we don't try to reopen on a different connection, as the + // block announces substream will very soon be closed too anyway. + if self.inner.connection_state(connection_id).shutting_down { + continue; + } + let new_substream_id = self.inner.open_out_notifications( connection_id, - codec::encode_protocol_name_string(codec::ProtocolName::Grandpa { - genesis_hash: self.chains[chain_index].genesis_hash, - fork_id: self.chains[chain_index].fork_id.as_deref(), + codec::encode_protocol_name_string(match substream_protocol { + NotificationsProtocol::Transactions { chain_index } => { + codec::ProtocolName::Transactions { + genesis_hash: self.chains[chain_index].genesis_hash, + fork_id: self.chains[chain_index].fork_id.as_deref(), + } + } + NotificationsProtocol::Grandpa { chain_index } => { + codec::ProtocolName::Grandpa { + genesis_hash: self.chains[chain_index].genesis_hash, + fork_id: self.chains[chain_index].fork_id.as_deref(), + } + } + _ => unreachable!(), }), - Duration::from_secs(10), // TODO: arbitrary - self.chains[chain_index].role.scale_encoding().to_vec(), - 1024 * 1024, // TODO: arbitrary + self.notifications_protocol_handshake_timeout(substream_protocol), + self.notifications_protocol_handshake(substream_protocol), + self.notifications_protocol_max_handshake_size(substream_protocol), ); self.substreams.insert( new_substream_id, SubstreamInfo { connection_id, - protocol: Some(Protocol::Grandpa { chain_index }), + protocol: Some(Protocol::Notifications(substream_protocol)), }, ); self.notification_substreams_by_peer_id.insert(( - NotificationsProtocol::Grandpa { chain_index }, + substream_protocol, peer_index, SubstreamDirection::Out, NotificationsSubstreamState::Pending, @@ -2333,62 +2468,63 @@ where // - Generate an event to ask the API user whether to accept the demand. This // happens specifically for block announce substreams. + // Extract various bits of information about the substream. + // Instantly reject the substream if it concerns a chain that has since then + // been removed from `self`, which can happen if the protocol name was already + // negotiated when the chain was removed. let substream_info = self .substreams .get(&substream_id) .unwrap_or_else(|| unreachable!()); - let connection_info = &self.inner[substream_info.connection_id]; - // Notification substreams can only happen on connections after their - // handshake phase is finished, therefore their `PeerId` is known. - let peer_index = *connection_info + let peer_index = *self.inner[substream_info.connection_id] .peer_index .as_ref() .unwrap_or_else(|| unreachable!()); - - // Check if the substream concerns a chain that has since then been removed. let Some(substream_protocol) = substream_info.protocol else { self.inner.reject_in_notifications(substream_id); self.substreams.remove(&substream_id); continue; }; + let Protocol::Notifications(substream_protocol) = substream_protocol else { + unreachable!() + }; + let (NotificationsProtocol::BlockAnnounces { chain_index } + | NotificationsProtocol::Transactions { chain_index } + | NotificationsProtocol::Grandpa { chain_index }) = substream_protocol; // Check whether a substream with the same protocol already exists with that // peer, and if so deny the request. + // Note that substreams with `asked_to_leave` equal to `true` are ignored when + // searching, as in this case it's not a protocol violation. if self .notification_substreams_by_peer_id .range( ( - substream_protocol.try_into().unwrap(), + substream_protocol, peer_index, SubstreamDirection::In, NotificationsSubstreamState::min_value(), SubstreamId::min_value(), ) - ..=( - substream_protocol.try_into().unwrap(), + ..( + substream_protocol, peer_index, SubstreamDirection::In, - NotificationsSubstreamState::max_value(), - SubstreamId::max_value(), + NotificationsSubstreamState::Open { + asked_to_leave: true, + }, + SubstreamId::min_value(), ), ) .next() .is_some() { self.inner.reject_in_notifications(substream_id); - self.substreams.remove(&substream_id); + let _was_removed = self.substreams.remove(&substream_id); + debug_assert!(_was_removed.is_some()); continue; } - // Find the `chain_index`. - let (Protocol::BlockAnnounces { chain_index } - | Protocol::Transactions { chain_index } - | Protocol::Grandpa { chain_index }) = substream_protocol - else { - // Any other protocol isn't a notifications protocol. - unreachable!() - }; - // If an outgoing block announces notifications protocol (either pending or // fully open) exists, accept the substream immediately. if self @@ -2412,54 +2548,43 @@ where .next() .is_some() { - self.notification_substreams_by_peer_id.insert(( - substream_protocol.try_into().unwrap(), + let _was_inserted = self.notification_substreams_by_peer_id.insert(( + substream_protocol, peer_index, SubstreamDirection::In, - NotificationsSubstreamState::Open, + NotificationsSubstreamState::Open { + asked_to_leave: false, + }, substream_id, )); - let handshake = match substream_protocol { - Protocol::BlockAnnounces { .. } => { - codec::encode_block_announces_handshake( - codec::BlockAnnouncesHandshakeRef { - best_hash: &self.chains[chain_index].best_hash, - best_number: self.chains[chain_index].best_number, - role: self.chains[chain_index].role, - genesis_hash: &self.chains[chain_index].genesis_hash, - }, - self.chains[chain_index].block_number_bytes, - ) - .fold(Vec::new(), |mut a, b| { - a.extend_from_slice(b.as_ref()); - a - }) - } - Protocol::Grandpa { .. } => { - self.chains[chain_index].role.scale_encoding().to_vec() - } - Protocol::Transactions { .. } => Vec::new(), - _ => unreachable!(), - }; + debug_assert!(_was_inserted); self.inner.accept_in_notifications( substream_id, - handshake, - 1024 * 1024, // TODO: ?! + self.notifications_protocol_handshake(substream_protocol), + self.notifications_protocol_max_notification_size(substream_protocol), ); continue; } // It is forbidden to cold-open a substream other than the block announces // substream. - if !matches!(substream_protocol, Protocol::BlockAnnounces { .. }) { + if !matches!( + substream_protocol, + NotificationsProtocol::BlockAnnounces { .. } + ) { self.inner.reject_in_notifications(substream_id); - self.substreams.remove(&substream_id); + let _was_removed = self.substreams.remove(&substream_id); + debug_assert!(_was_removed.is_some()); continue; } // Update the local state and return the event. + debug_assert!(matches!( + substream_protocol, + NotificationsProtocol::BlockAnnounces { .. } + )); self.notification_substreams_by_peer_id.insert(( - NotificationsProtocol::BlockAnnounces { chain_index }, + substream_protocol, peer_index, SubstreamDirection::In, NotificationsSubstreamState::Pending, @@ -2477,12 +2602,9 @@ where let substream_info = self .substreams - .get(&substream_id) + .remove(&substream_id) .unwrap_or_else(|| unreachable!()); - let connection_info = &self.inner[substream_info.connection_id]; - // Notification substreams can only happen on connections after their - // handshake phase is finished, therefore their `PeerId` is known. - let peer_index = *connection_info + let peer_index = *self.inner[substream_info.connection_id] .peer_index .as_ref() .unwrap_or_else(|| unreachable!()); @@ -2491,7 +2613,9 @@ where // except for block announce substreams. Additionally, when a chain is removed, // all its pending block announce substreams are rejected. Therefore, this // event can only happen for block announce substreams. - let Some(Protocol::BlockAnnounces { chain_index }) = substream_info.protocol + let Some(Protocol::Notifications(NotificationsProtocol::BlockAnnounces { + chain_index, + })) = substream_info.protocol else { unreachable!() }; @@ -2499,9 +2623,9 @@ where // Clean up the local state. let _was_in = self.notification_substreams_by_peer_id.remove(&( NotificationsProtocol::BlockAnnounces { chain_index }, - peer_index, // TODO: cloning overhead :-/ + peer_index, SubstreamDirection::In, - NotificationsSubstreamState::Open, + NotificationsSubstreamState::Pending, substream_id, )); debug_assert!(_was_in); @@ -2523,32 +2647,19 @@ where .substreams .get(&substream_id) .unwrap_or_else(|| unreachable!()); - let chain_index = match substream_info.protocol { + let substream_protocol = match substream_info.protocol { None => { // Substream concerns a chain that has been removed. // Ignore the notification. continue; } - Some(Protocol::BlockAnnounces { chain_index }) => chain_index, - Some(Protocol::Transactions { chain_index }) => chain_index, - Some(Protocol::Grandpa { chain_index }) => chain_index, - // Other protocols are not notification protocols. - Some( - Protocol::Identify - | Protocol::Ping - | Protocol::Sync { .. } - | Protocol::LightUnknown { .. } - | Protocol::LightStorage { .. } - | Protocol::LightCall { .. } - | Protocol::Kad { .. } - | Protocol::SyncWarp { .. } - | Protocol::State { .. }, - ) => unreachable!(), + Some(Protocol::Notifications(p)) => p, + Some(_) => unreachable!(), }; - let connection_info = &self.inner[substream_info.connection_id]; - // Notification substreams can only happen on connections after their - // handshake phase is finished, therefore their `PeerId` is known. - let peer_index = *connection_info + let (NotificationsProtocol::BlockAnnounces { chain_index } + | NotificationsProtocol::Transactions { chain_index } + | NotificationsProtocol::Grandpa { chain_index }) = substream_protocol; + let peer_index = *self.inner[substream_info.connection_id] .peer_index .as_ref() .unwrap_or_else(|| unreachable!()); @@ -2556,7 +2667,6 @@ where // Check whether there is an open outgoing block announces substream, as this // means that we are "gossip-connected". If not, then the notification is // silently discarded. - // TODO: cloning of the peer_id if self .notification_substreams_by_peer_id .range( @@ -2564,14 +2674,14 @@ where NotificationsProtocol::BlockAnnounces { chain_index }, peer_index, SubstreamDirection::Out, - NotificationsSubstreamState::Open, + NotificationsSubstreamState::open_min_value(), collection::SubstreamId::min_value(), ) ..=( NotificationsProtocol::BlockAnnounces { chain_index }, peer_index, SubstreamDirection::Out, - NotificationsSubstreamState::Open, + NotificationsSubstreamState::open_max_value(), collection::SubstreamId::max_value(), ), ) @@ -2582,8 +2692,8 @@ where } // Decode the notification and return an event. - match substream_info.protocol { - Some(Protocol::BlockAnnounces { .. }) => { + match substream_protocol { + NotificationsProtocol::BlockAnnounces { .. } => { if let Err(err) = codec::decode_block_announce( ¬ification, self.chains[chain_index].block_number_bytes, @@ -2603,10 +2713,10 @@ where }, }); } - Some(Protocol::Transactions { .. }) => { + NotificationsProtocol::Transactions { .. } => { // TODO: not implemented } - Some(Protocol::Grandpa { .. }) => { + NotificationsProtocol::Grandpa { .. } => { let decoded_notif = match codec::decode_grandpa_notification( ¬ification, self.chains[chain_index].block_number_bytes, @@ -2649,28 +2759,48 @@ where } } } - - // Other protocols are not notification protocols. - None - | Some( - Protocol::Identify - | Protocol::Ping - | Protocol::Sync { .. } - | Protocol::LightUnknown { .. } - | Protocol::LightStorage { .. } - | Protocol::LightCall { .. } - | Protocol::Kad { .. } - | Protocol::SyncWarp { .. } - | Protocol::State { .. }, - ) => unreachable!(), } } collection::Event::NotificationsInClose { substream_id, .. } => { // An incoming notifications substream has been closed. // Nothing to do except clean up the local state. - let _was_in = self.substreams.remove(&substream_id); - debug_assert!(_was_in.is_some()); + let Some(substream_info) = self.substreams.remove(&substream_id) else { + unreachable!() + }; + let peer_index = *self.inner[substream_info.connection_id] + .peer_index + .as_ref() + .unwrap_or_else(|| unreachable!()); + let Some(protocol) = substream_info.protocol else { + // Substream concerns a chain that has since then been removed. + continue; + }; + let Protocol::Notifications(protocol) = protocol else { + unreachable!() + }; + + // Clean up with both `asked_to_leave` equal to `true` or `false`, as we don't + // know in which of the two the substream is. + let _was_in1 = self.notification_substreams_by_peer_id.remove(&( + protocol, + peer_index, + SubstreamDirection::In, + NotificationsSubstreamState::Open { + asked_to_leave: false, + }, + substream_id, + )); + let _was_in2 = self.notification_substreams_by_peer_id.remove(&( + protocol, + peer_index, + SubstreamDirection::In, + NotificationsSubstreamState::Open { + asked_to_leave: true, + }, + substream_id, + )); + debug_assert!(_was_in1 || _was_in2); } collection::Event::PingOutSuccess { .. } => { @@ -2936,21 +3066,21 @@ where let protocol_name = match protocol { Protocol::Identify => codec::ProtocolName::Identify, Protocol::Ping => codec::ProtocolName::Ping, - Protocol::BlockAnnounces { chain_index } => { + Protocol::Notifications(NotificationsProtocol::BlockAnnounces { chain_index }) => { let chain_info = &self.chains[chain_index]; codec::ProtocolName::BlockAnnounces { genesis_hash: chain_info.genesis_hash, fork_id: chain_info.fork_id.as_deref(), } } - Protocol::Transactions { chain_index } => { + Protocol::Notifications(NotificationsProtocol::Transactions { chain_index }) => { let chain_info = &self.chains[chain_index]; codec::ProtocolName::Transactions { genesis_hash: chain_info.genesis_hash, fork_id: chain_info.fork_id.as_deref(), } } - Protocol::Grandpa { chain_index } => { + Protocol::Notifications(NotificationsProtocol::Grandpa { chain_index }) => { let chain_info = &self.chains[chain_index]; codec::ProtocolName::Grandpa { genesis_hash: chain_info.genesis_hash, @@ -3175,7 +3305,7 @@ where }, PeerIndex(usize::min_value()), SubstreamDirection::Out, - NotificationsSubstreamState::Open, + NotificationsSubstreamState::min_value(), SubstreamId::min_value(), ) ..=( @@ -3184,12 +3314,13 @@ where }, PeerIndex(usize::max_value()), SubstreamDirection::Out, - NotificationsSubstreamState::Open, + NotificationsSubstreamState::max_value(), SubstreamId::max_value(), ), ) .filter(move |(_, _, d, s, _)| { - *d == SubstreamDirection::Out && *s == NotificationsSubstreamState::Open + *d == SubstreamDirection::Out + && matches!(*s, NotificationsSubstreamState::Open { .. }) }) .map(|(_, peer_index, _, _, _)| &self.peers[peer_index.0]) } @@ -3203,18 +3334,17 @@ where /// /// Panics if the [`ChainId`] is invalid. /// - // TODO: proper error pub fn gossip_open( &mut self, chain_id: ChainId, target: &PeerId, kind: GossipKind, - ) -> Result<(), ()> { + ) -> Result<(), OpenGossipError> { let GossipKind::ConsensusTransactions = kind; let Some(&peer_index) = self.peers_by_peer_id.get(target) else { // If the `PeerId` is unknown, then we also don't have any connection to it. - return Err(()); + return Err(OpenGossipError::NoConnection); }; let chain_info = &self.chains[chain_id.0]; @@ -3246,15 +3376,12 @@ where .next() .is_some() { - return Err(()); + return Err(OpenGossipError::AlreadyOpened); } - let protocol_name = - codec::encode_protocol_name_string(codec::ProtocolName::BlockAnnounces { - genesis_hash: chain_info.genesis_hash, - fork_id: chain_info.fork_id.as_deref(), - }); - + // Choose the connection on which to open the substream. + // This is done ahead of time, as we don't want to do anything before potentially + // returning an error. // TODO: this is O(n) but is it really a problem? you're only supposed to have max 1 or 2 connections per PeerId let connection_id = self .connections_by_peer_id @@ -3267,22 +3394,9 @@ where let state = self.inner.connection_state(*connection_id); state.established && !state.shutting_down }) - .ok_or(())?; - - let handshake = codec::encode_block_announces_handshake( - codec::BlockAnnouncesHandshakeRef { - best_hash: &chain_info.best_hash, - best_number: chain_info.best_number, - role: chain_info.role, - genesis_hash: &chain_info.genesis_hash, - }, - self.chains[chain_id.0].block_number_bytes, - ) - .fold(Vec::new(), |mut a, b| { - a.extend_from_slice(b.as_ref()); - a - }); + .ok_or(OpenGossipError::NoConnection)?; + // Accept inbound substreams. for (protocol, in_substream_id) in [ NotificationsProtocol::BlockAnnounces { chain_index: chain_id.0, @@ -3330,35 +3444,49 @@ where protocol, peer_index, SubstreamDirection::In, - NotificationsSubstreamState::Open, + NotificationsSubstreamState::Open { + asked_to_leave: false, + }, in_substream_id, )); debug_assert!(_was_inserted); - self.inner - .accept_in_notifications(in_substream_id, handshake.clone(), 1024 * 1024) - // TODO: constant + self.inner.accept_in_notifications( + in_substream_id, + self.notifications_protocol_handshake(protocol), + self.notifications_protocol_max_notification_size(protocol), + ) } + // Open the block announces substream. let substream_id = self.inner.open_out_notifications( connection_id, - protocol_name, - Duration::from_secs(10), // TODO: arbitrary - handshake, - 1024 * 1024, // TODO: arbitrary + codec::encode_protocol_name_string(codec::ProtocolName::BlockAnnounces { + genesis_hash: chain_info.genesis_hash, + fork_id: chain_info.fork_id.as_deref(), + }), + self.notifications_protocol_handshake_timeout(NotificationsProtocol::BlockAnnounces { + chain_index: chain_id.0, + }), + self.notifications_protocol_handshake(NotificationsProtocol::BlockAnnounces { + chain_index: chain_id.0, + }), + self.notifications_protocol_max_handshake_size(NotificationsProtocol::BlockAnnounces { + chain_index: chain_id.0, + }), ); - let _prev_value = self.substreams.insert( substream_id, SubstreamInfo { connection_id, - protocol: Some(Protocol::BlockAnnounces { - chain_index: chain_id.0, - }), + protocol: Some(Protocol::Notifications( + NotificationsProtocol::BlockAnnounces { + chain_index: chain_id.0, + }, + )), }, ); debug_assert!(_prev_value.is_none()); - let _was_inserted = self.notification_substreams_by_peer_id.insert(( NotificationsProtocol::BlockAnnounces { chain_index: chain_id.0, @@ -3370,6 +3498,7 @@ where )); debug_assert!(_was_inserted); + // Update the desired peers tracking. if !self .gossip_desired_peers .contains(&(peer_index, kind, chain_id.0)) @@ -3381,7 +3510,6 @@ where )); debug_assert!(_was_inserted); } - self.connected_unopened_gossip_desired .remove(&(peer_index, chain_id, kind)); @@ -3407,71 +3535,23 @@ where chain_id: ChainId, peer_id: &PeerId, kind: GossipKind, - ) -> Result<(), ()> { - // TODO: proper return value + ) -> Result<(), CloseGossipError> { let GossipKind::ConsensusTransactions = kind; let Some(&peer_index) = self.peers_by_peer_id.get(peer_id) else { // If the `PeerId` is unknown, then we also don't have any gossip link to it. - return Err(()); + return Err(CloseGossipError::NotOpen); }; // An `assert!` is necessary because we don't actually access the chain information // anywhere, but still want to panic if the chain is invalid. assert!(self.chains.contains(chain_id.0)); - // Reject inbound requests, if any. - if let Some(substream_id) = self - .notification_substreams_by_peer_id - .range( - ( - NotificationsProtocol::BlockAnnounces { - chain_index: chain_id.0, - }, - peer_index, - SubstreamDirection::In, - NotificationsSubstreamState::Pending, - SubstreamId::min_value(), - ) - ..=( - NotificationsProtocol::BlockAnnounces { - chain_index: chain_id.0, - }, - peer_index, - SubstreamDirection::In, - NotificationsSubstreamState::Pending, - SubstreamId::max_value(), - ), - ) - .next() - .map(|(_, _, _, _, substream_id)| *substream_id) - { - self.inner.reject_in_notifications(substream_id); - - let _was_in = self.notification_substreams_by_peer_id.remove(&( - NotificationsProtocol::BlockAnnounces { - chain_index: chain_id.0, - }, - peer_index, - SubstreamDirection::In, - NotificationsSubstreamState::Pending, - substream_id, - )); - debug_assert!(_was_in); - - let _was_in = self.substreams.remove(&substream_id); - debug_assert!(_was_in.is_some()); - - self.opened_gossip_undesired.remove(&( - chain_id, - peer_index, - GossipKind::ConsensusTransactions, - )); + // Track whether this function closed anything in order to know whether to return an + // error at the end. + let mut has_closed_something = false; - // TODO: debug_assert that there's no inbound tx/gp substream? - } - - // Close outbound substreams, if any. + // Close all substreams, pending or open. for protocol in [ NotificationsProtocol::BlockAnnounces { chain_index: chain_id.0, @@ -3483,13 +3563,13 @@ where chain_index: chain_id.0, }, ] { - if let Some((substream_id, state)) = self + for (substream_id, direction, state) in self .notification_substreams_by_peer_id .range( ( protocol, peer_index, - SubstreamDirection::Out, + SubstreamDirection::In, NotificationsSubstreamState::min_value(), SubstreamId::min_value(), ) @@ -3501,29 +3581,94 @@ where SubstreamId::max_value(), ), ) - .next() - .map(|(_, _, _, state, substream_id)| (*substream_id, *state)) + .map(|(_, _, dir, state, sub_id)| (*sub_id, *dir, *state)) + .collect::>() { - self.inner.close_out_notifications(substream_id); + has_closed_something = true; - let _was_in = self.notification_substreams_by_peer_id.remove(&( - protocol, - peer_index, - SubstreamDirection::Out, - state, - substream_id, - )); - debug_assert!(_was_in); + match (direction, state) { + (SubstreamDirection::Out, _) => { + self.inner.close_out_notifications(substream_id); + + let _was_in = self.notification_substreams_by_peer_id.remove(&( + protocol, + peer_index, + direction, + state, + substream_id, + )); + debug_assert!(_was_in); + let _was_in = self.substreams.remove(&substream_id); + debug_assert!(_was_in.is_some()); + } + (SubstreamDirection::In, NotificationsSubstreamState::Pending) => { + self.inner.reject_in_notifications(substream_id); - let _was_in = self.substreams.remove(&substream_id); - debug_assert!(_was_in.is_some()); + let _was_in = self.notification_substreams_by_peer_id.remove(&( + protocol, + peer_index, + direction, + state, + substream_id, + )); + debug_assert!(_was_in); + let _was_in = self.substreams.remove(&substream_id); + debug_assert!(_was_in.is_some()); + } + ( + SubstreamDirection::In, + NotificationsSubstreamState::Open { + asked_to_leave: false, + }, + ) => { + self.inner + .start_close_in_notifications(substream_id, Duration::from_secs(5)); // TODO: arbitrary constant - // TODO: close tx and gp as well - // TODO: doesn't close inbound substreams + let _was_removed = self.notification_substreams_by_peer_id.remove(&( + protocol, + peer_index, + SubstreamDirection::In, + NotificationsSubstreamState::Open { + asked_to_leave: false, + }, + substream_id, + )); + debug_assert!(_was_removed); + let _was_inserted = self.notification_substreams_by_peer_id.insert(( + protocol, + peer_index, + SubstreamDirection::In, + NotificationsSubstreamState::Open { + asked_to_leave: true, + }, + substream_id, + )); + debug_assert!(_was_inserted); + } + ( + SubstreamDirection::In, + NotificationsSubstreamState::Open { + asked_to_leave: true, + }, + ) => { + // Nothing to do. + } + } } } - Ok(()) + // Desired peers tracking update. + self.opened_gossip_undesired.remove(&( + chain_id, + peer_index, + GossipKind::ConsensusTransactions, + )); + + if has_closed_something { + Ok(()) + } else { + Err(CloseGossipError::NotOpen) + } } /// Update the state of the local node with regards to GrandPa rounds. @@ -3574,7 +3719,7 @@ where *p == NotificationsProtocol::Grandpa { chain_index: chain_id.0, } && *d == SubstreamDirection::Out - && *s == NotificationsSubstreamState::Open + && matches!(*s, NotificationsSubstreamState::Open { .. }) }) { match self.inner.queue_notification(*substream_id, packet.clone()) { @@ -3694,14 +3839,14 @@ where NotificationsProtocol::BlockAnnounces { chain_index }, peer_index, SubstreamDirection::Out, - NotificationsSubstreamState::Open, + NotificationsSubstreamState::open_min_value(), SubstreamId::min_value(), ) ..=( NotificationsProtocol::BlockAnnounces { chain_index }, peer_index, SubstreamDirection::Out, - NotificationsSubstreamState::Open, + NotificationsSubstreamState::open_max_value(), SubstreamId::max_value(), ), ) @@ -3720,14 +3865,14 @@ where protocol, peer_index, SubstreamDirection::Out, - NotificationsSubstreamState::Open, + NotificationsSubstreamState::open_min_value(), SubstreamId::min_value(), ) ..=( protocol, peer_index, SubstreamDirection::Out, - NotificationsSubstreamState::Open, + NotificationsSubstreamState::open_max_value(), SubstreamId::max_value(), ), ) @@ -3755,30 +3900,30 @@ where codec::ProtocolName::BlockAnnounces { genesis_hash, fork_id, - } => Protocol::BlockAnnounces { + } => Protocol::Notifications(NotificationsProtocol::BlockAnnounces { chain_index: *self .chains_by_protocol_info .get(&(genesis_hash, fork_id.map(|fork_id| fork_id.to_owned()))) .ok_or(())?, - }, + }), codec::ProtocolName::Transactions { genesis_hash, fork_id, - } => Protocol::Transactions { + } => Protocol::Notifications(NotificationsProtocol::Transactions { chain_index: *self .chains_by_protocol_info .get(&(genesis_hash, fork_id.map(|fork_id| fork_id.to_owned()))) .ok_or(())?, - }, + }), codec::ProtocolName::Grandpa { genesis_hash, fork_id, - } => Protocol::Grandpa { + } => Protocol::Notifications(NotificationsProtocol::Grandpa { chain_index: *self .chains_by_protocol_info .get(&(genesis_hash, fork_id.map(|fork_id| fork_id.to_owned()))) .ok_or(())?, - }, + }), codec::ProtocolName::Sync { genesis_hash, fork_id, @@ -3879,6 +4024,62 @@ where let _was_in = self.peers_by_peer_id.remove(&peer_id); debug_assert_eq!(_was_in, Some(peer_index)); } + + /// Returns the maximum allowed size (in bytes) of the handshake of the given protocol. + fn notifications_protocol_max_handshake_size(&self, protocol: NotificationsProtocol) -> usize { + // TODO: these numbers are arbitrary, must be made to match Substrate + match protocol { + NotificationsProtocol::BlockAnnounces { .. } => 64 * 1024, + NotificationsProtocol::Transactions { .. } => 4, + NotificationsProtocol::Grandpa { .. } => 32, + } + } + + /// Returns the maximum allowed size (in bytes) of a notification of the given protocol. + fn notifications_protocol_max_notification_size( + &self, + _protocol: NotificationsProtocol, + ) -> usize { + // TODO: this number is arbitrary, must be made to match Substrate + 1024 * 1024 // TODO: + } + + /// Returns the timeout allowed for the remote to send back the handshake of the given + /// protocol. + fn notifications_protocol_handshake_timeout( + &self, + _protocol: NotificationsProtocol, + ) -> Duration { + Duration::from_secs(10) + } + + /// Builds the handshake to send to the remote when opening a notifications protocol. + fn notifications_protocol_handshake(&self, protocol: NotificationsProtocol) -> Vec { + let handshake = match protocol { + NotificationsProtocol::BlockAnnounces { chain_index } => { + codec::encode_block_announces_handshake( + codec::BlockAnnouncesHandshakeRef { + best_hash: &self.chains[chain_index].best_hash, + best_number: self.chains[chain_index].best_number, + role: self.chains[chain_index].role, + genesis_hash: &self.chains[chain_index].genesis_hash, + }, + self.chains[chain_index].block_number_bytes, + ) + .fold(Vec::new(), |mut a, b| { + a.extend_from_slice(b.as_ref()); + a + }) + } + NotificationsProtocol::Transactions { .. } => Vec::new(), + NotificationsProtocol::Grandpa { chain_index } => { + self.chains[chain_index].role.scale_encoding().to_vec() + } + }; + + debug_assert!(handshake.len() <= self.notifications_protocol_max_handshake_size(protocol)); + handshake + } } impl ops::Index for ChainNetwork { @@ -4164,6 +4365,22 @@ pub enum ProtocolError { BadBlocksRequest(codec::DecodeBlockRequestError), } +/// Error potentially returned by [`ChainNetwork::gossip_open`]. +#[derive(Debug, Clone, derive_more::Display)] +pub enum OpenGossipError { + /// No healthy established connection is available to open the link. + NoConnection, + /// There already is a pending or fully opened gossip link with the given peer. + AlreadyOpened, +} + +/// Error potentially returned by [`ChainNetwork::gossip_close`]. +#[derive(Debug, Clone, derive_more::Display)] +pub enum CloseGossipError { + /// There exists no outgoing nor ingoing attempt at a gossip link. + NotOpen, +} + /// Error potentially returned when starting a request. #[derive(Debug, Clone, derive_more::Display)] pub enum StartRequestError { diff --git a/wasm-node/CHANGELOG.md b/wasm-node/CHANGELOG.md index 9723cee324..523075a50d 100644 --- a/wasm-node/CHANGELOG.md +++ b/wasm-node/CHANGELOG.md @@ -15,6 +15,10 @@ - Fix panic when `chainHead_unstable_follow` is called too many times. ([#1392](https://github.com/smol-dot/smoldot/pull/1392)) - Fix panic when opening a gossiping link to a peer that we were previously connected to. ([#1395](https://github.com/smol-dot/smoldot/pull/1395)) - Fix panic when the discovery system finds same address attributed to two different peers. ([#1412](https://github.com/smol-dot/smoldot/pull/1412)) +- Fix sending a block announce handshake when accepting an inbound transactions or grandpa substream in some rare situations. ((#1417)[https://github.com/smol-dot/smoldot/pull/1417]) +- Fix automatically refusing inbound notification substreams if a different inbound substream of the same protocol existed on the same connection, even when that other substream has been closed. ((#1417)[https://github.com/smol-dot/smoldot/pull/1417]) +- Inbound notification substreams opened by the remote and that are no longer wanted are now forcefully closed if the remote doesn't close them gracefully. ((#1417)[https://github.com/smol-dot/smoldot/pull/1417]) +- Fix panic when a connection is shutting down after the notification substreams of that connection were opened in an unconventional order. ((#1417)[https://github.com/smol-dot/smoldot/pull/1417]) ## 2.0.10 - 2023-11-17