Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the networking abstraction of the Platform trait #315

Merged
merged 24 commits into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 92 additions & 72 deletions light-base/src/network_service/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use super::Shared;
use crate::platform::{Platform, PlatformConnection, PlatformSubstreamDirection, ReadBuffer};

use alloc::{string::ToString as _, sync::Arc, vec, vec::Vec};
use core::{iter, pin::Pin};
use core::{cmp, iter, pin::Pin};
use futures::{channel::mpsc, prelude::*};
use smoldot::{
libp2p::{collection::SubstreamFate, read_write::ReadWrite},
Expand Down Expand Up @@ -211,7 +211,8 @@ async fn single_stream_connection_task<TPlat: Platform>(
// from this slice the data to send. Consequently, the write buffer is held locally. This is
// suboptimal compared to writing to a write buffer provided by the platform, but it is easier
// to implement it this way.
let mut write_buffer = vec![0; 4096];
// Switched to `None` after the connection closes its writing side.
let mut write_buffer = Some(vec![0; 4096]);

// The main loop is as follows:
// - Update the state machine.
Expand All @@ -229,55 +230,67 @@ async fn single_stream_connection_task<TPlat: Platform>(

let now = TPlat::now();

let (read_bytes, read_buffer_has_data, read_buffer_closed, written_bytes, wake_up_after) =
if !connection_task.is_reset_called() {
let incoming_buffer = match TPlat::read_buffer(&mut connection) {
ReadBuffer::Reset => {
connection_task.reset();
continue;
}
ReadBuffer::Open(b) => Some(b),
ReadBuffer::Closed => None,
};

// Perform a read-write. This updates the internal state of the connection task.
let mut read_write = ReadWrite {
now: now.clone(),
incoming_buffer,
outgoing_buffer: Some((&mut write_buffer, &mut [])), // TODO: this should be None if a previous read_write() produced None
read_bytes: 0,
written_bytes: 0,
wake_up_after: None,
};
connection_task.read_write(&mut read_write);

// Because the `read_write` object borrows the connection, we need to drop it before we
// can modify the connection. Before dropping the `read_write`, clone some important
// information from it.
let read_buffer_has_data =
read_write.incoming_buffer.map_or(false, |b| !b.is_empty());
let read_buffer_closed = read_write.incoming_buffer.is_none();
let read_bytes = read_write.read_bytes;
let written_bytes = read_write.written_bytes;
let wake_up_after = read_write.wake_up_after.clone();
drop(read_write);
let (read_bytes, written_bytes, wake_up_after) = if !connection_task.is_reset_called() {
let write_side_was_open = write_buffer.is_some();
let writable_bytes = cmp::min(
TPlat::writable_bytes(&mut connection),
write_buffer.as_ref().map_or(0, |b| b.len()),
);

// Now update the connection.
if written_bytes != 0 {
TPlat::send(&mut connection, &write_buffer[..written_bytes]);
let incoming_buffer = match TPlat::read_buffer(&mut connection) {
ReadBuffer::Reset => {
connection_task.reset();
continue;
}
TPlat::advance_read_cursor(&mut connection, read_bytes);
ReadBuffer::Open(b) => Some(b),
ReadBuffer::Closed => None,
};

(
read_bytes,
read_buffer_has_data,
read_buffer_closed,
written_bytes,
wake_up_after,
)
} else {
(0, false, true, 0, None)
// Perform a read-write. This updates the internal state of the connection task.
let mut read_write = ReadWrite {
now: now.clone(),
incoming_buffer,
outgoing_buffer: if let Some(write_buffer) = write_buffer.as_mut() {
Some((&mut write_buffer[..writable_bytes], &mut []))
} else {
None
},
read_bytes: 0,
written_bytes: 0,
wake_up_after: None,
};
connection_task.read_write(&mut read_write);

// Because the `read_write` object borrows the connection, we need to drop it before we
// can modify the connection. Before dropping the `read_write`, clone some important
// information from it.
let read_bytes = read_write.read_bytes;
let write_size_closed = write_side_was_open && read_write.outgoing_buffer.is_none();
let written_bytes = read_write.written_bytes;
debug_assert!(written_bytes <= writable_bytes);
let wake_up_after = read_write.wake_up_after.clone();
drop(read_write);

// Now update the connection.
if written_bytes != 0 {
// `written_bytes`non-zero when the writing side has been closed before
// doesn't make sense and would indicate a bug in the networking code
TPlat::send(
&mut connection,
&write_buffer.as_mut().unwrap()[..written_bytes],
);
}
if write_size_closed {
TPlat::close_send(&mut connection);
debug_assert!(write_buffer.is_some());
write_buffer = None;
}
TPlat::advance_read_cursor(&mut connection, read_bytes);

(read_bytes, written_bytes, wake_up_after)
} else {
(0, 0, None)
};

// Try pull message to send to the coordinator.

Expand Down Expand Up @@ -334,6 +347,10 @@ async fn single_stream_connection_task<TPlat: Platform>(
if let Some(task_update) = task_update {
connection_task = task_update;
} else {
// As documented in `update_stream`, we call this function one last time in order to
// give the possibility to the implementation to process closing the writing side
// before the connection is dropped.
TPlat::update_stream(&mut connection).await;
return;
}

Expand All @@ -358,23 +375,15 @@ async fn single_stream_connection_task<TPlat: Platform>(
future::Either::Right(future::pending())
}
.fuse();

// Future that is woken up when new data is ready on the socket.
let read_buffer_ready = if !(read_buffer_has_data && read_bytes == 0) && !read_buffer_closed
{
future::Either::Left(TPlat::wait_more_data(&mut connection))
} else {
future::Either::Right(future::pending())
};

// Future that is woken up when new data is ready on the socket or more data is writable.
let stream_update = TPlat::update_stream(&mut connection);
// Future that is woken up when a new message is coming from the coordinator.
let message_from_coordinator = Pin::new(&mut coordinator_to_connection).peek();

// Wait until either some data is ready on the socket, or the connection state machine
// has requested to be polled again, or a message is coming from the coordinator.
futures::pin_mut!(read_buffer_ready);
// Combines the three futures above into one.
futures::pin_mut!(stream_update);
future::select(
future::select(read_buffer_ready, message_from_coordinator),
future::select(stream_update, message_from_coordinator),
poll_after,
)
.await;
Expand Down Expand Up @@ -408,10 +417,11 @@ async fn webrtc_multi_stream_connection_task<TPlat: Platform>(
// Newly-open substream that has just been yielded by the connection.
let mut newly_open_substream = None;
// `true` if the remote has force-closed our connection.
let mut has_reset = false;
let mut remote_has_reset = false;
// List of all currently open substreams. The index (as a `usize`) corresponds to the id
// of this substream within the `connection_task` state machine.
let mut open_substreams = slab::Slab::<TPlat::Stream>::with_capacity(16);
// For each stream, a boolean indicates whether the local writing side is closed.
let mut open_substreams = slab::Slab::<(TPlat::Stream, bool)>::with_capacity(16);

// In order to write data on a stream, we simply pass a slice, and the platform will copy
// from this slice the data to send. Consequently, the write buffer is held locally. This is
Expand All @@ -438,7 +448,7 @@ async fn webrtc_multi_stream_connection_task<TPlat: Platform>(
PlatformSubstreamDirection::Outbound => true,
PlatformSubstreamDirection::Inbound => false,
};
let id = open_substreams.insert(stream);
let id = open_substreams.insert((stream, true));
connection_task.add_substream(id, outbound);
if outbound {
pending_opening_out_substreams -= 1;
Expand All @@ -464,7 +474,9 @@ async fn webrtc_multi_stream_connection_task<TPlat: Platform>(
// TODO: trying to read/write every single substream every single time is suboptimal, but making this not suboptimal is very complicated
for substream_id in open_substreams.iter().map(|(id, _)| id).collect::<Vec<_>>() {
loop {
let substream = &mut open_substreams[substream_id];
let (substream, write_side_was_open) = &mut open_substreams[substream_id];

let writable_bytes = cmp::min(TPlat::writable_bytes(substream), write_buffer.len());

let incoming_buffer = match TPlat::read_buffer(substream) {
ReadBuffer::Open(buf) => buf,
Expand All @@ -480,7 +492,11 @@ async fn webrtc_multi_stream_connection_task<TPlat: Platform>(
let mut read_write = ReadWrite {
now: now.clone(),
incoming_buffer: Some(incoming_buffer),
outgoing_buffer: Some((&mut write_buffer, &mut [])),
outgoing_buffer: if *write_side_was_open {
Some((&mut write_buffer[..writable_bytes], &mut []))
} else {
None
},
read_bytes: 0,
written_bytes: 0,
wake_up_after,
Expand All @@ -496,13 +512,19 @@ async fn webrtc_multi_stream_connection_task<TPlat: Platform>(
// information from it.
let read_bytes = read_write.read_bytes;
let written_bytes = read_write.written_bytes;
let must_close_writing_side =
*write_side_was_open && read_write.outgoing_buffer.is_none();
wake_up_after = read_write.wake_up_after.take();
drop(read_write);

// Now update the connection.
if written_bytes != 0 {
TPlat::send(substream, &write_buffer[..written_bytes]);
}
if must_close_writing_side {
TPlat::close_send(substream);
*write_side_was_open = false;
}
TPlat::advance_read_cursor(substream, read_bytes);

// If the `connection_task` requires this substream to be killed, we drop the
Expand Down Expand Up @@ -598,13 +620,11 @@ async fn webrtc_multi_stream_connection_task<TPlat: Platform>(
.fuse();

// Future that is woken up when new data is ready on any of the streams.
// TODO: very suboptimal
// TODO: will loop infinitely if the remote closes its writing side because `wait_more_data` is immediately ready when that is the case
let data_ready = iter::once(future::Either::Right(future::pending()))
let streams_updated = iter::once(future::Either::Right(future::pending()))
.chain(
open_substreams
.iter_mut()
.map(|(_, stream)| future::Either::Left(TPlat::wait_more_data(stream))),
.map(|(_, (stream, _))| future::Either::Left(TPlat::update_stream(stream))),
)
.collect::<future::SelectAll<_>>();

Expand All @@ -615,18 +635,18 @@ async fn webrtc_multi_stream_connection_task<TPlat: Platform>(
debug_assert!(newly_open_substream.is_none());
futures::select! {
_ = message_from_coordinator => {}
substream = if has_reset { either::Right(future::pending()) } else { either::Left(TPlat::next_substream(&mut connection)) }.fuse() => {
substream = if remote_has_reset { either::Right(future::pending()) } else { either::Left(TPlat::next_substream(&mut connection)) }.fuse() => {
match substream {
Some(s) => newly_open_substream = Some(s),
None => {
// `None` is returned if the remote has force-closed the connection.
connection_task.reset();
has_reset = true;
remote_has_reset = true;
}
}
}
_ = poll_after => {}
_ = data_ready.fuse() => {}
_ = streams_updated.fuse() => {}
}
}
}
82 changes: 67 additions & 15 deletions light-base/src/platform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,18 @@ pub trait Platform: Send + 'static {
/// the `Connection` and all its associated substream objects ([`Platform::Stream`]) have
/// been dropped.
type Connection: Send + Sync + 'static;
/// Opaque object representing either a single-stream connection or a substream in a
/// multi-stream connection.
///
/// If this object is abruptly dropped, the underlying single stream connection or substream
/// should be abruptly dropped (i.e. RST) as well, unless its reading and writing sides
/// have been gracefully closed in the past.
type Stream: Send + 'static;
type ConnectFuture: Future<Output = Result<PlatformConnection<Self::Stream, Self::Connection>, ConnectError>>
+ Unpin
+ Send
+ 'static;
type StreamDataFuture<'a>: Future<Output = ()> + Unpin + Send + 'a;
type StreamUpdateFuture<'a>: Future<Output = ()> + Unpin + Send + 'a;
type NextSubstreamFuture<'a>: Future<Output = Option<(Self::Stream, PlatformSubstreamDirection)>>
+ Unpin
+ Send
Expand Down Expand Up @@ -106,35 +112,81 @@ pub trait Platform: Send + 'static {
/// all its associated `Stream`s as soon as possible.
fn next_substream(connection: &'_ mut Self::Connection) -> Self::NextSubstreamFuture<'_>;

/// Returns a future that becomes ready when either the read buffer of the given stream
/// contains data, or the remote has closed their sending side.
/// Synchronizes the stream with the "actual" stream.
///
/// Returns a future that becomes ready when "something" in the state has changed. In other
/// words, when data has been added to the read buffer of the given stream , or the remote
/// closes their sending side, or the number of writable bytes (see
/// [`Platform::writable_bytes`]) increases.
///
/// This function might not add data to the read buffer nor process the remote closing its
/// writing side, unless the read buffer has been emptied beforehand using
/// [`Platform::advance_read_cursor`].
///
/// In the specific situation where the reading side is closed and the writing side has been
/// closed using [`Platform::close_send`], the API user must call this function before dropping
/// the `Stream` object. This makes it possible for the implementation to finish cleaning up
/// everything gracefully before the object is dropped.
///
/// This function should also flush any outgoing data if necessary.
///
/// The future is immediately ready if data is already available or the remote has already
/// closed their sending side.
fn wait_more_data(stream: &'_ mut Self::Stream) -> Self::StreamDataFuture<'_>;
/// In order to avoid race conditions, the state of the read buffer and the writable bytes
/// shouldn't be updated unless this function is called.
/// In other words, calling this function switches the stream from a state to another, and
/// this state transition should only happen when this function is called and not otherwise.
fn update_stream(stream: &'_ mut Self::Stream) -> Self::StreamUpdateFuture<'_>;

/// Gives access to the content of the read buffer of the given stream.
fn read_buffer(stream: &mut Self::Stream) -> ReadBuffer;

/// Discards the first `bytes` bytes of the read buffer of this stream. This makes it
/// possible for the remote to send more data.
/// Discards the first `bytes` bytes of the read buffer of this stream.
///
/// This makes it possible for more data to be received when [`Platform::update_stream`] is
/// called.
///
/// # Panic
///
/// Panics if there aren't enough bytes to discard in the buffer.
///
fn advance_read_cursor(stream: &mut Self::Stream, bytes: usize);

/// Queues the given bytes to be sent out on the given connection.
/// Returns the maximum size of the buffer that can be passed to [`Platform::send`].
///
/// Must return 0 if [`Platform::close_send`] has previously been called, or if the stream
/// has been reset by the remote.
///
/// If [`Platform::send`] is called, the number of writable bytes must decrease by exactly
/// the size of the buffer that was provided.
/// The number of writable bytes should never change unless [`Platform::update_stream`] is
/// called.
fn writable_bytes(stream: &mut Self::Stream) -> usize;

/// Queues the given bytes to be sent out on the given stream.
///
/// > **Note**: In the case of [`PlatformConnection::MultiStreamWebRtc`], be aware that there
/// > exists a limit to the amount of data to send at once. The `data` parameter
/// > is guaranteed to fit within that limit. Due to the existence of this limit,
/// > the implementation of this function shouldn't attempt to save function calls
/// > by performing internal buffering and batching multiple calls into one.
// TODO: back-pressure
// TODO: allow closing sending side
/// > exists a limit to the amount of data to send in a single packet. The `data`
/// > parameter is guaranteed to fit within that limit. Due to the existence of this
/// > limit, the implementation of this function shouldn't attempt to save function
/// > calls by performing internal buffering and batching multiple calls into one.
///
/// # Panic
///
/// Panics if `data.is_empty()`.
/// Panics if `data.len()` is superior to the value returned by [`Platform::writable_bytes`].
/// Panics if [`Platform::close_send`] has been called before on this stream.
///
fn send(stream: &mut Self::Stream, data: &[u8]);

/// Closes the sending side of the given stream.
///
/// > **Note**: In situations where this isn't possible, such as with the WebSocket protocol,
/// > this is a no-op.
///
/// # Panic
///
/// Panics if [`Platform::close_send`] has already been called on this stream.
///
fn close_send(stream: &mut Self::Stream);
}

/// Type of opened connection. See [`Platform::connect`].
Expand Down
Loading