Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: async API when Response has been processed. #1281

Merged
merged 29 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
4773227
feat: add `RpcModule::register_raw_method`
niklasad1 Jan 30, 2024
ee9b8ea
add proc macro support
niklasad1 Jan 30, 2024
b595e5f
rename API
niklasad1 Jan 30, 2024
304c938
simplify API with MethodResponse::notify_when_sent
niklasad1 Jan 31, 2024
973e62d
improve notify API
niklasad1 Jan 31, 2024
90da71b
fix nits
niklasad1 Jan 31, 2024
53520d8
introduce ResponsePayloadV2
niklasad1 Jan 31, 2024
f71d046
impl ResponsePayloadV2 for T
niklasad1 Feb 1, 2024
dc88a6d
cleanup
niklasad1 Feb 1, 2024
51ce17e
client: proc macro support for custom ret_ty
niklasad1 Feb 1, 2024
ffea2ca
add tests
niklasad1 Feb 2, 2024
f549bf4
address grumbles
niklasad1 Feb 2, 2024
2f1fdd8
remove unused code
niklasad1 Feb 2, 2024
0c98f5c
fix tests
niklasad1 Feb 3, 2024
db7e08c
proc: revert unrelated changes
niklasad1 Feb 5, 2024
925108a
remove panics; move should be enough
niklasad1 Feb 5, 2024
d256661
bring back UI tests
niklasad1 Feb 5, 2024
0eba672
grumbles: remove NotifiedError
niklasad1 Feb 5, 2024
8391579
break stuff for uniform API
niklasad1 Feb 5, 2024
f4b5a92
make more stuff private
niklasad1 Feb 5, 2024
e835a72
remove ResponseErrorUnit type alias
niklasad1 Feb 5, 2024
7f93b20
fix ui tests
niklasad1 Feb 5, 2024
7006a00
Update proc-macros/src/render_server.rs
niklasad1 Feb 5, 2024
487df31
Rename ws_notify_on_method_answered.rs to response_payload_notify_on_…
niklasad1 Feb 5, 2024
f9d7fc3
Merge remote-tracking branch 'origin/master' into low-level-api-v1
niklasad1 Feb 6, 2024
da41c62
remove unit_error APIs
niklasad1 Feb 6, 2024
a83c766
Merge remote-tracking branch 'origin/low-level-api-v1' into low-level…
niklasad1 Feb 6, 2024
802f77f
replace notify_on_x with notify_on_completion
niklasad1 Feb 6, 2024
2836906
Update server/src/transport/ws.rs
niklasad1 Feb 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 4 additions & 265 deletions core/src/server/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,9 @@
use std::io;
use std::time::Duration;

use jsonrpsee_types::error::{
reject_too_big_batch_response, ErrorCode, ErrorObject, OVERSIZED_RESPONSE_CODE, OVERSIZED_RESPONSE_MSG,
};
use jsonrpsee_types::{Id, InvalidRequest, Response, ResponsePayload};
use serde::Serialize;
use serde_json::value::to_raw_value;
use jsonrpsee_types::{ErrorCode, ErrorObject, Id, InvalidRequest, Response, ResponsePayload};
use tokio::sync::mpsc;

use crate::server::LOG_TARGET;

use super::{DisconnectError, SendTimeoutError, SubscriptionMessage, TrySendError};

/// Bounded writer that allows writing at most `max_len` bytes.
Expand Down Expand Up @@ -100,7 +93,7 @@ impl MethodSink {
}

/// Create a new `MethodSink` with a limited response size.
pub fn new_with_limit(tx: mpsc::Sender<String>, max_response_size: u32 ) -> Self {
pub fn new_with_limit(tx: mpsc::Sender<String>, max_response_size: u32) -> Self {
MethodSink { tx, max_response_size }
}

Expand Down Expand Up @@ -169,216 +162,10 @@ pub fn prepare_error(data: &[u8]) -> (Id<'_>, ErrorCode) {
}
}

/// Represents a response to a method call.
///
/// NOTE: A subscription is also a method call but it's
/// possible determine whether a method response
/// is "subscription" or "ordinary method call"
/// by calling [`MethodResponse::is_subscription`]
#[derive(Debug, Clone)]
pub struct MethodResponse {
/// Serialized JSON-RPC response,
pub result: String,
/// Indicates whether the call was successful or not.
pub success_or_error: MethodResponseResult,
/// Indicates whether the call was a subscription response.
pub is_subscription: bool,
}

impl MethodResponse {
/// Returns whether the call was successful.
pub fn is_success(&self) -> bool {
self.success_or_error.is_success()
}

/// Returns whether the call failed.
pub fn is_error(&self) -> bool {
self.success_or_error.is_success()
}

/// Returns whether the call is a subscription.
pub fn is_subscription(&self) -> bool {
self.is_subscription
}
}

/// Represent the outcome of a method call success or failed.
#[derive(Debug, Copy, Clone)]
pub enum MethodResponseResult {
/// The method call was successful.
Success,
/// The method call failed with error code.
Failed(i32),
}

impl MethodResponseResult {
/// Returns whether the call was successful.
pub fn is_success(&self) -> bool {
matches!(self, MethodResponseResult::Success)
}

/// Returns whether the call failed.
pub fn is_error(&self) -> bool {
matches!(self, MethodResponseResult::Failed(_))
}

/// Get the error code
///
/// Returns `Some(error code)` if the call failed.
pub fn as_error_code(&self) -> Option<i32> {
match self {
Self::Failed(e) => Some(*e),
_ => None,
}
}
}

impl MethodResponse {
/// This is similar to [`MethodResponse::response`] but sets a flag to indicate
/// that response is a subscription.
pub fn subscription_response<T>(id: Id, result: ResponsePayload<T>, max_response_size: usize) -> Self
where
T: Serialize + Clone,
{
let mut rp = Self::response(id, result, max_response_size);
rp.is_subscription = true;
rp
}

/// Create a new method response.
///
/// If the serialization of `result` exceeds `max_response_size` then
/// the response is changed to an JSON-RPC error object.
pub fn response<T>(id: Id, result: ResponsePayload<T>, max_response_size: usize) -> Self
where
T: Serialize + Clone,
{
let mut writer = BoundedWriter::new(max_response_size);

let success_or_error = if let ResponsePayload::Error(ref e) = result {
MethodResponseResult::Failed(e.code())
} else {
MethodResponseResult::Success
};

match serde_json::to_writer(&mut writer, &Response::new(result, id.clone())) {
Ok(_) => {
// Safety - serde_json does not emit invalid UTF-8.
let result = unsafe { String::from_utf8_unchecked(writer.into_bytes()) };

Self { result, success_or_error, is_subscription: false }
}
Err(err) => {
tracing::error!(target: LOG_TARGET, "Error serializing response: {:?}", err);

if err.is_io() {
let data = to_raw_value(&format!("Exceeded max limit of {max_response_size}")).ok();
let err_code = OVERSIZED_RESPONSE_CODE;

let err = ResponsePayload::error_borrowed(ErrorObject::borrowed(
err_code,
OVERSIZED_RESPONSE_MSG,
data.as_deref(),
));
let result =
serde_json::to_string(&Response::new(err, id)).expect("JSON serialization infallible; qed");

Self { result, success_or_error: MethodResponseResult::Failed(err_code), is_subscription: false }
} else {
let err_code = ErrorCode::InternalError;
let result = serde_json::to_string(&Response::new(err_code.into(), id))
.expect("JSON serialization infallible; qed");
Self {
result,
success_or_error: MethodResponseResult::Failed(err_code.code()),
is_subscription: false,
}
}
}
}
}

/// This is similar to [`MethodResponse::error`] but sets a flag to indicate
/// that error is a subscription.
pub fn subscription_error<'a>(id: Id, err: impl Into<ErrorObject<'a>>) -> Self {
let mut rp = Self::error(id, err);
rp.is_subscription = true;
rp
}

/// Create a [`MethodResponse`] from a JSON-RPC error.
pub fn error<'a>(id: Id, err: impl Into<ErrorObject<'a>>) -> Self {
let err: ErrorObject = err.into();
let err_code = err.code();
let err = ResponsePayload::error_borrowed(err);
let result = serde_json::to_string(&Response::new(err, id)).expect("JSON serialization infallible; qed");
Self { result, success_or_error: MethodResponseResult::Failed(err_code), is_subscription: false }
}
}

/// Builder to build a `BatchResponse`.
#[derive(Debug, Clone, Default)]
pub struct BatchResponseBuilder {
/// Serialized JSON-RPC response,
result: String,
/// Max limit for the batch
max_response_size: usize,
}

impl BatchResponseBuilder {
/// Create a new batch response builder with limit.
pub fn new_with_limit(limit: usize) -> Self {
let mut initial = String::with_capacity(2048);
initial.push('[');

Self { result: initial, max_response_size: limit }
}

/// Append a result from an individual method to the batch response.
///
/// Fails if the max limit is exceeded and returns to error response to
/// return early in order to not process method call responses which are thrown away anyway.
pub fn append(&mut self, response: &MethodResponse) -> Result<(), MethodResponse> {
// `,` will occupy one extra byte for each entry
// on the last item the `,` is replaced by `]`.
let len = response.result.len() + self.result.len() + 1;

if len > self.max_response_size {
Err(MethodResponse::error(Id::Null, reject_too_big_batch_response(self.max_response_size)))
} else {
self.result.push_str(&response.result);
self.result.push(',');
Ok(())
}
}

/// Check if the batch is empty.
pub fn is_empty(&self) -> bool {
self.result.len() <= 1
}

/// Finish the batch response
pub fn finish(mut self) -> String {
if self.result.len() == 1 {
batch_response_error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest))
} else {
self.result.pop();
self.result.push(']');
self.result
}
}
}

/// Create a JSON-RPC error response.
pub fn batch_response_error(id: Id, err: impl Into<ErrorObject<'static>>) -> String {
let err = ResponsePayload::error_borrowed(err);
serde_json::to_string(&Response::new(err, id)).expect("JSON serialization infallible; qed")
}

#[cfg(test)]
mod tests {
use super::{BatchResponseBuilder, BoundedWriter, Id, MethodResponse, Response};
use jsonrpsee_types::ResponsePayload;
use crate::server::BoundedWriter;
use jsonrpsee_types::{Id, Response, ResponsePayload};

#[test]
fn bounded_serializer_work() {
Expand All @@ -396,52 +183,4 @@ mod tests {
// NOTE: `"` is part of the serialization so 101 characters.
assert!(serde_json::to_writer(&mut writer, &"x".repeat(99)).is_err());
}

#[test]
fn batch_with_single_works() {
let method = MethodResponse::response(Id::Number(1), ResponsePayload::result_borrowed(&"a"), usize::MAX);
assert_eq!(method.result.len(), 37);

// Recall a batch appends two bytes for the `[]`.
let mut builder = BatchResponseBuilder::new_with_limit(39);
builder.append(&method).unwrap();
let batch = builder.finish();

assert_eq!(batch, r#"[{"jsonrpc":"2.0","result":"a","id":1}]"#)
}

#[test]
fn batch_with_multiple_works() {
let m1 = MethodResponse::response(Id::Number(1), ResponsePayload::result_borrowed(&"a"), usize::MAX);
assert_eq!(m1.result.len(), 37);

// Recall a batch appends two bytes for the `[]` and one byte for `,` to append a method call.
// so it should be 2 + (37 * n) + (n-1)
let limit = 2 + (37 * 2) + 1;
let mut builder = BatchResponseBuilder::new_with_limit(limit);
builder.append(&m1).unwrap();
builder.append(&m1).unwrap();
let batch = builder.finish();

assert_eq!(batch, r#"[{"jsonrpc":"2.0","result":"a","id":1},{"jsonrpc":"2.0","result":"a","id":1}]"#)
}

#[test]
fn batch_empty_err() {
let batch = BatchResponseBuilder::new_with_limit(1024).finish();

let exp_err = r#"{"jsonrpc":"2.0","error":{"code":-32600,"message":"Invalid request"},"id":null}"#;
assert_eq!(batch, exp_err);
}

#[test]
fn batch_too_big() {
let method = MethodResponse::response(Id::Number(1), ResponsePayload::result_borrowed(&"a".repeat(28)), 128);
assert_eq!(method.result.len(), 64);

let batch = BatchResponseBuilder::new_with_limit(63).append(&method).unwrap_err();

let exp_err = r#"{"jsonrpc":"2.0","error":{"code":-32011,"message":"The batch response was too large","data":"Exceeded max limit of 63"},"id":null}"#;
assert_eq!(batch.result, exp_err);
}
}
Loading
Loading