Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix panic in Yamux state machine when a remote closes a substream with an active timeout #1122

Merged
merged 4 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions lib/src/libp2p/collection/single_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -788,10 +788,6 @@ where
return;
}

// `read_write()` should be called again as soon as possible after `timeout` in
// order for the check above to work.
read_write.wake_up_after(&timeout);

loop {
let (read_before, written_before) =
(read_write.read_bytes, read_write.write_bytes_queued);
Expand All @@ -818,6 +814,9 @@ where
if (read_before, written_before)
== (read_write.read_bytes, read_write.write_bytes_queued) =>
{
// `read_write()` should be called again as soon as possible
// after `timeout`.
read_write.wake_up_after(&timeout);
self.connection = SingleStreamConnectionTaskInner::Handshake {
handshake: updated_handshake,
randomness_seed,
Expand Down
9 changes: 8 additions & 1 deletion lib/src/libp2p/connection/established/single_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,14 @@ where
return Ok((self, Some(Event::PingOutFailed)));
}
}
read_write.wake_up_after(&self.inner.next_ping);

// Only wake up if the connection can be closed.
// TODO: review w.r.t. https://github.com/smol-dot/smoldot/issues/1121
if read_write.expected_incoming_bytes.is_some()
&& read_write.write_bytes_queueable.is_some()
{
read_write.wake_up_after(&self.inner.next_ping);
}

// If we have both sent and received a GoAway frame, that means that no new substream
// can be opened. If in addition to this there is no substream in the connection,
Expand Down
8 changes: 4 additions & 4 deletions lib/src/libp2p/connection/established/substream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,8 +476,6 @@ where
);
}

read_write.wake_up_after(&timeout);

if let Some(extracted_negotiation) = negotiation.take() {
match extracted_negotiation.read_write(read_write) {
Ok(multistream_select::Negotiation::InProgress(nego)) => {
Expand Down Expand Up @@ -590,6 +588,8 @@ where
}
}

read_write.wake_up_after(&timeout);

(
Some(SubstreamInner::NotificationsOutHandshakeRecv {
timeout,
Expand Down Expand Up @@ -667,7 +667,6 @@ where
}),
);
}
read_write.wake_up_after(&timeout);

if let Some(extracted_nego) = negotiation.take() {
match extracted_nego.read_write(read_write) {
Expand Down Expand Up @@ -754,6 +753,8 @@ where
}
}

read_write.wake_up_after(&timeout);

(
Some(SubstreamInner::RequestOut {
timeout,
Expand Down Expand Up @@ -1066,7 +1067,6 @@ where
return (Some(SubstreamInner::PingOutFailed { queued_pings }), None);
}
if queued_pings.remove(0).is_some() {
read_write.wake_up_asap();
return (
Some(SubstreamInner::PingOut {
negotiation,
Expand Down
28 changes: 24 additions & 4 deletions lib/src/libp2p/connection/yamux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,10 @@ where
if fin {
*remote_write_closed = true;

// No need to send window frames anymore if the remote has
// sent a FIN.
self.inner.window_frames_to_send.remove(&stream_id);

// Wake up the substream.
match substreams_wake_up_key {
Some(Some(when)) if *when <= outer_read_write.now => {}
Expand Down Expand Up @@ -1960,7 +1964,7 @@ where

// When to wake up the substream for reading again.
debug_assert!(matches!(substreams_wake_up_key, None));
let will_wake_up_again_asap_read = match (
let will_wake_up_read_again = match (
self.inner_read_write.read_bytes,
&self.inner_read_write.wake_up_after,
) {
Expand All @@ -1976,7 +1980,7 @@ where
.substreams_wake_up
.insert((Some(when.clone()), self.substream_id));
*substreams_wake_up_key = Some(Some(when.clone()));
false
true
}
_ => {
// Non-zero bytes written or `when <= now`.
Expand Down Expand Up @@ -2089,10 +2093,10 @@ where
// Substream has nothing to write.
}

// TODO: review w.r.t. wake ups
// Mark the substream as dead if it won't ever wake up again.
if matches!(local_write_close, SubstreamStateLocalWrite::FinQueued)
&& *remote_write_closed
&& !will_wake_up_again_asap_read
&& !will_wake_up_read_again
&& !self
.yamux
.inner
Expand All @@ -2106,6 +2110,22 @@ where
{
let _was_inserted = self.yamux.inner.dead_substreams.insert(self.substream_id);
debug_assert!(_was_inserted);
debug_assert!(!self
.yamux
.inner
.substreams_wake_up
.iter()
.any(|(_, s)| *s == self.substream_id));
debug_assert!(!self
.yamux
.inner
.substreams_write_ready
.contains(&self.substream_id));
debug_assert!(!self
.yamux
.inner
.window_frames_to_send
.contains_key(&self.substream_id));
}
}

Expand Down
4 changes: 4 additions & 0 deletions wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

### Fixed

- Fix panic in Yamux state machine when a remote closes a substream with an active timeout. ([#1122](https://github.com/smol-dot/smoldot/pull/1122))

## 2.0.0 - 2023-09-07

### Remove
Expand Down
Loading