From aa6f11141b455982d83436235bbbd1f337b3d9e5 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Mon, 13 Feb 2023 10:20:22 +0100 Subject: [PATCH 1/2] client: rename `max_buffer_capacity_per_sub` rename `max_notifs_per_subscription` to `max_buffer_capacity_per_sub` --- client/wasm-client/src/lib.rs | 21 +++++++----- client/ws-client/src/lib.rs | 14 ++++---- client/ws-client/src/tests.rs | 2 +- core/src/client/async_client/mod.rs | 53 ++++++++++++++++++----------- core/src/client/mod.rs | 9 ++--- tests/tests/integration_tests.rs | 9 +++-- 6 files changed, 66 insertions(+), 42 deletions(-) diff --git a/client/wasm-client/src/lib.rs b/client/wasm-client/src/lib.rs index f5322b7110..ced915d322 100644 --- a/client/wasm-client/src/lib.rs +++ b/client/wasm-client/src/lib.rs @@ -63,7 +63,7 @@ use jsonrpsee_core::Error; pub struct WasmClientBuilder { id_kind: IdKind, max_concurrent_requests: usize, - max_notifs_per_subscription: usize, + max_buffer_capacity_per_subscription: usize, max_log_length: u32, request_timeout: Duration, } @@ -74,7 +74,7 @@ impl Default for WasmClientBuilder { id_kind: IdKind::Number, max_log_length: 4096, max_concurrent_requests: 256, - max_notifs_per_subscription: 1024, + max_buffer_capacity_per_subscription: 1024, request_timeout: Duration::from_secs(60), } } @@ -93,9 +93,9 @@ impl WasmClientBuilder { self } - /// See documentation [`ClientBuilder::max_notifs_per_subscription`] (default is 1024). - pub fn max_notifs_per_subscription(mut self, max: usize) -> Self { - self.max_notifs_per_subscription = max; + /// See documentation [`ClientBuilder::max_buffer_capacity_per_subscription`] (default is 1024). + pub fn max_buffer_capacity_per_subscription(mut self, max: usize) -> Self { + self.max_buffer_capacity_per_subscription = max; self } @@ -115,15 +115,20 @@ impl WasmClientBuilder { /// Build the client with specified URL to connect to. pub async fn build(self, url: impl AsRef) -> Result { - let Self { max_log_length, id_kind, request_timeout, max_concurrent_requests, max_notifs_per_subscription } = - self; + let Self { + max_log_length, + id_kind, + request_timeout, + max_concurrent_requests, + max_buffer_capacity_per_subscription, + } = self; let (sender, receiver) = web::connect(url).await.map_err(|e| Error::Transport(e.into()))?; let builder = ClientBuilder::default() .set_max_logging_length(max_log_length) .request_timeout(request_timeout) .id_format(id_kind) - .max_notifs_per_subscription(max_notifs_per_subscription) + .max_buffer_capacity_per_subscription(max_buffer_capacity_per_subscription) .max_concurrent_requests(max_concurrent_requests); Ok(builder.build_with_wasm(sender, receiver)) diff --git a/client/ws-client/src/lib.rs b/client/ws-client/src/lib.rs index 6c4520d4ed..0f1c032a51 100644 --- a/client/ws-client/src/lib.rs +++ b/client/ws-client/src/lib.rs @@ -83,7 +83,7 @@ pub struct WsClientBuilder { ping_interval: Option, headers: http::HeaderMap, max_concurrent_requests: usize, - max_notifs_per_subscription: usize, + max_buffer_capacity_per_subscription: usize, max_redirections: usize, id_kind: IdKind, max_log_length: u32, @@ -100,7 +100,7 @@ impl Default for WsClientBuilder { ping_interval: None, headers: HeaderMap::new(), max_concurrent_requests: 256, - max_notifs_per_subscription: 1024, + max_buffer_capacity_per_subscription: 1024, max_redirections: 5, id_kind: IdKind::Number, max_log_length: 4096, @@ -181,9 +181,9 @@ impl WsClientBuilder { self } - /// See documentation [`ClientBuilder::max_notifs_per_subscription`] (default is 1024). - pub fn max_notifs_per_subscription(mut self, max: usize) -> Self { - self.max_notifs_per_subscription = max; + /// See documentation [`ClientBuilder::max_buffer_capacity_per_subscription`] (default is 1024). + pub fn max_buffer_capacity_per_subscription(mut self, max: usize) -> Self { + self.max_buffer_capacity_per_subscription = max; self } @@ -224,7 +224,7 @@ impl WsClientBuilder { ping_interval, headers, max_redirections, - max_notifs_per_subscription, + max_buffer_capacity_per_subscription, id_kind, max_log_length, } = self; @@ -242,7 +242,7 @@ impl WsClientBuilder { let (sender, receiver) = transport_builder.build(uri).await.map_err(|e| Error::Transport(e.into()))?; let mut client = ClientBuilder::default() - .max_notifs_per_subscription(max_notifs_per_subscription) + .max_buffer_capacity_per_subscription(max_buffer_capacity_per_subscription) .request_timeout(request_timeout) .max_concurrent_requests(max_concurrent_requests) .id_format(id_kind) diff --git a/client/ws-client/src/tests.rs b/client/ws-client/src/tests.rs index 51d518f382..60a5cecf2d 100644 --- a/client/ws-client/src/tests.rs +++ b/client/ws-client/src/tests.rs @@ -201,7 +201,7 @@ async fn notification_without_polling_doesnt_make_client_unuseable() { let uri = to_ws_uri_string(server.local_addr()); let client = WsClientBuilder::default() - .max_notifs_per_subscription(4) + .max_buffer_capacity_per_subscription(4) .build(&uri) .with_default_timeout() .await diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 874cdaef87..48dfe53fb8 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -75,7 +75,7 @@ impl ErrorFromBack { pub struct ClientBuilder { request_timeout: Duration, max_concurrent_requests: usize, - max_notifs_per_subscription: usize, + max_buffer_capacity_per_subscription: usize, id_kind: IdKind, max_log_length: u32, ping_interval: Option, @@ -86,7 +86,7 @@ impl Default for ClientBuilder { Self { request_timeout: Duration::from_secs(60), max_concurrent_requests: 256, - max_notifs_per_subscription: 1024, + max_buffer_capacity_per_subscription: 1024, id_kind: IdKind::Number, max_log_length: 4096, ping_interval: None, @@ -107,7 +107,7 @@ impl ClientBuilder { self } - /// Set max concurrent notification capacity for each subscription; when the capacity is exceeded the subscription + /// Set max buffer capacity for each subscription; when the capacity is exceeded the subscription /// will be dropped (default is 1024). /// /// You may prevent the subscription from being dropped by polling often enough @@ -118,8 +118,8 @@ impl ClientBuilder { /// # Panics /// /// This function panics if `max` is 0. - pub fn max_notifs_per_subscription(mut self, max: usize) -> Self { - self.max_notifs_per_subscription = max; + pub fn max_buffer_capacity_per_subscription(mut self, max: usize) -> Self { + self.max_buffer_capacity_per_subscription = max; self } @@ -168,7 +168,7 @@ impl ClientBuilder { { let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests); let (err_tx, err_rx) = oneshot::channel(); - let max_notifs_per_subscription = self.max_notifs_per_subscription; + let max_buffer_capacity_per_subscription = self.max_buffer_capacity_per_subscription; let ping_interval = self.ping_interval; let (on_exit_tx, on_exit_rx) = oneshot::channel(); @@ -178,7 +178,7 @@ impl ClientBuilder { receiver, from_front, err_tx, - max_notifs_per_subscription, + max_buffer_capacity_per_subscription, ping_interval, on_exit_rx, ) @@ -204,11 +204,20 @@ impl ClientBuilder { { let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests); let (err_tx, err_rx) = oneshot::channel(); - let max_notifs_per_subscription = self.max_notifs_per_subscription; + let max_buffer_capacity_per_subscription = self.max_buffer_capacity_per_subscription; let (on_exit_tx, on_exit_rx) = oneshot::channel(); wasm_bindgen_futures::spawn_local(async move { - background_task(sender, receiver, from_front, err_tx, max_notifs_per_subscription, None, on_exit_rx).await; + background_task( + sender, + receiver, + from_front, + err_tx, + max_buffer_capacity_per_subscription, + None, + on_exit_rx, + ) + .await; }); Client { to_back, @@ -500,14 +509,14 @@ async fn handle_backend_messages( message: Option>, manager: &mut RequestManager, sender: &mut S, - max_notifs_per_subscription: usize, + max_buffer_capacity_per_subscription: usize, ) -> Result<(), Error> { // Handle raw messages of form `ReceivedMessage::Bytes` (Vec) or ReceivedMessage::Data` (String). async fn handle_recv_message( raw: &[u8], manager: &mut RequestManager, sender: &mut S, - max_notifs_per_subscription: usize, + max_buffer_capacity_per_subscription: usize, ) -> Result<(), Error> { let first_non_whitespace = raw.iter().find(|byte| !byte.is_ascii_whitespace()); @@ -515,7 +524,7 @@ async fn handle_backend_messages( Some(b'{') => { // Single response to a request. if let Ok(single) = serde_json::from_slice::>(raw) { - match process_single_response(manager, single, max_notifs_per_subscription) { + match process_single_response(manager, single, max_buffer_capacity_per_subscription) { Ok(Some(unsub)) => { stop_subscription(sender, manager, unsub).await; } @@ -599,10 +608,10 @@ async fn handle_backend_messages( tracing::debug!("Received pong"); } Some(Ok(ReceivedMessage::Bytes(raw))) => { - handle_recv_message(raw.as_ref(), manager, sender, max_notifs_per_subscription).await?; + handle_recv_message(raw.as_ref(), manager, sender, max_buffer_capacity_per_subscription).await?; } Some(Ok(ReceivedMessage::Text(raw))) => { - handle_recv_message(raw.as_ref(), manager, sender, max_notifs_per_subscription).await?; + handle_recv_message(raw.as_ref(), manager, sender, max_buffer_capacity_per_subscription).await?; } Some(Err(e)) => { return Err(Error::Transport(e.into())); @@ -620,7 +629,7 @@ async fn handle_frontend_messages( message: FrontToBack, manager: &mut RequestManager, sender: &mut S, - max_notifs_per_subscription: usize, + max_buffer_capacity_per_subscription: usize, ) { match message { FrontToBack::Batch(batch) => { @@ -678,7 +687,7 @@ async fn handle_frontend_messages( } // User called `register_notification` on the front-end. FrontToBack::RegisterNotification(reg) => { - let (subscribe_tx, subscribe_rx) = mpsc::channel(max_notifs_per_subscription); + let (subscribe_tx, subscribe_rx) = mpsc::channel(max_buffer_capacity_per_subscription); if manager.insert_notification_handler(®.method, subscribe_tx).is_ok() { let _ = reg.send_back.send(Ok((subscribe_rx, reg.method))); @@ -699,7 +708,7 @@ async fn background_task( receiver: R, frontend: mpsc::Receiver, front_error: oneshot::Sender, - max_notifs_per_subscription: usize, + max_buffer_capacity_per_subscription: usize, ping_interval: Option, on_exit: oneshot::Receiver<()>, ) where @@ -746,7 +755,13 @@ async fn background_task( break; }; - handle_frontend_messages(frontend_value, &mut manager, &mut sender, max_notifs_per_subscription).await; + handle_frontend_messages( + frontend_value, + &mut manager, + &mut sender, + max_buffer_capacity_per_subscription, + ) + .await; // Advance frontend, save backend. message_fut = future::select(frontend.next(), backend); @@ -758,7 +773,7 @@ async fn background_task( backend_value, &mut manager, &mut sender, - max_notifs_per_subscription, + max_buffer_capacity_per_subscription, ) .await { diff --git a/core/src/client/mod.rs b/core/src/client/mod.rs index 3ef88968a3..c1e69ff62a 100644 --- a/core/src/client/mod.rs +++ b/core/src/client/mod.rs @@ -38,7 +38,6 @@ use crate::params::BatchRequestBuilder; use crate::traits::ToRpcParams; use async_trait::async_trait; use core::marker::PhantomData; -use futures_util::future::FutureExt; use futures_util::stream::{Stream, StreamExt}; use jsonrpsee_types::{ErrorObject, Id, SubscriptionId}; use serde::de::DeserializeOwned; @@ -216,8 +215,10 @@ pub enum SubscriptionKind { /// Active subscription on the client. /// -/// It will automatically unsubscribe in the [`Subscription::drop`] so no need to explicitly call -/// the `unsubscribe method` if it is an an subscription based on [`SubscriptionId`]. +/// It will try to `unsubscribe` in the drop implementation +/// but it may fail if the underlying buffer is full. +/// Thus, if you want to ensure it's actually unsubscribe then +/// [`Subscription::unsubscribe`] is recommended to use. #[derive(Debug)] pub struct Subscription { /// Channel to send requests to the background task. @@ -381,7 +382,7 @@ impl Drop for Subscription { Some(SubscriptionKind::Subscription(sub_id)) => FrontToBack::SubscriptionClosed(sub_id), None => return, }; - let _ = self.to_back.send(msg).now_or_never(); + let _ = self.to_back.try_send(msg); } } diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index 4d4381d4ec..69fcf7d615 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -207,8 +207,11 @@ async fn ws_subscription_several_clients_with_drop() { let mut clients = Vec::with_capacity(10); for _ in 0..10 { - let client = - WsClientBuilder::default().max_notifs_per_subscription(u32::MAX as usize).build(&server_url).await.unwrap(); + let client = WsClientBuilder::default() + .max_buffer_capacity_per_subscription(u32::MAX as usize) + .build(&server_url) + .await + .unwrap(); let hello_sub: Subscription = client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap(); let foo_sub: Subscription = @@ -254,7 +257,7 @@ async fn ws_subscription_without_polling_doesnt_make_client_unuseable() { let server_addr = server_with_subscription().await; let server_url = format!("ws://{}", server_addr); - let client = WsClientBuilder::default().max_notifs_per_subscription(4).build(&server_url).await.unwrap(); + let client = WsClientBuilder::default().max_buffer_capacity_per_subscription(4).build(&server_url).await.unwrap(); let mut hello_sub: Subscription = client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap(); From 0f9460c134f8db928c6ad7d4f3062ec844ec4f2c Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Mon, 13 Feb 2023 13:22:45 +0100 Subject: [PATCH 2/2] Update core/src/client/mod.rs --- core/src/client/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/client/mod.rs b/core/src/client/mod.rs index c1e69ff62a..a31ce200b6 100644 --- a/core/src/client/mod.rs +++ b/core/src/client/mod.rs @@ -217,7 +217,7 @@ pub enum SubscriptionKind { /// /// It will try to `unsubscribe` in the drop implementation /// but it may fail if the underlying buffer is full. -/// Thus, if you want to ensure it's actually unsubscribe then +/// Thus, if you want to ensure it's actually unsubscribed then /// [`Subscription::unsubscribe`] is recommended to use. #[derive(Debug)] pub struct Subscription {