Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Considerably clean up the Yamux implementation #383

Merged
merged 75 commits into from
Apr 5, 2023
Merged
Changes from 1 commit
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
4911334
Small comment fix
tomaka Mar 31, 2023
a65dd8f
Wrap all fields in a `Box` to reduce movements
tomaka Mar 31, 2023
916f0ea
Rename and comment field
tomaka Mar 31, 2023
681a40c
Cache the list of dead substreams
tomaka Mar 31, 2023
18a3a3a
Fix missing `Box` import
tomaka Mar 31, 2023
7eb874d
Remove TODO
tomaka Mar 31, 2023
04d6666
Use a `VecDeque` for `write_buffers`
tomaka Mar 31, 2023
ad92626
Add TODO
tomaka Mar 31, 2023
415c0aa
Remove automatic window size increase on incoming data
tomaka Mar 31, 2023
8832e62
Remove SubstreamRef and SubstreamMut in favor of functions on Yamux
tomaka Mar 31, 2023
abfeb59
Fix faulty debug_assert
tomaka Mar 31, 2023
e285c54
Some docfixes
tomaka Mar 31, 2023
bc49408
Fix insertion in dead_substreams at the wrong time
tomaka Apr 1, 2023
b685ee8
Remove the `ExtractOut` intermediary object
tomaka Apr 1, 2023
ba2b99c
Fix faulty debug_assert
tomaka Apr 1, 2023
5adc96e
Turn an if into a debug_assert!
tomaka Apr 1, 2023
09429ce
Tweaks to the `incoming_data` function
tomaka Apr 1, 2023
146ccc8
Use a HashSet for outgoing pings
tomaka Apr 1, 2023
cb66004
Add an `encode` function
tomaka Apr 2, 2023
f51bd6f
Store the decoded header in Outgoing
tomaka Apr 2, 2023
3f6b9ec
Remove is_goaway flag
tomaka Apr 2, 2023
76cd2bc
Remove the header queing private functions
tomaka Apr 2, 2023
aa3d25b
`decode_yamux_header` now accepts a precise length
tomaka Apr 2, 2023
39ba2cf
Remove old comment
tomaka Apr 2, 2023
cb67914
Add TODO
tomaka Apr 2, 2023
33164ed
Merge branch 'main' into yamux-cleanups
tomaka Apr 3, 2023
0576892
Add yamux header encoding tests
tomaka Apr 3, 2023
326361e
Macro name
tomaka Apr 4, 2023
e676553
Docfix
tomaka Apr 4, 2023
1de223c
Rustfmt
tomaka Apr 4, 2023
1c3a3c4
Move the write queue to a separate module
tomaka Apr 4, 2023
78ae8ca
Doclinks
tomaka Apr 4, 2023
9f80f10
Add `InvalidInboundStreamId`
tomaka Apr 4, 2023
93abd83
Some tests
tomaka Apr 4, 2023
29a54ba
Fixes and more tests
tomaka Apr 4, 2023
be99ed9
Fix test
tomaka Apr 4, 2023
77284f5
Fix test
tomaka Apr 4, 2023
ca3be46
More tests
tomaka Apr 5, 2023
6b0fced
Remove `reserve_window` function
tomaka Apr 5, 2023
a42223d
More testing
tomaka Apr 5, 2023
c37ab7d
Bugfix and more test
tomaka Apr 5, 2023
4f91826
More tests
tomaka Apr 5, 2023
3112fb4
More tests and bugfix
tomaka Apr 5, 2023
1959613
More tests
tomaka Apr 5, 2023
0274e35
Add `max_simultaneous_rst_substreams` config option
tomaka Apr 5, 2023
15e9153
Remove debugging thing
tomaka Apr 5, 2023
c65f09c
More tests
tomaka Apr 5, 2023
bd29aaa
Check the ACK flag
tomaka Apr 5, 2023
0d578d5
Fix test
tomaka Apr 5, 2023
2540767
Error on multiple GoAways
tomaka Apr 5, 2023
1c605e5
Fix sending a RST after a GoAway
tomaka Apr 5, 2023
dd369b1
Fix debug_assert! and more tests
tomaka Apr 5, 2023
fa72d45
Fix code
tomaka Apr 5, 2023
dc84f62
Fix test
tomaka Apr 5, 2023
b8f9427
Add a limit to the number of pongs
tomaka Apr 5, 2023
c82c694
Clarify data with RST
tomaka Apr 5, 2023
3c0d246
Test can't open after goaway
tomaka Apr 5, 2023
ded9dfc
Simplify pings handling
tomaka Apr 5, 2023
3aa99ae
CHANGELOG entry
tomaka Apr 5, 2023
5fef223
Clarify add_remote_window behaviour
tomaka Apr 5, 2023
40a617d
Add `max_out_data_frame_size`
tomaka Apr 5, 2023
8b604eb
Fix tests
tomaka Apr 5, 2023
7460e59
More proper handling of RST with data
tomaka Apr 5, 2023
194184f
Choose substreams to send on semi-randomly
tomaka Apr 5, 2023
3070e75
Clarify a bit doc
tomaka Apr 5, 2023
ff43b5f
Add a cache for substreams to write out
tomaka Apr 5, 2023
22bfb86
Remove some TODOs
tomaka Apr 5, 2023
0d402d9
Spellcheck
tomaka Apr 5, 2023
f68652d
PR links
tomaka Apr 5, 2023
63a7a14
Fix no checking if substream still healthy
tomaka Apr 5, 2023
58f7704
No longer panic anywhere in the public API, except invalid substreams
tomaka Apr 5, 2023
4a3aad0
Doc and TODO fix
tomaka Apr 5, 2023
35d6e50
Oops, substream IDs must increment by 2
tomaka Apr 5, 2023
64205f7
Fix clippy warning
tomaka Apr 5, 2023
3ea28b9
Docfix
tomaka Apr 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Tweaks to the incoming_data function
tomaka committed Apr 1, 2023

Verified

This commit was signed with the committer’s verified signature.
tomaka Pierre Krieger
commit 09429ce2208b44ba8bcaf5724627c883e906945b
207 changes: 119 additions & 88 deletions lib/src/libp2p/connection/yamux.rs
Original file line number Diff line number Diff line change
@@ -893,9 +893,16 @@ impl<T> Yamux<T> {
remaining_bytes: 0,
fin: true,
} => {
// End of the data frame. Proceed to receive new header at the next iteration.
self.inner.incoming = Incoming::Header(arrayvec::ArrayVec::new());

if let Some(Substream {
// Note that it is possible that we are receiving data corresponding to a
// substream for which a RST has been sent out by the local node. Since the
// local state machine doesn't keep track of RST'ted substreams, any
// frame concerning a substream that has been RST or doesn't exist is
// discarded and doesn't result in an error, under the presumption that we
// are in this situation.
let Some(Substream {
state:
SubstreamState::Healthy {
remote_write_closed: remote_write_closed @ false,
@@ -905,33 +912,33 @@ impl<T> Yamux<T> {
..
},
..
}) = self.inner.substreams.get_mut(&substream_id.0)
{
*remote_write_closed = true;
}) = self.inner.substreams.get_mut(&substream_id.0) else { continue; };

if matches!(*local_write_close, SubstreamStateLocalWrite::FinQueued)
&& (write_buffers.is_empty() // TODO: cumbersome
|| (write_buffers.len() == 1
&& write_buffers[0].len()
<= *first_write_buffer_offset))
{
let _was_inserted = self.inner.dead_substreams.insert(substream_id.0);
debug_assert!(_was_inserted);
}
*remote_write_closed = true;

return Ok(IncomingDataOutcome {
yamux: self,
bytes_read: total_read,
detail: Some(IncomingDataDetail::StreamClosed { substream_id }),
});
if matches!(*local_write_close, SubstreamStateLocalWrite::FinQueued)
&& (write_buffers.is_empty() // TODO: cumbersome
|| (write_buffers.len() == 1
&& write_buffers[0].len()
<= *first_write_buffer_offset))
{
let _was_inserted = self.inner.dead_substreams.insert(substream_id.0);
debug_assert!(_was_inserted);
}

return Ok(IncomingDataOutcome {
yamux: self,
bytes_read: total_read,
detail: Some(IncomingDataDetail::StreamClosed { substream_id }),
});
}

Incoming::DataFrame {
remaining_bytes: 0,
fin: false,
..
} => {
// End of the data frame. Proceed to receive new header at the next iteration.
self.inner.incoming = Incoming::Header(arrayvec::ArrayVec::new());
}

@@ -945,20 +952,22 @@ impl<T> Yamux<T> {

debug_assert_ne!(*remaining_bytes, 0);

// Extract the data and update the local states.
let pulled_data = cmp::min(
*remaining_bytes,
u32::try_from(data.len()).unwrap_or(u32::max_value()),
);

let pulled_data_usize = usize::try_from(pulled_data).unwrap();
*remaining_bytes -= pulled_data;

let start_offset = total_read;
total_read += pulled_data_usize;
data = &data[pulled_data_usize..];

// Note that it is possible that we are receiving data corresponding to a
// substream for which a RST has been sent out by the local node. Since the
// If the substream still exists, report the event to the API user.
// If the substream doesn't exist anymore, just continue iterating.
//
// It is possible that we are receiving data corresponding to a substream for
// which a RST has been sent out by the local node. Since the
// local state machine doesn't keep track of RST'ted substreams, any
// frame concerning a substream that has been RST or doesn't exist is
// discarded and doesn't result in an error, under the presumption that we
@@ -983,9 +992,10 @@ impl<T> Yamux<T> {
});
}

// Also note that we don't switch back `self.inner.incoming` to `Header`.
// Instead, the next iteration will pick up `DataFrame` again and transition
// again. This is necessary to handle the `fin` flag elegantly.
// We don't switch back `self.inner.incoming` to `Header` even if there's no
// bytes remaining in the data frame. Instead, the next iteration will pick up
// `DataFrame` again and transition again. This is necessary to handle the
// `fin` flag elegantly.
}

Incoming::DataFrame {
@@ -1024,6 +1034,7 @@ impl<T> Yamux<T> {
// be empty. If it is not the case, we simply leave the ping header
// there and prevent any further data from being read.
if !matches!(self.inner.outgoing, Outgoing::Idle) {
// TODO: this could trigger a deadlock if the send buffer is very small
break;
}

@@ -1057,6 +1068,7 @@ impl<T> Yamux<T> {
self.inner.incoming = Incoming::Header(arrayvec::ArrayVec::new());
}
header::DecodedYamuxHeader::PingResponse { opaque_value } => {
// TODO: this is `O(n)`
let pos = match self
.inner
.pings_waiting_reply
@@ -1156,7 +1168,10 @@ impl<T> Yamux<T> {
length,
..
} => {
// Handle `RST` flag separately.
// Frame with the `RST` flag set. Destroy the substream.

// It is invalid to have the `RST` flag set and data at the same time.
// TODO: why is it invalid?
if matches!(decoded_header, header::DecodedYamuxHeader::Data { .. })
&& length != 0
{
@@ -1169,56 +1184,53 @@ impl<T> Yamux<T> {
// which we have sent a RST frame earlier. Considering that we don't
// always keep traces of old substreams, we have no way to know whether
// this is the case or not.
if let Some(s) = self.inner.substreams.get_mut(&stream_id) {
let _was_inserted = self.inner.dead_substreams.insert(stream_id);
debug_assert!(_was_inserted);
let Some(s) = self.inner.substreams.get_mut(&stream_id) else { continue };

// We might be currently writing a frame of data of the substream
// being reset. If that happens, we need to update some internal
// state regarding this frame of data.
match (
&mut self.inner.outgoing,
mem::replace(&mut s.state, SubstreamState::Reset),
) {
(
Outgoing::Header {
substream_data_frame:
Some((data @ OutgoingSubstreamData::Healthy(_), _)),
..
}
| Outgoing::SubstreamData {
data: data @ OutgoingSubstreamData::Healthy(_),
..
},
SubstreamState::Healthy {
write_buffers,
first_write_buffer_offset,
..
},
) if *data
== OutgoingSubstreamData::Healthy(SubstreamId(
stream_id,
)) =>
{
*data = OutgoingSubstreamData::Obsolete {
write_buffers,
first_write_buffer_offset,
};
let _was_inserted = self.inner.dead_substreams.insert(stream_id);
debug_assert!(_was_inserted);

// We might be currently writing a frame of data of the substream
// being reset. If that happens, we need to update some internal
// state regarding this frame of data.
match (
&mut self.inner.outgoing,
mem::replace(&mut s.state, SubstreamState::Reset),
) {
(
Outgoing::Header {
substream_data_frame:
Some((data @ OutgoingSubstreamData::Healthy(_), _)),
..
}
_ => {}
| Outgoing::SubstreamData {
data: data @ OutgoingSubstreamData::Healthy(_),
..
},
SubstreamState::Healthy {
write_buffers,
first_write_buffer_offset,
..
},
) if *data
== OutgoingSubstreamData::Healthy(SubstreamId(stream_id)) =>
{
*data = OutgoingSubstreamData::Obsolete {
write_buffers,
first_write_buffer_offset,
};
}

return Ok(IncomingDataOutcome {
yamux: self,
bytes_read: total_read,
detail: Some(IncomingDataDetail::StreamReset {
substream_id: SubstreamId(stream_id),
}),
});
_ => {}
}

return Ok(IncomingDataOutcome {
yamux: self,
bytes_read: total_read,
detail: Some(IncomingDataDetail::StreamReset {
substream_id: SubstreamId(stream_id),
}),
});
}

// Remote has sent a SYN flag. A new substream is to be opened.
header::DecodedYamuxHeader::Data {
syn: true,
fin,
@@ -1235,6 +1247,7 @@ impl<T> Yamux<T> {
length,
..
} => {
// Remote has sent a SYN flag. A new substream is to be opened.
match self.inner.substreams.get(&stream_id) {
Some(Substream {
state: SubstreamState::Healthy { .. },
@@ -1250,18 +1263,46 @@ impl<T> Yamux<T> {
// Because we don't immediately destroy substreams, the remote
// might decide to re-use a substream ID that is still
// allocated locally. If that happens, we block the reading.
// It will be unblocked when the API user destroys the old
// substream.
break;
}
None => {}
}

// When receiving a new substream, the outgoing state must always be
// `Outgoing::Idle`, in order to potentially queue the substream
// rejection message later.
// If it is not the case, we simply leave the header there and prevent
// any further data from being read.
if !matches!(self.inner.outgoing, Outgoing::Idle) {
break;
}

let is_data =
matches!(decoded_header, header::DecodedYamuxHeader::Data { .. });

// If we have queued or sent a GoAway frame, then the substream is
// automatically rejected.
if !matches!(self.inner.outgoing_goaway, OutgoingGoAway::NotRequired) {
self.inner.incoming = if !is_data || length == 0 {
// Send the `RST` frame.
let mut header = arrayvec::ArrayVec::new();
header.push(0);
header.push(1);
header.try_extend_from_slice(&0x8u16.to_be_bytes()).unwrap();
header
.try_extend_from_slice(&stream_id.get().to_be_bytes())
.unwrap();
header.try_extend_from_slice(&0u32.to_be_bytes()).unwrap();
debug_assert_eq!(header.len(), 12);

self.inner.outgoing = Outgoing::Header {
header,
substream_data_frame: None,
is_goaway: false,
};

self.inner.incoming = if !is_data {
Incoming::Header(arrayvec::ArrayVec::new())
} else {
Incoming::DataFrame {
@@ -1270,16 +1311,8 @@ impl<T> Yamux<T> {
fin,
}
};
continue;
}

// As documented, when in the `Incoming::PendingIncomingSubstream`
// state, the outgoing state must always be `Outgoing::Idle`, in
// order to potentially queue the substream rejection message later.
// If it is not the case, we simply leave the header there and prevent
// any further data from being read.
if !matches!(self.inner.outgoing, Outgoing::Idle) {
break;
continue;
}

self.inner.incoming = Incoming::PendingIncomingSubstream {
@@ -1332,6 +1365,8 @@ impl<T> Yamux<T> {
.ok_or(Error::CreditsExceeded)?;
}

// Switch to the `DataFrame` state in order to process the frame, even
// if the substream no longer exists.
self.inner.incoming = Incoming::DataFrame {
substream_id: SubstreamId(stream_id),
remaining_bytes: length,
@@ -1743,14 +1778,10 @@ impl<T> Yamux<T> {
fin,
..
} => {
self.inner.incoming = if data_frame_size == 0 {
Incoming::Header(arrayvec::ArrayVec::new())
} else {
Incoming::DataFrame {
substream_id,
remaining_bytes: data_frame_size,
fin,
}
self.inner.incoming = Incoming::DataFrame {
substream_id,
remaining_bytes: data_frame_size,
fin,
};

let mut header = arrayvec::ArrayVec::new();