Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: rename max_notifs_per_subscription to max_buffer_capacity_per_subscription #1012

Merged
merged 2 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions client/wasm-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use jsonrpsee_core::Error;
pub struct WasmClientBuilder {
id_kind: IdKind,
max_concurrent_requests: usize,
max_notifs_per_subscription: usize,
max_buffer_capacity_per_subscription: usize,
max_log_length: u32,
request_timeout: Duration,
}
Expand All @@ -74,7 +74,7 @@ impl Default for WasmClientBuilder {
id_kind: IdKind::Number,
max_log_length: 4096,
max_concurrent_requests: 256,
max_notifs_per_subscription: 1024,
max_buffer_capacity_per_subscription: 1024,
request_timeout: Duration::from_secs(60),
}
}
Expand All @@ -93,9 +93,9 @@ impl WasmClientBuilder {
self
}

/// See documentation [`ClientBuilder::max_notifs_per_subscription`] (default is 1024).
pub fn max_notifs_per_subscription(mut self, max: usize) -> Self {
self.max_notifs_per_subscription = max;
/// See documentation [`ClientBuilder::max_buffer_capacity_per_subscription`] (default is 1024).
pub fn max_buffer_capacity_per_subscription(mut self, max: usize) -> Self {
self.max_buffer_capacity_per_subscription = max;
self
}

Expand All @@ -115,15 +115,20 @@ impl WasmClientBuilder {

/// Build the client with specified URL to connect to.
pub async fn build(self, url: impl AsRef<str>) -> Result<Client, Error> {
let Self { max_log_length, id_kind, request_timeout, max_concurrent_requests, max_notifs_per_subscription } =
self;
let Self {
max_log_length,
id_kind,
request_timeout,
max_concurrent_requests,
max_buffer_capacity_per_subscription,
} = self;
let (sender, receiver) = web::connect(url).await.map_err(|e| Error::Transport(e.into()))?;

let builder = ClientBuilder::default()
.set_max_logging_length(max_log_length)
.request_timeout(request_timeout)
.id_format(id_kind)
.max_notifs_per_subscription(max_notifs_per_subscription)
.max_buffer_capacity_per_subscription(max_buffer_capacity_per_subscription)
.max_concurrent_requests(max_concurrent_requests);

Ok(builder.build_with_wasm(sender, receiver))
Expand Down
14 changes: 7 additions & 7 deletions client/ws-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub struct WsClientBuilder {
ping_interval: Option<Duration>,
headers: http::HeaderMap,
max_concurrent_requests: usize,
max_notifs_per_subscription: usize,
max_buffer_capacity_per_subscription: usize,
max_redirections: usize,
id_kind: IdKind,
max_log_length: u32,
Expand All @@ -100,7 +100,7 @@ impl Default for WsClientBuilder {
ping_interval: None,
headers: HeaderMap::new(),
max_concurrent_requests: 256,
max_notifs_per_subscription: 1024,
max_buffer_capacity_per_subscription: 1024,
max_redirections: 5,
id_kind: IdKind::Number,
max_log_length: 4096,
Expand Down Expand Up @@ -181,9 +181,9 @@ impl WsClientBuilder {
self
}

/// See documentation [`ClientBuilder::max_notifs_per_subscription`] (default is 1024).
pub fn max_notifs_per_subscription(mut self, max: usize) -> Self {
self.max_notifs_per_subscription = max;
/// See documentation [`ClientBuilder::max_buffer_capacity_per_subscription`] (default is 1024).
pub fn max_buffer_capacity_per_subscription(mut self, max: usize) -> Self {
self.max_buffer_capacity_per_subscription = max;
self
}

Expand Down Expand Up @@ -224,7 +224,7 @@ impl WsClientBuilder {
ping_interval,
headers,
max_redirections,
max_notifs_per_subscription,
max_buffer_capacity_per_subscription,
id_kind,
max_log_length,
} = self;
Expand All @@ -242,7 +242,7 @@ impl WsClientBuilder {
let (sender, receiver) = transport_builder.build(uri).await.map_err(|e| Error::Transport(e.into()))?;

let mut client = ClientBuilder::default()
.max_notifs_per_subscription(max_notifs_per_subscription)
.max_buffer_capacity_per_subscription(max_buffer_capacity_per_subscription)
.request_timeout(request_timeout)
.max_concurrent_requests(max_concurrent_requests)
.id_format(id_kind)
Expand Down
2 changes: 1 addition & 1 deletion client/ws-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ async fn notification_without_polling_doesnt_make_client_unuseable() {

let uri = to_ws_uri_string(server.local_addr());
let client = WsClientBuilder::default()
.max_notifs_per_subscription(4)
.max_buffer_capacity_per_subscription(4)
.build(&uri)
.with_default_timeout()
.await
Expand Down
53 changes: 34 additions & 19 deletions core/src/client/async_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl ErrorFromBack {
pub struct ClientBuilder {
request_timeout: Duration,
max_concurrent_requests: usize,
max_notifs_per_subscription: usize,
max_buffer_capacity_per_subscription: usize,
id_kind: IdKind,
max_log_length: u32,
ping_interval: Option<Duration>,
Expand All @@ -86,7 +86,7 @@ impl Default for ClientBuilder {
Self {
request_timeout: Duration::from_secs(60),
max_concurrent_requests: 256,
max_notifs_per_subscription: 1024,
max_buffer_capacity_per_subscription: 1024,
id_kind: IdKind::Number,
max_log_length: 4096,
ping_interval: None,
Expand All @@ -107,7 +107,7 @@ impl ClientBuilder {
self
}

/// Set max concurrent notification capacity for each subscription; when the capacity is exceeded the subscription
/// Set max buffer capacity for each subscription; when the capacity is exceeded the subscription
/// will be dropped (default is 1024).
///
/// You may prevent the subscription from being dropped by polling often enough
Expand All @@ -118,8 +118,8 @@ impl ClientBuilder {
/// # Panics
///
/// This function panics if `max` is 0.
pub fn max_notifs_per_subscription(mut self, max: usize) -> Self {
self.max_notifs_per_subscription = max;
pub fn max_buffer_capacity_per_subscription(mut self, max: usize) -> Self {
self.max_buffer_capacity_per_subscription = max;
self
}

Expand Down Expand Up @@ -168,7 +168,7 @@ impl ClientBuilder {
{
let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests);
let (err_tx, err_rx) = oneshot::channel();
let max_notifs_per_subscription = self.max_notifs_per_subscription;
let max_buffer_capacity_per_subscription = self.max_buffer_capacity_per_subscription;
let ping_interval = self.ping_interval;
let (on_exit_tx, on_exit_rx) = oneshot::channel();

Expand All @@ -178,7 +178,7 @@ impl ClientBuilder {
receiver,
from_front,
err_tx,
max_notifs_per_subscription,
max_buffer_capacity_per_subscription,
ping_interval,
on_exit_rx,
)
Expand All @@ -204,11 +204,20 @@ impl ClientBuilder {
{
let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests);
let (err_tx, err_rx) = oneshot::channel();
let max_notifs_per_subscription = self.max_notifs_per_subscription;
let max_buffer_capacity_per_subscription = self.max_buffer_capacity_per_subscription;
let (on_exit_tx, on_exit_rx) = oneshot::channel();

wasm_bindgen_futures::spawn_local(async move {
background_task(sender, receiver, from_front, err_tx, max_notifs_per_subscription, None, on_exit_rx).await;
background_task(
sender,
receiver,
from_front,
err_tx,
max_buffer_capacity_per_subscription,
None,
on_exit_rx,
)
.await;
});
Client {
to_back,
Expand Down Expand Up @@ -500,22 +509,22 @@ async fn handle_backend_messages<S: TransportSenderT, R: TransportReceiverT>(
message: Option<Result<ReceivedMessage, R::Error>>,
manager: &mut RequestManager,
sender: &mut S,
max_notifs_per_subscription: usize,
max_buffer_capacity_per_subscription: usize,
) -> Result<(), Error> {
// Handle raw messages of form `ReceivedMessage::Bytes` (Vec<u8>) or ReceivedMessage::Data` (String).
async fn handle_recv_message<S: TransportSenderT>(
raw: &[u8],
manager: &mut RequestManager,
sender: &mut S,
max_notifs_per_subscription: usize,
max_buffer_capacity_per_subscription: usize,
) -> Result<(), Error> {
let first_non_whitespace = raw.iter().find(|byte| !byte.is_ascii_whitespace());

match first_non_whitespace {
Some(b'{') => {
// Single response to a request.
if let Ok(single) = serde_json::from_slice::<Response<_>>(raw) {
match process_single_response(manager, single, max_notifs_per_subscription) {
match process_single_response(manager, single, max_buffer_capacity_per_subscription) {
Ok(Some(unsub)) => {
stop_subscription(sender, manager, unsub).await;
}
Expand Down Expand Up @@ -599,10 +608,10 @@ async fn handle_backend_messages<S: TransportSenderT, R: TransportReceiverT>(
tracing::debug!("Received pong");
}
Some(Ok(ReceivedMessage::Bytes(raw))) => {
handle_recv_message(raw.as_ref(), manager, sender, max_notifs_per_subscription).await?;
handle_recv_message(raw.as_ref(), manager, sender, max_buffer_capacity_per_subscription).await?;
}
Some(Ok(ReceivedMessage::Text(raw))) => {
handle_recv_message(raw.as_ref(), manager, sender, max_notifs_per_subscription).await?;
handle_recv_message(raw.as_ref(), manager, sender, max_buffer_capacity_per_subscription).await?;
}
Some(Err(e)) => {
return Err(Error::Transport(e.into()));
Expand All @@ -620,7 +629,7 @@ async fn handle_frontend_messages<S: TransportSenderT>(
message: FrontToBack,
manager: &mut RequestManager,
sender: &mut S,
max_notifs_per_subscription: usize,
max_buffer_capacity_per_subscription: usize,
) {
match message {
FrontToBack::Batch(batch) => {
Expand Down Expand Up @@ -678,7 +687,7 @@ async fn handle_frontend_messages<S: TransportSenderT>(
}
// User called `register_notification` on the front-end.
FrontToBack::RegisterNotification(reg) => {
let (subscribe_tx, subscribe_rx) = mpsc::channel(max_notifs_per_subscription);
let (subscribe_tx, subscribe_rx) = mpsc::channel(max_buffer_capacity_per_subscription);

if manager.insert_notification_handler(&reg.method, subscribe_tx).is_ok() {
let _ = reg.send_back.send(Ok((subscribe_rx, reg.method)));
Expand All @@ -699,7 +708,7 @@ async fn background_task<S, R>(
receiver: R,
frontend: mpsc::Receiver<FrontToBack>,
front_error: oneshot::Sender<Error>,
max_notifs_per_subscription: usize,
max_buffer_capacity_per_subscription: usize,
ping_interval: Option<Duration>,
on_exit: oneshot::Receiver<()>,
) where
Expand Down Expand Up @@ -746,7 +755,13 @@ async fn background_task<S, R>(
break;
};

handle_frontend_messages(frontend_value, &mut manager, &mut sender, max_notifs_per_subscription).await;
handle_frontend_messages(
frontend_value,
&mut manager,
&mut sender,
max_buffer_capacity_per_subscription,
)
.await;

// Advance frontend, save backend.
message_fut = future::select(frontend.next(), backend);
Expand All @@ -758,7 +773,7 @@ async fn background_task<S, R>(
backend_value,
&mut manager,
&mut sender,
max_notifs_per_subscription,
max_buffer_capacity_per_subscription,
)
.await
{
Expand Down
9 changes: 5 additions & 4 deletions core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use crate::params::BatchRequestBuilder;
use crate::traits::ToRpcParams;
use async_trait::async_trait;
use core::marker::PhantomData;
use futures_util::future::FutureExt;
use futures_util::stream::{Stream, StreamExt};
use jsonrpsee_types::{ErrorObject, Id, SubscriptionId};
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -216,8 +215,10 @@ pub enum SubscriptionKind {

/// Active subscription on the client.
///
/// It will automatically unsubscribe in the [`Subscription::drop`] so no need to explicitly call
/// the `unsubscribe method` if it is an an subscription based on [`SubscriptionId`].
/// It will try to `unsubscribe` in the drop implementation
/// but it may fail if the underlying buffer is full.
/// Thus, if you want to ensure it's actually unsubscribed then
/// [`Subscription::unsubscribe`] is recommended to use.
#[derive(Debug)]
pub struct Subscription<Notif> {
/// Channel to send requests to the background task.
Expand Down Expand Up @@ -381,7 +382,7 @@ impl<Notif> Drop for Subscription<Notif> {
Some(SubscriptionKind::Subscription(sub_id)) => FrontToBack::SubscriptionClosed(sub_id),
None => return,
};
let _ = self.to_back.send(msg).now_or_never();
let _ = self.to_back.try_send(msg);
}
}

Expand Down
9 changes: 6 additions & 3 deletions tests/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,11 @@ async fn ws_subscription_several_clients_with_drop() {

let mut clients = Vec::with_capacity(10);
for _ in 0..10 {
let client =
WsClientBuilder::default().max_notifs_per_subscription(u32::MAX as usize).build(&server_url).await.unwrap();
let client = WsClientBuilder::default()
.max_buffer_capacity_per_subscription(u32::MAX as usize)
.build(&server_url)
.await
.unwrap();
let hello_sub: Subscription<String> =
client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap();
let foo_sub: Subscription<u64> =
Expand Down Expand Up @@ -254,7 +257,7 @@ async fn ws_subscription_without_polling_doesnt_make_client_unuseable() {
let server_addr = server_with_subscription().await;
let server_url = format!("ws://{}", server_addr);

let client = WsClientBuilder::default().max_notifs_per_subscription(4).build(&server_url).await.unwrap();
let client = WsClientBuilder::default().max_buffer_capacity_per_subscription(4).build(&server_url).await.unwrap();
let mut hello_sub: Subscription<JsonValue> =
client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap();

Expand Down