Skip to content

Commit

Permalink
Add fake out requests system in peers.rs (#2369)
Browse files Browse the repository at this point in the history
* Add fake out requests system in peers.rs

* CHANGELOG
  • Loading branch information
tomaka authored Jun 14, 2022
1 parent 6a9b573 commit 4472dc1
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 38 deletions.
1 change: 1 addition & 0 deletions bin/wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- Fix another panic in case of a carefully-crafted LEB128 length. ([#2337](https://github.com/paritytech/smoldot/pull/2337))
- Fix a panic when decoding a block header containing a large number of Aura authorities. ([#2338](https://github.com/paritytech/smoldot/pull/2338))
- Fix multiple panics when decoding network messages in case where these messages were truncated. ([#2340](https://github.com/paritytech/smoldot/pull/2340), [#2355](https://github.com/paritytech/smoldot/pull/2355))
- Fix panic when the Kademlia random discovery process initiates a request on a connection that has just started shutting down. ([#2369](https://github.com/paritytech/smoldot/pull/2369))
- Fix subscriptions to `chainHead_unstable_follow` being immediately shut down if the gap between the finalized block and the best block is above a certain threshold. This could lead to loops where the JSON-RPC client tries to re-open a subscription, only for it to be immediately shut down again.

## 0.6.17 - 2022-05-31
Expand Down
154 changes: 116 additions & 38 deletions src/libp2p/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ pub struct Peers<TConn, TNow> {
/// function.
// TODO: explain why it can't grow unbounded
pending_desired_out_notifs: VecDeque<(DesiredOutNotificationId, usize, usize)>,

/// List of so-called "fake outgoing requests" for which an event about their failure should
/// be reported as soon as possible.
fake_out_requests_to_report: VecDeque<u64>,

/// Identifier to assign to the next fake outgoing requests. Increased one by one.
next_fake_out_request_id: u64,
}

/// See [`Peers::peers_notifications_out`].
Expand Down Expand Up @@ -183,6 +190,14 @@ struct Connection<TConn> {
/// - If the handshake is in progress and the connection is inbound, contains `None`.
peer_index: Option<usize>,

/// List of so-called fake outgoing requests that [`Peers::start_request`] has created
/// regarding this connection. Always empty unless the connection is currently shutting down.
/// The API user thinks that the connection is currently executing this request, but in
/// reality it is not. When the connection finishes its shutdown, these fake requests are
/// pushed to [`Peers::fake_out_requests_to_report`] in order to be reported as having failed.
fake_out_requests: Vec<u64>,

/// Opaque data decided by the API user.
user_data: TConn,
}

Expand Down Expand Up @@ -220,6 +235,8 @@ where
pending_desired_out_notifs: VecDeque::new(), // TODO: capacity?
desired_in_notifications: slab::Slab::new(), // TODO: capacity?
desired_out_notifications: slab::Slab::new(), // TODO: capacity?
fake_out_requests_to_report: VecDeque::with_capacity(8),
next_fake_out_request_id: 0,
}
}

Expand Down Expand Up @@ -281,6 +298,16 @@ where
/// Returns the next event produced by the service.
pub fn next_event(&mut self) -> Option<Event<TConn>> {
loop {
if let Some(fake_out_request_to_report) = self.fake_out_requests_to_report.pop_front() {
if self.fake_out_requests_to_report.is_empty() {
self.fake_out_requests_to_report.shrink_to(8)
};
return Some(Event::Response {
request_id: OutRequestId(OutRequestIdInner::Fake(fake_out_request_to_report)),
response: Err(RequestError::ConnectionShutdown),
});
}

let event = match self.inner.next_event() {
Some(ev) => ev,
None => return None,
Expand Down Expand Up @@ -393,6 +420,7 @@ where
user_data:
Connection {
peer_index: Some(expected_peer_index),
fake_out_requests,
user_data,
},
} => {
Expand Down Expand Up @@ -425,6 +453,8 @@ where

self.try_clean_up_peer(expected_peer_index);

self.fake_out_requests_to_report.extend(fake_out_requests);

// Only produce a `Disconnected` event if connection wasn't handshaking.
if was_established {
return Some(Event::Disconnected {
Expand All @@ -438,12 +468,15 @@ where
collection::Event::Shutdown {
user_data:
Connection {
peer_index: None, ..
peer_index: None,
fake_out_requests,
..
},
..
} => {
// Connection was incoming but its handshake wasn't finished yet.
// The shutdown isn't reported.
debug_assert!(fake_out_requests.is_empty());
}

collection::Event::InboundError {
Expand All @@ -467,7 +500,7 @@ where
response,
} => {
return Some(Event::Response {
request_id: OutRequestId(substream_id),
request_id: OutRequestId(OutRequestIdInner::Real(substream_id)),
response,
});
}
Expand Down Expand Up @@ -723,6 +756,7 @@ where
false,
Connection {
peer_index: None,
fake_out_requests: Vec::new(),
user_data,
},
)
Expand Down Expand Up @@ -750,6 +784,7 @@ where
true,
Connection {
peer_index: Some(peer_index),
fake_out_requests: Vec::new(),
user_data,
},
);
Expand Down Expand Up @@ -856,7 +891,9 @@ where

// If substream is closed, try to open it.
if matches!(current_state.open, NotificationsOutOpenState::Closed) {
if let Some(connection_id) = self.connection_id_for_peer(peer_id) {
if let ConnectionIdForPeer::Connected(connection_id) =
self.connection_id_for_peer(peer_id)
{
let id =
DesiredOutNotificationId(self.desired_out_notifications.insert(Some((
peer_index,
Expand Down Expand Up @@ -1168,20 +1205,38 @@ where
/// Panics if `protocol_index` isn't a valid index in [`Config::request_response_protocols`].
/// Panics if there is no open connection with the target.
///
#[track_caller]
pub fn start_request(
&mut self,
target: &PeerId,
protocol_index: usize,
request_data: Vec<u8>,
timeout: TNow,
) -> OutRequestId {
let target_connection_id = self.connection_id_for_peer(target).unwrap();
OutRequestId(self.inner.start_request(
let target_connection_id = match self.connection_id_for_peer(target) {
ConnectionIdForPeer::Connected(id) => id,
ConnectionIdForPeer::NotConnected => panic!(), // As documented.
ConnectionIdForPeer::ConnectedButShuttingDown(id) => {
// The situation where we are connected to the peer but only through a connection
// that is shutting down is a bit complicated. We can't start a request, as that
// would be invalid, so we generate a so-called "fake request" and when we
// connection later shuts down we report about the request failure.
// Note that we don't immediately report the failure, as that could cause an upper
// layer to immediately try the request again before it has processed the relevant
// `Disconnect` event.
let fake_id = self.next_fake_out_request_id;
self.next_fake_out_request_id += 1;
self.inner[id].fake_out_requests.push(fake_id);
return OutRequestId(OutRequestIdInner::Fake(fake_id));
}
};

OutRequestId(OutRequestIdInner::Real(self.inner.start_request(
target_connection_id,
protocol_index,
request_data,
timeout,
))
)))
}

/// Responds to a previously-emitted [`Event::RequestIn`].
Expand All @@ -1197,21 +1252,13 @@ where

/// Returns `true` if there exists an established connection with the given peer.
pub fn has_established_connection(&self, peer_id: &PeerId) -> bool {
let peer_index = match self.peer_indices.get(peer_id) {
Some(idx) => *idx,
None => return false,
};

self.connections_by_peer
.range(
(peer_index, ConnectionId::min_value())..=(peer_index, ConnectionId::max_value()),
)
.any(|(_, connection_id)| {
// Note that connections that are shutting down are still counted,
// as we report the disconnected event only at the end of the
// shutdown.
self.inner.connection_state(*connection_id).established
})
// Connections that are shutting down are still counted, as we report the disconnected
// event only at the end of the shutdown.
match self.connection_id_for_peer(peer_id) {
ConnectionIdForPeer::Connected(_)
| ConnectionIdForPeer::ConnectedButShuttingDown(_) => true,
ConnectionIdForPeer::NotConnected => false,
}
}

/// Returns an iterator to the list of [`PeerId`]s that we have an established connection
Expand Down Expand Up @@ -1248,25 +1295,41 @@ where
}

/// Picks the connection to use to send requests or notifications to the given peer.
fn connection_id_for_peer(&self, target: &PeerId) -> Option<collection::ConnectionId> {
let peer_index = *self.peer_indices.get(target)?;
///
/// This function tries to find a connection that is established and not shutting down. If it
/// can't find any, it instead tries to find a connection that is established but shutting
/// down. While it is technically not possible to send requests or notifications to connections
/// that are shutting down, the API user is still allowed to do so, and thus shouldn't lead to
/// a panic or error.
fn connection_id_for_peer(&self, target: &PeerId) -> ConnectionIdForPeer {
let peer_index = match self.peer_indices.get(target) {
Some(i) => *i,
None => return ConnectionIdForPeer::NotConnected,
};

if let Some((_, connection_id)) = self
.connections_by_peer
.range(
(peer_index, collection::ConnectionId::min_value())
..=(peer_index, collection::ConnectionId::max_value()),
)
.find(|(_, connection_id)| {
let state = self.inner.connection_state(*connection_id);
// TODO: this is wrong because shutting down connections do not immediately lead to a Disconnect event in the public API, meaning that the user can get a panic despite using the API correctly
state.established && !state.shutting_down
})
{
return Some(*connection_id);
let mut found_shutting_down = None;

for (_, connection_id) in self.connections_by_peer.range(
(peer_index, collection::ConnectionId::min_value())
..=(peer_index, collection::ConnectionId::max_value()),
) {
let state = self.inner.connection_state(*connection_id);
if !state.established {
continue;
}

if !state.shutting_down {
return ConnectionIdForPeer::Connected(*connection_id);
} else {
found_shutting_down = Some(*connection_id);
}
}

None
if let Some(found_shutting_down) = found_shutting_down {
ConnectionIdForPeer::ConnectedButShuttingDown(found_shutting_down)
} else {
ConnectionIdForPeer::NotConnected
}
}

fn peer_index_or_insert(&mut self, peer_id: &PeerId) -> usize {
Expand Down Expand Up @@ -1335,6 +1398,12 @@ impl<TConn, TNow> ops::IndexMut<ConnectionId> for Peers<TConn, TNow> {
}
}

enum ConnectionIdForPeer {
Connected(ConnectionId),
ConnectedButShuttingDown(ConnectionId),
NotConnected,
}

/// See [`Event::DesiredInNotification`].
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct DesiredInNotificationId(usize);
Expand All @@ -1349,7 +1418,16 @@ pub struct InRequestId(collection::SubstreamId);

/// See [`Peers::start_request`].
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct OutRequestId(collection::SubstreamId);
pub struct OutRequestId(OutRequestIdInner);

/// There are two kinds of outgoing requests: "real" ones, and "fake" ones. Fake requests exist for
/// API consistency purposes: when a connection is shutting down and the user starts an outgoing
/// request, we create a fake request then report its failure.
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
enum OutRequestIdInner {
Real(collection::SubstreamId),
Fake(u64),
}

/// Event happening over the network. See [`Peers::next_event`].
// TODO: in principle we could return `&PeerId` instead of `PeerId` most of the time, but this causes many borrow checker issues in the upper layer and I'm not motivated enough to deal with that
Expand Down

0 comments on commit 4472dc1

Please sign in to comment.