Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement WebRTC messages framing #2896

Merged
merged 23 commits into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/light-base/src/json_rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1289,7 +1289,7 @@ impl<TPlat: Platform> Background<TPlat> {

async fn storage_query(
&self,
keys: impl Iterator<Item = impl AsRef<[u8]>> + Clone,
keys: impl Iterator<Item = impl AsRef<[u8]> + Clone> + Clone,
hash: &[u8; 32],
total_attempts: u32,
timeout_per_request: Duration,
Expand Down
2 changes: 1 addition & 1 deletion bin/light-base/src/network_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ impl<TPlat: Platform> NetworkService<TPlat> {
self: Arc<Self>,
chain_index: usize,
target: PeerId, // TODO: takes by value because of futures longevity issue
config: protocol::StorageProofRequestConfig<impl Iterator<Item = impl AsRef<[u8]>>>,
config: protocol::StorageProofRequestConfig<impl Iterator<Item = impl AsRef<[u8]> + Clone>>,
timeout: Duration,
) -> Result<service::EncodedMerkleProof, StorageProofRequestError> {
let rx = {
Expand Down
91 changes: 41 additions & 50 deletions bin/light-base/src/network_service/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,12 +382,7 @@ async fn multi_stream_connection_task<TPlat: Platform>(
// from this slice the data to send. Consequently, the write buffer is held locally. This is
// suboptimal compared to writing to a write buffer provided by the platform, but it is easier
// to implement it this way.
let mut write_buffer = vec![0; 4096];

// When reading/writing substreams, the substream can ask to be woken up after a certain time.
// This variable stores the earliest time when we should be waking up.
// TODO: this is wrong; this code assumes that substreams will be found in `ready_substreams` while it is not the case now; however it seems more appropriate to modify `ready_substreams` rather than accomodate this limitation here
let mut wake_up_after = None;
let mut write_buffer = vec![0; 16384]; // TODO: the write buffer must not exceed 16kiB due to the libp2p WebRTC spec; this should ideally be enforced through the connection task API

loop {
// Start opening new outbound substreams, if needed.
Expand Down Expand Up @@ -424,56 +419,52 @@ async fn multi_stream_connection_task<TPlat: Platform>(

let now = TPlat::now();

// Clear `wake_up_after` if necessary, otherwise it will always stay at a constant value.
// TODO: nit: can use `Option::is_some_and` after it's stable; https://github.com/rust-lang/rust/issues/93050
if wake_up_after
.as_ref()
.map(|time| *time <= now)
.unwrap_or(false)
{
wake_up_after = None;
}
// When reading/writing substreams, the substream can ask to be woken up after a certain
// time. This variable stores the earliest time when we should be waking up.
let mut wake_up_after = None;

// Perform a read-write on all substreams.
// TODO: trying to read/write every single substream every single time is suboptimal, but making this not suboptimal is very complicated
for substream_id in open_substreams.iter().map(|(id, _)| id).collect::<Vec<_>>() {
let substream = &mut open_substreams[substream_id];

let mut read_write = ReadWrite {
now: now.clone(),
incoming_buffer: TPlat::read_buffer(substream),
outgoing_buffer: Some((&mut write_buffer, &mut [])), // TODO: this should be None if a previous read_write() produced None
read_bytes: 0,
written_bytes: 0,
wake_up_after: None,
};

let kill_substream =
connection_task.substream_read_write(&substream_id, &mut read_write);

// Because the `read_write` object borrows the stream, we need to drop it before we
// can modify the connection. Before dropping the `read_write`, clone some important
// information from it.
let read_bytes = read_write.read_bytes;
let written_bytes = read_write.written_bytes;
match (&mut wake_up_after, &read_write.wake_up_after) {
(_, None) => {}
(val @ None, Some(t)) => *val = Some(t.clone()),
(Some(curr), Some(upd)) if *upd < *curr => *curr = upd.clone(),
(Some(_), Some(_)) => {}
}
drop(read_write);
loop {
let substream = &mut open_substreams[substream_id];

let mut read_write = ReadWrite {
now: now.clone(),
incoming_buffer: TPlat::read_buffer(substream),
outgoing_buffer: Some((&mut write_buffer, &mut [])), // TODO: this should be None if a previous read_write() produced None
read_bytes: 0,
written_bytes: 0,
wake_up_after,
};

let kill_substream =
connection_task.substream_read_write(&substream_id, &mut read_write);

// Because the `read_write` object borrows the stream, we need to drop it before we
// can modify the connection. Before dropping the `read_write`, clone some important
// information from it.
let read_bytes = read_write.read_bytes;
let written_bytes = read_write.written_bytes;
wake_up_after = read_write.wake_up_after.take();
drop(read_write);

// Now update the connection.
if written_bytes != 0 {
TPlat::send(substream, &write_buffer[..written_bytes]);
}
TPlat::advance_read_cursor(substream, read_bytes);

// Now update the connection.
if written_bytes != 0 {
TPlat::send(substream, &write_buffer[..written_bytes]);
}
TPlat::advance_read_cursor(substream, read_bytes);
// If the `connection_task` requires this substream to be killed, we drop the `Stream`
// object.
if kill_substream {
open_substreams.remove(substream_id);
break;
}

// If the `connection_task` requires this substream to be killed, we drop the `Stream`
// object.
if kill_substream {
open_substreams.remove(substream_id);
if read_bytes == 0 && written_bytes == 0 {
break;
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion bin/light-base/src/sync_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ impl<TPlat: Platform> SyncService<TPlat> {
block_number: u64,
block_hash: &[u8; 32],
storage_trie_root: &[u8; 32],
requested_keys: impl Iterator<Item = impl AsRef<[u8]>> + Clone,
requested_keys: impl Iterator<Item = impl AsRef<[u8]> + Clone> + Clone,
total_attempts: u32,
timeout_per_request: Duration,
_max_parallel: NonZeroU32,
Expand Down
4 changes: 4 additions & 0 deletions bin/wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

### Changed

- The WebRTC protocol implementation is now up to date with the specification. While the specification hasn't been finalized yet and could still evolve, the current version is believed to be likely to be final. ([#2896](https://github.com/paritytech/smoldot/pull/2896))

### Fixed

- Fix timeout not being checked when opening a notifications substream. ([#2323](https://github.com/paritytech/smoldot/pull/2323))
Expand Down
14 changes: 8 additions & 6 deletions bin/wasm-node/javascript/src/index-browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,15 +256,15 @@ export function start(options?: ClientOptions): Client {
"v=0" + "\n" +
// Identifies the creator of the SDP document. We are allowed to use dummy values
// (`-` and `0.0.0.0`) to remain anonymous, which we do. Note that "IN" means
// "Internet". (RFC8866)
// "Internet" (and not "input"). (RFC8866)
"o=- 0 0 IN IP" + ipVersion + " " + targetIp + "\n" +
// Name for the session. We are allowed to pass a dummy `-`. (RFC8866)
"s=-" + "\n" +
// Start and end of the validity of the session. `0 0` means that the session never
// expires. (RFC8866)
"t=0 0" + "\n" +
// A lite implementation is only appropriate for devices that will
// *always* be connected to the public Internet and have a public
// always be connected to the public Internet and have a public
// IP address at which it can receive packets from any
// correspondent. ICE will not function when a lite implementation
// is placed behind a NAT (RFC8445).
Expand All @@ -273,12 +273,12 @@ export function start(options?: ClientOptions): Client {
// The protocol in this line (i.e. `TCP/DTLS/SCTP` or `UDP/DTLS/SCTP`) must always be
// the same as the one in the offer. We know that this is true because we tweak the
// offer to match the protocol.
// The `<fmt>` component must always be `pc-datachannel` for WebRTC.
// The `<fmt>` component must always be `webrtc-datachannel` for WebRTC.
// The rest of the SDP payload adds attributes to this specific media stream.
// RFCs: 8839, 8866, 8841
"m=application " + targetPort + " " + "UDP/DTLS/SCTP webrtc-datachannel" + "\n" +
// Indicates the IP address of the remote.
// Note that "IN" means "Internet".
// Note that "IN" means "Internet" (and not "input").
"c=IN IP" + ipVersion + " " + targetIp + "\n" +
// Media ID - uniquely identifies this media stream (RFC9143).
"a=mid:0" + "\n" +
Expand All @@ -287,6 +287,7 @@ export function start(options?: ClientOptions): Client {
// ICE username and password, which are used for establishing and
// maintaining the ICE connection. (RFC8839)
// MUST match ones used by the answerer (server).
// These values are set according to the libp2p WebRTC specification.
"a=ice-ufrag:" + remoteCertMultibase + "\n" +
"a=ice-pwd:" + remoteCertMultibase + "\n" +
// Fingerprint of the certificate that the server will use during the TLS
Expand All @@ -303,8 +304,9 @@ export function start(options?: ClientOptions): Client {
// (UDP or TCP)
"a=sctp-port:5000" + "\n" +
// The maximum SCTP user message size (in bytes) (RFC8841)
"a=max-message-size:100000" + "\n" +
// A transport address for a candidate that can be used for connectivity checks (RFC8839).
"a=max-message-size:16384" + "\n" + // TODO: should this be part of the spec?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subject to a change, but yes!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// A transport address for a candidate that can be used for connectivity
// checks (RFC8839).
"a=candidate:1 1 UDP 1 " + targetIp + " " + targetPort + " typ host" + "\n";

await pc!.setRemoteDescription({ type: "answer", sdp: remoteSdp });
Expand Down
Loading