Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clarify platform reading API #2925

Merged
merged 10 commits into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 55 additions & 34 deletions bin/light-base/src/network_service/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use super::Shared;
use crate::platform::{Platform, PlatformConnection, PlatformSubstreamDirection};
use crate::platform::{Platform, PlatformConnection, PlatformSubstreamDirection, ReadBuffer};

use alloc::{string::ToString as _, sync::Arc, vec, vec::Vec};
use core::{iter, pin::Pin};
Expand Down Expand Up @@ -221,35 +221,57 @@ async fn single_stream_connection_task<TPlat: Platform>(
connection_task.inject_coordinator_message(message);
}

// TODO: `TPlat::read_buffer` can return `None` if the connection has been abruptly closed, should we handle this and stop reading/writing? cc https://github.com/paritytech/smoldot/issues/2782

// Perform a read-write. This updates the internal state of the connection task.
let now = TPlat::now();
let mut read_write = ReadWrite {
now: now.clone(),
incoming_buffer: TPlat::read_buffer(&mut connection),
outgoing_buffer: Some((&mut write_buffer, &mut [])), // TODO: this should be None if a previous read_write() produced None
read_bytes: 0,
written_bytes: 0,
wake_up_after: None,
};
connection_task.read_write(&mut read_write);

// Because the `read_write` object borrows the connection, we need to drop it before we
// can modify the connection. Before dropping the `read_write`, clone some important
// information from it.
let read_buffer_has_data = read_write.incoming_buffer.map_or(false, |b| !b.is_empty());
let read_buffer_closed = read_write.incoming_buffer.is_none();
let read_bytes = read_write.read_bytes;
let written_bytes = read_write.written_bytes;
let wake_up_after = read_write.wake_up_after.clone();
drop(read_write);

// Now update the connection.
if written_bytes != 0 {
TPlat::send(&mut connection, &write_buffer[..written_bytes]);
}
TPlat::advance_read_cursor(&mut connection, read_bytes);

let (read_bytes, read_buffer_has_data, read_buffer_closed, written_bytes, wake_up_after) =
if !connection_task.is_reset_called() {
let incoming_buffer = match TPlat::read_buffer(&mut connection) {
ReadBuffer::Reset => {
connection_task.reset();
continue;
}
ReadBuffer::Open(b) => Some(b),
ReadBuffer::Closed => None,
};

// Perform a read-write. This updates the internal state of the connection task.
let mut read_write = ReadWrite {
now: now.clone(),
incoming_buffer,
outgoing_buffer: Some((&mut write_buffer, &mut [])), // TODO: this should be None if a previous read_write() produced None
read_bytes: 0,
written_bytes: 0,
wake_up_after: None,
};
connection_task.read_write(&mut read_write);

// Because the `read_write` object borrows the connection, we need to drop it before we
// can modify the connection. Before dropping the `read_write`, clone some important
// information from it.
let read_buffer_has_data =
read_write.incoming_buffer.map_or(false, |b| !b.is_empty());
let read_buffer_closed = read_write.incoming_buffer.is_none();
let read_bytes = read_write.read_bytes;
let written_bytes = read_write.written_bytes;
let wake_up_after = read_write.wake_up_after.clone();
drop(read_write);

// Now update the connection.
if written_bytes != 0 {
TPlat::send(&mut connection, &write_buffer[..written_bytes]);
}
TPlat::advance_read_cursor(&mut connection, read_bytes);

(
read_bytes,
read_buffer_has_data,
read_buffer_closed,
written_bytes,
wake_up_after,
)
} else {
(0, false, true, 0, None)
};

// Try pull message to send to the coordinator.

Expand Down Expand Up @@ -439,11 +461,10 @@ async fn webrtc_multi_stream_connection_task<TPlat: Platform>(
let substream = &mut open_substreams[substream_id];

let incoming_buffer = match TPlat::read_buffer(substream) {
Some(buf) => buf,
None => {
// In the case of WebRTC, `read_buffer` returns `None` only if the
// substream has been reset by the remote. We inform the connection task,
// and the substream is now considered dead.
ReadBuffer::Open(buf) => buf,
ReadBuffer::Closed => panic!(), // Forbidden for WebRTC.
ReadBuffer::Reset => {
// Inform the connection task. The substream is now considered dead.
connection_task.reset_substream(&substream_id);
open_substreams.remove(substream_id);
break;
Expand Down
22 changes: 17 additions & 5 deletions bin/light-base/src/platform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,7 @@ pub trait Platform: Send + 'static {
fn wait_more_data(stream: &mut Self::Stream) -> Self::StreamDataFuture;

/// Gives access to the content of the read buffer of the given stream.
///
/// Returns `None` if the remote has closed their sending side or if the stream has been
/// reset. In the case of [`PlatformConnection::MultiStreamWebRtc`], only the stream having
/// been reset applies.
fn read_buffer(stream: &mut Self::Stream) -> Option<&[u8]>;
fn read_buffer(stream: &mut Self::Stream) -> ReadBuffer;

/// Discards the first `bytes` bytes of the read buffer of this stream. This makes it
/// possible for the remote to send more data.
Expand Down Expand Up @@ -170,3 +166,19 @@ pub struct ConnectError {
/// `true` if the error is caused by the address to connect to being forbidden or unsupported.
pub is_bad_addr: bool,
}

/// State of the read buffer, as returned by [`Platform::read_buffer`].
#[derive(Debug)]
pub enum ReadBuffer<'a> {
/// Reading side of the stream is fully open. Contains the data waiting to be processed.
Open(&'a [u8]),

/// The reading side of the stream has been closed by the remote.
///
/// Note that this is forbidden for connections of
/// type [`PlatformConnection::MultiStreamWebRtc`].
Closed,

/// The stream has been abprutly closed by the remote.
Reset,
}
11 changes: 6 additions & 5 deletions bin/light-base/src/platform/async_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#![cfg(feature = "std")]
#![cfg_attr(docsrs, doc(cfg(feature = "std")))]

use super::{ConnectError, Platform, PlatformConnection, PlatformSubstreamDirection};
use super::{ConnectError, Platform, PlatformConnection, PlatformSubstreamDirection, ReadBuffer};

use alloc::{collections::VecDeque, sync::Arc};
use core::{pin::Pin, str, task::Poll, time::Duration};
Expand Down Expand Up @@ -286,9 +286,10 @@ impl Platform for AsyncStdTcpWebSocket {
}))
}

fn read_buffer(stream: &mut Self::Stream) -> Option<&[u8]> {
fn read_buffer(stream: &mut Self::Stream) -> ReadBuffer {
if stream.read_buffer.is_none() {
return None;
// TODO: the implementation doesn't let us differentiate between Closed and Reset
return ReadBuffer::Reset;
}

let mut lock = stream.read_data_rx.lock();
Expand All @@ -297,12 +298,12 @@ impl Platform for AsyncStdTcpWebSocket {
Some(b) => stream.read_buffer.as_mut().unwrap().extend(b),
None => {
stream.read_buffer = None;
return None;
return ReadBuffer::Reset;
}
}
}

Some(stream.read_buffer.as_ref().unwrap())
ReadBuffer::Open(stream.read_buffer.as_ref().unwrap())
}

fn advance_read_cursor(stream: &mut Self::Stream, bytes: usize) {
Expand Down
4 changes: 4 additions & 0 deletions bin/wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
- The `payment_queryInfo` JSON-RPC function now works with runtimes that have defined the type of `Balance` to be less than 16 bytes. ([#2914](https://github.com/paritytech/smoldot/pull/2914))
- The parameter of `chainHead_unstable_finalizedDatabase` has been renamed from `max_size_bytes` to `maxSizeBytes`. ([#2923](https://github.com/paritytech/smoldot/pull/2923))

### Fixed

- Fix errors showing in the browser's console about WebSockets being already in the CLOSED or CLOSED state. ([#2925](https://github.com/paritytech/smoldot/pull/2925))
tomaka marked this conversation as resolved.
Show resolved Hide resolved

## 0.7.3 - 2022-10-19

### Changed
Expand Down
14 changes: 7 additions & 7 deletions bin/wasm-node/javascript/src/index-browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,14 @@ export function start(options?: ClientOptions): Client {
};
connection.onclose = (event) => {
const message = "Error code " + event.code + (!!event.reason ? (": " + event.reason) : "");
config.onConnectionClose(message);
config.onConnectionReset(message);
};
connection.onmessage = (msg) => {
config.onMessage(new Uint8Array(msg.data as ArrayBuffer));
};

return {
close: (): void => {
reset: (): void => {
connection.onopen = null;
connection.onclose = null;
connection.onmessage = null;
Expand Down Expand Up @@ -167,11 +167,11 @@ export function start(options?: ClientOptions): Client {
};

dataChannel.onerror = (_error) => {
config.onStreamClose(dataChannelId);
config.onStreamReset(dataChannelId);
};

dataChannel.onclose = () => {
config.onStreamClose(dataChannelId);
config.onStreamReset(dataChannelId);
};

dataChannel.onmessage = (m) => {
Expand Down Expand Up @@ -226,7 +226,7 @@ export function start(options?: ClientOptions): Client {
if (localTlsCertificateHex === undefined) {
// Because we've already returned from the `connect` function at this point, we pretend
// that the connection has failed to open.
config.onConnectionClose('Failed to obtain the browser certificate fingerprint');
config.onConnectionReset('Failed to obtain the browser certificate fingerprint');
return;
}
const localTlsCertificateMultihash = new Uint8Array(34);
Expand All @@ -240,7 +240,7 @@ export function start(options?: ClientOptions): Client {
// open.
pc.onconnectionstatechange = (_event) => {
if (pc!.connectionState == "closed" || pc!.connectionState == "disconnected" || pc!.connectionState == "failed") {
config.onConnectionClose("WebRTC state transitioned to " + pc!.connectionState);
config.onConnectionReset("WebRTC state transitioned to " + pc!.connectionState);

pc!.onconnectionstatechange = null;
pc!.onnegotiationneeded = null;
Expand Down Expand Up @@ -361,7 +361,7 @@ export function start(options?: ClientOptions): Client {
});

return {
close: (streamId: number | undefined): void => {
reset: (streamId: number | undefined): void => {
// If `streamId` is undefined, then the whole connection must be destroyed.
if (streamId === undefined) {
// The `RTCPeerConnection` is created at the same time as we report the connection as
Expand Down
8 changes: 4 additions & 4 deletions bin/wasm-node/javascript/src/index-deno.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean
};
connection.socket.onclose = (event) => {
const message = "Error code " + event.code + (!!event.reason ? (": " + event.reason) : "");
config.onConnectionClose(message);
config.onConnectionReset(message);
};
connection.socket.onmessage = (msg) => {
config.onMessage(new Uint8Array(msg.data as ArrayBuffer));
Expand Down Expand Up @@ -191,7 +191,7 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean
// The socket is reported closed, but `socket.destroyed` is still `false` (see
// check above). As such, we must inform the inner layers.
socket.destroyed = true;
config.onConnectionClose(outcome === null ? "EOF when reading socket" : outcome);
config.onConnectionReset(outcome === null ? "EOF when reading socket" : outcome);
return;
}
console.assert(outcome !== 0); // `read` guarantees to return a non-zero value.
Expand All @@ -208,7 +208,7 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean
}

return {
close: (): void => {
reset: (): void => {
if (connection.ty == 'websocket') {
// WebSocket
// We can't set these fields to null because the TypeScript definitions don't
Expand Down Expand Up @@ -249,7 +249,7 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean
// The socket is reported closed, but `socket.destroyed` is still
// `false` (see check above). As such, we must inform the inner layers.
socket.destroyed = true;
config.onConnectionClose(outcome);
config.onConnectionReset(outcome);
return c;
}
// Note that, contrary to `read`, it is possible for `outcome` to be 0.
Expand Down
8 changes: 4 additions & 4 deletions bin/wasm-node/javascript/src/index-nodejs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,14 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean
};
socket.onclose = (event) => {
const message = "Error code " + event.code + (!!event.reason ? (": " + event.reason) : "");
config.onConnectionClose(message);
config.onConnectionReset(message);
socket.onopen = () => { };
socket.onclose = () => { };
socket.onmessage = () => { };
socket.onerror = () => { };
};
socket.onerror = (event) => {
config.onConnectionClose(event.message);
config.onConnectionReset(event.message);
socket.onopen = () => { };
socket.onclose = () => { };
socket.onmessage = () => { };
Expand Down Expand Up @@ -150,7 +150,7 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean
// NodeJS doesn't provide a reason why the closing happened, but only
// whether it was caused by an error.
const message = hasError ? "Error" : "Closed gracefully";
config.onConnectionClose(message);
config.onConnectionReset(message);
});
connection.socket.on('error', () => { });
connection.socket.on('data', (message) => {
Expand All @@ -163,7 +163,7 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean
}

return {
close: (): void => {
reset: (): void => {
if (connection.ty == 'websocket') {
// WebSocket
// We can't set these fields to null because the TypeScript definitions don't
Expand Down
Loading