Skip to content

Commit

Permalink
Add a WebRtcFraming utility for WebRTC messages (#1350)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomaka authored Nov 17, 2023
1 parent 5b89582 commit 584aa61
Show file tree
Hide file tree
Showing 4 changed files with 421 additions and 365 deletions.
175 changes: 24 additions & 151 deletions lib/src/libp2p/collection/multi_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,19 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use crate::util::{leb128, protobuf};

use super::{
super::{
connection::{established, noise},
connection::{established, noise, webrtc_framing},
read_write::ReadWrite,
},
ConnectionToCoordinator, ConnectionToCoordinatorInner, CoordinatorToConnection,
CoordinatorToConnectionInner, NotificationsOutErr, PeerId, ShutdownCause, SubstreamFate,
SubstreamId,
};

use alloc::{collections::VecDeque, string::ToString as _, sync::Arc, vec::Vec};
use alloc::{collections::VecDeque, string::ToString as _, sync::Arc};
use core::{
cmp,
hash::Hash,
mem,
ops::{Add, Sub},
time::Duration,
};
Expand All @@ -44,15 +40,11 @@ enum MultiStreamConnectionTaskInner<TNow, TSubId> {
/// Connection is still in its handshake phase.
Handshake {
/// Substream that has been opened to perform the handshake, if any.
opened_substream: Option<TSubId>,
opened_substream: Option<(TSubId, webrtc_framing::WebRtcFraming)>,

/// Noise handshake in progress. Always `Some`, except to be temporarily extracted.
handshake: Option<noise::HandshakeInProgress>,

/// All incoming data for the handshake substream is first transferred to this buffer.
// TODO: this is very suboptimal code, instead the parsing should be done in a streaming way
handshake_read_buffer: Vec<u8>,

/// Other substreams, besides [`MultiStreamConnectionTaskInner::Handshake::opened_substream`],
/// that have been opened. For each substream, contains a boolean indicating whether the
/// substream is outbound (`true`) or inbound (`false`).
Expand Down Expand Up @@ -151,7 +143,6 @@ where
// TODO: the handshake doesn't have a timeout
handshake: Some(handshake),
opened_substream: None,
handshake_read_buffer: Vec::new(),
extra_open_substreams: hashbrown::HashMap::with_capacity_and_hasher(
0,
Default::default(),
Expand Down Expand Up @@ -683,14 +674,16 @@ where
opened_substream: ref mut opened_substream @ None,
..
} if outbound => {
*opened_substream = Some(id);
*opened_substream = Some((id, webrtc_framing::WebRtcFraming::new()));
}
MultiStreamConnectionTaskInner::Handshake {
opened_substream,
extra_open_substreams,
..
} => {
assert!(opened_substream.as_ref().map_or(true, |open| *open != id));
assert!(opened_substream
.as_ref()
.map_or(true, |(open, _)| *open != id));
// TODO: add a limit to the number allowed?
let _was_in = extra_open_substreams.insert(id, outbound);
assert!(_was_in.is_none());
Expand Down Expand Up @@ -791,12 +784,10 @@ where
established.reset_substream(substream_id)
}
MultiStreamConnectionTaskInner::Handshake {
opened_substream: Some(opened_substream),
handshake_read_buffer,
opened_substream: Some((opened_substream, _)),
..
} if opened_substream == substream_id => {
// TODO: the handshake has failed, kill the connection?
handshake_read_buffer.clear();
}
MultiStreamConnectionTaskInner::Handshake {
extra_open_substreams,
Expand Down Expand Up @@ -843,149 +834,38 @@ where
match &mut self.connection {
MultiStreamConnectionTaskInner::Handshake {
handshake,
opened_substream,
handshake_read_buffer,
opened_substream: Some((opened_handshake_substream, handshake_webrtc_framing)),
established,
extra_open_substreams,
} if opened_substream
.as_ref()
.map_or(false, |s| s == substream_id) =>
{
} if opened_handshake_substream == substream_id => {
// TODO: check the handshake timeout

// The Noise data is not directly the data of the substream. Instead, everything
// is wrapped within a Protobuf frame. For this reason, we first transfer the data
// to a buffer.
//
// According to the libp2p WebRTC spec, a frame and its length prefix must not be
// larger than 16kiB, meaning that the read buffer never has to exceed this size.
//
// Try to add data to `handshake_read_buffer`.
// TODO: this is very suboptimal; improve
// TODO: this doesn't properly back-pressure, because we read unconditionally
let protobuf_frame_size = {
let mut parser =
nom::combinator::map_parser::<_, _, _, nom::error::Error<&[u8]>, _, _>(
nom::multi::length_data(crate::util::leb128::nom_leb128_usize),
protobuf::message_decode! {
#[optional] flags = 1 => protobuf::enum_tag_decode,
#[optional] message = 2 => protobuf::bytes_tag_decode,
},
);

match parser(&read_write.incoming_buffer) {
Ok((rest, framed_message)) => {
if let Some(message) = framed_message.message {
handshake_read_buffer.extend_from_slice(message);
}

let protobuf_frame_size = handshake_read_buffer.len() - rest.len();
// If the remote has sent a `FIN` or `RESET_STREAM` flag, mark the
// remote writing side as closed.
if framed_message.flags.map_or(false, |f| f == 0 || f == 2) {
// TODO: no, handshake error
return SubstreamFate::Reset;
}
protobuf_frame_size
}
Err(nom::Err::Incomplete(needed)) => {
read_write.expected_incoming_bytes = Some(
handshake_read_buffer.len()
+ match needed {
nom::Needed::Size(s) => s.get(),
nom::Needed::Unknown => 1,
},
);
0
}
Err(_) => {
// Message decoding error.
// TODO: no, handshake error
// Progress the Noise handshake.
let handshake_outcome = {
// The Noise data is not directly the data of the substream. Instead,
// everything is wrapped within a Protobuf frame.
let mut with_framing = match handshake_webrtc_framing.read_write(read_write) {
Ok(f) => f,
Err(_err) => {
// TODO: not great for diagnostic to just ignore the error; also, the connection should just reset entirely
return SubstreamFate::Reset;
}
}
};
handshake.take().unwrap().read_write(&mut with_framing)
};
let _ = read_write.incoming_bytes_take(protobuf_frame_size);

let mut sub_read_write = ReadWrite {
now: read_write.now.clone(),
incoming_buffer: mem::take(handshake_read_buffer),
read_bytes: 0,
expected_incoming_bytes: Some(0),
write_buffers: Vec::new(),
write_bytes_queued: read_write.write_bytes_queued,
// Don't write out more than one frame.
// TODO: this `10` is here for the length and protobuf frame size and is a bit hacky
write_bytes_queueable: Some(
cmp::min(read_write.write_bytes_queueable.unwrap(), 16384)
.saturating_sub(10),
),
wake_up_after: None,
};

let handshake_outcome = handshake.take().unwrap().read_write(&mut sub_read_write);
*handshake_read_buffer = sub_read_write.incoming_buffer;
if let Some(wake_up_after) = &sub_read_write.wake_up_after {
read_write.wake_up_after(wake_up_after)
}

// Send out the message that the Noise handshake has written
// into `intermediary_write_buffer`.
if sub_read_write.write_bytes_queued != read_write.write_bytes_queued {
let written_bytes =
sub_read_write.write_bytes_queued - read_write.write_bytes_queued;
debug_assert_eq!(
written_bytes,
sub_read_write
.write_buffers
.iter()
.fold(0, |s, b| s + b.len())
);

// TODO: don't do the encoding manually but use the protobuf module?
let tag = protobuf::tag_encode(2, 2).collect::<Vec<_>>();
let data_len = leb128::encode_usize(written_bytes).collect::<Vec<_>>();
let libp2p_prefix =
leb128::encode_usize(tag.len() + data_len.len() + written_bytes)
.collect::<Vec<_>>();

// The spec mentions that a frame plus its length prefix shouldn't exceed
// 16kiB. This is normally ensured by forbidding the substream from writing
// more data than would fit in 16kiB.
debug_assert!(
libp2p_prefix.len() + tag.len() + data_len.len() + written_bytes <= 16384
);

read_write.write_out(libp2p_prefix);
read_write.write_out(tag);
read_write.write_out(data_len);
for buffer in sub_read_write.write_buffers {
read_write.write_out(buffer);
}
}

match handshake_outcome {
Ok(noise::NoiseHandshake::InProgress(handshake_update)) => {
*handshake = Some(handshake_update);
SubstreamFate::Continue
}
Err(_err) => todo!("{:?}", _err), // TODO: /!\
Err(_err) => return SubstreamFate::Reset, // TODO: /!\
Ok(noise::NoiseHandshake::Success {
cipher: _,
remote_peer_id,
}) => {
// The handshake has succeeded and we will transition into "established"
// mode.
// However the rest of the body of this function still needs to deal with
// the substream used for the handshake.
// We close the writing side. If the reading side is closed, we indicate
// that the substream is dead. If the reading side is still open, we
// indicate that it's not dead and store it in the state machine while
// waiting for it to be closed by the remote.
read_write.close_write();
let handshake_substream_still_open =
read_write.expected_incoming_bytes.is_some();

let mut established = established.take().unwrap();
for (substream_id, outbound) in extra_open_substreams.drain() {
established.add_substream(substream_id, outbound);
Expand All @@ -994,11 +874,7 @@ where
self.connection = MultiStreamConnectionTaskInner::Established {
established,
handshake_finished_message_to_send: Some(remote_peer_id),
handshake_substream: if handshake_substream_still_open {
Some(opened_substream.take().unwrap())
} else {
None
},
handshake_substream: None, // TODO: do properly
outbound_substreams_map: hashbrown::HashMap::with_capacity_and_hasher(
0,
Default::default(),
Expand All @@ -1008,11 +884,8 @@ where
inbound_accept_cancel_events: VecDeque::with_capacity(2),
};

if handshake_substream_still_open {
SubstreamFate::Continue
} else {
SubstreamFate::Reset
}
// TODO: hacky
SubstreamFate::Reset
}
}
}
Expand Down
1 change: 1 addition & 0 deletions lib/src/libp2p/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,5 @@ pub mod established;
pub mod multistream_select;
pub mod noise;
pub mod single_stream_handshake;
pub mod webrtc_framing;
pub mod yamux;
Loading

0 comments on commit 584aa61

Please sign in to comment.