Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement WebRTC messages framing #2896

Merged
merged 23 commits into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/light-base/src/json_rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1289,7 +1289,7 @@ impl<TPlat: Platform> Background<TPlat> {

async fn storage_query(
&self,
keys: impl Iterator<Item = impl AsRef<[u8]>> + Clone,
keys: impl Iterator<Item = impl AsRef<[u8]> + Clone> + Clone,
hash: &[u8; 32],
total_attempts: u32,
timeout_per_request: Duration,
Expand Down
2 changes: 1 addition & 1 deletion bin/light-base/src/network_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ impl<TPlat: Platform> NetworkService<TPlat> {
self: Arc<Self>,
chain_index: usize,
target: PeerId, // TODO: takes by value because of futures longevity issue
config: protocol::StorageProofRequestConfig<impl Iterator<Item = impl AsRef<[u8]>>>,
config: protocol::StorageProofRequestConfig<impl Iterator<Item = impl AsRef<[u8]> + Clone>>,
timeout: Duration,
) -> Result<service::EncodedMerkleProof, StorageProofRequestError> {
let rx = {
Expand Down
91 changes: 41 additions & 50 deletions bin/light-base/src/network_service/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,12 +382,7 @@ async fn multi_stream_connection_task<TPlat: Platform>(
// from this slice the data to send. Consequently, the write buffer is held locally. This is
// suboptimal compared to writing to a write buffer provided by the platform, but it is easier
// to implement it this way.
let mut write_buffer = vec![0; 4096];

// When reading/writing substreams, the substream can ask to be woken up after a certain time.
// This variable stores the earliest time when we should be waking up.
// TODO: this is wrong; this code assumes that substreams will be found in `ready_substreams` while it is not the case now; however it seems more appropriate to modify `ready_substreams` rather than accomodate this limitation here
let mut wake_up_after = None;
let mut write_buffer = vec![0; 16384]; // TODO: the write buffer must not exceed 16kiB due to the libp2p WebRTC spec; this should ideally be enforced through the connection task API

loop {
// Start opening new outbound substreams, if needed.
Expand Down Expand Up @@ -424,56 +419,52 @@ async fn multi_stream_connection_task<TPlat: Platform>(

let now = TPlat::now();

// Clear `wake_up_after` if necessary, otherwise it will always stay at a constant value.
// TODO: nit: can use `Option::is_some_and` after it's stable; https://github.com/rust-lang/rust/issues/93050
if wake_up_after
.as_ref()
.map(|time| *time <= now)
.unwrap_or(false)
{
wake_up_after = None;
}
// When reading/writing substreams, the substream can ask to be woken up after a certain
// time. This variable stores the earliest time when we should be waking up.
let mut wake_up_after = None;

// Perform a read-write on all substreams.
// TODO: trying to read/write every single substream every single time is suboptimal, but making this not suboptimal is very complicated
for substream_id in open_substreams.iter().map(|(id, _)| id).collect::<Vec<_>>() {
let substream = &mut open_substreams[substream_id];

let mut read_write = ReadWrite {
now: now.clone(),
incoming_buffer: TPlat::read_buffer(substream),
outgoing_buffer: Some((&mut write_buffer, &mut [])), // TODO: this should be None if a previous read_write() produced None
read_bytes: 0,
written_bytes: 0,
wake_up_after: None,
};

let kill_substream =
connection_task.substream_read_write(&substream_id, &mut read_write);

// Because the `read_write` object borrows the stream, we need to drop it before we
// can modify the connection. Before dropping the `read_write`, clone some important
// information from it.
let read_bytes = read_write.read_bytes;
let written_bytes = read_write.written_bytes;
match (&mut wake_up_after, &read_write.wake_up_after) {
(_, None) => {}
(val @ None, Some(t)) => *val = Some(t.clone()),
(Some(curr), Some(upd)) if *upd < *curr => *curr = upd.clone(),
(Some(_), Some(_)) => {}
}
drop(read_write);
loop {
let substream = &mut open_substreams[substream_id];

let mut read_write = ReadWrite {
now: now.clone(),
incoming_buffer: TPlat::read_buffer(substream),
outgoing_buffer: Some((&mut write_buffer, &mut [])), // TODO: this should be None if a previous read_write() produced None
read_bytes: 0,
written_bytes: 0,
wake_up_after,
};

let kill_substream =
connection_task.substream_read_write(&substream_id, &mut read_write);

// Because the `read_write` object borrows the stream, we need to drop it before we
// can modify the connection. Before dropping the `read_write`, clone some important
// information from it.
let read_bytes = read_write.read_bytes;
let written_bytes = read_write.written_bytes;
wake_up_after = read_write.wake_up_after.take();
drop(read_write);

// Now update the connection.
if written_bytes != 0 {
TPlat::send(substream, &write_buffer[..written_bytes]);
}
TPlat::advance_read_cursor(substream, read_bytes);

// Now update the connection.
if written_bytes != 0 {
TPlat::send(substream, &write_buffer[..written_bytes]);
}
TPlat::advance_read_cursor(substream, read_bytes);
// If the `connection_task` requires this substream to be killed, we drop the `Stream`
// object.
if kill_substream {
open_substreams.remove(substream_id);
break;
}

// If the `connection_task` requires this substream to be killed, we drop the `Stream`
// object.
if kill_substream {
open_substreams.remove(substream_id);
if read_bytes == 0 && written_bytes == 0 {
break;
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion bin/light-base/src/sync_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ impl<TPlat: Platform> SyncService<TPlat> {
block_number: u64,
block_hash: &[u8; 32],
storage_trie_root: &[u8; 32],
requested_keys: impl Iterator<Item = impl AsRef<[u8]>> + Clone,
requested_keys: impl Iterator<Item = impl AsRef<[u8]> + Clone> + Clone,
total_attempts: u32,
timeout_per_request: Duration,
_max_parallel: NonZeroU32,
Expand Down
4 changes: 4 additions & 0 deletions bin/wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

### Changed

- The WebRTC protocol implementation is now up to date with the specification. While the specification hasn't been finalized yet and could still evolve, the current version is believed to be likely to be final. ([#2896](https://github.com/paritytech/smoldot/pull/2896))

### Fixed

- Fix timeout not being checked when opening a notifications substream. ([#2323](https://github.com/paritytech/smoldot/pull/2323))
Expand Down
18 changes: 10 additions & 8 deletions bin/wasm-node/javascript/src/index-browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,29 +256,29 @@ export function start(options?: ClientOptions): Client {
"v=0" + "\n" +
// Identifies the creator of the SDP document. We are allowed to use dummy values
// (`-` and `0.0.0.0`) to remain anonymous, which we do. Note that "IN" means
// "Internet". (RFC8866)
// "Internet" (and not "input"). (RFC8866)
"o=- 0 0 IN IP" + ipVersion + " " + targetIp + "\n" +
// Name for the session. We are allowed to pass a dummy `-`. (RFC8866)
"s=-" + "\n" +
// Start and end of the validity of the session. `0 0` means that the session never
// expires. (RFC8866)
"t=0 0" + "\n" +
// A lite implementation is only appropriate for devices that will
// *always* be connected to the public Internet and have a public
// A non-lite implementation is only appropriate for devices that will
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

??? This was a copy from the specification. Did something change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

? The specification doesn't contain this text. It's me who wrote it. It has a mistake in it, it says that lite implementations are only for publicly-accessible devices. But no, it's non-lite implementations that are for public devices. We're not a public device, so we use the lite implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Certain ICE agents will always be connected to the public Internet
and have a public IP address at which it can receive packets from any
correspondent. To make it easier for these devices to support ICE,
ICE defines a special type of implementation called "lite" (in
contrast to the normal full implementation). Lite agents only use
host candidates and do not generate connectivity checks or run state
machines, though they need to be able to respond to connectivity
checks." https://datatracker.ietf.org/doc/rfc8445/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By specifying ice-lite in server's SDP (that we're pretending we've received through some channel, but instead construct manually), we're saying that server is publicly accessible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest this text doesn't make sense to me no matter whether it says "lite" or "not lite", so I've just reverted it 🤷

// always be connected to the public Internet and have a public
// IP address at which it can receive packets from any
// correspondent. ICE will not function when a lite implementation
// correspondent. ICE will not function when a non-lite implementation
// is placed behind a NAT (RFC8445).
"a=ice-lite" + "\n" +
// A `m=` line describes a request to establish a certain protocol.
// The protocol in this line (i.e. `TCP/DTLS/SCTP` or `UDP/DTLS/SCTP`) must always be
// the same as the one in the offer. We know that this is true because we tweak the
// offer to match the protocol.
// The `<fmt>` component must always be `pc-datachannel` for WebRTC.
// The `<fmt>` component must always be `webrtc-datachannel` for WebRTC.
// The rest of the SDP payload adds attributes to this specific media stream.
// RFCs: 8839, 8866, 8841
"m=application " + targetPort + " " + "UDP/DTLS/SCTP webrtc-datachannel" + "\n" +
// Indicates the IP address of the remote.
// Note that "IN" means "Internet".
// Note that "IN" means "Internet" (and not "input").
"c=IN IP" + ipVersion + " " + targetIp + "\n" +
// Media ID - uniquely identifies this media stream (RFC9143).
"a=mid:0" + "\n" +
Expand All @@ -287,6 +287,7 @@ export function start(options?: ClientOptions): Client {
// ICE username and password, which are used for establishing and
// maintaining the ICE connection. (RFC8839)
// MUST match ones used by the answerer (server).
// These values are set according to the libp2p WebRTC specification.
"a=ice-ufrag:" + remoteCertMultibase + "\n" +
"a=ice-pwd:" + remoteCertMultibase + "\n" +
// Fingerprint of the certificate that the server will use during the TLS
Expand All @@ -303,8 +304,9 @@ export function start(options?: ClientOptions): Client {
// (UDP or TCP)
"a=sctp-port:5000" + "\n" +
// The maximum SCTP user message size (in bytes) (RFC8841)
"a=max-message-size:100000" + "\n" +
// A transport address for a candidate that can be used for connectivity checks (RFC8839).
"a=max-message-size:16384" + "\n" + // TODO: should this be part of the spec?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subject to a change, but yes!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// A transport address for a candidate that can be used for connectivity
// checks (RFC8839).
"a=candidate:1 1 UDP 1 " + targetIp + " " + targetPort + " typ host" + "\n";

await pc!.setRemoteDescription({ type: "answer", sdp: remoteSdp });
Expand Down
Loading