Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a WebRtcFraming utility for WebRTC messages #1350

Merged
merged 1 commit into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 24 additions & 151 deletions lib/src/libp2p/collection/multi_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,19 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use crate::util::{leb128, protobuf};

use super::{
super::{
connection::{established, noise},
connection::{established, noise, webrtc_framing},
read_write::ReadWrite,
},
ConnectionToCoordinator, ConnectionToCoordinatorInner, CoordinatorToConnection,
CoordinatorToConnectionInner, NotificationsOutErr, PeerId, ShutdownCause, SubstreamFate,
SubstreamId,
};

use alloc::{collections::VecDeque, string::ToString as _, sync::Arc, vec::Vec};
use alloc::{collections::VecDeque, string::ToString as _, sync::Arc};
use core::{
cmp,
hash::Hash,
mem,
ops::{Add, Sub},
time::Duration,
};
Expand All @@ -44,15 +40,11 @@ enum MultiStreamConnectionTaskInner<TNow, TSubId> {
/// Connection is still in its handshake phase.
Handshake {
/// Substream that has been opened to perform the handshake, if any.
opened_substream: Option<TSubId>,
opened_substream: Option<(TSubId, webrtc_framing::WebRtcFraming)>,

/// Noise handshake in progress. Always `Some`, except to be temporarily extracted.
handshake: Option<noise::HandshakeInProgress>,

/// All incoming data for the handshake substream is first transferred to this buffer.
// TODO: this is very suboptimal code, instead the parsing should be done in a streaming way
handshake_read_buffer: Vec<u8>,

/// Other substreams, besides [`MultiStreamConnectionTaskInner::Handshake::opened_substream`],
/// that have been opened. For each substream, contains a boolean indicating whether the
/// substream is outbound (`true`) or inbound (`false`).
Expand Down Expand Up @@ -151,7 +143,6 @@ where
// TODO: the handshake doesn't have a timeout
handshake: Some(handshake),
opened_substream: None,
handshake_read_buffer: Vec::new(),
extra_open_substreams: hashbrown::HashMap::with_capacity_and_hasher(
0,
Default::default(),
Expand Down Expand Up @@ -683,14 +674,16 @@ where
opened_substream: ref mut opened_substream @ None,
..
} if outbound => {
*opened_substream = Some(id);
*opened_substream = Some((id, webrtc_framing::WebRtcFraming::new()));
}
MultiStreamConnectionTaskInner::Handshake {
opened_substream,
extra_open_substreams,
..
} => {
assert!(opened_substream.as_ref().map_or(true, |open| *open != id));
assert!(opened_substream
.as_ref()
.map_or(true, |(open, _)| *open != id));
// TODO: add a limit to the number allowed?
let _was_in = extra_open_substreams.insert(id, outbound);
assert!(_was_in.is_none());
Expand Down Expand Up @@ -791,12 +784,10 @@ where
established.reset_substream(substream_id)
}
MultiStreamConnectionTaskInner::Handshake {
opened_substream: Some(opened_substream),
handshake_read_buffer,
opened_substream: Some((opened_substream, _)),
..
} if opened_substream == substream_id => {
// TODO: the handshake has failed, kill the connection?
handshake_read_buffer.clear();
}
MultiStreamConnectionTaskInner::Handshake {
extra_open_substreams,
Expand Down Expand Up @@ -843,149 +834,38 @@ where
match &mut self.connection {
MultiStreamConnectionTaskInner::Handshake {
handshake,
opened_substream,
handshake_read_buffer,
opened_substream: Some((opened_handshake_substream, handshake_webrtc_framing)),
established,
extra_open_substreams,
} if opened_substream
.as_ref()
.map_or(false, |s| s == substream_id) =>
{
} if opened_handshake_substream == substream_id => {
// TODO: check the handshake timeout

// The Noise data is not directly the data of the substream. Instead, everything
// is wrapped within a Protobuf frame. For this reason, we first transfer the data
// to a buffer.
//
// According to the libp2p WebRTC spec, a frame and its length prefix must not be
// larger than 16kiB, meaning that the read buffer never has to exceed this size.
//
// Try to add data to `handshake_read_buffer`.
// TODO: this is very suboptimal; improve
// TODO: this doesn't properly back-pressure, because we read unconditionally
let protobuf_frame_size = {
let mut parser =
nom::combinator::map_parser::<_, _, _, nom::error::Error<&[u8]>, _, _>(
nom::multi::length_data(crate::util::leb128::nom_leb128_usize),
protobuf::message_decode! {
#[optional] flags = 1 => protobuf::enum_tag_decode,
#[optional] message = 2 => protobuf::bytes_tag_decode,
},
);

match parser(&read_write.incoming_buffer) {
Ok((rest, framed_message)) => {
if let Some(message) = framed_message.message {
handshake_read_buffer.extend_from_slice(message);
}

let protobuf_frame_size = handshake_read_buffer.len() - rest.len();
// If the remote has sent a `FIN` or `RESET_STREAM` flag, mark the
// remote writing side as closed.
if framed_message.flags.map_or(false, |f| f == 0 || f == 2) {
// TODO: no, handshake error
return SubstreamFate::Reset;
}
protobuf_frame_size
}
Err(nom::Err::Incomplete(needed)) => {
read_write.expected_incoming_bytes = Some(
handshake_read_buffer.len()
+ match needed {
nom::Needed::Size(s) => s.get(),
nom::Needed::Unknown => 1,
},
);
0
}
Err(_) => {
// Message decoding error.
// TODO: no, handshake error
// Progress the Noise handshake.
let handshake_outcome = {
// The Noise data is not directly the data of the substream. Instead,
// everything is wrapped within a Protobuf frame.
let mut with_framing = match handshake_webrtc_framing.read_write(read_write) {
Ok(f) => f,
Err(_err) => {
// TODO: not great for diagnostic to just ignore the error; also, the connection should just reset entirely
return SubstreamFate::Reset;
}
}
};
handshake.take().unwrap().read_write(&mut with_framing)
};
let _ = read_write.incoming_bytes_take(protobuf_frame_size);

let mut sub_read_write = ReadWrite {
now: read_write.now.clone(),
incoming_buffer: mem::take(handshake_read_buffer),
read_bytes: 0,
expected_incoming_bytes: Some(0),
write_buffers: Vec::new(),
write_bytes_queued: read_write.write_bytes_queued,
// Don't write out more than one frame.
// TODO: this `10` is here for the length and protobuf frame size and is a bit hacky
write_bytes_queueable: Some(
cmp::min(read_write.write_bytes_queueable.unwrap(), 16384)
.saturating_sub(10),
),
wake_up_after: None,
};

let handshake_outcome = handshake.take().unwrap().read_write(&mut sub_read_write);
*handshake_read_buffer = sub_read_write.incoming_buffer;
if let Some(wake_up_after) = &sub_read_write.wake_up_after {
read_write.wake_up_after(wake_up_after)
}

// Send out the message that the Noise handshake has written
// into `intermediary_write_buffer`.
if sub_read_write.write_bytes_queued != read_write.write_bytes_queued {
let written_bytes =
sub_read_write.write_bytes_queued - read_write.write_bytes_queued;
debug_assert_eq!(
written_bytes,
sub_read_write
.write_buffers
.iter()
.fold(0, |s, b| s + b.len())
);

// TODO: don't do the encoding manually but use the protobuf module?
let tag = protobuf::tag_encode(2, 2).collect::<Vec<_>>();
let data_len = leb128::encode_usize(written_bytes).collect::<Vec<_>>();
let libp2p_prefix =
leb128::encode_usize(tag.len() + data_len.len() + written_bytes)
.collect::<Vec<_>>();

// The spec mentions that a frame plus its length prefix shouldn't exceed
// 16kiB. This is normally ensured by forbidding the substream from writing
// more data than would fit in 16kiB.
debug_assert!(
libp2p_prefix.len() + tag.len() + data_len.len() + written_bytes <= 16384
);

read_write.write_out(libp2p_prefix);
read_write.write_out(tag);
read_write.write_out(data_len);
for buffer in sub_read_write.write_buffers {
read_write.write_out(buffer);
}
}

match handshake_outcome {
Ok(noise::NoiseHandshake::InProgress(handshake_update)) => {
*handshake = Some(handshake_update);
SubstreamFate::Continue
}
Err(_err) => todo!("{:?}", _err), // TODO: /!\
Err(_err) => return SubstreamFate::Reset, // TODO: /!\
Ok(noise::NoiseHandshake::Success {
cipher: _,
remote_peer_id,
}) => {
// The handshake has succeeded and we will transition into "established"
// mode.
// However the rest of the body of this function still needs to deal with
// the substream used for the handshake.
// We close the writing side. If the reading side is closed, we indicate
// that the substream is dead. If the reading side is still open, we
// indicate that it's not dead and store it in the state machine while
// waiting for it to be closed by the remote.
read_write.close_write();
let handshake_substream_still_open =
read_write.expected_incoming_bytes.is_some();

let mut established = established.take().unwrap();
for (substream_id, outbound) in extra_open_substreams.drain() {
established.add_substream(substream_id, outbound);
Expand All @@ -994,11 +874,7 @@ where
self.connection = MultiStreamConnectionTaskInner::Established {
established,
handshake_finished_message_to_send: Some(remote_peer_id),
handshake_substream: if handshake_substream_still_open {
Some(opened_substream.take().unwrap())
} else {
None
},
handshake_substream: None, // TODO: do properly
outbound_substreams_map: hashbrown::HashMap::with_capacity_and_hasher(
0,
Default::default(),
Expand All @@ -1008,11 +884,8 @@ where
inbound_accept_cancel_events: VecDeque::with_capacity(2),
};

if handshake_substream_still_open {
SubstreamFate::Continue
} else {
SubstreamFate::Reset
}
// TODO: hacky
SubstreamFate::Reset
}
}
}
Expand Down
1 change: 1 addition & 0 deletions lib/src/libp2p/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,5 @@ pub mod established;
pub mod multistream_select;
pub mod noise;
pub mod single_stream_handshake;
pub mod webrtc_framing;
pub mod yamux;
Loading
Loading