diff --git a/bin/light-base/src/network_service/tasks.rs b/bin/light-base/src/network_service/tasks.rs index f314e13686..ed7338a697 100644 --- a/bin/light-base/src/network_service/tasks.rs +++ b/bin/light-base/src/network_service/tasks.rs @@ -16,7 +16,7 @@ // along with this program. If not, see . use super::Shared; -use crate::platform::{Platform, PlatformConnection, PlatformSubstreamDirection}; +use crate::platform::{Platform, PlatformConnection, PlatformSubstreamDirection, ReadBuffer}; use alloc::{string::ToString as _, sync::Arc, vec, vec::Vec}; use core::{iter, pin::Pin}; @@ -221,35 +221,57 @@ async fn single_stream_connection_task( connection_task.inject_coordinator_message(message); } - // TODO: `TPlat::read_buffer` can return `None` if the connection has been abruptly closed, should we handle this and stop reading/writing? cc https://github.com/paritytech/smoldot/issues/2782 - - // Perform a read-write. This updates the internal state of the connection task. let now = TPlat::now(); - let mut read_write = ReadWrite { - now: now.clone(), - incoming_buffer: TPlat::read_buffer(&mut connection), - outgoing_buffer: Some((&mut write_buffer, &mut [])), // TODO: this should be None if a previous read_write() produced None - read_bytes: 0, - written_bytes: 0, - wake_up_after: None, - }; - connection_task.read_write(&mut read_write); - - // Because the `read_write` object borrows the connection, we need to drop it before we - // can modify the connection. Before dropping the `read_write`, clone some important - // information from it. - let read_buffer_has_data = read_write.incoming_buffer.map_or(false, |b| !b.is_empty()); - let read_buffer_closed = read_write.incoming_buffer.is_none(); - let read_bytes = read_write.read_bytes; - let written_bytes = read_write.written_bytes; - let wake_up_after = read_write.wake_up_after.clone(); - drop(read_write); - - // Now update the connection. - if written_bytes != 0 { - TPlat::send(&mut connection, &write_buffer[..written_bytes]); - } - TPlat::advance_read_cursor(&mut connection, read_bytes); + + let (read_bytes, read_buffer_has_data, read_buffer_closed, written_bytes, wake_up_after) = + if !connection_task.is_reset_called() { + let incoming_buffer = match TPlat::read_buffer(&mut connection) { + ReadBuffer::Reset => { + connection_task.reset(); + continue; + } + ReadBuffer::Open(b) => Some(b), + ReadBuffer::Closed => None, + }; + + // Perform a read-write. This updates the internal state of the connection task. + let mut read_write = ReadWrite { + now: now.clone(), + incoming_buffer, + outgoing_buffer: Some((&mut write_buffer, &mut [])), // TODO: this should be None if a previous read_write() produced None + read_bytes: 0, + written_bytes: 0, + wake_up_after: None, + }; + connection_task.read_write(&mut read_write); + + // Because the `read_write` object borrows the connection, we need to drop it before we + // can modify the connection. Before dropping the `read_write`, clone some important + // information from it. + let read_buffer_has_data = + read_write.incoming_buffer.map_or(false, |b| !b.is_empty()); + let read_buffer_closed = read_write.incoming_buffer.is_none(); + let read_bytes = read_write.read_bytes; + let written_bytes = read_write.written_bytes; + let wake_up_after = read_write.wake_up_after.clone(); + drop(read_write); + + // Now update the connection. + if written_bytes != 0 { + TPlat::send(&mut connection, &write_buffer[..written_bytes]); + } + TPlat::advance_read_cursor(&mut connection, read_bytes); + + ( + read_bytes, + read_buffer_has_data, + read_buffer_closed, + written_bytes, + wake_up_after, + ) + } else { + (0, false, true, 0, None) + }; // Try pull message to send to the coordinator. @@ -439,11 +461,10 @@ async fn webrtc_multi_stream_connection_task( let substream = &mut open_substreams[substream_id]; let incoming_buffer = match TPlat::read_buffer(substream) { - Some(buf) => buf, - None => { - // In the case of WebRTC, `read_buffer` returns `None` only if the - // substream has been reset by the remote. We inform the connection task, - // and the substream is now considered dead. + ReadBuffer::Open(buf) => buf, + ReadBuffer::Closed => panic!(), // Forbidden for WebRTC. + ReadBuffer::Reset => { + // Inform the connection task. The substream is now considered dead. connection_task.reset_substream(&substream_id); open_substreams.remove(substream_id); break; diff --git a/bin/light-base/src/platform.rs b/bin/light-base/src/platform.rs index 2b43da63b2..4c4433e29f 100644 --- a/bin/light-base/src/platform.rs +++ b/bin/light-base/src/platform.rs @@ -107,11 +107,7 @@ pub trait Platform: Send + 'static { fn wait_more_data(stream: &mut Self::Stream) -> Self::StreamDataFuture; /// Gives access to the content of the read buffer of the given stream. - /// - /// Returns `None` if the remote has closed their sending side or if the stream has been - /// reset. In the case of [`PlatformConnection::MultiStreamWebRtc`], only the stream having - /// been reset applies. - fn read_buffer(stream: &mut Self::Stream) -> Option<&[u8]>; + fn read_buffer(stream: &mut Self::Stream) -> ReadBuffer; /// Discards the first `bytes` bytes of the read buffer of this stream. This makes it /// possible for the remote to send more data. @@ -170,3 +166,19 @@ pub struct ConnectError { /// `true` if the error is caused by the address to connect to being forbidden or unsupported. pub is_bad_addr: bool, } + +/// State of the read buffer, as returned by [`Platform::read_buffer`]. +#[derive(Debug)] +pub enum ReadBuffer<'a> { + /// Reading side of the stream is fully open. Contains the data waiting to be processed. + Open(&'a [u8]), + + /// The reading side of the stream has been closed by the remote. + /// + /// Note that this is forbidden for connections of + /// type [`PlatformConnection::MultiStreamWebRtc`]. + Closed, + + /// The stream has been abruptly closed by the remote. + Reset, +} diff --git a/bin/light-base/src/platform/async_std.rs b/bin/light-base/src/platform/async_std.rs index 398fc5aa61..ce530b1b26 100644 --- a/bin/light-base/src/platform/async_std.rs +++ b/bin/light-base/src/platform/async_std.rs @@ -18,7 +18,7 @@ #![cfg(feature = "std")] #![cfg_attr(docsrs, doc(cfg(feature = "std")))] -use super::{ConnectError, Platform, PlatformConnection, PlatformSubstreamDirection}; +use super::{ConnectError, Platform, PlatformConnection, PlatformSubstreamDirection, ReadBuffer}; use alloc::{collections::VecDeque, sync::Arc}; use core::{pin::Pin, str, task::Poll, time::Duration}; @@ -286,9 +286,10 @@ impl Platform for AsyncStdTcpWebSocket { })) } - fn read_buffer(stream: &mut Self::Stream) -> Option<&[u8]> { + fn read_buffer(stream: &mut Self::Stream) -> ReadBuffer { if stream.read_buffer.is_none() { - return None; + // TODO: the implementation doesn't let us differentiate between Closed and Reset + return ReadBuffer::Reset; } let mut lock = stream.read_data_rx.lock(); @@ -297,12 +298,12 @@ impl Platform for AsyncStdTcpWebSocket { Some(b) => stream.read_buffer.as_mut().unwrap().extend(b), None => { stream.read_buffer = None; - return None; + return ReadBuffer::Reset; } } } - Some(stream.read_buffer.as_ref().unwrap()) + ReadBuffer::Open(stream.read_buffer.as_ref().unwrap()) } fn advance_read_cursor(stream: &mut Self::Stream, bytes: usize) { diff --git a/bin/wasm-node/CHANGELOG.md b/bin/wasm-node/CHANGELOG.md index c06c69dbe6..4d2e2ab288 100644 --- a/bin/wasm-node/CHANGELOG.md +++ b/bin/wasm-node/CHANGELOG.md @@ -8,6 +8,10 @@ - The parameter of `chainHead_unstable_finalizedDatabase` has been renamed from `max_size_bytes` to `maxSizeBytes`. ([#2923](https://github.com/paritytech/smoldot/pull/2923)) - The database now contains the hash of the genesis block header. This hash is verified when the database is loaded, and the database is ignored if there is a mismatch. This prevents accidents where the wrong database is provided, which would lead to the chain not working and would be hard to debug. ([#2928](https://github.com/paritytech/smoldot/pull/2928)) +### Fixed + +- Fix errors showing in the browser's console about WebSockets being already in the CLOSING or CLOSED state. ([#2925](https://github.com/paritytech/smoldot/pull/2925)) + ## 0.7.3 - 2022-10-19 ### Changed diff --git a/bin/wasm-node/javascript/src/index-browser.ts b/bin/wasm-node/javascript/src/index-browser.ts index b5657b7ea6..7aaebdeb04 100644 --- a/bin/wasm-node/javascript/src/index-browser.ts +++ b/bin/wasm-node/javascript/src/index-browser.ts @@ -108,14 +108,14 @@ export function start(options?: ClientOptions): Client { }; connection.onclose = (event) => { const message = "Error code " + event.code + (!!event.reason ? (": " + event.reason) : ""); - config.onConnectionClose(message); + config.onConnectionReset(message); }; connection.onmessage = (msg) => { config.onMessage(new Uint8Array(msg.data as ArrayBuffer)); }; return { - close: (): void => { + reset: (): void => { connection.onopen = null; connection.onclose = null; connection.onmessage = null; @@ -167,11 +167,11 @@ export function start(options?: ClientOptions): Client { }; dataChannel.onerror = (_error) => { - config.onStreamClose(dataChannelId); + config.onStreamReset(dataChannelId); }; dataChannel.onclose = () => { - config.onStreamClose(dataChannelId); + config.onStreamReset(dataChannelId); }; dataChannel.onmessage = (m) => { @@ -226,7 +226,7 @@ export function start(options?: ClientOptions): Client { if (localTlsCertificateHex === undefined) { // Because we've already returned from the `connect` function at this point, we pretend // that the connection has failed to open. - config.onConnectionClose('Failed to obtain the browser certificate fingerprint'); + config.onConnectionReset('Failed to obtain the browser certificate fingerprint'); return; } const localTlsCertificateMultihash = new Uint8Array(34); @@ -240,7 +240,7 @@ export function start(options?: ClientOptions): Client { // open. pc.onconnectionstatechange = (_event) => { if (pc!.connectionState == "closed" || pc!.connectionState == "disconnected" || pc!.connectionState == "failed") { - config.onConnectionClose("WebRTC state transitioned to " + pc!.connectionState); + config.onConnectionReset("WebRTC state transitioned to " + pc!.connectionState); pc!.onconnectionstatechange = null; pc!.onnegotiationneeded = null; @@ -361,7 +361,7 @@ export function start(options?: ClientOptions): Client { }); return { - close: (streamId: number | undefined): void => { + reset: (streamId: number | undefined): void => { // If `streamId` is undefined, then the whole connection must be destroyed. if (streamId === undefined) { // The `RTCPeerConnection` is created at the same time as we report the connection as diff --git a/bin/wasm-node/javascript/src/index-deno.ts b/bin/wasm-node/javascript/src/index-deno.ts index 5f36a19f6c..ea97df4789 100644 --- a/bin/wasm-node/javascript/src/index-deno.ts +++ b/bin/wasm-node/javascript/src/index-deno.ts @@ -141,7 +141,7 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean }; connection.socket.onclose = (event) => { const message = "Error code " + event.code + (!!event.reason ? (": " + event.reason) : ""); - config.onConnectionClose(message); + config.onConnectionReset(message); }; connection.socket.onmessage = (msg) => { config.onMessage(new Uint8Array(msg.data as ArrayBuffer)); @@ -191,7 +191,7 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean // The socket is reported closed, but `socket.destroyed` is still `false` (see // check above). As such, we must inform the inner layers. socket.destroyed = true; - config.onConnectionClose(outcome === null ? "EOF when reading socket" : outcome); + config.onConnectionReset(outcome === null ? "EOF when reading socket" : outcome); return; } console.assert(outcome !== 0); // `read` guarantees to return a non-zero value. @@ -208,7 +208,7 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean } return { - close: (): void => { + reset: (): void => { if (connection.ty == 'websocket') { // WebSocket // We can't set these fields to null because the TypeScript definitions don't @@ -249,7 +249,7 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean // The socket is reported closed, but `socket.destroyed` is still // `false` (see check above). As such, we must inform the inner layers. socket.destroyed = true; - config.onConnectionClose(outcome); + config.onConnectionReset(outcome); return c; } // Note that, contrary to `read`, it is possible for `outcome` to be 0. diff --git a/bin/wasm-node/javascript/src/index-nodejs.ts b/bin/wasm-node/javascript/src/index-nodejs.ts index d5c332cf1f..ea61df0981 100644 --- a/bin/wasm-node/javascript/src/index-nodejs.ts +++ b/bin/wasm-node/javascript/src/index-nodejs.ts @@ -108,14 +108,14 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean }; socket.onclose = (event) => { const message = "Error code " + event.code + (!!event.reason ? (": " + event.reason) : ""); - config.onConnectionClose(message); + config.onConnectionReset(message); socket.onopen = () => { }; socket.onclose = () => { }; socket.onmessage = () => { }; socket.onerror = () => { }; }; socket.onerror = (event) => { - config.onConnectionClose(event.message); + config.onConnectionReset(event.message); socket.onopen = () => { }; socket.onclose = () => { }; socket.onmessage = () => { }; @@ -150,7 +150,7 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean // NodeJS doesn't provide a reason why the closing happened, but only // whether it was caused by an error. const message = hasError ? "Error" : "Closed gracefully"; - config.onConnectionClose(message); + config.onConnectionReset(message); }); connection.socket.on('error', () => { }); connection.socket.on('data', (message) => { @@ -163,7 +163,7 @@ function connect(config: ConnectionConfig, forbidTcp: boolean, forbidWs: boolean } return { - close: (): void => { + reset: (): void => { if (connection.ty == 'websocket') { // WebSocket // We can't set these fields to null because the TypeScript definitions don't diff --git a/bin/wasm-node/javascript/src/instance/bindings-smoldot-light.ts b/bin/wasm-node/javascript/src/instance/bindings-smoldot-light.ts index 66207327cd..8e87460fb1 100644 --- a/bin/wasm-node/javascript/src/instance/bindings-smoldot-light.ts +++ b/bin/wasm-node/javascript/src/instance/bindings-smoldot-light.ts @@ -58,11 +58,11 @@ export interface Config { * * - `Opening` (initial state) * - `Open` - * - `Closed` + * - `Reset` * - * When in the `Opening` or `Open` state, the connection can transition to the `Closed` state + * When in the `Opening` or `Open` state, the connection can transition to the `Reset` state * if the remote closes the connection or refuses the connection altogether. When that - * happens, `config.onClosed` is called. Once in the `Closed` state, the connection cannot + * happens, `config.onReset` is called. Once in the `Reset` state, the connection cannot * transition back to another state. * * Initially in the `Opening` state, the connection can transition to the `Open` state if the @@ -75,20 +75,20 @@ export interface Config { */ export interface Connection { /** - * Transitions the connection or one of its substreams to the `Closed` state. + * Transitions the connection or one of its substreams to the `Reset` state. * * If the connection is of type "single-stream", the whole connection must be shut down. * If the connection is of type "multi-stream", a `streamId` can be provided, in which case * only the given substream is shut down. * - * The `config.onClose` or `config.onStreamClose` callbacks are **not** called. + * The `config.onReset` or `config.onStreamReset` callbacks are **not** called. * * The transition is performed in the background. * If the whole connection is to be shut down, none of the callbacks passed to the `Config` - * must be called again. If only a substream is shut down, the `onStreamClose` and `onMessage` + * must be called again. If only a substream is shut down, the `onStreamReset` and `onMessage` * callbacks must not be called again with that substream. */ - close(streamId?: number): void; + reset(streamId?: number): void; /** * Queues data to be sent on the given connection. @@ -139,11 +139,11 @@ export interface ConnectionConfig { ) => void; /** - * Callback called when the connection transitions to the `Closed` state. + * Callback called when the connection transitions to the `Reset` state. * - * It it **not** called if `Connection.close` is manually called by the API user. + * It it **not** called if `Connection.reset` is manually called by the API user. */ - onConnectionClose: (message: string) => void; + onConnectionReset: (message: string) => void; /** * Callback called when a new substream has been opened. @@ -153,13 +153,13 @@ export interface ConnectionConfig { onStreamOpened: (streamId: number, direction: 'inbound' | 'outbound') => void; /** - * Callback called when a stream transitions to the `Closed` state. + * Callback called when a stream transitions to the `Reset` state. * - * It it **not** called if `Connection.closeStream` is manually called by the API user. + * It it **not** called if `Connection.resetStream` is manually called by the API user. * * This function must only be called for connections of type "multi-stream". */ - onStreamClose: (streamId: number) => void; + onStreamReset: (streamId: number) => void; /** * Callback called when a message sent by the remote has been received. @@ -196,7 +196,7 @@ export default function (config: Config): { imports: WebAssembly.ModuleImports, killedTracked.killed = true; // TODO: kill timers as well? for (const connection in connections) { - connections[connection]!.close() + connections[connection]!.reset() delete connections[connection] } }; @@ -322,13 +322,13 @@ export default function (config: Config): { imports: WebAssembly.ModuleImports, } } catch(_error) {} }, - onConnectionClose: (message: string) => { + onConnectionReset: (message: string) => { if (killedTracked.killed) return; try { const encoded = new TextEncoder().encode(message) const ptr = instance.exports.alloc(encoded.length) >>> 0; new Uint8Array(instance.exports.memory.buffer).set(encoded, ptr); - instance.exports.connection_closed(connectionId, ptr, encoded.length); + instance.exports.connection_reset(connectionId, ptr, encoded.length); } catch(_error) {} }, onMessage: (message: Uint8Array, streamId?: number) => { @@ -349,10 +349,10 @@ export default function (config: Config): { imports: WebAssembly.ModuleImports, ); } catch(_error) {} }, - onStreamClose: (streamId: number) => { + onStreamReset: (streamId: number) => { if (killedTracked.killed) return; try { - instance.exports.stream_closed(connectionId, streamId); + instance.exports.stream_reset(connectionId, streamId); } catch(_error) {} } @@ -379,10 +379,10 @@ export default function (config: Config): { imports: WebAssembly.ModuleImports, }, // Must close and destroy the connection object. - connection_close: (connectionId: number) => { + reset_connection: (connectionId: number) => { if (killedTracked.killed) return; const connection = connections[connectionId]!; - connection.close(); + connection.reset(); delete connections[connectionId]; }, @@ -393,9 +393,9 @@ export default function (config: Config): { imports: WebAssembly.ModuleImports, }, // Closes a substream on a multi-stream connection. - connection_stream_close: (connectionId: number, streamId: number) => { + connection_stream_reset: (connectionId: number, streamId: number) => { const connection = connections[connectionId]!; - connection.close(streamId) + connection.reset(streamId) }, // Must queue the data found in the WebAssembly memory at the given pointer. It is assumed diff --git a/bin/wasm-node/javascript/src/instance/bindings.ts b/bin/wasm-node/javascript/src/instance/bindings.ts index 26762116d6..620acdfbeb 100644 --- a/bin/wasm-node/javascript/src/instance/bindings.ts +++ b/bin/wasm-node/javascript/src/instance/bindings.ts @@ -39,8 +39,8 @@ export interface SmoldotWasmExports extends WebAssembly.Exports { connection_open_multi_stream: (connectionId: number, handshakeTyPtr: number, handshakeTyLen: number) => void, stream_message: (connectionId: number, streamId: number, ptr: number, len: number) => void, connection_stream_opened: (connectionId: number, streamId: number, outbound: number) => void, - connection_closed: (connectionId: number, ptr: number, len: number) => void, - stream_closed: (connectionId: number, streamId: number) => void, + connection_reset: (connectionId: number, ptr: number, len: number) => void, + stream_reset: (connectionId: number, streamId: number) => void, } export interface SmoldotWasmInstance extends WebAssembly.Instance { diff --git a/bin/wasm-node/rust/src/bindings.rs b/bin/wasm-node/rust/src/bindings.rs index 2927d1c32d..164fe4ad7e 100644 --- a/bin/wasm-node/rust/src/bindings.rs +++ b/bin/wasm-node/rust/src/bindings.rs @@ -173,11 +173,11 @@ extern "C" { /// /// - `Opening` (initial state) /// - `Open` - /// - `Closed` + /// - `Reset` /// - /// When in the `Opening` or `Open` state, the connection can transition to the `Closed` state + /// When in the `Opening` or `Open` state, the connection can transition to the `Reset` state /// if the remote closes the connection or refuses the connection altogether. When that - /// happens, [`connection_closed`] must be called. Once in the `Closed` state, the connection + /// happens, [`connection_reset`] must be called. Once in the `Reset` state, the connection /// cannot transition back to another state. /// /// Initially in the `Opening` state, the connection can transition to the `Open` state if the @@ -187,11 +187,11 @@ extern "C" { /// There exists two kind of connections: single-stream and multi-stream. Single-stream /// connections are assumed to have a single stream open at all time and the encryption and /// multiplexing are handled internally by smoldot. Multi-stream connections open and close - /// streams over time using [`connection_stream_opened`] and [`stream_closed`], and the + /// streams over time using [`connection_stream_opened`] and [`stream_reset`], and the /// encryption and multiplexing are handled by the user of these bindings. pub fn connection_new(id: u32, addr_ptr: u32, addr_len: u32, error_ptr_ptr: u32) -> u32; - /// Close a connection previously initialized with [`connection_new`]. + /// Abruptly close a connection previously initialized with [`connection_new`]. /// /// This destroys the identifier passed as parameter. This identifier must never be passed /// through the FFI boundary, unless the same identifier is later allocated again with @@ -202,7 +202,7 @@ extern "C" { /// /// > **Note**: In JavaScript, remember to unregister event handlers before calling for /// > example `WebSocket.close()`. - pub fn connection_close(id: u32); + pub fn reset_connection(id: u32); /// Queues a new outbound substream opening. The [`connection_stream_opened`] function must /// later be called when the substream has been successfully opened. @@ -211,12 +211,12 @@ extern "C" { /// currently be in the `Open` state. See the documentation of [`connection_new`] for details. pub fn connection_stream_open(connection_id: u32); - /// Closes an existing substream of a multi-stream connection. The substream must currently - /// be in the `Open` state. + /// Abruptly closes an existing substream of a multi-stream connection. The substream must + /// currently be in the `Open` state. /// /// This function will only be called for multi-stream connections. The connection must /// currently be in the `Open` state. See the documentation of [`connection_new`] for details. - pub fn connection_stream_close(connection_id: u32, stream_id: u32); + pub fn connection_stream_reset(connection_id: u32, stream_id: u32); /// Queues data on the given stream. The data is found in the memory of the WebAssembly /// virtual machine, at the given pointer. The data must be sent as a binary frame. @@ -535,7 +535,7 @@ pub extern "C" fn connection_stream_opened(connection_id: u32, stream_id: u32, o crate::platform::connection_stream_opened(connection_id, stream_id, outbound) } -/// Can be called at any point by the JavaScript code if the connection switches to the `Closed` +/// Can be called at any point by the JavaScript code if the connection switches to the `Reset` /// state. /// /// Must only be called once per connection object. @@ -545,11 +545,11 @@ pub extern "C" fn connection_stream_opened(connection_id: u32, stream_id: u32, o /// /// See also [`connection_new`]. #[no_mangle] -pub extern "C" fn connection_closed(connection_id: u32, ptr: u32, len: u32) { - crate::platform::connection_closed(connection_id, ptr, len) +pub extern "C" fn connection_reset(connection_id: u32, ptr: u32, len: u32) { + crate::platform::connection_reset(connection_id, ptr, len) } -/// Can be called at any point by the JavaScript code if the stream switches to the `Closed` +/// Can be called at any point by the JavaScript code if the stream switches to the `Reset` /// state. /// /// Must only be called once per stream. @@ -560,6 +560,6 @@ pub extern "C" fn connection_closed(connection_id: u32, ptr: u32, len: u32) { /// /// See also [`connection_open_multi_stream`]. #[no_mangle] -pub extern "C" fn stream_closed(connection_id: u32, stream_id: u32) { - crate::platform::stream_closed(connection_id, stream_id) +pub extern "C" fn stream_reset(connection_id: u32, stream_id: u32) { + crate::platform::stream_reset(connection_id, stream_id) } diff --git a/bin/wasm-node/rust/src/platform.rs b/bin/wasm-node/rust/src/platform.rs index 628192de3b..bdf93c49b2 100644 --- a/bin/wasm-node/rust/src/platform.rs +++ b/bin/wasm-node/rust/src/platform.rs @@ -166,7 +166,7 @@ impl smoldot_light::platform::Platform for Platform { remote_tls_certificate_multihash: remote_tls_certificate_multihash.clone(), }) } - ConnectionInner::Closed { + ConnectionInner::Reset { message, connection_handles_alive, } => { @@ -195,7 +195,7 @@ impl smoldot_light::platform::Platform for Platform { let connection = lock.connections.get_mut(&connection_id).unwrap(); match &mut connection.inner { - ConnectionInner::Closed { .. } => return None, + ConnectionInner::Reset { .. } => return None, ConnectionInner::MultiStreamWebRtc { opened_substreams_to_pick_up, connection_handles_alive, @@ -260,7 +260,7 @@ impl smoldot_light::platform::Platform for Platform { let mut lock = STATE.try_lock().unwrap(); let stream = lock.streams.get_mut(stream_id).unwrap(); - if !stream.messages_queue.is_empty() || stream.closed { + if !stream.messages_queue.is_empty() || stream.reset { return future::ready(()).boxed(); } @@ -270,25 +270,29 @@ impl smoldot_light::platform::Platform for Platform { something_happened.boxed() } - fn read_buffer(StreamWrapper(stream_id, read_buffer): &mut Self::Stream) -> Option<&[u8]> { + fn read_buffer( + StreamWrapper(stream_id, read_buffer): &mut Self::Stream, + ) -> smoldot_light::platform::ReadBuffer { let mut lock = STATE.try_lock().unwrap(); let stream = lock.streams.get_mut(stream_id).unwrap(); - if stream.closed { - return None; + if stream.reset { + return smoldot_light::platform::ReadBuffer::Reset; } if read_buffer.buffer_first_offset < read_buffer.buffer.len() { - return Some(&read_buffer.buffer[read_buffer.buffer_first_offset..]); + return smoldot_light::platform::ReadBuffer::Open( + &read_buffer.buffer[read_buffer.buffer_first_offset..], + ); } // Move the next buffer from `STATE` into `read_buffer`. if let Some(msg) = stream.messages_queue.pop_front() { read_buffer.buffer = msg; read_buffer.buffer_first_offset = 0; - Some(&read_buffer.buffer[..]) + smoldot_light::platform::ReadBuffer::Open(&read_buffer.buffer[..]) } else { - Some(&[]) + smoldot_light::platform::ReadBuffer::Open(&[]) } } @@ -329,7 +333,7 @@ impl smoldot_light::platform::Platform for Platform { let mut lock = STATE.try_lock().unwrap(); let stream = lock.streams.get_mut(&(*connection_id, *stream_id)).unwrap(); - if stream.closed { + if stream.reset { return; } @@ -357,7 +361,7 @@ impl Drop for StreamWrapper { ConnectionInner::NotOpen => unreachable!(), ConnectionInner::SingleStreamMsNoiseYamux => { unsafe { - bindings::connection_close(self.0 .0); + bindings::reset_connection(self.0 .0); } debug_assert_eq!(self.0 .1, 0); @@ -367,17 +371,17 @@ impl Drop for StreamWrapper { connection_handles_alive, .. } => { - unsafe { bindings::connection_stream_close(self.0 .0, self.0 .1) } + unsafe { bindings::connection_stream_reset(self.0 .0, self.0 .1) } *connection_handles_alive -= 1; let remove_connection = *connection_handles_alive == 0; if remove_connection { unsafe { - bindings::connection_close(self.0 .0); + bindings::reset_connection(self.0 .0); } } remove_connection } - ConnectionInner::Closed { + ConnectionInner::Reset { connection_handles_alive, .. } => { @@ -385,7 +389,7 @@ impl Drop for StreamWrapper { let remove_connection = *connection_handles_alive == 0; if remove_connection { unsafe { - bindings::connection_close(self.0 .0); + bindings::reset_connection(self.0 .0); } } remove_connection @@ -413,7 +417,7 @@ impl Drop for ConnectionWrapper { connection_handles_alive, .. } - | ConnectionInner::Closed { + | ConnectionInner::Reset { connection_handles_alive, .. } => { @@ -426,7 +430,7 @@ impl Drop for ConnectionWrapper { lock.connections.remove(&self.0).unwrap(); if remove_connection { unsafe { - bindings::connection_close(self.0); + bindings::reset_connection(self.0); } } } @@ -475,8 +479,8 @@ enum ConnectionInner { /// Multihash encoding of the TLS certificate used by the remote node at the DTLS layer. remote_tls_certificate_multihash: Vec, }, - /// [`bindings::connection_closed`] has been called - Closed { + /// [`bindings::connection_reset`] has been called + Reset { /// Message given by the bindings to justify the closure. message: String, /// Number of objects (connections and streams) in the [`Platform`] API that reference @@ -486,8 +490,8 @@ enum ConnectionInner { } struct Stream { - /// `true` if the sending and receiving sides of the stream have been closed. - closed: bool, + /// `true` if the sending and receiving sides of the stream have been reset. + reset: bool, /// List of messages received through [`bindings::stream_message`]. Must never contain /// empty messages. messages_queue: VecDeque>, @@ -519,7 +523,7 @@ pub(crate) fn connection_open_single_stream(connection_id: u32, handshake_ty: u3 let _prev_value = lock.streams.insert( (connection_id, 0), Stream { - closed: false, + reset: false, messages_queue: VecDeque::with_capacity(8), something_happened: event_listener::Event::new(), }, @@ -597,14 +601,14 @@ pub(crate) fn stream_message(connection_id: u32, stream_id: u32, ptr: u32, len: let actual_stream_id = match connection.inner { ConnectionInner::MultiStreamWebRtc { .. } => stream_id, ConnectionInner::SingleStreamMsNoiseYamux => 0, - ConnectionInner::Closed { .. } | ConnectionInner::NotOpen => unreachable!(), + ConnectionInner::Reset { .. } | ConnectionInner::NotOpen => unreachable!(), }; let stream = lock .streams .get_mut(&(connection_id, actual_stream_id)) .unwrap(); - debug_assert!(!stream.closed); + debug_assert!(!stream.reset); let ptr = usize::try_from(ptr).unwrap(); let len = usize::try_from(len).unwrap(); @@ -637,7 +641,7 @@ pub(crate) fn connection_stream_opened(connection_id: u32, stream_id: u32, outbo let _prev_value = lock.streams.insert( (connection_id, stream_id), Stream { - closed: false, + reset: false, messages_queue: VecDeque::with_capacity(8), something_happened: event_listener::Event::new(), }, @@ -662,7 +666,7 @@ pub(crate) fn connection_stream_opened(connection_id: u32, stream_id: u32, outbo } } -pub(crate) fn connection_closed(connection_id: u32, ptr: u32, len: u32) { +pub(crate) fn connection_reset(connection_id: u32, ptr: u32, len: u32) { let mut lock = STATE.try_lock().unwrap(); let connection = lock.connections.get_mut(&connection_id).unwrap(); @@ -673,10 +677,10 @@ pub(crate) fn connection_closed(connection_id: u32, ptr: u32, len: u32) { connection_handles_alive, .. } => *connection_handles_alive, - ConnectionInner::Closed { .. } => unreachable!(), + ConnectionInner::Reset { .. } => unreachable!(), }; - connection.inner = ConnectionInner::Closed { + connection.inner = ConnectionInner::Reset { connection_handles_alive, message: { let ptr = usize::try_from(ptr).unwrap(); @@ -693,16 +697,16 @@ pub(crate) fn connection_closed(connection_id: u32, ptr: u32, len: u32) { .streams .range_mut((connection_id, u32::min_value())..=(connection_id, u32::max_value())) { - stream.closed = true; + stream.reset = true; stream.something_happened.notify(usize::max_value()); } } -pub(crate) fn stream_closed(connection_id: u32, stream_id: u32) { +pub(crate) fn stream_reset(connection_id: u32, stream_id: u32) { // Note that, as documented, it is illegal to call this function on single-stream substreams. // We can thus assume that the `stream_id` is valid. let mut lock = STATE.try_lock().unwrap(); let stream = lock.streams.get_mut(&(connection_id, stream_id)).unwrap(); - stream.closed = true; + stream.reset = true; stream.something_happened.notify(usize::max_value()); } diff --git a/src/libp2p/collection/single_stream.rs b/src/libp2p/collection/single_stream.rs index 9695c5e04d..3f50c776e9 100644 --- a/src/libp2p/collection/single_stream.rs +++ b/src/libp2p/collection/single_stream.rs @@ -482,6 +482,18 @@ where } } + /// Returns `true` if [`SingleStreamConnectionTask::reset`] has been called in the past. + pub fn is_reset_called(&self) -> bool { + matches!( + self.connection, + SingleStreamConnectionTaskInner::ShutdownWaitingAck { + initiator: ShutdownInitiator::Api, + } | SingleStreamConnectionTaskInner::ShutdownAcked { + initiator: ShutdownInitiator::Api, + } + ) + } + /// Reads data coming from the connection, updates the internal state machine, and writes data /// destined to the connection through the [`ReadWrite`]. ///