Skip to content

Commit

Permalink
Fix panic in Yamux state machine when a remote closes a substream wit…
Browse files Browse the repository at this point in the history
…h an active timeout (#1122)

* Fix Yamux state mismatch

* Review usage of `wake_up_after`

* CHANGELOG

* Fix PR link
  • Loading branch information
tomaka authored Sep 8, 2023
1 parent 752d699 commit 85f6cad
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 13 deletions.
7 changes: 3 additions & 4 deletions lib/src/libp2p/collection/single_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -788,10 +788,6 @@ where
return;
}

// `read_write()` should be called again as soon as possible after `timeout` in
// order for the check above to work.
read_write.wake_up_after(&timeout);

loop {
let (read_before, written_before) =
(read_write.read_bytes, read_write.write_bytes_queued);
Expand All @@ -818,6 +814,9 @@ where
if (read_before, written_before)
== (read_write.read_bytes, read_write.write_bytes_queued) =>
{
// `read_write()` should be called again as soon as possible
// after `timeout`.
read_write.wake_up_after(&timeout);
self.connection = SingleStreamConnectionTaskInner::Handshake {
handshake: updated_handshake,
randomness_seed,
Expand Down
9 changes: 8 additions & 1 deletion lib/src/libp2p/connection/established/single_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,14 @@ where
return Ok((self, Some(Event::PingOutFailed)));
}
}
read_write.wake_up_after(&self.inner.next_ping);

// Only wake up if the connection can be closed.
// TODO: review w.r.t. https://github.com/smol-dot/smoldot/issues/1121
if read_write.expected_incoming_bytes.is_some()
&& read_write.write_bytes_queueable.is_some()
{
read_write.wake_up_after(&self.inner.next_ping);
}

// If we have both sent and received a GoAway frame, that means that no new substream
// can be opened. If in addition to this there is no substream in the connection,
Expand Down
8 changes: 4 additions & 4 deletions lib/src/libp2p/connection/established/substream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,8 +476,6 @@ where
);
}

read_write.wake_up_after(&timeout);

if let Some(extracted_negotiation) = negotiation.take() {
match extracted_negotiation.read_write(read_write) {
Ok(multistream_select::Negotiation::InProgress(nego)) => {
Expand Down Expand Up @@ -590,6 +588,8 @@ where
}
}

read_write.wake_up_after(&timeout);

(
Some(SubstreamInner::NotificationsOutHandshakeRecv {
timeout,
Expand Down Expand Up @@ -667,7 +667,6 @@ where
}),
);
}
read_write.wake_up_after(&timeout);

if let Some(extracted_nego) = negotiation.take() {
match extracted_nego.read_write(read_write) {
Expand Down Expand Up @@ -754,6 +753,8 @@ where
}
}

read_write.wake_up_after(&timeout);

(
Some(SubstreamInner::RequestOut {
timeout,
Expand Down Expand Up @@ -1066,7 +1067,6 @@ where
return (Some(SubstreamInner::PingOutFailed { queued_pings }), None);
}
if queued_pings.remove(0).is_some() {
read_write.wake_up_asap();
return (
Some(SubstreamInner::PingOut {
negotiation,
Expand Down
28 changes: 24 additions & 4 deletions lib/src/libp2p/connection/yamux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,10 @@ where
if fin {
*remote_write_closed = true;

// No need to send window frames anymore if the remote has
// sent a FIN.
self.inner.window_frames_to_send.remove(&stream_id);

// Wake up the substream.
match substreams_wake_up_key {
Some(Some(when)) if *when <= outer_read_write.now => {}
Expand Down Expand Up @@ -1960,7 +1964,7 @@ where

// When to wake up the substream for reading again.
debug_assert!(matches!(substreams_wake_up_key, None));
let will_wake_up_again_asap_read = match (
let will_wake_up_read_again = match (
self.inner_read_write.read_bytes,
&self.inner_read_write.wake_up_after,
) {
Expand All @@ -1976,7 +1980,7 @@ where
.substreams_wake_up
.insert((Some(when.clone()), self.substream_id));
*substreams_wake_up_key = Some(Some(when.clone()));
false
true
}
_ => {
// Non-zero bytes written or `when <= now`.
Expand Down Expand Up @@ -2089,10 +2093,10 @@ where
// Substream has nothing to write.
}

// TODO: review w.r.t. wake ups
// Mark the substream as dead if it won't ever wake up again.
if matches!(local_write_close, SubstreamStateLocalWrite::FinQueued)
&& *remote_write_closed
&& !will_wake_up_again_asap_read
&& !will_wake_up_read_again
&& !self
.yamux
.inner
Expand All @@ -2106,6 +2110,22 @@ where
{
let _was_inserted = self.yamux.inner.dead_substreams.insert(self.substream_id);
debug_assert!(_was_inserted);
debug_assert!(!self
.yamux
.inner
.substreams_wake_up
.iter()
.any(|(_, s)| *s == self.substream_id));
debug_assert!(!self
.yamux
.inner
.substreams_write_ready
.contains(&self.substream_id));
debug_assert!(!self
.yamux
.inner
.window_frames_to_send
.contains_key(&self.substream_id));
}
}

Expand Down
4 changes: 4 additions & 0 deletions wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

### Fixed

- Fix panic in Yamux state machine when a remote closes a substream with an active timeout. ([#1122](https://github.com/smol-dot/smoldot/pull/1122))

## 2.0.0 - 2023-09-07

### Remove
Expand Down

0 comments on commit 85f6cad

Please sign in to comment.