Skip to content

Commit

Permalink
Add Network::start_close_in_notifications (#1328)
Browse files Browse the repository at this point in the history
* Add infrastructure to close ingoing substreams

* Properly handle timeout in substream.rs

* Docfix
  • Loading branch information
tomaka authored Nov 13, 2023
1 parent 902cec4 commit eced0fa
Show file tree
Hide file tree
Showing 7 changed files with 281 additions and 95 deletions.
100 changes: 88 additions & 12 deletions lib/src/libp2p/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ enum SubstreamState {
/// Substream hasn't been accepted or refused yet.
Pending,
Open,
/// Substream is in the process of being closed. Only relevant for inbound substreams.
RequestedClosing,
}

impl<TConn, TNow> Network<TConn, TNow>
Expand Down Expand Up @@ -949,6 +951,43 @@ where
}
}

/// Start the closing of an inbound notifications substream that was previously accepted with
/// [`Network::accept_in_notifications`]
///
/// Calling this function will later generate a [`Event::NotificationsInClose`] event once the
/// substream is effectively closed.
/// This function gracefully asks the remote to close the substream. The remote has the
/// duration indicated with `timeout` to effectively close the substream. In the meanwhile,
/// notifications can still be received.
///
/// This function generates a message destined to the connection. Use
/// [`Network::pull_message_to_connection`] to process these messages after it has returned.
///
/// # Panic
///
/// Panics if the [`SubstreamId`] doesn't correspond to an accepted inbound notifications
/// substream.
///
#[track_caller]
pub fn start_close_in_notifications(&mut self, substream_id: SubstreamId, timeout: Duration) {
let (connection_id, state, inner_substream_id) =
match self.ingoing_notification_substreams.get_mut(&substream_id) {
Some(s) => s,
None => panic!(),
};
assert!(matches!(state, SubstreamState::Open));

self.messages_to_connections.push_back((
*connection_id,
CoordinatorToConnectionInner::CloseInNotifications {
substream_id: *inner_substream_id,
timeout,
},
));

*state = SubstreamState::RequestedClosing;
}

/// Responds to an incoming request. Must be called in response to a [`Event::RequestIn`].
///
/// If the substream was in the meanwhile yielded in an [`Event::RequestInCancel`], then this
Expand Down Expand Up @@ -1084,6 +1123,7 @@ where
substream_id,
result: Err(NotificationsOutErr::ConnectionShutdown),
},
SubstreamState::RequestedClosing => unreachable!(), // Never set for outgoing notification substreams.
});
}

Expand All @@ -1103,16 +1143,24 @@ where
.map(|(k, v)| (*k, *v))
.next()
{
self.ingoing_notification_substreams
let (_, state, _) = self
.ingoing_notification_substreams
.remove(&substream_id)
.unwrap();
self.ingoing_notification_substreams_by_connection
.remove(&key)
.unwrap();

return Some(Event::NotificationsInClose {
substream_id,
outcome: Err(NotificationsInClosedErr::ConnectionShutdown),
return Some(match state {
SubstreamState::Open | SubstreamState::RequestedClosing => {
Event::NotificationsInClose {
substream_id,
outcome: Err(NotificationsInClosedErr::ConnectionShutdown),
}
}
SubstreamState::Pending => {
Event::NotificationsInOpenCancel { substream_id }
}
});
}

Expand Down Expand Up @@ -1474,12 +1522,14 @@ where
.remove(&substream_id)
.unwrap();
match state {
SubstreamState::Open => Event::NotificationsInClose {
substream_id,
outcome: Err(NotificationsInClosedErr::Substream(
established::NotificationsInClosedErr::SubstreamReset,
)),
},
SubstreamState::Open | SubstreamState::RequestedClosing => {
Event::NotificationsInClose {
substream_id,
outcome: Err(NotificationsInClosedErr::Substream(
established::NotificationsInClosedErr::SubstreamReset,
)),
}
}
SubstreamState::Pending => {
Event::NotificationsInOpenCancel { substream_id }
}
Expand Down Expand Up @@ -1535,8 +1585,26 @@ where
.ingoing_notification_substreams_by_connection
.remove(&(connection_id, inner_substream_id))
.unwrap();
let _was_in = self.ingoing_notification_substreams.remove(&substream_id);
debug_assert!(_was_in.is_some());
let (_, state, _) = self
.ingoing_notification_substreams
.remove(&substream_id)
.unwrap();
debug_assert!(matches!(
state,
SubstreamState::Open | SubstreamState::RequestedClosing
));

if let SubstreamState::Open = state {
// As documented, we must confirm the reception of the event by sending
// back a rejection, provided that no such event has been sent beforehand.
self.messages_to_connections.push_back((
connection_id,
CoordinatorToConnectionInner::CloseInNotifications {
substream_id: inner_substream_id,
timeout: Duration::new(0, 0),
},
));
}

Event::NotificationsInClose {
substream_id,
Expand Down Expand Up @@ -1756,6 +1824,10 @@ enum ConnectionToCoordinatorInner {
notification: Vec<u8>,
},
/// See the corresponding event in [`established::Event`].
///
/// In order to avoid race conditions, this must always be acknowledged by sending back a
/// [`CoordinatorToConnectionInner::CloseInNotifications`] message if no such message was
/// sent in the past.
NotificationsInClose {
id: established::SubstreamId,
outcome: Result<(), established::NotificationsInClosedErr>,
Expand Down Expand Up @@ -1856,6 +1928,10 @@ enum CoordinatorToConnectionInner {
RejectInNotifications {
substream_id: established::SubstreamId,
},
CloseInNotifications {
substream_id: established::SubstreamId,
timeout: Duration,
},

/// Answer an incoming request.
///
Expand Down
71 changes: 36 additions & 35 deletions lib/src/libp2p/collection/multi_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,12 @@ enum MultiStreamConnectionTaskInner<TNow, TSubId> {
outbound_substreams_map:
hashbrown::HashMap<SubstreamId, established::SubstreamId, fnv::FnvBuildHasher>,

/// After a [`ConnectionToCoordinatorInner::NotificationsInOpenCancel`] is emitted, an
/// After a [`ConnectionToCoordinatorInner::NotificationsInOpenCancel`] or a
/// [`ConnectionToCoordinatorInner::NotificationsInClose`] is emitted, an
/// entry is added to this list. If the coordinator accepts or refuses a substream in this
/// list, the acceptance/refusal is dismissed.
notifications_in_open_cancel_acknowledgments: VecDeque<established::SubstreamId>,

/// After a `NotificationsInOpenCancel` is emitted by the connection, an
/// entry is added to this list. If the coordinator accepts or refuses a substream in this
/// list, the acceptance/refusal is dismissed.
/// list, or closes a substream in this list, the acceptance/refusal/closing is dismissed.
// TODO: this works only because SubstreamIds aren't reused
inbound_negotiated_cancel_acknowledgments:
notifications_in_close_acknowledgments:
hashbrown::HashSet<established::SubstreamId, fnv::FnvBuildHasher>,

/// Messages about inbound accept cancellations to send back.
Expand Down Expand Up @@ -212,8 +208,7 @@ where
established,
outbound_substreams_map,
handshake_finished_message_to_send,
notifications_in_open_cancel_acknowledgments,
inbound_negotiated_cancel_acknowledgments,
notifications_in_close_acknowledgments,
inbound_accept_cancel_events,
..
} => {
Expand Down Expand Up @@ -254,7 +249,7 @@ where
Some(ConnectionToCoordinatorInner::InboundNegotiated { id, protocol_name })
}
Some(established::Event::InboundNegotiatedCancel { id, .. }) => {
inbound_negotiated_cancel_acknowledgments.insert(id);
notifications_in_close_acknowledgments.insert(id);
None
}
Some(established::Event::InboundAcceptedCancel { id, .. }) => {
Expand All @@ -281,13 +276,14 @@ where
Some(ConnectionToCoordinatorInner::NotificationsInOpen { id, handshake })
}
Some(established::Event::NotificationsInOpenCancel { id, .. }) => {
notifications_in_open_cancel_acknowledgments.push_back(id);
notifications_in_close_acknowledgments.insert(id);
Some(ConnectionToCoordinatorInner::NotificationsInOpenCancel { id })
}
Some(established::Event::NotificationIn { id, notification }) => {
Some(ConnectionToCoordinatorInner::NotificationIn { id, notification })
}
Some(established::Event::NotificationsInClose { id, outcome, .. }) => {
notifications_in_close_acknowledgments.insert(id);
Some(ConnectionToCoordinatorInner::NotificationsInClose { id, outcome })
}
Some(established::Event::NotificationsOutResult { id, result }) => {
Expand Down Expand Up @@ -389,12 +385,12 @@ where
},
MultiStreamConnectionTaskInner::Established {
established,
inbound_negotiated_cancel_acknowledgments,
notifications_in_close_acknowledgments,
inbound_accept_cancel_events,
..
},
) => {
if !inbound_negotiated_cancel_acknowledgments.remove(&substream_id) {
if !notifications_in_close_acknowledgments.remove(&substream_id) {
established.accept_inbound(substream_id, inbound_ty, None);
} else {
inbound_accept_cancel_events.push_back(substream_id)
Expand All @@ -404,11 +400,11 @@ where
CoordinatorToConnectionInner::RejectInbound { substream_id },
MultiStreamConnectionTaskInner::Established {
established,
inbound_negotiated_cancel_acknowledgments,
notifications_in_close_acknowledgments,
..
},
) => {
if !inbound_negotiated_cancel_acknowledgments.remove(&substream_id) {
if !notifications_in_close_acknowledgments.remove(&substream_id) {
established.reject_inbound(substream_id);
}
}
Expand Down Expand Up @@ -475,7 +471,7 @@ where
// user close the substream before the message about the substream being closed
// was delivered to the coordinator.
if let Some(inner_substream_id) = outbound_substreams_map.remove(&substream_id) {
established.close_notifications_substream(inner_substream_id);
established.close_out_notifications_substream(inner_substream_id);
}
}
(
Expand Down Expand Up @@ -520,16 +516,11 @@ where
},
MultiStreamConnectionTaskInner::Established {
established,
notifications_in_open_cancel_acknowledgments,
notifications_in_close_acknowledgments,
..
},
) => {
if let Some(idx) = notifications_in_open_cancel_acknowledgments
.iter()
.position(|s| *s == substream_id)
{
notifications_in_open_cancel_acknowledgments.remove(idx);
} else {
if !notifications_in_close_acknowledgments.remove(&substream_id) {
established.accept_in_notifications_substream(
substream_id,
handshake,
Expand All @@ -541,19 +532,30 @@ where
CoordinatorToConnectionInner::RejectInNotifications { substream_id },
MultiStreamConnectionTaskInner::Established {
established,
notifications_in_open_cancel_acknowledgments,
notifications_in_close_acknowledgments,
..
},
) => {
if let Some(idx) = notifications_in_open_cancel_acknowledgments
.iter()
.position(|s| *s == substream_id)
{
notifications_in_open_cancel_acknowledgments.remove(idx);
} else {
if !notifications_in_close_acknowledgments.remove(&substream_id) {
established.reject_in_notifications_substream(substream_id);
}
}
(
CoordinatorToConnectionInner::CloseInNotifications {
substream_id,
timeout,
},
MultiStreamConnectionTaskInner::Established {
established,
notifications_in_close_acknowledgments,
..
},
) => {
if !notifications_in_close_acknowledgments.remove(&substream_id) {
established
.close_in_notifications_substream(substream_id, now.clone() + timeout);
}
}
(
CoordinatorToConnectionInner::StartShutdown { .. },
MultiStreamConnectionTaskInner::Handshake { .. }
Expand All @@ -571,6 +573,7 @@ where
| CoordinatorToConnectionInner::RejectInbound { .. }
| CoordinatorToConnectionInner::AcceptInNotifications { .. }
| CoordinatorToConnectionInner::RejectInNotifications { .. }
| CoordinatorToConnectionInner::CloseInNotifications { .. }
| CoordinatorToConnectionInner::StartRequest { .. }
| CoordinatorToConnectionInner::AnswerRequest { .. }
| CoordinatorToConnectionInner::OpenOutNotifications { .. }
Expand All @@ -584,6 +587,7 @@ where
| CoordinatorToConnectionInner::RejectInbound { .. }
| CoordinatorToConnectionInner::AcceptInNotifications { .. }
| CoordinatorToConnectionInner::RejectInNotifications { .. }
| CoordinatorToConnectionInner::CloseInNotifications { .. }
| CoordinatorToConnectionInner::StartRequest { .. }
| CoordinatorToConnectionInner::AnswerRequest { .. }
| CoordinatorToConnectionInner::OpenOutNotifications { .. }
Expand Down Expand Up @@ -989,10 +993,7 @@ where
0,
Default::default(),
),
notifications_in_open_cancel_acknowledgments: VecDeque::with_capacity(
4,
),
inbound_negotiated_cancel_acknowledgments:
notifications_in_close_acknowledgments:
hashbrown::HashSet::with_capacity_and_hasher(2, Default::default()),
inbound_accept_cancel_events: VecDeque::with_capacity(2),
};
Expand Down
Loading

0 comments on commit eced0fa

Please sign in to comment.