Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

General clean-up of the single stream connections code #2708

Merged
merged 25 commits into from
Sep 6, 2022
Merged
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
bec0190
Add some code comments
tomaka Aug 30, 2022
1fa5cb1
Remove the update_all method and inline it
tomaka Aug 30, 2022
85c864d
Typo: peer -> ping
tomaka Aug 30, 2022
aacce8b
Decode data frames in two steps
tomaka Aug 30, 2022
b11121d
Remove pending_events field
tomaka Aug 30, 2022
4b7a308
Call substream.reserve_window
tomaka Aug 30, 2022
1e3e419
Add a NewOutboundSubstreamsForbidden event
tomaka Aug 30, 2022
33f0182
Forbid new substreams after a GoAway frame
tomaka Aug 30, 2022
bc38ae9
Remove num_ping_failed_events field
tomaka Aug 30, 2022
7c21a9c
Merge branch 'main' into single-stream-improve
tomaka Aug 31, 2022
d2e3a42
Merge branch 'main' into single-stream-improve
tomaka Aug 31, 2022
480475e
More all yamux operations within a single loop
tomaka Sep 1, 2022
5b213ec
Add Yamux::is_empty
tomaka Sep 1, 2022
9104584
Add Yamux::send_goaway
tomaka Sep 1, 2022
aafb99b
Properly handle shutdown phase
tomaka Sep 1, 2022
753c1eb
Add SingleStream::deny_new_incoming_substreams
tomaka Sep 1, 2022
7e6ded7
Remove TODO about GoAway frames
tomaka Sep 1, 2022
b8735fe
Remove obsolete TODO in yamux
tomaka Sep 1, 2022
9792928
Add CHANGELOG entry
tomaka Sep 1, 2022
8f13baf
Fix missing continue
tomaka Sep 1, 2022
471dce0
Fix docs and spellcheck
tomaka Sep 1, 2022
49d1760
Only close connection if GoAway has been sent
tomaka Sep 2, 2022
38bf91f
Fix potential infinite loop in Yamux
tomaka Sep 2, 2022
1ef9fef
Merge branch 'main' into single-stream-improve
tomaka Sep 6, 2022
a6f037a
Merge branch 'main' into single-stream-improve
tomaka Sep 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 85 additions & 66 deletions src/libp2p/connection/established/single_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,15 @@ where
}
read_write.wake_up_after(&self.inner.next_ping);

// Decoding the incoming data.
// Processing incoming data might be blocked on emitting data or on removing dead
// substreams, and processing incoming data might lead to more data to emit. The easiest
// way to implement this is a single loop that does everything.
loop {
// Any meaningful activity within this loop can set this value to `true`. If this
// value is still `false` at the end of the loop, we return from the function due to
// having nothing more to do.
let mut must_continue_looping = false;

// If `self.inner.current_data_frame` is `Some`, that means that yamux has already
// processed some of the data in the buffer of decrypted data and has determined that
// it was data belonging to a certain substream. We now pass over this data again,
Expand Down Expand Up @@ -245,6 +252,7 @@ where
.map_err(Error::Noise)?;
read_write.advance_read(num_read);
} else {
// TODO: hack-ish
read_write.close_write();
return Ok((self, None));
}
Expand All @@ -258,12 +266,17 @@ where
.map_err(Error::Yamux)?;
self.inner.yamux = yamux_decode.yamux;

// TODO: it is possible that the yamux reading is blocked on writing
// If bytes_read is 0 and detail is None, then Yamux can't do anything more. On the
melekes marked this conversation as resolved.
Show resolved Hide resolved
// other hand, if bytes_read is != 0 or detail is Some, then Yamux might have more
// things to do, and we must loop again.
if !(yamux_decode.bytes_read == 0 && yamux_decode.detail.is_none()) {
must_continue_looping = true;
}

// Analyze how Yamux has parsed the data.
// This still contains references to the data in `self.encryption`.
match yamux_decode.detail {
None if yamux_decode.bytes_read == 0 => break,
None if yamux_decode.bytes_read == 0 => {}
None => {
self.encryption
.consume_inbound_data(yamux_decode.bytes_read);
Expand Down Expand Up @@ -334,86 +347,92 @@ where
unreachable!()
}
};
}

// Substreams that have been closed or reset aren't immediately removed the yamux state
// machine. They must be removed manually, which is what is done here.
while let Some((dead_substream_id, death_ty, state_machine)) =
self.inner.yamux.next_dead_substream()
{
let state_machine = match state_machine {
Some(s) => s,
None => continue,
};

match death_ty {
yamux::DeadSubstreamTy::Reset => {
if let Some(event) = state_machine.reset() {
return Ok((
self,
Some(Self::pass_through_substream_event(dead_substream_id, event)),
));
// Substreams that have been closed or reset aren't immediately removed the yamux state
// machine. They must be removed manually, which is what is done here.
while let Some((dead_substream_id, death_ty, state_machine)) =
self.inner.yamux.next_dead_substream()
{
// Removing a dead substream might lead to Yamux being able to process more
// incoming data. As such, we loop again.
must_continue_looping = true;

let state_machine = match state_machine {
Some(s) => s,
None => continue,
};

match death_ty {
yamux::DeadSubstreamTy::Reset => {
if let Some(event) = state_machine.reset() {
return Ok((
self,
Some(Self::pass_through_substream_event(dead_substream_id, event)),
));
}
}
}
yamux::DeadSubstreamTy::ClosedGracefully => {
let mut substream_read_write = ReadWrite {
now: read_write.now.clone(),
incoming_buffer: None,
outgoing_buffer: None,
read_bytes: 0,
written_bytes: 0,
wake_up_after: None,
};

let (_, _event) = state_machine.read_write(&mut substream_read_write);

if let Some(wake_up_after) = substream_read_write.wake_up_after {
read_write.wake_up_after(&wake_up_after);
yamux::DeadSubstreamTy::ClosedGracefully => {
let mut substream_read_write = ReadWrite {
now: read_write.now.clone(),
incoming_buffer: None,
outgoing_buffer: None,
read_bytes: 0,
written_bytes: 0,
wake_up_after: None,
};

let (_, _event) = state_machine.read_write(&mut substream_read_write);

if let Some(wake_up_after) = substream_read_write.wake_up_after {
read_write.wake_up_after(&wake_up_after);
}

// TODO: finish here
}

// TODO: finish here
}
}
}

// The yamux state machine contains the data that needs to be written out.
// Try to flush it.
loop {
// The yamux state machine contains the data that needs to be written out.
// Try to flush it.

// Calculate number of bytes that we can extract from yamux. This is similar but not
// exactly the same as the size of the outgoing buffer, as noise adds some headers to
// the data.
let unencrypted_bytes_to_extract = self
.encryption
.encrypt_size_conv(read_write.outgoing_buffer_available());
if unencrypted_bytes_to_extract == 0 {
break;
}

// Extract outgoing data that is buffered within yamux.
// TODO: don't allocate an intermediary buffer, but instead pass them directly to the encryption
let mut buffers = Vec::with_capacity(32);
let mut extract_out = self.inner.yamux.extract_out(unencrypted_bytes_to_extract);
while let Some(buffer) = extract_out.next() {
buffers.push(buffer.as_ref().to_vec()); // TODO: copy
}
if unencrypted_bytes_to_extract != 0 {
// Extract outgoing data that is buffered within yamux.
// TODO: don't allocate an intermediary buffer, but instead pass them directly to the encryption
let mut buffers = Vec::with_capacity(32);
let mut extract_out = self.inner.yamux.extract_out(unencrypted_bytes_to_extract);
while let Some(buffer) = extract_out.next() {
buffers.push(buffer.as_ref().to_vec()); // TODO: copy
}

if buffers.is_empty() {
break;
if !buffers.is_empty() {
must_continue_looping = true;

// Pass the data to the encryption layer.
let (_read, written) = self.encryption.encrypt(
buffers.into_iter(),
match read_write.outgoing_buffer.as_mut() {
Some((a, b)) => (a, b),
None => (&mut [], &mut []),
},
);
debug_assert!(_read <= unencrypted_bytes_to_extract);
read_write.advance_write(written);
}
}

// Pass the data to the encryption layer.
let (_read, written) = self.encryption.encrypt(
buffers.into_iter(),
match read_write.outgoing_buffer.as_mut() {
Some((a, b)) => (a, b),
None => (&mut [], &mut []),
},
);
debug_assert!(_read <= unencrypted_bytes_to_extract);
read_write.advance_write(written);
// If `must_continue_looping` is still false, then we didn't do anything meaningful
// during this iteration. Return due to idleness.
if !must_continue_looping {
return Ok((self, None));
}
}

Ok((self, None))
}

/// Advances a single substream.
Expand Down