Skip to content

Commit

Permalink
Add an event when a pending negotiation substream is reset (#521)
Browse files Browse the repository at this point in the history
* Add an event when a pending negotiation substream is reset

* PR link
  • Loading branch information
tomaka authored May 4, 2023
1 parent 3406d8d commit a9b5a5d
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 13 deletions.
59 changes: 53 additions & 6 deletions lib/src/libp2p/collection/multi_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ enum MultiStreamConnectionTaskInner<TNow, TSubId> {
/// entry is added to this list. If the coordinator accepts or refuses a substream in this
/// list, the acceptance/refusal is dismissed.
notifications_in_open_cancel_acknowledgments: VecDeque<established::SubstreamId>,

/// After a `NotificationsInOpenCancel` is emitted by the connection, an
/// entry is added to this list. If the coordinator accepts or refuses a substream in this
/// list, the acceptance/refusal is dismissed.
// TODO: this works only because SubstreamIds aren't reused
inbound_negotiated_cancel_acknowledgments:
hashbrown::HashSet<established::SubstreamId, fnv::FnvBuildHasher>,

/// Messages about inbound accept cancellations to send back.
inbound_accept_cancel_events: VecDeque<established::SubstreamId>,
},

/// Connection has finished its shutdown. A [`ConnectionToCoordinatorInner::ShutdownFinished`]
Expand Down Expand Up @@ -205,6 +215,8 @@ where
outbound_substreams_map,
handshake_finished_message_to_send,
notifications_in_open_cancel_acknowledgments,
inbound_negotiated_cancel_acknowledgments,
inbound_accept_cancel_events,
..
} => {
if let Some(remote_peer_id) = handshake_finished_message_to_send.take() {
Expand All @@ -216,6 +228,17 @@ where
);
}

if let Some(substream_id) = inbound_accept_cancel_events.pop_front() {
return (
Some(self),
Some(ConnectionToCoordinator {
inner: ConnectionToCoordinatorInner::InboundAcceptedCancel {
_id: substream_id,
},
}),
);
}

let event = match established.pull_event() {
Some(established::Event::NewOutboundSubstreamsForbidden) => {
// TODO: handle properly
Expand All @@ -232,6 +255,10 @@ where
Some(established::Event::InboundNegotiated { id, protocol_name }) => {
Some(ConnectionToCoordinatorInner::InboundNegotiated { id, protocol_name })
}
Some(established::Event::InboundNegotiatedCancel { id, .. }) => {
inbound_negotiated_cancel_acknowledgments.insert(id);
None
}
Some(established::Event::InboundAcceptedCancel { id, .. }) => {
Some(ConnectionToCoordinatorInner::InboundAcceptedCancel { _id: id })
}
Expand Down Expand Up @@ -362,7 +389,12 @@ where
substream_id,
inbound_ty,
},
MultiStreamConnectionTaskInner::Established { established, .. },
MultiStreamConnectionTaskInner::Established {
established,
inbound_negotiated_cancel_acknowledgments,
inbound_accept_cancel_events,
..
},
) => {
let (inbound_ty, protocol_index) = match inbound_ty {
InboundTy::Notifications {
Expand All @@ -382,15 +414,27 @@ where
InboundTy::Ping => (established::InboundTy::Ping, 0),
};

// TODO: /!\ will panic if substream is obsolete, instead just ignore the response
established.accept_inbound(substream_id, inbound_ty, either::Right(protocol_index));
if !inbound_negotiated_cancel_acknowledgments.remove(&substream_id) {
established.accept_inbound(
substream_id,
inbound_ty,
either::Right(protocol_index),
);
} else {
inbound_accept_cancel_events.push_back(substream_id)
}
}
(
CoordinatorToConnectionInner::RejectInbound { substream_id },
MultiStreamConnectionTaskInner::Established { established, .. },
MultiStreamConnectionTaskInner::Established {
established,
inbound_negotiated_cancel_acknowledgments,
..
},
) => {
// TODO: /!\ will panic if substream is obsolete, instead just ignore the response
established.reject_inbound(substream_id);
if !inbound_negotiated_cancel_acknowledgments.remove(&substream_id) {
established.reject_inbound(substream_id);
}
}
(
CoordinatorToConnectionInner::StartRequest {
Expand Down Expand Up @@ -985,6 +1029,9 @@ where
notifications_in_open_cancel_acknowledgments: VecDeque::with_capacity(
4,
),
inbound_negotiated_cancel_acknowledgments:
hashbrown::HashSet::with_capacity_and_hasher(2, Default::default()),
inbound_accept_cancel_events: VecDeque::with_capacity(2),
};

if handshake_substream_still_open {
Expand Down
47 changes: 41 additions & 6 deletions lib/src/libp2p/collection/single_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ enum SingleStreamConnectionTaskInner<TNow> {
/// entry is added to this list. If the coordinator accepts or refuses a substream in this
/// list, the acceptance/refusal is dismissed.
notifications_in_open_cancel_acknowledgments: VecDeque<established::SubstreamId>,

/// After a `NotificationsInOpenCancel` is emitted by the connection, an
/// entry is added to this list. If the coordinator accepts or refuses a substream in this
/// list, the acceptance/refusal is dismissed.
// TODO: this works only because SubstreamIds aren't reused
inbound_negotiated_cancel_acknowledgments:
hashbrown::HashSet<established::SubstreamId, fnv::FnvBuildHasher>,
},

/// Connection has finished its shutdown. A [`ConnectionToCoordinatorInner::ShutdownFinished`]
Expand Down Expand Up @@ -216,7 +223,11 @@ where
substream_id,
inbound_ty,
},
SingleStreamConnectionTaskInner::Established { established, .. },
SingleStreamConnectionTaskInner::Established {
established,
inbound_negotiated_cancel_acknowledgments,
..
},
) => {
let (inbound_ty, protocol_index) = match inbound_ty {
InboundTy::Notifications {
Expand All @@ -236,15 +247,29 @@ where
InboundTy::Ping => (established::InboundTy::Ping, 0),
};

// TODO: /!\ will panic if substream is obsolete, instead just ignore the response
established.accept_inbound(substream_id, inbound_ty, either::Right(protocol_index));
if !inbound_negotiated_cancel_acknowledgments.remove(&substream_id) {
established.accept_inbound(
substream_id,
inbound_ty,
either::Right(protocol_index),
);
} else {
self.pending_messages.push_back(
ConnectionToCoordinatorInner::InboundAcceptedCancel { _id: substream_id },
)
}
}
(
CoordinatorToConnectionInner::RejectInbound { substream_id },
SingleStreamConnectionTaskInner::Established { established, .. },
SingleStreamConnectionTaskInner::Established {
established,
inbound_negotiated_cancel_acknowledgments,
..
},
) => {
// TODO: /!\ will panic if substream is obsolete, instead just ignore the response
established.reject_inbound(substream_id);
if !inbound_negotiated_cancel_acknowledgments.remove(&substream_id) {
established.reject_inbound(substream_id);
}
}
(
CoordinatorToConnectionInner::StartRequest {
Expand Down Expand Up @@ -546,6 +571,7 @@ where
established,
mut outbound_substreams_map,
mut notifications_in_open_cancel_acknowledgments,
mut inbound_negotiated_cancel_acknowledgments,
} => match established.read_write(read_write) {
Ok((connection, event)) => {
if read_write.is_dead() && event.is_none() {
Expand Down Expand Up @@ -589,6 +615,9 @@ where
},
);
}
Some(established::Event::InboundNegotiatedCancel { id, .. }) => {
inbound_negotiated_cancel_acknowledgments.insert(id);
}
Some(established::Event::InboundAcceptedCancel { id, .. }) => {
self.pending_messages.push_back(
ConnectionToCoordinatorInner::InboundAcceptedCancel { _id: id },
Expand Down Expand Up @@ -696,6 +725,7 @@ where
established: connection,
outbound_substreams_map,
notifications_in_open_cancel_acknowledgments,
inbound_negotiated_cancel_acknowledgments,
};
}
Err(err) => {
Expand Down Expand Up @@ -814,6 +844,11 @@ where
), // TODO: capacity?
notifications_in_open_cancel_acknowledgments:
VecDeque::with_capacity(4),
inbound_negotiated_cancel_acknowledgments:
hashbrown::HashSet::with_capacity_and_hasher(
2,
Default::default(),
),
};
break;
}
Expand Down
8 changes: 8 additions & 0 deletions lib/src/libp2p/connection/established.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ pub enum Event<TSubUd> {
protocol_name: String,
},

/// An inbound substream that is waiting for a call to [`SingleStream::accept_inbound`],
/// [`SingleStream::reject_inbound`], [`MultiStream::accept_inbound`], or
/// [`MultiStream::reject_inbound`] has been abruptly closed.
InboundNegotiatedCancel {
/// Identifier of the substream.
id: SubstreamId,
},

/// An inbound substream that was previously accepted using [`SingleStream::accept_inbound`]
/// or [`MultiStream::accept_inbound`] was closed by the remote or has generated an error.
InboundAcceptedCancel {
Expand Down
4 changes: 4 additions & 0 deletions lib/src/libp2p/connection/established/multi_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,10 @@ where
id: SubstreamId(SubstreamIdInner::MultiStream(substream_id)),
protocol_name,
},
substream::Event::InboundNegotiatedCancel => Event::InboundAcceptedCancel {
id: SubstreamId(SubstreamIdInner::MultiStream(substream_id)),
user_data: substream_user_data.take().unwrap(),
},
substream::Event::RequestIn { request } => Event::RequestIn {
id: SubstreamId(SubstreamIdInner::MultiStream(substream_id)),
request,
Expand Down
3 changes: 3 additions & 0 deletions lib/src/libp2p/connection/established/single_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,9 @@ where
id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)),
protocol_name,
},
substream::Event::InboundNegotiatedCancel => Event::InboundNegotiatedCancel {
id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)),
},
substream::Event::RequestIn { request } => Event::RequestIn {
id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)),
request,
Expand Down
7 changes: 6 additions & 1 deletion lib/src/libp2p/connection/established/substream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1059,7 +1059,7 @@ where
match self.inner {
SubstreamInner::InboundNegotiating(_, _) => None,
SubstreamInner::InboundNegotiatingAccept(_, _) => None,
SubstreamInner::InboundNegotiatingApiWait(_) => None,
SubstreamInner::InboundNegotiatingApiWait(_) => Some(Event::InboundNegotiatedCancel),
SubstreamInner::InboundFailed => None,
SubstreamInner::RequestOut { .. } => Some(Event::Response {
response: Err(RequestError::SubstreamReset),
Expand Down Expand Up @@ -1355,6 +1355,11 @@ pub enum Event {
/// [`Substream::accept_inbound`] or [`Substream::reject_inbound`] in order to resume.
InboundNegotiated(String),

/// An inbound substream that had successfully negotiated a protocol got abruptly closed
/// while waiting for the call to [`Substream::accept_inbound`] or
/// [`Substream::reject_inbound`].
InboundNegotiatedCancel,

/// Received a request in the context of a request-response protocol.
RequestIn {
/// Bytes of the request. Its interpretation is out of scope of this module.
Expand Down
4 changes: 4 additions & 0 deletions wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

### Fixed

- Fix panic when a remote opens a substream then immediately resets it before smoldot has been able to determine asynchronously whether to accept it or not. ([#521](https://github.com/smol-dot/smoldot/pull/521))

## 1.0.4 - 2023-05-03

### Added
Expand Down

0 comments on commit a9b5a5d

Please sign in to comment.