From 3c407dff249fb9cf97ad229814f6a10e84fc7a32 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 27 Oct 2022 12:34:06 +0200 Subject: [PATCH] Clarify platform reading API (#2925) At the moment, the `read_buffer` function of the `Platform` trait (representing how the light client binds to the environment) returns either `Some` in normal situations or `None` if the connection has been either closed or reset. This PR turns this `Option` into an enum with three variants: `Open`, `Closed`, and `Reset`. This lets the code properly differentiate between these two latter situations. Additionally, I've renamed "closed" to "reset" in the entire Wasm<->JS code (all the functions and variables), for clarity. While the `Platform` trait lets you report that the reading side is closed but the writing side still open, the JS code can't represent that state. Instead, it's either "connection open" or "connection reset". This PR fixes https://github.com/paritytech/smoldot/issues/2782. The issue was that we were treating `read_buffer()` returning `None` as meaning "reading side closed", but actually the entire connection was already reset. We would then call `send` on the already-reset connection, leading to the error. This kind of confusion is exactly why I've performed the rename. Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- bin/light-base/src/network_service/tasks.rs | 89 ++++++++++++------- bin/light-base/src/platform.rs | 22 +++-- bin/light-base/src/platform/async_std.rs | 11 +-- bin/wasm-node/CHANGELOG.md | 4 + bin/wasm-node/javascript/src/index-browser.ts | 14 +-- bin/wasm-node/javascript/src/index-deno.ts | 8 +- bin/wasm-node/javascript/src/index-nodejs.ts | 8 +- .../src/instance/bindings-smoldot-light.ts | 44 ++++----- .../javascript/src/instance/bindings.ts | 4 +- bin/wasm-node/rust/src/bindings.rs | 30 +++---- bin/wasm-node/rust/src/platform.rs | 66 +++++++------- src/libp2p/collection/single_stream.rs | 12 +++ 12 files changed, 183 insertions(+), 129 deletions(-) diff --git a/bin/light-base/src/network_service/tasks.rs b/bin/light-base/src/network_service/tasks.rs index f314e13686..ed7338a697 100644 --- a/bin/light-base/src/network_service/tasks.rs +++ b/bin/light-base/src/network_service/tasks.rs @@ -16,7 +16,7 @@ // along with this program. If not, see . use super::Shared; -use crate::platform::{Platform, PlatformConnection, PlatformSubstreamDirection}; +use crate::platform::{Platform, PlatformConnection, PlatformSubstreamDirection, ReadBuffer}; use alloc::{string::ToString as _, sync::Arc, vec, vec::Vec}; use core::{iter, pin::Pin}; @@ -221,35 +221,57 @@ async fn single_stream_connection_task( connection_task.inject_coordinator_message(message); } - // TODO: `TPlat::read_buffer` can return `None` if the connection has been abruptly closed, should we handle this and stop reading/writing? cc https://github.com/paritytech/smoldot/issues/2782 - - // Perform a read-write. This updates the internal state of the connection task. let now = TPlat::now(); - let mut read_write = ReadWrite { - now: now.clone(), - incoming_buffer: TPlat::read_buffer(&mut connection), - outgoing_buffer: Some((&mut write_buffer, &mut [])), // TODO: this should be None if a previous read_write() produced None - read_bytes: 0, - written_bytes: 0, - wake_up_after: None, - }; - connection_task.read_write(&mut read_write); - - // Because the `read_write` object borrows the connection, we need to drop it before we - // can modify the connection. Before dropping the `read_write`, clone some important - // information from it. - let read_buffer_has_data = read_write.incoming_buffer.map_or(false, |b| !b.is_empty()); - let read_buffer_closed = read_write.incoming_buffer.is_none(); - let read_bytes = read_write.read_bytes; - let written_bytes = read_write.written_bytes; - let wake_up_after = read_write.wake_up_after.clone(); - drop(read_write); - - // Now update the connection. - if written_bytes != 0 { - TPlat::send(&mut connection, &write_buffer[..written_bytes]); - } - TPlat::advance_read_cursor(&mut connection, read_bytes); + + let (read_bytes, read_buffer_has_data, read_buffer_closed, written_bytes, wake_up_after) = + if !connection_task.is_reset_called() { + let incoming_buffer = match TPlat::read_buffer(&mut connection) { + ReadBuffer::Reset => { + connection_task.reset(); + continue; + } + ReadBuffer::Open(b) => Some(b), + ReadBuffer::Closed => None, + }; + + // Perform a read-write. This updates the internal state of the connection task. + let mut read_write = ReadWrite { + now: now.clone(), + incoming_buffer, + outgoing_buffer: Some((&mut write_buffer, &mut [])), // TODO: this should be None if a previous read_write() produced None + read_bytes: 0, + written_bytes: 0, + wake_up_after: None, + }; + connection_task.read_write(&mut read_write); + + // Because the `read_write` object borrows the connection, we need to drop it before we + // can modify the connection. Before dropping the `read_write`, clone some important + // information from it. + let read_buffer_has_data = + read_write.incoming_buffer.map_or(false, |b| !b.is_empty()); + let read_buffer_closed = read_write.incoming_buffer.is_none(); + let read_bytes = read_write.read_bytes; + let written_bytes = read_write.written_bytes; + let wake_up_after = read_write.wake_up_after.clone(); + drop(read_write); + + // Now update the connection. + if written_bytes != 0 { + TPlat::send(&mut connection, &write_buffer[..written_bytes]); + } + TPlat::advance_read_cursor(&mut connection, read_bytes); + + ( + read_bytes, + read_buffer_has_data, + read_buffer_closed, + written_bytes, + wake_up_after, + ) + } else { + (0, false, true, 0, None) + }; // Try pull message to send to the coordinator. @@ -439,11 +461,10 @@ async fn webrtc_multi_stream_connection_task( let substream = &mut open_substreams[substream_id]; let incoming_buffer = match TPlat::read_buffer(substream) { - Some(buf) => buf, - None => { - // In the case of WebRTC, `read_buffer` returns `None` only if the - // substream has been reset by the remote. We inform the connection task, - // and the substream is now considered dead. + ReadBuffer::Open(buf) => buf, + ReadBuffer::Closed => panic!(), // Forbidden for WebRTC. + ReadBuffer::Reset => { + // Inform the connection task. The substream is now considered dead. connection_task.reset_substream(&substream_id); open_substreams.remove(substream_id); break; diff --git a/bin/light-base/src/platform.rs b/bin/light-base/src/platform.rs index 2b43da63b2..4c4433e29f 100644 --- a/bin/light-base/src/platform.rs +++ b/bin/light-base/src/platform.rs @@ -107,11 +107,7 @@ pub trait Platform: Send + 'static { fn wait_more_data(stream: &mut Self::Stream) -> Self::StreamDataFuture; /// Gives access to the content of the read buffer of the given stream. - /// - /// Returns `None` if the remote has closed their sending side or if the stream has been - /// reset. In the case of [`PlatformConnection::MultiStreamWebRtc`], only the stream having - /// been reset applies. - fn read_buffer(stream: &mut Self::Stream) -> Option<&[u8]>; + fn read_buffer(stream: &mut Self::Stream) -> ReadBuffer; /// Discards the first `bytes` bytes of the read buffer of this stream. This makes it /// possible for the remote to send more data. @@ -170,3 +166,19 @@ pub struct ConnectError { /// `true` if the error is caused by the address to connect to being forbidden or unsupported. pub is_bad_addr: bool, } + +/// State of the read buffer, as returned by [`Platform::read_buffer`]. +#[derive(Debug)] +pub enum ReadBuffer<'a> { + /// Reading side of the stream is fully open. Contains the data waiting to be processed. + Open(&'a [u8]), + + /// The reading side of the stream has been closed by the remote. + /// + /// Note that this is forbidden for connections of + /// type [`PlatformConnection::MultiStreamWebRtc`]. + Closed, + + /// The stream has been abruptly closed by the remote. + Reset, +} diff --git a/bin/light-base/src/platform/async_std.rs b/bin/light-base/src/platform/async_std.rs index 398fc5aa61..ce530b1b26 100644 --- a/bin/light-base/src/platform/async_std.rs +++ b/bin/light-base/src/platform/async_std.rs @@ -18,7 +18,7 @@ #![cfg(feature = "std")] #![cfg_attr(docsrs, doc(cfg(feature = "std")))] -use super::{ConnectError, Platform, PlatformConnection, PlatformSubstreamDirection}; +use super::{ConnectError, Platform, PlatformConnection, PlatformSubstreamDirection, ReadBuffer}; use alloc::{collections::VecDeque, sync::Arc}; use core::{pin::Pin, str, task::Poll, time::Duration}; @@ -286,9 +286,10 @@ impl Platform for AsyncStdTcpWebSocket { })) } - fn read_buffer(stream: &mut Self::Stream) -> Option<&[u8]> { + fn read_buffer(stream: &mut Self::Stream) -> ReadBuffer { if stream.read_buffer.is_none() { - return None; + // TODO: the implementation doesn't let us differentiate between Closed and Reset + return ReadBuffer::Reset; } let mut lock = stream.read_data_rx.lock(); @@ -297,12 +298,12 @@ impl Platform for AsyncStdTcpWebSocket { Some(b) => stream.read_buffer.as_mut().unwrap().extend(b), None => { stream.read_buffer = None; - return None; + return ReadBuffer::Reset; } } } - Some(stream.read_buffer.as_ref().unwrap()) + ReadBuffer::Open(stream.read_buffer.as_ref().unwrap()) } fn advance_read_cursor(stream: &mut Self::Stream, bytes: usize) { diff --git a/bin/wasm-node/CHANGELOG.md b/bin/wasm-node/CHANGELOG.md index c06c69dbe6..4d2e2ab288 100644 --- a/bin/wasm-node/CHANGELOG.md +++ b/bin/wasm-node/CHANGELOG.md @@ -8,6 +8,10 @@ - The parameter of `chainHead_unstable_finalizedDatabase` has been renamed from `max_size_bytes` to `maxSizeBytes`. ([#2923](https://github.com/paritytech/smoldot/pull/2923)) - The database now contains the hash of the genesis block header. This hash is verified when the database is loaded, and the database is ignored if there is a mismatch. This prevents accidents where the wrong database is provided, which would lead to the chain not working and would be hard to debug. ([#2928](https://github.com/paritytech/smoldot/pull/2928)) +### Fixed + +- Fix errors showing in the browser's console about WebSockets being already in the CLOSING or CLOSED state. ([#2925](https://github.com/paritytech/smoldot/pull/2925)) + ## 0.7.3 - 2022-10-19 ### Changed diff --git a/bin/wasm-node/javascript/src/index-browser.ts b/bin/wasm-node/javascript/src/index-browser.ts index b5657b7ea6..7aaebdeb04 100644 --- a/bin/wasm-node/javascript/src/index-browser.ts +++ b/bin/wasm-node/javascript/src/index-browser.ts @@ -108,14 +108,14 @@ export function start(options?: ClientOptions): Client { }; connection.onclose = (event) => { const message = "Error code " + event.code + (!!event.reason ? (": " + event.reason) : ""); - config.onConnectionClose(message); + config.onConnectionReset(message); }; connection.onmessage = (msg) => { config.onMessage(new Uint8Array(msg.data as ArrayBuffer)); }; return { - close: (): void => { + reset: (): void => { connection.onopen = null; connection.onclose = null; connection.onmessage = null; @@ -167,11 +167,11 @@ export function start(options?: ClientOptions): Client { }; dataChannel.onerror = (_error) => { - config.onStreamClose(dataChannelId); + config.onStreamReset(dataChannelId); }; dataChannel.onclose = () => { - config.onStreamClose(dataChannelId); + config.onStreamReset(dataChannelId); }; dataChannel.onmessage = (m) => { @@ -226,7 +226,7 @@ export function start(options?: ClientOptions): Client { if (localTlsCertificateHex === undefined) { // Because we've already returned from the `connect` function at this point, we pretend // that the connection has failed to open. - config.onConnectionClose('Failed to obtain the browser certificate fingerprint'); + config.onConnectionReset('Failed to obtain the browser certificate fingerprint'); return; } const localTlsCertificateMultihash = new Uint8Array(34); @@ -240,7 +240,7 @@ export function start(options?: ClientOptions): Client { // open. pc.onconnectionstatechange = (_event) => { if (pc!.connectionState == "closed" || pc!.connectionState == "disconnected" || pc!.connectionState == "failed") { - config.onConnectionClose("WebRTC state transitioned to " + pc!.connectionState); + config.onConnectionReset("WebRTC state transitioned to " + pc!.connectionState); pc!.onconnectionstatechange = null; pc!.onnegotiationneeded = null; @@ -361,7 +361,7 @@ export function start(options?: ClientOptions): Client { }); return { - close: (streamId: number | undefined): void => { + reset: (streamId: number | undefined): void => { // If `streamId` is undefined, then the whole connection must be destroyed. if (streamId === undefined) { // The `RTCPeerConnection` is created at the same time as we report the connection as diff --git a/bin/wasm-node/javascript/src/index-deno.ts b/bin/wasm-node/javascript/src/index-deno.ts index 5f36a19f6c..ea97df4789 100644 --- a/bin/wasm-node/javascript/src/index-deno.ts +++ b/bin/wasm-node/javascript/src/index-deno.ts @@ -141,7 +141,7 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean }; connection.socket.onclose = (event) => { const message = "Error code " + event.code + (!!event.reason ? (": " + event.reason) : ""); - config.onConnectionClose(message); + config.onConnectionReset(message); }; connection.socket.onmessage = (msg) => { config.onMessage(new Uint8Array(msg.data as ArrayBuffer)); @@ -191,7 +191,7 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean // The socket is reported closed, but `socket.destroyed` is still `false` (see // check above). As such, we must inform the inner layers. socket.destroyed = true; - config.onConnectionClose(outcome === null ? "EOF when reading socket" : outcome); + config.onConnectionReset(outcome === null ? "EOF when reading socket" : outcome); return; } console.assert(outcome !== 0); // `read` guarantees to return a non-zero value. @@ -208,7 +208,7 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean } return { - close: (): void => { + reset: (): void => { if (connection.ty == 'websocket') { // WebSocket // We can't set these fields to null because the TypeScript definitions don't @@ -249,7 +249,7 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean // The socket is reported closed, but `socket.destroyed` is still // `false` (see check above). As such, we must inform the inner layers. socket.destroyed = true; - config.onConnectionClose(outcome); + config.onConnectionReset(outcome); return c; } // Note that, contrary to `read`, it is possible for `outcome` to be 0. diff --git a/bin/wasm-node/javascript/src/index-nodejs.ts b/bin/wasm-node/javascript/src/index-nodejs.ts index d5c332cf1f..ea61df0981 100644 --- a/bin/wasm-node/javascript/src/index-nodejs.ts +++ b/bin/wasm-node/javascript/src/index-nodejs.ts @@ -108,14 +108,14 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean }; socket.onclose = (event) => { const message = "Error code " + event.code + (!!event.reason ? (": " + event.reason) : ""); - config.onConnectionClose(message); + config.onConnectionReset(message); socket.onopen = () => { }; socket.onclose = () => { }; socket.onmessage = () => { }; socket.onerror = () => { }; }; socket.onerror = (event) => { - config.onConnectionClose(event.message); + config.onConnectionReset(event.message); socket.onopen = () => { }; socket.onclose = () => { }; socket.onmessage = () => { }; @@ -150,7 +150,7 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean // NodeJS doesn't provide a reason why the closing happened, but only // whether it was caused by an error. const message = hasError ? "Error" : "Closed gracefully"; - config.onConnectionClose(message); + config.onConnectionReset(message); }); connection.socket.on('error', () => { }); connection.socket.on('data', (message) => { @@ -163,7 +163,7 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean } return { - close: (): void => { + reset: (): void => { if (connection.ty == 'websocket') { // WebSocket // We can't set these fields to null because the TypeScript definitions don't diff --git a/bin/wasm-node/javascript/src/instance/bindings-smoldot-light.ts b/bin/wasm-node/javascript/src/instance/bindings-smoldot-light.ts index 66207327cd..8e87460fb1 100644 --- a/bin/wasm-node/javascript/src/instance/bindings-smoldot-light.ts +++ b/bin/wasm-node/javascript/src/instance/bindings-smoldot-light.ts @@ -58,11 +58,11 @@ export interface Config { * * - `Opening` (initial state) * - `Open` - * - `Closed` + * - `Reset` * - * When in the `Opening` or `Open` state, the connection can transition to the `Closed` state + * When in the `Opening` or `Open` state, the connection can transition to the `Reset` state * if the remote closes the connection or refuses the connection altogether. When that - * happens, `config.onClosed` is called. Once in the `Closed` state, the connection cannot + * happens, `config.onReset` is called. Once in the `Reset` state, the connection cannot * transition back to another state. * * Initially in the `Opening` state, the connection can transition to the `Open` state if the @@ -75,20 +75,20 @@ export interface Config { */ export interface Connection { /** - * Transitions the connection or one of its substreams to the `Closed` state. + * Transitions the connection or one of its substreams to the `Reset` state. * * If the connection is of type "single-stream", the whole connection must be shut down. * If the connection is of type "multi-stream", a `streamId` can be provided, in which case * only the given substream is shut down. * - * The `config.onClose` or `config.onStreamClose` callbacks are **not** called. + * The `config.onReset` or `config.onStreamReset` callbacks are **not** called. * * The transition is performed in the background. * If the whole connection is to be shut down, none of the callbacks passed to the `Config` - * must be called again. If only a substream is shut down, the `onStreamClose` and `onMessage` + * must be called again. If only a substream is shut down, the `onStreamReset` and `onMessage` * callbacks must not be called again with that substream. */ - close(streamId?: number): void; + reset(streamId?: number): void; /** * Queues data to be sent on the given connection. @@ -139,11 +139,11 @@ export interface ConnectionConfig { ) => void; /** - * Callback called when the connection transitions to the `Closed` state. + * Callback called when the connection transitions to the `Reset` state. * - * It it **not** called if `Connection.close` is manually called by the API user. + * It it **not** called if `Connection.reset` is manually called by the API user. */ - onConnectionClose: (message: string) => void; + onConnectionReset: (message: string) => void; /** * Callback called when a new substream has been opened. @@ -153,13 +153,13 @@ export interface ConnectionConfig { onStreamOpened: (streamId: number, direction: 'inbound' | 'outbound') => void; /** - * Callback called when a stream transitions to the `Closed` state. + * Callback called when a stream transitions to the `Reset` state. * - * It it **not** called if `Connection.closeStream` is manually called by the API user. + * It it **not** called if `Connection.resetStream` is manually called by the API user. * * This function must only be called for connections of type "multi-stream". */ - onStreamClose: (streamId: number) => void; + onStreamReset: (streamId: number) => void; /** * Callback called when a message sent by the remote has been received. @@ -196,7 +196,7 @@ export default function (config: Config): { imports: WebAssembly.ModuleImports, killedTracked.killed = true; // TODO: kill timers as well? for (const connection in connections) { - connections[connection]!.close() + connections[connection]!.reset() delete connections[connection] } }; @@ -322,13 +322,13 @@ export default function (config: Config): { imports: WebAssembly.ModuleImports, } } catch(_error) {} }, - onConnectionClose: (message: string) => { + onConnectionReset: (message: string) => { if (killedTracked.killed) return; try { const encoded = new TextEncoder().encode(message) const ptr = instance.exports.alloc(encoded.length) >>> 0; new Uint8Array(instance.exports.memory.buffer).set(encoded, ptr); - instance.exports.connection_closed(connectionId, ptr, encoded.length); + instance.exports.connection_reset(connectionId, ptr, encoded.length); } catch(_error) {} }, onMessage: (message: Uint8Array, streamId?: number) => { @@ -349,10 +349,10 @@ export default function (config: Config): { imports: WebAssembly.ModuleImports, ); } catch(_error) {} }, - onStreamClose: (streamId: number) => { + onStreamReset: (streamId: number) => { if (killedTracked.killed) return; try { - instance.exports.stream_closed(connectionId, streamId); + instance.exports.stream_reset(connectionId, streamId); } catch(_error) {} } @@ -379,10 +379,10 @@ export default function (config: Config): { imports: WebAssembly.ModuleImports, }, // Must close and destroy the connection object. - connection_close: (connectionId: number) => { + reset_connection: (connectionId: number) => { if (killedTracked.killed) return; const connection = connections[connectionId]!; - connection.close(); + connection.reset(); delete connections[connectionId]; }, @@ -393,9 +393,9 @@ export default function (config: Config): { imports: WebAssembly.ModuleImports, }, // Closes a substream on a multi-stream connection. - connection_stream_close: (connectionId: number, streamId: number) => { + connection_stream_reset: (connectionId: number, streamId: number) => { const connection = connections[connectionId]!; - connection.close(streamId) + connection.reset(streamId) }, // Must queue the data found in the WebAssembly memory at the given pointer. It is assumed diff --git a/bin/wasm-node/javascript/src/instance/bindings.ts b/bin/wasm-node/javascript/src/instance/bindings.ts index 26762116d6..620acdfbeb 100644 --- a/bin/wasm-node/javascript/src/instance/bindings.ts +++ b/bin/wasm-node/javascript/src/instance/bindings.ts @@ -39,8 +39,8 @@ export interface SmoldotWasmExports extends WebAssembly.Exports { connection_open_multi_stream: (connectionId: number, handshakeTyPtr: number, handshakeTyLen: number) => void, stream_message: (connectionId: number, streamId: number, ptr: number, len: number) => void, connection_stream_opened: (connectionId: number, streamId: number, outbound: number) => void, - connection_closed: (connectionId: number, ptr: number, len: number) => void, - stream_closed: (connectionId: number, streamId: number) => void, + connection_reset: (connectionId: number, ptr: number, len: number) => void, + stream_reset: (connectionId: number, streamId: number) => void, } export interface SmoldotWasmInstance extends WebAssembly.Instance { diff --git a/bin/wasm-node/rust/src/bindings.rs b/bin/wasm-node/rust/src/bindings.rs index 2927d1c32d..164fe4ad7e 100644 --- a/bin/wasm-node/rust/src/bindings.rs +++ b/bin/wasm-node/rust/src/bindings.rs @@ -173,11 +173,11 @@ extern "C" { /// /// - `Opening` (initial state) /// - `Open` - /// - `Closed` + /// - `Reset` /// - /// When in the `Opening` or `Open` state, the connection can transition to the `Closed` state + /// When in the `Opening` or `Open` state, the connection can transition to the `Reset` state /// if the remote closes the connection or refuses the connection altogether. When that - /// happens, [`connection_closed`] must be called. Once in the `Closed` state, the connection + /// happens, [`connection_reset`] must be called. Once in the `Reset` state, the connection /// cannot transition back to another state. /// /// Initially in the `Opening` state, the connection can transition to the `Open` state if the @@ -187,11 +187,11 @@ extern "C" { /// There exists two kind of connections: single-stream and multi-stream. Single-stream /// connections are assumed to have a single stream open at all time and the encryption and /// multiplexing are handled internally by smoldot. Multi-stream connections open and close - /// streams over time using [`connection_stream_opened`] and [`stream_closed`], and the + /// streams over time using [`connection_stream_opened`] and [`stream_reset`], and the /// encryption and multiplexing are handled by the user of these bindings. pub fn connection_new(id: u32, addr_ptr: u32, addr_len: u32, error_ptr_ptr: u32) -> u32; - /// Close a connection previously initialized with [`connection_new`]. + /// Abruptly close a connection previously initialized with [`connection_new`]. /// /// This destroys the identifier passed as parameter. This identifier must never be passed /// through the FFI boundary, unless the same identifier is later allocated again with @@ -202,7 +202,7 @@ extern "C" { /// /// > **Note**: In JavaScript, remember to unregister event handlers before calling for /// > example `WebSocket.close()`. - pub fn connection_close(id: u32); + pub fn reset_connection(id: u32); /// Queues a new outbound substream opening. The [`connection_stream_opened`] function must /// later be called when the substream has been successfully opened. @@ -211,12 +211,12 @@ extern "C" { /// currently be in the `Open` state. See the documentation of [`connection_new`] for details. pub fn connection_stream_open(connection_id: u32); - /// Closes an existing substream of a multi-stream connection. The substream must currently - /// be in the `Open` state. + /// Abruptly closes an existing substream of a multi-stream connection. The substream must + /// currently be in the `Open` state. /// /// This function will only be called for multi-stream connections. The connection must /// currently be in the `Open` state. See the documentation of [`connection_new`] for details. - pub fn connection_stream_close(connection_id: u32, stream_id: u32); + pub fn connection_stream_reset(connection_id: u32, stream_id: u32); /// Queues data on the given stream. The data is found in the memory of the WebAssembly /// virtual machine, at the given pointer. The data must be sent as a binary frame. @@ -535,7 +535,7 @@ pub extern "C" fn connection_stream_opened(connection_id: u32, stream_id: u32, o crate::platform::connection_stream_opened(connection_id, stream_id, outbound) } -/// Can be called at any point by the JavaScript code if the connection switches to the `Closed` +/// Can be called at any point by the JavaScript code if the connection switches to the `Reset` /// state. /// /// Must only be called once per connection object. @@ -545,11 +545,11 @@ pub extern "C" fn connection_stream_opened(connection_id: u32, stream_id: u32, o /// /// See also [`connection_new`]. #[no_mangle] -pub extern "C" fn connection_closed(connection_id: u32, ptr: u32, len: u32) { - crate::platform::connection_closed(connection_id, ptr, len) +pub extern "C" fn connection_reset(connection_id: u32, ptr: u32, len: u32) { + crate::platform::connection_reset(connection_id, ptr, len) } -/// Can be called at any point by the JavaScript code if the stream switches to the `Closed` +/// Can be called at any point by the JavaScript code if the stream switches to the `Reset` /// state. /// /// Must only be called once per stream. @@ -560,6 +560,6 @@ pub extern "C" fn connection_closed(connection_id: u32, ptr: u32, len: u32) { /// /// See also [`connection_open_multi_stream`]. #[no_mangle] -pub extern "C" fn stream_closed(connection_id: u32, stream_id: u32) { - crate::platform::stream_closed(connection_id, stream_id) +pub extern "C" fn stream_reset(connection_id: u32, stream_id: u32) { + crate::platform::stream_reset(connection_id, stream_id) } diff --git a/bin/wasm-node/rust/src/platform.rs b/bin/wasm-node/rust/src/platform.rs index 628192de3b..bdf93c49b2 100644 --- a/bin/wasm-node/rust/src/platform.rs +++ b/bin/wasm-node/rust/src/platform.rs @@ -166,7 +166,7 @@ impl smoldot_light::platform::Platform for Platform { remote_tls_certificate_multihash: remote_tls_certificate_multihash.clone(), }) } - ConnectionInner::Closed { + ConnectionInner::Reset { message, connection_handles_alive, } => { @@ -195,7 +195,7 @@ impl smoldot_light::platform::Platform for Platform { let connection = lock.connections.get_mut(&connection_id).unwrap(); match &mut connection.inner { - ConnectionInner::Closed { .. } => return None, + ConnectionInner::Reset { .. } => return None, ConnectionInner::MultiStreamWebRtc { opened_substreams_to_pick_up, connection_handles_alive, @@ -260,7 +260,7 @@ impl smoldot_light::platform::Platform for Platform { let mut lock = STATE.try_lock().unwrap(); let stream = lock.streams.get_mut(stream_id).unwrap(); - if !stream.messages_queue.is_empty() || stream.closed { + if !stream.messages_queue.is_empty() || stream.reset { return future::ready(()).boxed(); } @@ -270,25 +270,29 @@ impl smoldot_light::platform::Platform for Platform { something_happened.boxed() } - fn read_buffer(StreamWrapper(stream_id, read_buffer): &mut Self::Stream) -> Option<&[u8]> { + fn read_buffer( + StreamWrapper(stream_id, read_buffer): &mut Self::Stream, + ) -> smoldot_light::platform::ReadBuffer { let mut lock = STATE.try_lock().unwrap(); let stream = lock.streams.get_mut(stream_id).unwrap(); - if stream.closed { - return None; + if stream.reset { + return smoldot_light::platform::ReadBuffer::Reset; } if read_buffer.buffer_first_offset < read_buffer.buffer.len() { - return Some(&read_buffer.buffer[read_buffer.buffer_first_offset..]); + return smoldot_light::platform::ReadBuffer::Open( + &read_buffer.buffer[read_buffer.buffer_first_offset..], + ); } // Move the next buffer from `STATE` into `read_buffer`. if let Some(msg) = stream.messages_queue.pop_front() { read_buffer.buffer = msg; read_buffer.buffer_first_offset = 0; - Some(&read_buffer.buffer[..]) + smoldot_light::platform::ReadBuffer::Open(&read_buffer.buffer[..]) } else { - Some(&[]) + smoldot_light::platform::ReadBuffer::Open(&[]) } } @@ -329,7 +333,7 @@ impl smoldot_light::platform::Platform for Platform { let mut lock = STATE.try_lock().unwrap(); let stream = lock.streams.get_mut(&(*connection_id, *stream_id)).unwrap(); - if stream.closed { + if stream.reset { return; } @@ -357,7 +361,7 @@ impl Drop for StreamWrapper { ConnectionInner::NotOpen => unreachable!(), ConnectionInner::SingleStreamMsNoiseYamux => { unsafe { - bindings::connection_close(self.0 .0); + bindings::reset_connection(self.0 .0); } debug_assert_eq!(self.0 .1, 0); @@ -367,17 +371,17 @@ impl Drop for StreamWrapper { connection_handles_alive, .. } => { - unsafe { bindings::connection_stream_close(self.0 .0, self.0 .1) } + unsafe { bindings::connection_stream_reset(self.0 .0, self.0 .1) } *connection_handles_alive -= 1; let remove_connection = *connection_handles_alive == 0; if remove_connection { unsafe { - bindings::connection_close(self.0 .0); + bindings::reset_connection(self.0 .0); } } remove_connection } - ConnectionInner::Closed { + ConnectionInner::Reset { connection_handles_alive, .. } => { @@ -385,7 +389,7 @@ impl Drop for StreamWrapper { let remove_connection = *connection_handles_alive == 0; if remove_connection { unsafe { - bindings::connection_close(self.0 .0); + bindings::reset_connection(self.0 .0); } } remove_connection @@ -413,7 +417,7 @@ impl Drop for ConnectionWrapper { connection_handles_alive, .. } - | ConnectionInner::Closed { + | ConnectionInner::Reset { connection_handles_alive, .. } => { @@ -426,7 +430,7 @@ impl Drop for ConnectionWrapper { lock.connections.remove(&self.0).unwrap(); if remove_connection { unsafe { - bindings::connection_close(self.0); + bindings::reset_connection(self.0); } } } @@ -475,8 +479,8 @@ enum ConnectionInner { /// Multihash encoding of the TLS certificate used by the remote node at the DTLS layer. remote_tls_certificate_multihash: Vec, }, - /// [`bindings::connection_closed`] has been called - Closed { + /// [`bindings::connection_reset`] has been called + Reset { /// Message given by the bindings to justify the closure. message: String, /// Number of objects (connections and streams) in the [`Platform`] API that reference @@ -486,8 +490,8 @@ enum ConnectionInner { } struct Stream { - /// `true` if the sending and receiving sides of the stream have been closed. - closed: bool, + /// `true` if the sending and receiving sides of the stream have been reset. + reset: bool, /// List of messages received through [`bindings::stream_message`]. Must never contain /// empty messages. messages_queue: VecDeque>, @@ -519,7 +523,7 @@ pub(crate) fn connection_open_single_stream(connection_id: u32, handshake_ty: u3 let _prev_value = lock.streams.insert( (connection_id, 0), Stream { - closed: false, + reset: false, messages_queue: VecDeque::with_capacity(8), something_happened: event_listener::Event::new(), }, @@ -597,14 +601,14 @@ pub(crate) fn stream_message(connection_id: u32, stream_id: u32, ptr: u32, len: let actual_stream_id = match connection.inner { ConnectionInner::MultiStreamWebRtc { .. } => stream_id, ConnectionInner::SingleStreamMsNoiseYamux => 0, - ConnectionInner::Closed { .. } | ConnectionInner::NotOpen => unreachable!(), + ConnectionInner::Reset { .. } | ConnectionInner::NotOpen => unreachable!(), }; let stream = lock .streams .get_mut(&(connection_id, actual_stream_id)) .unwrap(); - debug_assert!(!stream.closed); + debug_assert!(!stream.reset); let ptr = usize::try_from(ptr).unwrap(); let len = usize::try_from(len).unwrap(); @@ -637,7 +641,7 @@ pub(crate) fn connection_stream_opened(connection_id: u32, stream_id: u32, outbo let _prev_value = lock.streams.insert( (connection_id, stream_id), Stream { - closed: false, + reset: false, messages_queue: VecDeque::with_capacity(8), something_happened: event_listener::Event::new(), }, @@ -662,7 +666,7 @@ pub(crate) fn connection_stream_opened(connection_id: u32, stream_id: u32, outbo } } -pub(crate) fn connection_closed(connection_id: u32, ptr: u32, len: u32) { +pub(crate) fn connection_reset(connection_id: u32, ptr: u32, len: u32) { let mut lock = STATE.try_lock().unwrap(); let connection = lock.connections.get_mut(&connection_id).unwrap(); @@ -673,10 +677,10 @@ pub(crate) fn connection_closed(connection_id: u32, ptr: u32, len: u32) { connection_handles_alive, .. } => *connection_handles_alive, - ConnectionInner::Closed { .. } => unreachable!(), + ConnectionInner::Reset { .. } => unreachable!(), }; - connection.inner = ConnectionInner::Closed { + connection.inner = ConnectionInner::Reset { connection_handles_alive, message: { let ptr = usize::try_from(ptr).unwrap(); @@ -693,16 +697,16 @@ pub(crate) fn connection_closed(connection_id: u32, ptr: u32, len: u32) { .streams .range_mut((connection_id, u32::min_value())..=(connection_id, u32::max_value())) { - stream.closed = true; + stream.reset = true; stream.something_happened.notify(usize::max_value()); } } -pub(crate) fn stream_closed(connection_id: u32, stream_id: u32) { +pub(crate) fn stream_reset(connection_id: u32, stream_id: u32) { // Note that, as documented, it is illegal to call this function on single-stream substreams. // We can thus assume that the `stream_id` is valid. let mut lock = STATE.try_lock().unwrap(); let stream = lock.streams.get_mut(&(connection_id, stream_id)).unwrap(); - stream.closed = true; + stream.reset = true; stream.something_happened.notify(usize::max_value()); } diff --git a/src/libp2p/collection/single_stream.rs b/src/libp2p/collection/single_stream.rs index 9695c5e04d..3f50c776e9 100644 --- a/src/libp2p/collection/single_stream.rs +++ b/src/libp2p/collection/single_stream.rs @@ -482,6 +482,18 @@ where } } + /// Returns `true` if [`SingleStreamConnectionTask::reset`] has been called in the past. + pub fn is_reset_called(&self) -> bool { + matches!( + self.connection, + SingleStreamConnectionTaskInner::ShutdownWaitingAck { + initiator: ShutdownInitiator::Api, + } | SingleStreamConnectionTaskInner::ShutdownAcked { + initiator: ShutdownInitiator::Api, + } + ) + } + /// Reads data coming from the connection, updates the internal state machine, and writes data /// destined to the connection through the [`ReadWrite`]. ///