Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: bounded channels and backpressure #962

Merged
merged 68 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
1f2bd33
bounded channels
niklasad1 Dec 5, 2022
cf1b122
remove bounded subscriptions
niklasad1 Dec 7, 2022
5d67a41
remove resource limiting
niklasad1 Dec 7, 2022
cfa48ee
kill connection once message tx fails
niklasad1 Jan 4, 2023
e103908
switch to tokio::mpsc
niklasad1 Jan 5, 2023
44469eb
Merge remote-tracking branch 'origin/master' into na-server-bounded-c…
niklasad1 Jan 5, 2023
b320766
fix nits
niklasad1 Jan 9, 2023
f3a41d9
make futures_channel hard dependency
niklasad1 Jan 9, 2023
69ac4c4
add real backpressure to rx
niklasad1 Jan 9, 2023
d11ec7e
PoC with crossbeam queue
niklasad1 Jan 10, 2023
6ec941d
remove pipe_from_stream
niklasad1 Jan 13, 2023
2599b8f
bring back Pending and SubscriptionSink again
niklasad1 Jan 16, 2023
1254080
more refactoring
niklasad1 Jan 17, 2023
72f0b7d
add example of old APIs
niklasad1 Jan 17, 2023
ede6f4a
introduce opaque SubscriptionMessage
niklasad1 Jan 17, 2023
64e7da6
feat: make subscription callbacks async
niklasad1 Jan 25, 2023
a77a66a
fix tests
niklasad1 Jan 26, 2023
7e306b3
move non-jsonrpc spec types from types
niklasad1 Jan 26, 2023
1bb875e
Merge remote-tracking branch 'origin/master' into na-server-bounded-c…
niklasad1 Jan 26, 2023
a9002c9
fix nits
niklasad1 Jan 26, 2023
f3d19a5
improve docs
niklasad1 Jan 26, 2023
d6c0996
add pipe_from_stream APIs back
niklasad1 Jan 26, 2023
ec35e8f
cleanup
niklasad1 Jan 27, 2023
c6efb43
Update core/src/server/helpers.rs
niklasad1 Jan 27, 2023
a252c07
Update server/src/server.rs
niklasad1 Jan 27, 2023
78298cd
more cleanup
niklasad1 Jan 27, 2023
7214a5d
Merge remote-tracking branch 'origin/na-server-bounded-channels' into…
niklasad1 Jan 27, 2023
1001603
Update core/src/server/helpers.rs
niklasad1 Jan 30, 2023
701ff75
small fixes
niklasad1 Jan 30, 2023
7458334
Merge remote-tracking branch 'origin/master' into na-server-bounded-c…
niklasad1 Jan 31, 2023
f553649
Merge remote-tracking branch 'origin/master' into na-server-bounded-c…
niklasad1 Feb 1, 2023
93623e5
rpc module: add unit test for backpressure
niklasad1 Feb 2, 2023
91a58f8
Merge remote-tracking branch 'origin/master' into na-server-bounded-c…
niklasad1 Feb 2, 2023
f63c3ff
doc fixes
niklasad1 Feb 2, 2023
8fa7172
fix more nits
niklasad1 Feb 2, 2023
6abfcba
refactor: pipe_from_stream
niklasad1 Feb 3, 2023
2227a7b
fix examples: revert unintentional change
niklasad1 Feb 3, 2023
17a98ee
address grumbles
niklasad1 Feb 3, 2023
9a4cc19
revert: don't require subscriptions to return Result
niklasad1 Feb 3, 2023
109e930
Update core/src/server/helpers.rs
niklasad1 Feb 3, 2023
1487383
grumbles: simplify PendingSubscription
niklasad1 Feb 6, 2023
f468f03
grumbles: fix doc nits
niklasad1 Feb 6, 2023
01a5679
remove pipe_from_stream APIs again
niklasad1 Feb 6, 2023
62496e2
add backpressure test for ws server
niklasad1 Feb 7, 2023
dc37776
rpc module: add `send_timeout` APIs
niklasad1 Feb 7, 2023
3c98441
rpc module: add tokio/time
niklasad1 Feb 7, 2023
8e68156
cleanup
niklasad1 Feb 7, 2023
0226258
Update examples/Cargo.toml
niklasad1 Feb 7, 2023
2024cad
Update server/src/server.rs
niklasad1 Feb 7, 2023
898fcd3
Update server/src/server.rs
niklasad1 Feb 7, 2023
d306593
Update server/src/server.rs
niklasad1 Feb 7, 2023
5a03761
extract `build_message` to `SubscriptionMessage`
niklasad1 Feb 7, 2023
332d182
remove resource limiting leftover
niklasad1 Feb 7, 2023
748b9ac
Merge remote-tracking branch 'origin/na-server-bounded-channels' into…
niklasad1 Feb 8, 2023
e2c07e9
Update core/src/server/rpc_module.rs
niklasad1 Feb 8, 2023
935da72
Update examples/examples/ws_pubsub_broadcast.rs
niklasad1 Feb 8, 2023
ade35a7
Update examples/examples/ws_pubsub_broadcast.rs
niklasad1 Feb 8, 2023
bb89d00
revert unintentional change
niklasad1 Feb 8, 2023
4e95865
Merge remote-tracking branch 'origin/na-server-bounded-channels' into…
niklasad1 Feb 8, 2023
5aae651
Update examples/examples/ws_pubsub_with_params.rs
niklasad1 Feb 8, 2023
9b0d2ef
fix more nits
niklasad1 Feb 8, 2023
d25062b
improve SubscriptionEmptyErr
niklasad1 Feb 8, 2023
d3cbf69
clippy --fix
niklasad1 Feb 8, 2023
1f1148b
bring back subscription limit
niklasad1 Feb 8, 2023
fce6d45
server: `set_message_buffer_capacity`
niklasad1 Feb 8, 2023
c141dbf
rpc module: revert raw_json_request API
niklasad1 Feb 8, 2023
1a18f9b
subscribe_bounded -> subscribe
niklasad1 Feb 8, 2023
039a0d1
CallResponse -> CallOrSubscription
niklasad1 Feb 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions benches/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,13 @@ pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::se
let mut module = gen_rpc_module();

module
.register_subscription(SUB_METHOD_NAME, SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, mut sink, _ctx| {
.register_subscription(SUB_METHOD_NAME, SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, pending, _ctx| {
let x = "Hello";
tokio::spawn(async move { sink.send(&x) });
tokio::spawn(async move {
let sink = pending.accept().await.unwrap();
let msg = sink.build_message(&x).unwrap();
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
sink.send(msg).await.unwrap();
});
Ok(())
})
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ server = [
"tokio/rt",
"tokio/sync",
]
client = ["futures-util/sink", "futures-channel/sink", "futures-channel/std"]
client = ["futures-util/sink", "futures-channel/sink"]
async-client = [
"async-lock",
"client",
Expand Down
15 changes: 0 additions & 15 deletions core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,21 +111,6 @@ pub enum Error {
/// Access control verification of HTTP headers failed.
#[error("HTTP header: `{0}` value: `{1}` verification failed")]
HttpHeaderRejected(&'static str, String),
/// Failed to execute a method because a resource was already at capacity
#[error("Resource at capacity: {0}")]
ResourceAtCapacity(&'static str),
/// Failed to register a resource due to a name conflict
#[error("Resource name already taken: {0}")]
ResourceNameAlreadyTaken(&'static str),
/// Failed to initialize resources for a method at startup
#[error("Resource name `{0}` not found for method `{1}`")]
ResourceNameNotFoundForMethod(&'static str, &'static str),
/// Trying to claim resources for a method execution, but the method resources have not been initialized
#[error("Method `{0}` has uninitialized resources")]
UninitializedMethod(Box<str>),
/// Failed to register a resource due to a maximum number of resources already registered
#[error("Maximum number of resources reached")]
MaxResourcesReached,
/// Custom error.
#[error("Custom error: {0}")]
Custom(String),
Expand Down
157 changes: 55 additions & 102 deletions core/src/server/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@
// DEALINGS IN THE SOFTWARE.

use std::io;
use std::sync::Arc;

use crate::tracing::tx_log_from_str;
use crate::Error;
use futures_channel::mpsc;
use jsonrpsee_types::error::{ErrorCode, ErrorObject, ErrorResponse, OVERSIZED_RESPONSE_CODE, OVERSIZED_RESPONSE_MSG};
use jsonrpsee_types::{Id, InvalidRequest, Response};
use serde::Serialize;
use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore};
use tokio::sync::mpsc::{self, Permit};

use super::rpc_module::{DisconnectError, TrySendError};

/// Bounded writer that allows writing at most `max_len` bytes.
///
Expand Down Expand Up @@ -84,7 +84,7 @@ impl<'a> io::Write for &'a mut BoundedWriter {
#[derive(Clone, Debug)]
pub struct MethodSink {
/// Channel sender.
tx: mpsc::UnboundedSender<String>,
pub tx: mpsc::Sender<String>,
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
/// Max response size in bytes for a executed call.
max_response_size: u32,
/// Max log length.
Expand All @@ -93,12 +93,12 @@ pub struct MethodSink {

impl MethodSink {
/// Create a new `MethodSink` with unlimited response size.
pub fn new(tx: mpsc::UnboundedSender<String>) -> Self {
pub fn new(tx: mpsc::Sender<String>) -> Self {
MethodSink { tx, max_response_size: u32::MAX, max_log_length: u32::MAX }
}

/// Create a new `MethodSink` with a limited response size.
pub fn new_with_limit(tx: mpsc::UnboundedSender<String>, max_response_size: u32, max_log_length: u32) -> Self {
pub fn new_with_limit(tx: mpsc::Sender<String>, max_response_size: u32, max_log_length: u32) -> Self {
MethodSink { tx, max_response_size, max_log_length }
}

Expand All @@ -107,47 +107,68 @@ impl MethodSink {
self.tx.is_closed()
}

/// Send a JSON-RPC error to the client
pub fn send_error(&self, id: Id, error: ErrorObject) -> bool {
let json = match serde_json::to_string(&ErrorResponse::borrowed(error, id)) {
Ok(json) => json,
Err(err) => {
tracing::error!("Error serializing response: {:?}", err);
/// Same as [`tokio::sync::mpsc::Sender::closed`].
///
/// # Cancel safety
/// This method is cancel safe. Once the channel is closed,
/// it stays closed forever and all future calls to closed will return immediately.
pub async fn closed(&self) {
self.tx.closed().await
}

return false;
}
};
/// Get the max response size.
pub const fn max_response_size(&self) -> u32 {
lexnv marked this conversation as resolved.
Show resolved Hide resolved
self.max_response_size
}

tx_log_from_str(&json, self.max_log_length);
/// Non-blocking send which fails if the channel is closed or full
///
/// Returns the message if the send fails such that either can be thrown away or re-sent later.
pub fn try_send(&mut self, msg: String) -> Result<(), TrySendError> {
tx_log_from_str(&msg, self.max_log_length);
self.tx.try_send(msg)
}

if let Err(err) = self.send_raw(json) {
tracing::warn!("Error sending response {:?}", err);
false
} else {
true
/// Async send which will wait until there is space in channel buffer or that the subscription is disconnected.
pub async fn send(&self, msg: String) -> Result<(), DisconnectError<String>> {
tx_log_from_str(&msg, self.max_log_length);
self.tx.send(msg).await.map_err(Into::into)
}

/// Waits for channel capacity. Once capacity to send one message is available, it is reserved for the caller.
pub async fn reserve(&self) -> Result<MethodSinkPermit, DisconnectError<()>> {
match self.tx.reserve().await {
Ok(permit) => Ok(MethodSinkPermit { tx: permit, max_log_length: self.max_log_length }),
Err(e) => Err(e.into()),
}
}
}

/// A method sink with reserved spot in the bounded queue.
#[derive(Debug)]
pub struct MethodSinkPermit<'a> {
tx: Permit<'a, String>,
max_log_length: u32,
}

impl<'a> MethodSinkPermit<'a> {
/// Send a JSON-RPC error to the client
pub fn send_error(self, id: Id, error: ErrorObject) {
let json = serde_json::to_string(&ErrorResponse::borrowed(error, id)).expect("valid JSON; qed");

self.send_raw(json)
}

/// Helper for sending the general purpose `Error` as a JSON-RPC errors to the client.
pub fn send_call_error(&self, id: Id, err: Error) -> bool {
pub fn send_call_error(self, id: Id, err: Error) {
self.send_error(id, err.into())
}

/// Send a raw JSON-RPC message to the client, `MethodSink` does not check verify the validity
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
/// of the JSON being sent.
pub fn send_raw(&self, json: String) -> Result<(), mpsc::TrySendError<String>> {
pub fn send_raw(self, json: String) {
self.tx.send(json.clone());
tx_log_from_str(&json, self.max_log_length);
self.tx.unbounded_send(json)
}

/// Close the channel for any further messages.
pub fn close(&self) {
self.tx.close_channel();
}

/// Get the maximum number of permitted subscriptions.
pub const fn max_response_size(&self) -> u32 {
self.max_response_size
}
}

Expand All @@ -160,59 +181,6 @@ pub fn prepare_error(data: &[u8]) -> (Id<'_>, ErrorCode) {
}
}

/// A permitted subscription.
#[derive(Debug)]
pub struct SubscriptionPermit {
_permit: OwnedSemaphorePermit,
resource: Arc<Notify>,
}

impl SubscriptionPermit {
/// Get the handle to [`tokio::sync::Notify`].
pub fn handle(&self) -> Arc<Notify> {
self.resource.clone()
}
}

/// Wrapper over [`tokio::sync::Notify`] with bounds check.
#[derive(Debug, Clone)]
pub struct BoundedSubscriptions {
resource: Arc<Notify>,
guard: Arc<Semaphore>,
max: u32,
}

impl BoundedSubscriptions {
/// Create a new bounded subscription.
pub fn new(max_subscriptions: u32) -> Self {
Self {
resource: Arc::new(Notify::new()),
guard: Arc::new(Semaphore::new(max_subscriptions as usize)),
max: max_subscriptions,
}
}

/// Attempts to acquire a subscription slot.
///
/// Fails if `max_subscriptions` have been exceeded.
pub fn acquire(&self) -> Option<SubscriptionPermit> {
Arc::clone(&self.guard)
.try_acquire_owned()
.ok()
.map(|p| SubscriptionPermit { _permit: p, resource: self.resource.clone() })
}

/// Get the maximum number of permitted subscriptions.
pub const fn max(&self) -> u32 {
self.max
}

/// Close all subscriptions.
pub fn close(&self) {
self.resource.notify_waiters();
}
}

/// Represent the response to method call.
#[derive(Debug, Clone)]
pub struct MethodResponse {
Expand Down Expand Up @@ -331,7 +299,6 @@ impl BatchResponse {

#[cfg(test)]
mod tests {
use crate::server::helpers::BoundedSubscriptions;

use super::{BatchResponseBuilder, BoundedWriter, Id, MethodResponse, Response};

Expand All @@ -351,20 +318,6 @@ mod tests {
assert!(serde_json::to_writer(&mut writer, &"x".repeat(99)).is_err());
}

#[test]
fn bounded_subscriptions_work() {
let subs = BoundedSubscriptions::new(5);
let mut handles = Vec::new();

for _ in 0..5 {
handles.push(subs.acquire().unwrap());
}

assert!(subs.acquire().is_none());
handles.swap_remove(0);
assert!(subs.acquire().is_some());
}

#[test]
fn batch_with_single_works() {
let method = MethodResponse::response(Id::Number(1), "a", usize::MAX);
Expand Down
2 changes: 0 additions & 2 deletions core/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,5 @@
pub mod helpers;
/// Host filtering.
pub mod host_filtering;
/// Resource limiting. Create generic "resources" and configure their limits to ensure servers are not overloaded.
pub mod resource_limiting;
/// JSON-RPC "modules" group sets of methods that belong together and handles method/subscription registration.
pub mod rpc_module;
Loading