Skip to content

Commit

Permalink
Add support for "multi-stream connections" (#2352)
Browse files Browse the repository at this point in the history
* ConnectionTask -> SingleStreamConnectionTask

* Add draft API to collection.rs

* Propagate functions to outer API layers

* Move established.rs -> single_stream.rs

* Restore compilation after previous commit

* Move some types out of single_stream

* Move out reexports from substream module

* Precise reexports

* Add draft for multi_stream connection

* Move SubstreamId to parent module

* Make multi_stream compile

* Add TODO in single_stream

* Update Platform trait

* Update network_service

* Propagate events in multi_stream

* Rename Established -> SingleStream

* Start integration in collection.rs

* Start proper events handling

* Now compiles

* Implement ready_substreams()

* Add MultiStreamConnectionTask::reset()

* Fix warnings

* Fix fuzzer

* Update JS bindings

* Fix warnings

* Typo and docs

* Rustfmt

* Renames

* Explain num expected substreams

* Fix many doclink errors

* More doclinks fixes

* More doclinks

* Hopefully last round of doclink fixes

* I-swear-everything-is-fixed-now

* Hopefully now it's fixed

* Fix doclinks in wasm-node

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
tomaka and mergify[bot] authored Jun 14, 2022
1 parent ac091ea commit f1e2a38
Show file tree
Hide file tree
Showing 16 changed files with 3,234 additions and 1,214 deletions.
14 changes: 9 additions & 5 deletions bin/full-node/src/run/network_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,11 @@ impl NetworkService {

let task = {
let mut guarded = inner.guarded.lock().await;
let (connection_id, connection_task) = guarded
.network
.add_incoming_connection(Instant::now(), multiaddr.clone());
let (connection_id, connection_task) =
guarded.network.add_single_stream_incoming_connection(
Instant::now(),
multiaddr.clone(),
);

let (tx, rx) = mpsc::channel(16); // TODO: ?!
guarded.active_connections.insert(connection_id, tx);
Expand Down Expand Up @@ -1091,7 +1093,9 @@ async fn opening_connection_task(
// has succeeded.
let mut guarded = inner.guarded.lock().await;
guarded.num_pending_out_attempts -= 1;
let (connection_id, connection_task) = guarded.network.pending_outcome_ok(start_connect.id);
let (connection_id, connection_task) = guarded
.network
.pending_outcome_ok_single_stream(start_connect.id);
inner.wake_up_main_background_task.notify(1);

let (tx, rx) = mpsc::channel(16); // TODO: ?!
Expand Down Expand Up @@ -1125,7 +1129,7 @@ async fn established_connection_task(
tcp_socket: async_std::net::TcpStream,
inner: Arc<Inner>,
connection_id: service::ConnectionId,
mut connection_task: service::ConnectionTask<Instant>,
mut connection_task: service::SingleStreamConnectionTask<Instant>,
coordinator_to_connection: mpsc::Receiver<service::CoordinatorToConnection<Instant>>,
mut connection_to_coordinator: mpsc::Sender<(
service::ConnectionId,
Expand Down
2 changes: 1 addition & 1 deletion bin/fuzz/fuzz_targets/network-connection-raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ libfuzzer_sys::fuzz_target!(|data: &[u8]| {
is_initiator
};

let (_id, mut task) = collection.insert(Duration::new(0, 0), is_initiator, ());
let (_id, mut task) = collection.insert_single_stream(Duration::new(0, 0), is_initiator, ());

let mut out_buffer = vec![0; 4096];

Expand Down
72 changes: 60 additions & 12 deletions bin/light-base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,23 @@ pub trait Platform: Send + 'static {
+ Send
+ Sync
+ 'static;

/// A multi-stream connection.
///
/// This object is merely a handle. The underlying connection should be dropped only after
/// the `Connection` and all its associated substream objects ([`Platform::Stream`]) have
/// been dropped.
type Connection: Send + Sync + 'static;
type ConnectFuture: Future<Output = Result<Self::Connection, ConnectError>>
type Stream: Send + Sync + 'static;
type ConnectFuture: Future<Output = Result<PlatformConnection<Self::Stream, Self::Connection>, ConnectError>>
+ Unpin
+ Send
+ 'static;
type StreamDataFuture: Future<Output = ()> + Unpin + Send + 'static;
type NextSubstreamFuture: Future<Output = Option<(Self::Stream, PlatformSubstreamDirection)>>
+ Unpin
+ Send
+ 'static;
type ConnectionDataFuture: Future<Output = ()> + Unpin + Send + 'static;

/// Returns the time elapsed since [the Unix Epoch](https://en.wikipedia.org/wiki/Unix_time)
/// (i.e. 00:00:00 UTC on 1 January 1970), ignoring leap seconds.
Expand All @@ -130,37 +141,74 @@ pub trait Platform: Send + 'static {
/// returned where [`ConnectError::is_bad_addr`] is `true`.
fn connect(url: &str) -> Self::ConnectFuture;

/// Returns a future that becomes ready when either the read buffer of the given connection
/// Queues the opening of an additional outbound substream.
///
/// The substream, once opened, must be yielded by [`Platform::next_substream`].
fn open_out_substream(connection: &mut Self::Connection);

/// Waits until a new incoming substream arrives on the connection.
///
/// This returns both inbound and outbound substreams. Outbound substreams should only be
/// yielded once for every call to [`Platform::open_out_substream`].
///
/// The future can also return `None` if the connection has been killed by the remote. If
/// the future returns `None`, the user of the `Platform` should drop the `Connection` and
/// all its associated `Stream`s as soon as possible.
fn next_substream(connection: &mut Self::Connection) -> Self::NextSubstreamFuture;

/// Returns a future that becomes ready when either the read buffer of the given stream
/// contains data, or the remote has closed their sending side.
///
/// The future is immediately ready if data is already available or the remote has already
/// closed their sending side.
///
/// This function can be called multiple times with the same connection, in which case all
/// This function can be called multiple times with the same stream, in which case all
/// the futures must be notified. The user of this function, however, is encouraged to
/// maintain only one active future.
///
/// If the future is polled after the connection object has been dropped, the behavior is
/// If the future is polled after the stream object has been dropped, the behavior is
/// not specified. The polling might panic, or return `Ready`, or return `Pending`.
fn wait_more_data(connection: &mut Self::Connection) -> Self::ConnectionDataFuture;
fn wait_more_data(stream: &mut Self::Stream) -> Self::StreamDataFuture;

/// Gives access to the content of the read buffer of the given connection.
/// Gives access to the content of the read buffer of the given stream.
///
/// Returns `None` if the remote has closed their sending side.
fn read_buffer(connection: &mut Self::Connection) -> Option<&[u8]>;
/// Returns `None` if the remote has closed their sending side or if the stream has been
/// reset.
fn read_buffer(stream: &mut Self::Stream) -> Option<&[u8]>;

/// Discards the first `bytes` bytes of the read buffer of this connection. This makes it
/// Discards the first `bytes` bytes of the read buffer of this stream. This makes it
/// possible for the remote to send more data.
///
/// # Panic
///
/// Panics if there aren't enough bytes to discard in the buffer.
///
fn advance_read_cursor(connection: &mut Self::Connection, bytes: usize);
fn advance_read_cursor(stream: &mut Self::Stream, bytes: usize);

/// Queues the given bytes to be sent out on the given connection.
// TODO: back-pressure
fn send(connection: &mut Self::Connection, data: &[u8]);
// TODO: allow closing sending side
fn send(stream: &mut Self::Stream, data: &[u8]);
}

/// Type of opened connection. See [`Platform::connect`].
#[derive(Debug)]
pub enum PlatformConnection<TStream, TConnection> {
/// The connection is a single stream on top of which encryption and multiplexing should be
/// negotiatied. The division in multiple substreams is handled internally.
SingleStream(TStream),
/// The connection is made of multiple substreams. The encryption and multiplexing are handled
/// externally.
MultiStream(TConnection, peer_id::PeerId),
}

/// Direction in which a substream has been opened. See [`Platform::next_substream`].
#[derive(Debug)]
pub enum PlatformSubstreamDirection {
/// Substream has been opened by the remote.
Inbound,
/// Substream has been opened locally in response to [`Platform::open_out_substream`].
Outbound,
}

/// Error potentially returned by [`Platform::connect`].
Expand Down
Loading

0 comments on commit f1e2a38

Please sign in to comment.