Skip to content

Commit

Permalink
Clarify platform reading API (#2925)
Browse files Browse the repository at this point in the history
At the moment, the `read_buffer` function of the `Platform` trait
(representing how the light client binds to the environment) returns
either `Some` in normal situations or `None` if the connection has been
either closed or reset.

This PR turns this `Option` into an enum with three variants: `Open`,
`Closed`, and `Reset`.
This lets the code properly differentiate between these two latter
situations.

Additionally, I've renamed "closed" to "reset" in the entire Wasm<->JS
code (all the functions and variables), for clarity. While the
`Platform` trait lets you report that the reading side is closed but the
writing side still open, the JS code can't represent that state.
Instead, it's either "connection open" or "connection reset".

This PR fixes #2782. The
issue was that we were treating `read_buffer()` returning `None` as
meaning "reading side closed", but actually the entire connection was
already reset. We would then call `send` on the already-reset
connection, leading to the error. This kind of confusion is exactly why
I've performed the rename.

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
tomaka and mergify[bot] authored Oct 27, 2022
1 parent a413d18 commit 3c407df
Show file tree
Hide file tree
Showing 12 changed files with 183 additions and 129 deletions.
89 changes: 55 additions & 34 deletions bin/light-base/src/network_service/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use super::Shared;
use crate::platform::{Platform, PlatformConnection, PlatformSubstreamDirection};
use crate::platform::{Platform, PlatformConnection, PlatformSubstreamDirection, ReadBuffer};

use alloc::{string::ToString as _, sync::Arc, vec, vec::Vec};
use core::{iter, pin::Pin};
Expand Down Expand Up @@ -221,35 +221,57 @@ async fn single_stream_connection_task<TPlat: Platform>(
connection_task.inject_coordinator_message(message);
}

// TODO: `TPlat::read_buffer` can return `None` if the connection has been abruptly closed, should we handle this and stop reading/writing? cc https://github.com/paritytech/smoldot/issues/2782

// Perform a read-write. This updates the internal state of the connection task.
let now = TPlat::now();
let mut read_write = ReadWrite {
now: now.clone(),
incoming_buffer: TPlat::read_buffer(&mut connection),
outgoing_buffer: Some((&mut write_buffer, &mut [])), // TODO: this should be None if a previous read_write() produced None
read_bytes: 0,
written_bytes: 0,
wake_up_after: None,
};
connection_task.read_write(&mut read_write);

// Because the `read_write` object borrows the connection, we need to drop it before we
// can modify the connection. Before dropping the `read_write`, clone some important
// information from it.
let read_buffer_has_data = read_write.incoming_buffer.map_or(false, |b| !b.is_empty());
let read_buffer_closed = read_write.incoming_buffer.is_none();
let read_bytes = read_write.read_bytes;
let written_bytes = read_write.written_bytes;
let wake_up_after = read_write.wake_up_after.clone();
drop(read_write);

// Now update the connection.
if written_bytes != 0 {
TPlat::send(&mut connection, &write_buffer[..written_bytes]);
}
TPlat::advance_read_cursor(&mut connection, read_bytes);

let (read_bytes, read_buffer_has_data, read_buffer_closed, written_bytes, wake_up_after) =
if !connection_task.is_reset_called() {
let incoming_buffer = match TPlat::read_buffer(&mut connection) {
ReadBuffer::Reset => {
connection_task.reset();
continue;
}
ReadBuffer::Open(b) => Some(b),
ReadBuffer::Closed => None,
};

// Perform a read-write. This updates the internal state of the connection task.
let mut read_write = ReadWrite {
now: now.clone(),
incoming_buffer,
outgoing_buffer: Some((&mut write_buffer, &mut [])), // TODO: this should be None if a previous read_write() produced None
read_bytes: 0,
written_bytes: 0,
wake_up_after: None,
};
connection_task.read_write(&mut read_write);

// Because the `read_write` object borrows the connection, we need to drop it before we
// can modify the connection. Before dropping the `read_write`, clone some important
// information from it.
let read_buffer_has_data =
read_write.incoming_buffer.map_or(false, |b| !b.is_empty());
let read_buffer_closed = read_write.incoming_buffer.is_none();
let read_bytes = read_write.read_bytes;
let written_bytes = read_write.written_bytes;
let wake_up_after = read_write.wake_up_after.clone();
drop(read_write);

// Now update the connection.
if written_bytes != 0 {
TPlat::send(&mut connection, &write_buffer[..written_bytes]);
}
TPlat::advance_read_cursor(&mut connection, read_bytes);

(
read_bytes,
read_buffer_has_data,
read_buffer_closed,
written_bytes,
wake_up_after,
)
} else {
(0, false, true, 0, None)
};

// Try pull message to send to the coordinator.

Expand Down Expand Up @@ -439,11 +461,10 @@ async fn webrtc_multi_stream_connection_task<TPlat: Platform>(
let substream = &mut open_substreams[substream_id];

let incoming_buffer = match TPlat::read_buffer(substream) {
Some(buf) => buf,
None => {
// In the case of WebRTC, `read_buffer` returns `None` only if the
// substream has been reset by the remote. We inform the connection task,
// and the substream is now considered dead.
ReadBuffer::Open(buf) => buf,
ReadBuffer::Closed => panic!(), // Forbidden for WebRTC.
ReadBuffer::Reset => {
// Inform the connection task. The substream is now considered dead.
connection_task.reset_substream(&substream_id);
open_substreams.remove(substream_id);
break;
Expand Down
22 changes: 17 additions & 5 deletions bin/light-base/src/platform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,7 @@ pub trait Platform: Send + 'static {
fn wait_more_data(stream: &mut Self::Stream) -> Self::StreamDataFuture;

/// Gives access to the content of the read buffer of the given stream.
///
/// Returns `None` if the remote has closed their sending side or if the stream has been
/// reset. In the case of [`PlatformConnection::MultiStreamWebRtc`], only the stream having
/// been reset applies.
fn read_buffer(stream: &mut Self::Stream) -> Option<&[u8]>;
fn read_buffer(stream: &mut Self::Stream) -> ReadBuffer;

/// Discards the first `bytes` bytes of the read buffer of this stream. This makes it
/// possible for the remote to send more data.
Expand Down Expand Up @@ -170,3 +166,19 @@ pub struct ConnectError {
/// `true` if the error is caused by the address to connect to being forbidden or unsupported.
pub is_bad_addr: bool,
}

/// State of the read buffer, as returned by [`Platform::read_buffer`].
#[derive(Debug)]
pub enum ReadBuffer<'a> {
/// Reading side of the stream is fully open. Contains the data waiting to be processed.
Open(&'a [u8]),

/// The reading side of the stream has been closed by the remote.
///
/// Note that this is forbidden for connections of
/// type [`PlatformConnection::MultiStreamWebRtc`].
Closed,

/// The stream has been abruptly closed by the remote.
Reset,
}
11 changes: 6 additions & 5 deletions bin/light-base/src/platform/async_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#![cfg(feature = "std")]
#![cfg_attr(docsrs, doc(cfg(feature = "std")))]

use super::{ConnectError, Platform, PlatformConnection, PlatformSubstreamDirection};
use super::{ConnectError, Platform, PlatformConnection, PlatformSubstreamDirection, ReadBuffer};

use alloc::{collections::VecDeque, sync::Arc};
use core::{pin::Pin, str, task::Poll, time::Duration};
Expand Down Expand Up @@ -286,9 +286,10 @@ impl Platform for AsyncStdTcpWebSocket {
}))
}

fn read_buffer(stream: &mut Self::Stream) -> Option<&[u8]> {
fn read_buffer(stream: &mut Self::Stream) -> ReadBuffer {
if stream.read_buffer.is_none() {
return None;
// TODO: the implementation doesn't let us differentiate between Closed and Reset
return ReadBuffer::Reset;
}

let mut lock = stream.read_data_rx.lock();
Expand All @@ -297,12 +298,12 @@ impl Platform for AsyncStdTcpWebSocket {
Some(b) => stream.read_buffer.as_mut().unwrap().extend(b),
None => {
stream.read_buffer = None;
return None;
return ReadBuffer::Reset;
}
}
}

Some(stream.read_buffer.as_ref().unwrap())
ReadBuffer::Open(stream.read_buffer.as_ref().unwrap())
}

fn advance_read_cursor(stream: &mut Self::Stream, bytes: usize) {
Expand Down
4 changes: 4 additions & 0 deletions bin/wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
- The parameter of `chainHead_unstable_finalizedDatabase` has been renamed from `max_size_bytes` to `maxSizeBytes`. ([#2923](https://github.com/paritytech/smoldot/pull/2923))
- The database now contains the hash of the genesis block header. This hash is verified when the database is loaded, and the database is ignored if there is a mismatch. This prevents accidents where the wrong database is provided, which would lead to the chain not working and would be hard to debug. ([#2928](https://github.com/paritytech/smoldot/pull/2928))

### Fixed

- Fix errors showing in the browser's console about WebSockets being already in the CLOSING or CLOSED state. ([#2925](https://github.com/paritytech/smoldot/pull/2925))

## 0.7.3 - 2022-10-19

### Changed
Expand Down
14 changes: 7 additions & 7 deletions bin/wasm-node/javascript/src/index-browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,14 @@ export function start(options?: ClientOptions): Client {
};
connection.onclose = (event) => {
const message = "Error code " + event.code + (!!event.reason ? (": " + event.reason) : "");
config.onConnectionClose(message);
config.onConnectionReset(message);
};
connection.onmessage = (msg) => {
config.onMessage(new Uint8Array(msg.data as ArrayBuffer));
};

return {
close: (): void => {
reset: (): void => {
connection.onopen = null;
connection.onclose = null;
connection.onmessage = null;
Expand Down Expand Up @@ -167,11 +167,11 @@ export function start(options?: ClientOptions): Client {
};

dataChannel.onerror = (_error) => {
config.onStreamClose(dataChannelId);
config.onStreamReset(dataChannelId);
};

dataChannel.onclose = () => {
config.onStreamClose(dataChannelId);
config.onStreamReset(dataChannelId);
};

dataChannel.onmessage = (m) => {
Expand Down Expand Up @@ -226,7 +226,7 @@ export function start(options?: ClientOptions): Client {
if (localTlsCertificateHex === undefined) {
// Because we've already returned from the `connect` function at this point, we pretend
// that the connection has failed to open.
config.onConnectionClose('Failed to obtain the browser certificate fingerprint');
config.onConnectionReset('Failed to obtain the browser certificate fingerprint');
return;
}
const localTlsCertificateMultihash = new Uint8Array(34);
Expand All @@ -240,7 +240,7 @@ export function start(options?: ClientOptions): Client {
// open.
pc.onconnectionstatechange = (_event) => {
if (pc!.connectionState == "closed" || pc!.connectionState == "disconnected" || pc!.connectionState == "failed") {
config.onConnectionClose("WebRTC state transitioned to " + pc!.connectionState);
config.onConnectionReset("WebRTC state transitioned to " + pc!.connectionState);

pc!.onconnectionstatechange = null;
pc!.onnegotiationneeded = null;
Expand Down Expand Up @@ -361,7 +361,7 @@ export function start(options?: ClientOptions): Client {
});

return {
close: (streamId: number | undefined): void => {
reset: (streamId: number | undefined): void => {
// If `streamId` is undefined, then the whole connection must be destroyed.
if (streamId === undefined) {
// The `RTCPeerConnection` is created at the same time as we report the connection as
Expand Down
8 changes: 4 additions & 4 deletions bin/wasm-node/javascript/src/index-deno.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean
};
connection.socket.onclose = (event) => {
const message = "Error code " + event.code + (!!event.reason ? (": " + event.reason) : "");
config.onConnectionClose(message);
config.onConnectionReset(message);
};
connection.socket.onmessage = (msg) => {
config.onMessage(new Uint8Array(msg.data as ArrayBuffer));
Expand Down Expand Up @@ -191,7 +191,7 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean
// The socket is reported closed, but `socket.destroyed` is still `false` (see
// check above). As such, we must inform the inner layers.
socket.destroyed = true;
config.onConnectionClose(outcome === null ? "EOF when reading socket" : outcome);
config.onConnectionReset(outcome === null ? "EOF when reading socket" : outcome);
return;
}
console.assert(outcome !== 0); // `read` guarantees to return a non-zero value.
Expand All @@ -208,7 +208,7 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean
}

return {
close: (): void => {
reset: (): void => {
if (connection.ty == 'websocket') {
// WebSocket
// We can't set these fields to null because the TypeScript definitions don't
Expand Down Expand Up @@ -249,7 +249,7 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean
// The socket is reported closed, but `socket.destroyed` is still
// `false` (see check above). As such, we must inform the inner layers.
socket.destroyed = true;
config.onConnectionClose(outcome);
config.onConnectionReset(outcome);
return c;
}
// Note that, contrary to `read`, it is possible for `outcome` to be 0.
Expand Down
8 changes: 4 additions & 4 deletions bin/wasm-node/javascript/src/index-nodejs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,14 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean
};
socket.onclose = (event) => {
const message = "Error code " + event.code + (!!event.reason ? (": " + event.reason) : "");
config.onConnectionClose(message);
config.onConnectionReset(message);
socket.onopen = () => { };
socket.onclose = () => { };
socket.onmessage = () => { };
socket.onerror = () => { };
};
socket.onerror = (event) => {
config.onConnectionClose(event.message);
config.onConnectionReset(event.message);
socket.onopen = () => { };
socket.onclose = () => { };
socket.onmessage = () => { };
Expand Down Expand Up @@ -150,7 +150,7 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean
// NodeJS doesn't provide a reason why the closing happened, but only
// whether it was caused by an error.
const message = hasError ? "Error" : "Closed gracefully";
config.onConnectionClose(message);
config.onConnectionReset(message);
});
connection.socket.on('error', () => { });
connection.socket.on('data', (message) => {
Expand All @@ -163,7 +163,7 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean
}

return {
close: (): void => {
reset: (): void => {
if (connection.ty == 'websocket') {
// WebSocket
// We can't set these fields to null because the TypeScript definitions don't
Expand Down
Loading

0 comments on commit 3c407df

Please sign in to comment.