Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests to established connections #2323

Merged
merged 17 commits into from
Oct 13, 2022
6 changes: 6 additions & 0 deletions bin/wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

## Unreleased

### Fixed

- Fix timeout not being checked when opening a notifications substream. ([#2323](https://github.com/paritytech/smoldot/pull/2323))
- Fix inbound notifications substreams close requests being ignored. ([#2323](https://github.com/paritytech/smoldot/pull/2323))
- Fix closed inbound notifications substreams still being considered as open when closed gracefully by the remote. ([#2323](https://github.com/paritytech/smoldot/pull/2323))

## 0.7.2 - 2022-10-12

### Changed
Expand Down
2 changes: 1 addition & 1 deletion src/libp2p/collection/single_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,9 @@ where
},
) => {
let inner_substream_id = established.open_notifications_substream(
now,
overlay_network_index,
handshake,
now + Duration::from_secs(20), // TODO: make configurable
(),
);

Expand Down
1 change: 1 addition & 0 deletions src/libp2p/connection/established.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
mod multi_stream;
mod single_stream;
pub mod substream;
mod tests;

use super::yamux;
use alloc::{string::String, vec::Vec};
Expand Down
35 changes: 20 additions & 15 deletions src/libp2p/connection/established/single_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,10 +441,15 @@ where
let state_machine_extracted = match state_machine_refmut.take() {
Some(s) => s,
None => {
// We can only happen if substream state machine has been reset,
// in which case it can't be in the "closed gracefully" state.
// Reaching this would indicate a bug in yamux.
unreachable!()
// Substream has already been removed from the Yamux state machine
// previously. We know that it can't yield any more event.
self.inner.yamux.remove_dead_substream(dead_substream_id);

// Removing a dead substream might lead to Yamux being able to
// process more incoming data. As such, we loop again.
must_continue_looping = true;

continue;
}
};

Expand Down Expand Up @@ -603,19 +608,21 @@ where
substream.write(inner.intermediary_buffer[..written_bytes].to_vec());
}
if !write_is_closed && closed_after {
// TODO: use return value
// TODO: substream.close();
debug_assert_eq!(written_bytes, 0);
substream.close();
}

match substream_update {
Some(s) => *substream.user_data_mut() = Some(s),
None => {
// TODO: only reset if not already closed
inner
.yamux
.substream_by_id_mut(substream_id)
.unwrap()
.reset();
if !closed_after || !read_is_closed {
// TODO: what we do here is definitely correct, but the docs of `reset()` seem sketchy, investigate
inner
.yamux
.substream_by_id_mut(substream_id)
.unwrap()
.reset();
}
}
};

Expand Down Expand Up @@ -867,9 +874,9 @@ where
///
pub fn open_notifications_substream(
&mut self,
now: TNow,
protocol_index: usize,
handshake: Vec<u8>,
timeout: TNow,
user_data: TNotifUd,
) -> SubstreamId {
let max_handshake_size =
Expand All @@ -878,8 +885,6 @@ where
// TODO: turn this assert into something that can't panic?
assert!(handshake.len() <= max_handshake_size);

let timeout = now + Duration::from_secs(20); // TODO:

let substream =
self.inner
.yamux
Expand Down
43 changes: 43 additions & 0 deletions src/libp2p/connection/established/substream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ enum SubstreamInner<TNow, TRqUd, TNotifUd> {
/// A notifications protocol has been negotiated on a substream. Either a successful handshake
/// or an abrupt closing is now expected.
NotificationsOutHandshakeRecv {
/// When the opening will time out in the absence of response.
timeout: TNow,
/// Buffer for the incoming handshake.
handshake_in: leb128::FramedInProgress,
/// Handshake payload to write out.
Expand All @@ -84,6 +86,9 @@ enum SubstreamInner<TNow, TRqUd, TNotifUd> {
notifications: VecDeque<u8>,
/// Data passed by the user to [`Substream::notifications_out`].
user_data: TNotifUd,
/// If `true`, we have reported a [`Event::NotificationsOutCloseDemanded`] event in the
/// past and shouldn't report one again.
close_demanded_by_remote: bool,
},
/// A notifications protocol has been closed. Waiting for the remote to close it as well.
NotificationsOutClosed,
Expand Down Expand Up @@ -462,6 +467,7 @@ where

(
Some(SubstreamInner::NotificationsOutHandshakeRecv {
timeout,
handshake_in: leb128::FramedInProgress::new(max_handshake_size),
handshake_out,
user_data,
Expand Down Expand Up @@ -498,10 +504,22 @@ where
)
}
SubstreamInner::NotificationsOutHandshakeRecv {
timeout,
handshake_in,
mut handshake_out,
user_data,
} => {
if timeout < read_write.now {
return (
Some(SubstreamInner::NotificationsOutNegotiationFailed),
Some(Event::NotificationsOutResult {
result: Err((NotificationsOutErr::Timeout, user_data)),
}),
);
}

read_write.wake_up_after(&timeout);

read_write.write_from_vec_deque(&mut handshake_out);

let incoming_buffer = match read_write.incoming_buffer {
Expand All @@ -521,6 +539,7 @@ where
if !handshake_out.is_empty() {
return (
Some(SubstreamInner::NotificationsOutHandshakeRecv {
timeout,
handshake_in,
handshake_out,
user_data,
Expand All @@ -537,6 +556,7 @@ where
Some(SubstreamInner::NotificationsOut {
notifications: VecDeque::new(),
user_data,
close_demanded_by_remote: false,
}),
Some(Event::NotificationsOutResult {
result: Ok(remote_handshake),
Expand All @@ -547,6 +567,7 @@ where
read_write.advance_read(num_read);
(
Some(SubstreamInner::NotificationsOutHandshakeRecv {
timeout,
handshake_in,
handshake_out,
user_data,
Expand All @@ -565,14 +586,32 @@ where
SubstreamInner::NotificationsOut {
mut notifications,
user_data,
close_demanded_by_remote,
} => {
// Receiving data on an outgoing substream is forbidden by the protocol.
read_write.discard_all_incoming();
read_write.write_from_vec_deque(&mut notifications);

// If this debug assertion fails, it means that `incoming_buffer` was `None` in
// the past then became `Some` again.
debug_assert!(!close_demanded_by_remote || read_write.incoming_buffer.is_none());

if !close_demanded_by_remote && read_write.incoming_buffer.is_none() {
return (
Some(SubstreamInner::NotificationsOut {
notifications,
user_data,
close_demanded_by_remote: true,
}),
Some(Event::NotificationsOutCloseDemanded),
);
}

(
Some(SubstreamInner::NotificationsOut {
notifications,
user_data,
close_demanded_by_remote,
}),
None,
)
Expand Down Expand Up @@ -879,6 +918,10 @@ where
} => {
read_write.write_from_vec_deque(&mut handshake);

if close_desired && handshake.is_empty() {
read_write.close_write_if_empty();
}

let incoming_buffer = match read_write.incoming_buffer {
Some(buf) => buf,
None => {
Expand Down
Loading