Skip to content

Commit

Permalink
Improve the networking abstraction of the Platform trait (#315)
Browse files Browse the repository at this point in the history
* Improve the networking abstraction of the Platform trait

* Update AsyncStdTcpWebSocket

* Remove TODO

* Propagate changes to Rust <-> JS bindings

* Update NodeJS TCP

* Update index-deno.ts

* Update browser websocket

* Update NodeJS WebSocket

* Increase initial bufferable size

* Update browser WebRTC

* Fix issues in AsyncStdTcpWebSocket

* Add closeSend function

* Some obvious comments

* Add write_closable flag for single-stream connections

* Update wasm-node/platform

* CHANGELOG

* Spellcheck

* Rustfmt

* Docfix

* More CHANGELOG

* Add debug_assert

* Fix order of calls w.r.t. WebSocket

* Also fix NodeJS TCP
  • Loading branch information
tomaka authored Mar 24, 2023
1 parent 6e8fd53 commit 9de929e
Show file tree
Hide file tree
Showing 11 changed files with 903 additions and 369 deletions.
164 changes: 92 additions & 72 deletions light-base/src/network_service/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use super::Shared;
use crate::platform::{Platform, PlatformConnection, PlatformSubstreamDirection, ReadBuffer};

use alloc::{string::ToString as _, sync::Arc, vec, vec::Vec};
use core::{iter, pin::Pin};
use core::{cmp, iter, pin::Pin};
use futures::{channel::mpsc, prelude::*};
use smoldot::{
libp2p::{collection::SubstreamFate, read_write::ReadWrite},
Expand Down Expand Up @@ -211,7 +211,8 @@ async fn single_stream_connection_task<TPlat: Platform>(
// from this slice the data to send. Consequently, the write buffer is held locally. This is
// suboptimal compared to writing to a write buffer provided by the platform, but it is easier
// to implement it this way.
let mut write_buffer = vec![0; 4096];
// Switched to `None` after the connection closes its writing side.
let mut write_buffer = Some(vec![0; 4096]);

// The main loop is as follows:
// - Update the state machine.
Expand All @@ -229,55 +230,67 @@ async fn single_stream_connection_task<TPlat: Platform>(

let now = TPlat::now();

let (read_bytes, read_buffer_has_data, read_buffer_closed, written_bytes, wake_up_after) =
if !connection_task.is_reset_called() {
let incoming_buffer = match TPlat::read_buffer(&mut connection) {
ReadBuffer::Reset => {
connection_task.reset();
continue;
}
ReadBuffer::Open(b) => Some(b),
ReadBuffer::Closed => None,
};

// Perform a read-write. This updates the internal state of the connection task.
let mut read_write = ReadWrite {
now: now.clone(),
incoming_buffer,
outgoing_buffer: Some((&mut write_buffer, &mut [])), // TODO: this should be None if a previous read_write() produced None
read_bytes: 0,
written_bytes: 0,
wake_up_after: None,
};
connection_task.read_write(&mut read_write);

// Because the `read_write` object borrows the connection, we need to drop it before we
// can modify the connection. Before dropping the `read_write`, clone some important
// information from it.
let read_buffer_has_data =
read_write.incoming_buffer.map_or(false, |b| !b.is_empty());
let read_buffer_closed = read_write.incoming_buffer.is_none();
let read_bytes = read_write.read_bytes;
let written_bytes = read_write.written_bytes;
let wake_up_after = read_write.wake_up_after.clone();
drop(read_write);
let (read_bytes, written_bytes, wake_up_after) = if !connection_task.is_reset_called() {
let write_side_was_open = write_buffer.is_some();
let writable_bytes = cmp::min(
TPlat::writable_bytes(&mut connection),
write_buffer.as_ref().map_or(0, |b| b.len()),
);

// Now update the connection.
if written_bytes != 0 {
TPlat::send(&mut connection, &write_buffer[..written_bytes]);
let incoming_buffer = match TPlat::read_buffer(&mut connection) {
ReadBuffer::Reset => {
connection_task.reset();
continue;
}
TPlat::advance_read_cursor(&mut connection, read_bytes);
ReadBuffer::Open(b) => Some(b),
ReadBuffer::Closed => None,
};

(
read_bytes,
read_buffer_has_data,
read_buffer_closed,
written_bytes,
wake_up_after,
)
} else {
(0, false, true, 0, None)
// Perform a read-write. This updates the internal state of the connection task.
let mut read_write = ReadWrite {
now: now.clone(),
incoming_buffer,
outgoing_buffer: if let Some(write_buffer) = write_buffer.as_mut() {
Some((&mut write_buffer[..writable_bytes], &mut []))
} else {
None
},
read_bytes: 0,
written_bytes: 0,
wake_up_after: None,
};
connection_task.read_write(&mut read_write);

// Because the `read_write` object borrows the connection, we need to drop it before we
// can modify the connection. Before dropping the `read_write`, clone some important
// information from it.
let read_bytes = read_write.read_bytes;
let write_size_closed = write_side_was_open && read_write.outgoing_buffer.is_none();
let written_bytes = read_write.written_bytes;
debug_assert!(written_bytes <= writable_bytes);
let wake_up_after = read_write.wake_up_after.clone();
drop(read_write);

// Now update the connection.
if written_bytes != 0 {
// `written_bytes`non-zero when the writing side has been closed before
// doesn't make sense and would indicate a bug in the networking code
TPlat::send(
&mut connection,
&write_buffer.as_mut().unwrap()[..written_bytes],
);
}
if write_size_closed {
TPlat::close_send(&mut connection);
debug_assert!(write_buffer.is_some());
write_buffer = None;
}
TPlat::advance_read_cursor(&mut connection, read_bytes);

(read_bytes, written_bytes, wake_up_after)
} else {
(0, 0, None)
};

// Try pull message to send to the coordinator.

Expand Down Expand Up @@ -334,6 +347,10 @@ async fn single_stream_connection_task<TPlat: Platform>(
if let Some(task_update) = task_update {
connection_task = task_update;
} else {
// As documented in `update_stream`, we call this function one last time in order to
// give the possibility to the implementation to process closing the writing side
// before the connection is dropped.
TPlat::update_stream(&mut connection).await;
return;
}

Expand All @@ -358,23 +375,15 @@ async fn single_stream_connection_task<TPlat: Platform>(
future::Either::Right(future::pending())
}
.fuse();

// Future that is woken up when new data is ready on the socket.
let read_buffer_ready = if !(read_buffer_has_data && read_bytes == 0) && !read_buffer_closed
{
future::Either::Left(TPlat::wait_more_data(&mut connection))
} else {
future::Either::Right(future::pending())
};

// Future that is woken up when new data is ready on the socket or more data is writable.
let stream_update = TPlat::update_stream(&mut connection);
// Future that is woken up when a new message is coming from the coordinator.
let message_from_coordinator = Pin::new(&mut coordinator_to_connection).peek();

// Wait until either some data is ready on the socket, or the connection state machine
// has requested to be polled again, or a message is coming from the coordinator.
futures::pin_mut!(read_buffer_ready);
// Combines the three futures above into one.
futures::pin_mut!(stream_update);
future::select(
future::select(read_buffer_ready, message_from_coordinator),
future::select(stream_update, message_from_coordinator),
poll_after,
)
.await;
Expand Down Expand Up @@ -408,10 +417,11 @@ async fn webrtc_multi_stream_connection_task<TPlat: Platform>(
// Newly-open substream that has just been yielded by the connection.
let mut newly_open_substream = None;
// `true` if the remote has force-closed our connection.
let mut has_reset = false;
let mut remote_has_reset = false;
// List of all currently open substreams. The index (as a `usize`) corresponds to the id
// of this substream within the `connection_task` state machine.
let mut open_substreams = slab::Slab::<TPlat::Stream>::with_capacity(16);
// For each stream, a boolean indicates whether the local writing side is closed.
let mut open_substreams = slab::Slab::<(TPlat::Stream, bool)>::with_capacity(16);

// In order to write data on a stream, we simply pass a slice, and the platform will copy
// from this slice the data to send. Consequently, the write buffer is held locally. This is
Expand All @@ -438,7 +448,7 @@ async fn webrtc_multi_stream_connection_task<TPlat: Platform>(
PlatformSubstreamDirection::Outbound => true,
PlatformSubstreamDirection::Inbound => false,
};
let id = open_substreams.insert(stream);
let id = open_substreams.insert((stream, true));
connection_task.add_substream(id, outbound);
if outbound {
pending_opening_out_substreams -= 1;
Expand All @@ -464,7 +474,9 @@ async fn webrtc_multi_stream_connection_task<TPlat: Platform>(
// TODO: trying to read/write every single substream every single time is suboptimal, but making this not suboptimal is very complicated
for substream_id in open_substreams.iter().map(|(id, _)| id).collect::<Vec<_>>() {
loop {
let substream = &mut open_substreams[substream_id];
let (substream, write_side_was_open) = &mut open_substreams[substream_id];

let writable_bytes = cmp::min(TPlat::writable_bytes(substream), write_buffer.len());

let incoming_buffer = match TPlat::read_buffer(substream) {
ReadBuffer::Open(buf) => buf,
Expand All @@ -480,7 +492,11 @@ async fn webrtc_multi_stream_connection_task<TPlat: Platform>(
let mut read_write = ReadWrite {
now: now.clone(),
incoming_buffer: Some(incoming_buffer),
outgoing_buffer: Some((&mut write_buffer, &mut [])),
outgoing_buffer: if *write_side_was_open {
Some((&mut write_buffer[..writable_bytes], &mut []))
} else {
None
},
read_bytes: 0,
written_bytes: 0,
wake_up_after,
Expand All @@ -496,13 +512,19 @@ async fn webrtc_multi_stream_connection_task<TPlat: Platform>(
// information from it.
let read_bytes = read_write.read_bytes;
let written_bytes = read_write.written_bytes;
let must_close_writing_side =
*write_side_was_open && read_write.outgoing_buffer.is_none();
wake_up_after = read_write.wake_up_after.take();
drop(read_write);

// Now update the connection.
if written_bytes != 0 {
TPlat::send(substream, &write_buffer[..written_bytes]);
}
if must_close_writing_side {
TPlat::close_send(substream);
*write_side_was_open = false;
}
TPlat::advance_read_cursor(substream, read_bytes);

// If the `connection_task` requires this substream to be killed, we drop the
Expand Down Expand Up @@ -598,13 +620,11 @@ async fn webrtc_multi_stream_connection_task<TPlat: Platform>(
.fuse();

// Future that is woken up when new data is ready on any of the streams.
// TODO: very suboptimal
// TODO: will loop infinitely if the remote closes its writing side because `wait_more_data` is immediately ready when that is the case
let data_ready = iter::once(future::Either::Right(future::pending()))
let streams_updated = iter::once(future::Either::Right(future::pending()))
.chain(
open_substreams
.iter_mut()
.map(|(_, stream)| future::Either::Left(TPlat::wait_more_data(stream))),
.map(|(_, (stream, _))| future::Either::Left(TPlat::update_stream(stream))),
)
.collect::<future::SelectAll<_>>();

Expand All @@ -615,18 +635,18 @@ async fn webrtc_multi_stream_connection_task<TPlat: Platform>(
debug_assert!(newly_open_substream.is_none());
futures::select! {
_ = message_from_coordinator => {}
substream = if has_reset { either::Right(future::pending()) } else { either::Left(TPlat::next_substream(&mut connection)) }.fuse() => {
substream = if remote_has_reset { either::Right(future::pending()) } else { either::Left(TPlat::next_substream(&mut connection)) }.fuse() => {
match substream {
Some(s) => newly_open_substream = Some(s),
None => {
// `None` is returned if the remote has force-closed the connection.
connection_task.reset();
has_reset = true;
remote_has_reset = true;
}
}
}
_ = poll_after => {}
_ = data_ready.fuse() => {}
_ = streams_updated.fuse() => {}
}
}
}
82 changes: 67 additions & 15 deletions light-base/src/platform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,18 @@ pub trait Platform: Send + 'static {
/// the `Connection` and all its associated substream objects ([`Platform::Stream`]) have
/// been dropped.
type Connection: Send + Sync + 'static;
/// Opaque object representing either a single-stream connection or a substream in a
/// multi-stream connection.
///
/// If this object is abruptly dropped, the underlying single stream connection or substream
/// should be abruptly dropped (i.e. RST) as well, unless its reading and writing sides
/// have been gracefully closed in the past.
type Stream: Send + 'static;
type ConnectFuture: Future<Output = Result<PlatformConnection<Self::Stream, Self::Connection>, ConnectError>>
+ Unpin
+ Send
+ 'static;
type StreamDataFuture<'a>: Future<Output = ()> + Unpin + Send + 'a;
type StreamUpdateFuture<'a>: Future<Output = ()> + Unpin + Send + 'a;
type NextSubstreamFuture<'a>: Future<Output = Option<(Self::Stream, PlatformSubstreamDirection)>>
+ Unpin
+ Send
Expand Down Expand Up @@ -106,35 +112,81 @@ pub trait Platform: Send + 'static {
/// all its associated `Stream`s as soon as possible.
fn next_substream(connection: &'_ mut Self::Connection) -> Self::NextSubstreamFuture<'_>;

/// Returns a future that becomes ready when either the read buffer of the given stream
/// contains data, or the remote has closed their sending side.
/// Synchronizes the stream with the "actual" stream.
///
/// Returns a future that becomes ready when "something" in the state has changed. In other
/// words, when data has been added to the read buffer of the given stream , or the remote
/// closes their sending side, or the number of writable bytes (see
/// [`Platform::writable_bytes`]) increases.
///
/// This function might not add data to the read buffer nor process the remote closing its
/// writing side, unless the read buffer has been emptied beforehand using
/// [`Platform::advance_read_cursor`].
///
/// In the specific situation where the reading side is closed and the writing side has been
/// closed using [`Platform::close_send`], the API user must call this function before dropping
/// the `Stream` object. This makes it possible for the implementation to finish cleaning up
/// everything gracefully before the object is dropped.
///
/// This function should also flush any outgoing data if necessary.
///
/// The future is immediately ready if data is already available or the remote has already
/// closed their sending side.
fn wait_more_data(stream: &'_ mut Self::Stream) -> Self::StreamDataFuture<'_>;
/// In order to avoid race conditions, the state of the read buffer and the writable bytes
/// shouldn't be updated unless this function is called.
/// In other words, calling this function switches the stream from a state to another, and
/// this state transition should only happen when this function is called and not otherwise.
fn update_stream(stream: &'_ mut Self::Stream) -> Self::StreamUpdateFuture<'_>;

/// Gives access to the content of the read buffer of the given stream.
fn read_buffer(stream: &mut Self::Stream) -> ReadBuffer;

/// Discards the first `bytes` bytes of the read buffer of this stream. This makes it
/// possible for the remote to send more data.
/// Discards the first `bytes` bytes of the read buffer of this stream.
///
/// This makes it possible for more data to be received when [`Platform::update_stream`] is
/// called.
///
/// # Panic
///
/// Panics if there aren't enough bytes to discard in the buffer.
///
fn advance_read_cursor(stream: &mut Self::Stream, bytes: usize);

/// Queues the given bytes to be sent out on the given connection.
/// Returns the maximum size of the buffer that can be passed to [`Platform::send`].
///
/// Must return 0 if [`Platform::close_send`] has previously been called, or if the stream
/// has been reset by the remote.
///
/// If [`Platform::send`] is called, the number of writable bytes must decrease by exactly
/// the size of the buffer that was provided.
/// The number of writable bytes should never change unless [`Platform::update_stream`] is
/// called.
fn writable_bytes(stream: &mut Self::Stream) -> usize;

/// Queues the given bytes to be sent out on the given stream.
///
/// > **Note**: In the case of [`PlatformConnection::MultiStreamWebRtc`], be aware that there
/// > exists a limit to the amount of data to send at once. The `data` parameter
/// > is guaranteed to fit within that limit. Due to the existence of this limit,
/// > the implementation of this function shouldn't attempt to save function calls
/// > by performing internal buffering and batching multiple calls into one.
// TODO: back-pressure
// TODO: allow closing sending side
/// > exists a limit to the amount of data to send in a single packet. The `data`
/// > parameter is guaranteed to fit within that limit. Due to the existence of this
/// > limit, the implementation of this function shouldn't attempt to save function
/// > calls by performing internal buffering and batching multiple calls into one.
///
/// # Panic
///
/// Panics if `data.is_empty()`.
/// Panics if `data.len()` is superior to the value returned by [`Platform::writable_bytes`].
/// Panics if [`Platform::close_send`] has been called before on this stream.
///
fn send(stream: &mut Self::Stream, data: &[u8]);

/// Closes the sending side of the given stream.
///
/// > **Note**: In situations where this isn't possible, such as with the WebSocket protocol,
/// > this is a no-op.
///
/// # Panic
///
/// Panics if [`Platform::close_send`] has already been called on this stream.
///
fn close_send(stream: &mut Self::Stream);
}

/// Type of opened connection. See [`Platform::connect`].
Expand Down
Loading

0 comments on commit 9de929e

Please sign in to comment.