Skip to content

Commit

Permalink
refactor: split client and server errors (#1122)
Browse files Browse the repository at this point in the history
* split client and server error

* cargo fmt

* remove old code

* fix tests

* cargo fmt

* fix rustdoc links

* make clippy happy

* Update server/src/future.rs

* Update client/http-client/src/client.rs

* Update client/http-client/src/client.rs

* Update core/src/client/async_client/helpers.rs

* Update core/src/client/async_client/helpers.rs

* Update core/src/client/async_client/mod.rs

* Update core/src/client/async_client/mod.rs

* fix more todos

* unused dep: soketto

* remove unused error variant

* Update core/src/lib.rs

* cargo fmt

* fix grumbles: move client error to own mod

* simplify imports

* Update core/src/client/error.rs
  • Loading branch information
niklasad1 authored Dec 6, 2023
1 parent 0a39c1a commit 0ca84f9
Show file tree
Hide file tree
Showing 35 changed files with 288 additions and 283 deletions.
4 changes: 2 additions & 2 deletions client/http-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ use hyper::body::HttpBody;
use hyper::http::HeaderMap;
use hyper::Body;
use jsonrpsee_core::client::{
generate_batch_id_range, BatchResponse, CertificateStore, ClientT, IdKind, RequestIdManager, Subscription,
generate_batch_id_range, BatchResponse, CertificateStore, ClientT, Error, IdKind, RequestIdManager, Subscription,
SubscriptionClientT,
};
use jsonrpsee_core::params::BatchRequestBuilder;
use jsonrpsee_core::traits::ToRpcParams;
use jsonrpsee_core::{Error, JsonRawValue, TEN_MB_SIZE_BYTES};
use jsonrpsee_core::{JsonRawValue, TEN_MB_SIZE_BYTES};
use jsonrpsee_types::{ErrorObject, InvalidRequestId, ResponseSuccess, TwoPointZero};
use serde::de::DeserializeOwned;
use tower::layer::util::Identity;
Expand Down
18 changes: 9 additions & 9 deletions client/http-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::types::error::{ErrorCode, ErrorObject};
use crate::HttpClientBuilder;
use jsonrpsee_core::client::{BatchResponse, ClientT, IdKind};
use jsonrpsee_core::params::BatchRequestBuilder;
use jsonrpsee_core::Error;
use jsonrpsee_core::ClientError;
use jsonrpsee_core::{rpc_params, DeserializeOwned};
use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::mocks::Id;
Expand Down Expand Up @@ -59,8 +59,8 @@ async fn method_call_with_wrong_id_kind() {
http_server_with_hardcoded_response(ok_response(exp.into(), Id::Num(0))).with_default_timeout().await.unwrap();
let uri = format!("http://{server_addr}");
let client = HttpClientBuilder::default().id_format(IdKind::String).build(&uri).unwrap();
let res: Result<String, Error> = client.request("o", rpc_params![]).with_default_timeout().await.unwrap();
assert!(matches!(res, Err(Error::InvalidRequestId(_))));
let res: Result<String, ClientError> = client.request("o", rpc_params![]).with_default_timeout().await.unwrap();
assert!(matches!(res, Err(ClientError::InvalidRequestId(_))));
}

#[tokio::test]
Expand Down Expand Up @@ -96,7 +96,7 @@ async fn response_with_wrong_id() {
.await
.unwrap()
.unwrap_err();
assert!(matches!(err, Error::InvalidRequestId(_)));
assert!(matches!(err, ClientError::InvalidRequestId(_)));
}

#[tokio::test]
Expand Down Expand Up @@ -137,7 +137,7 @@ async fn internal_error_works() {
async fn subscription_response_to_request() {
let req = r#"{"jsonrpc":"2.0","method":"subscribe_hello","params":{"subscription":"3px4FrtxSYQ1zBKW154NoVnrDhrq764yQNCXEgZyM6Mu","result":"hello my friend"}}"#.to_string();
let err = run_request_with_response(req).with_default_timeout().await.unwrap().unwrap_err();
assert!(matches!(err, Error::ParseError(_)));
assert!(matches!(err, ClientError::ParseError(_)));
}

#[tokio::test]
Expand Down Expand Up @@ -261,23 +261,23 @@ async fn batch_request_out_of_order_response() {
async fn run_batch_request_with_response<T: Send + DeserializeOwned + std::fmt::Debug + Clone + 'static>(
batch: BatchRequestBuilder<'_>,
response: String,
) -> Result<BatchResponse<T>, Error> {
) -> Result<BatchResponse<T>, ClientError> {
let server_addr = http_server_with_hardcoded_response(response).with_default_timeout().await.unwrap();
let uri = format!("http://{server_addr}");
let client = HttpClientBuilder::default().build(&uri).unwrap();
client.batch_request(batch).with_default_timeout().await.unwrap()
}

async fn run_request_with_response(response: String) -> Result<String, Error> {
async fn run_request_with_response(response: String) -> Result<String, ClientError> {
let server_addr = http_server_with_hardcoded_response(response).with_default_timeout().await.unwrap();
let uri = format!("http://{server_addr}");
let client = HttpClientBuilder::default().build(&uri).unwrap();
client.request("say_hello", rpc_params![]).with_default_timeout().await.unwrap()
}

fn assert_jsonrpc_error_response(err: Error, exp: ErrorObjectOwned) {
fn assert_jsonrpc_error_response(err: ClientError, exp: ErrorObjectOwned) {
match &err {
Error::Call(err) => {
ClientError::Call(err) => {
assert_eq!(err, &exp);
}
e => panic!("Expected error: \"{err}\", got: {e:?}"),
Expand Down
3 changes: 1 addition & 2 deletions client/http-client/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ use hyper::body::{Body, HttpBody};
use hyper::client::{Client, HttpConnector};
use hyper::http::{HeaderMap, HeaderValue};
use jsonrpsee_core::client::CertificateStore;
use jsonrpsee_core::error::GenericTransportError;
use jsonrpsee_core::http_helpers;
use jsonrpsee_core::tracing::client::{rx_log_from_bytes, tx_log_from_str};
use jsonrpsee_core::{http_helpers, GenericTransportError};
use std::error::Error as StdError;
use std::future::Future;
use std::pin::Pin;
Expand Down
3 changes: 1 addition & 2 deletions client/wasm-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ pub use jsonrpsee_types as types;
use std::time::Duration;

use jsonrpsee_client_transport::web;
use jsonrpsee_core::client::{ClientBuilder, IdKind};
use jsonrpsee_core::Error;
use jsonrpsee_core::client::{ClientBuilder, Error, IdKind};

/// Builder for [`Client`].
///
Expand Down
2 changes: 1 addition & 1 deletion client/ws-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ readme.workspace = true
publish = true

[dependencies]
http = "0.2.0"
jsonrpsee-types = { workspace = true }
jsonrpsee-client-transport = { workspace = true, features = ["ws"] }
jsonrpsee-core = { workspace = true, features = ["async-client"] }
http = "0.2.0"
url = "2.4.0"

[dev-dependencies]
Expand Down
19 changes: 9 additions & 10 deletions client/ws-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,17 @@
#[cfg(test)]
mod tests;

pub use http::{HeaderMap, HeaderValue};
pub use jsonrpsee_core::client::Client as WsClient;
pub use jsonrpsee_types as types;

pub use http::{HeaderMap, HeaderValue};
use std::time::Duration;
use url::Url;

use jsonrpsee_client_transport::ws::{AsyncRead, AsyncWrite, WsTransportClientBuilder};
use jsonrpsee_core::client::{
CertificateStore, ClientBuilder, IdKind, MaybeSend, TransportReceiverT, TransportSenderT,
CertificateStore, ClientBuilder, Error, IdKind, MaybeSend, TransportReceiverT, TransportSenderT,
};
use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES};
use jsonrpsee_core::TEN_MB_SIZE_BYTES;
use std::time::Duration;
use url::Url;

/// Builder for [`WsClient`].
///
Expand Down Expand Up @@ -220,7 +219,7 @@ impl WsClientBuilder {
/// ## Panics
///
/// Panics if being called outside of `tokio` runtime context.
pub async fn build_with_transport<S, R>(self, sender: S, receiver: R) -> Result<WsClient, Error>
pub fn build_with_transport<S, R>(self, sender: S, receiver: R) -> WsClient
where
S: TransportSenderT + Send,
R: TransportReceiverT + Send,
Expand All @@ -246,7 +245,7 @@ impl WsClientBuilder {
client = client.ping_interval(interval);
}

Ok(client.build_with_tokio(sender, receiver))
client.build_with_tokio(sender, receiver)
}

/// Build the [`WsClient`] with specified data stream, using [`WsTransportClientBuilder::build_with_stream`].
Expand All @@ -271,7 +270,7 @@ impl WsClientBuilder {
let (sender, receiver) =
transport_builder.build_with_stream(uri, data_stream).await.map_err(|e| Error::Transport(e.into()))?;

let ws_client = self.build_with_transport(sender, receiver).await?;
let ws_client = self.build_with_transport(sender, receiver);
Ok(ws_client)
}

Expand All @@ -294,7 +293,7 @@ impl WsClientBuilder {
let uri = Url::parse(url.as_ref()).map_err(|e| Error::Transport(e.into()))?;
let (sender, receiver) = transport_builder.build(uri).await.map_err(|e| Error::Transport(e.into()))?;

let ws_client = self.build_with_transport(sender, receiver).await?;
let ws_client = self.build_with_transport(sender, receiver);
Ok(ws_client)
}
}
5 changes: 2 additions & 3 deletions client/ws-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@
use crate::types::error::{ErrorCode, ErrorObject};
use crate::WsClientBuilder;

use jsonrpsee_core::client::{BatchResponse, ClientT, SubscriptionClientT};
use jsonrpsee_core::client::{IdKind, Subscription};
use jsonrpsee_core::client::{BatchResponse, ClientT, Error, IdKind, Subscription, SubscriptionClientT};
use jsonrpsee_core::params::BatchRequestBuilder;
use jsonrpsee_core::{rpc_params, DeserializeOwned, Error};
use jsonrpsee_core::{rpc_params, DeserializeOwned};
use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::mocks::{Id, WebSocketTestServer};
use jsonrpsee_test_utils::TimeoutFutureExt;
Expand Down
2 changes: 0 additions & 2 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@ futures-util = { version = "0.3.14", default-features = false, optional = true }
hyper = { version = "0.14.10", default-features = false, features = ["stream"], optional = true }
rustc-hash = { version = "1", optional = true }
rand = { version = "0.8", optional = true }
soketto = { version = "0.7.1", optional = true }
parking_lot = { version = "0.12", optional = true }
tokio = { version = "1.16", optional = true }
wasm-bindgen-futures = { version = "0.4.19", optional = true }
futures-timer = { version = "3", optional = true }


[features]
default = []
http-helpers = ["hyper", "futures-util"]
Expand Down
17 changes: 8 additions & 9 deletions core/src/client/async_client/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@

use crate::client::async_client::LOG_TARGET;
use crate::client::async_client::manager::{RequestManager, RequestStatus};
use crate::client::{RequestMessage, TransportSenderT};
use crate::client::{RequestMessage, TransportSenderT, Error};
use crate::params::ArrayParams;
use crate::traits::ToRpcParams;
use crate::Error;

use futures_timer::Delay;
use futures_util::future::{self, Either};
Expand All @@ -56,7 +55,7 @@ pub(crate) fn process_batch_response(
manager: &mut RequestManager,
rps: Vec<InnerBatchResponse>,
range: Range<u64>,
) -> Result<(), Error> {
) -> Result<(), InvalidRequestId> {
let mut responses = Vec::with_capacity(rps.len());

let start_idx = range.start;
Expand All @@ -65,7 +64,7 @@ pub(crate) fn process_batch_response(
Some(state) => state,
None => {
tracing::warn!(target: LOG_TARGET, "Received unknown batch response");
return Err(InvalidRequestId::NotPendingRequest(format!("{:?}", range)).into());
return Err(InvalidRequestId::NotPendingRequest(format!("{:?}", range)));
}
};

Expand All @@ -81,7 +80,7 @@ pub(crate) fn process_batch_response(
if let Some(elem) = maybe_elem {
*elem = rp.result;
} else {
return Err(InvalidRequestId::NotPendingRequest(rp.id.to_string()).into());
return Err(InvalidRequestId::NotPendingRequest(rp.id.to_string()));
}
}

Expand Down Expand Up @@ -155,7 +154,7 @@ pub(crate) fn process_notification(manager: &mut RequestManager, notif: Notifica
Ok(()) => (),
Err(err) => {
tracing::warn!(target: LOG_TARGET, "Could not send notification, dropping handler for {:?} error: {:?}", notif.method, err);
let _ = manager.remove_notification_handler(notif.method.into_owned());
let _ = manager.remove_notification_handler(&notif.method);
}
},
None => {
Expand All @@ -173,7 +172,7 @@ pub(crate) fn process_single_response(
manager: &mut RequestManager,
response: Response<JsonValue>,
max_capacity_per_subscription: usize,
) -> Result<Option<RequestMessage>, Error> {
) -> Result<Option<RequestMessage>, InvalidRequestId> {
let response_id = response.id.clone().into_owned();
let result = ResponseSuccess::try_from(response).map(|s| s.result).map_err(Error::Call);

Expand All @@ -182,7 +181,7 @@ pub(crate) fn process_single_response(
let send_back_oneshot = match manager.complete_pending_call(response_id.clone()) {
Some(Some(send)) => send,
Some(None) => return Ok(None),
None => return Err(InvalidRequestId::NotPendingRequest(response_id.to_string()).into()),
None => return Err(InvalidRequestId::NotPendingRequest(response_id.to_string())),
};

let _ = send_back_oneshot.send(result);
Expand Down Expand Up @@ -223,7 +222,7 @@ pub(crate) fn process_single_response(
}

RequestStatus::Subscription | RequestStatus::Invalid => {
Err(InvalidRequestId::NotPendingRequest(response_id.to_string()).into())
Err(InvalidRequestId::NotPendingRequest(response_id.to_string()))
}
}
}
Expand Down
17 changes: 8 additions & 9 deletions core/src/client/async_client/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ use std::{
ops::Range,
};

use crate::{client::BatchEntry, Error};
use crate::{
client::BatchEntry,
client::Error, error::RegisterMethodError,
};
use jsonrpsee_types::{Id, SubscriptionId};
use rustc_hash::FxHashMap;
use serde_json::value::Value as JsonValue;
Expand Down Expand Up @@ -183,22 +186,18 @@ impl RequestManager {
&mut self,
method: &str,
send_back: SubscriptionSink,
) -> Result<(), Error> {
) -> Result<(), RegisterMethodError> {
if let Entry::Vacant(handle) = self.notification_handlers.entry(method.to_owned()) {
handle.insert(send_back);
Ok(())
} else {
Err(Error::MethodAlreadyRegistered(method.to_owned()))
Err(RegisterMethodError::AlreadyRegistered(method.to_owned()))
}
}

/// Removes a notification handler.
pub(crate) fn remove_notification_handler(&mut self, method: String) -> Result<(), Error> {
if self.notification_handlers.remove(&method).is_some() {
Ok(())
} else {
Err(Error::UnregisteredNotification(method))
}
pub(crate) fn remove_notification_handler(&mut self, method: &str) -> Option<SubscriptionSink> {
self.notification_handlers.remove(method)
}

/// Tries to complete a pending subscription.
Expand Down
19 changes: 11 additions & 8 deletions core/src/client/async_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ mod manager;

use crate::client::async_client::helpers::{process_subscription_close_response, InnerBatchResponse};
use crate::client::{
BatchMessage, BatchResponse, ClientT, ReceivedMessage, RegisterNotificationMessage, RequestMessage, Subscription,
SubscriptionClientT, SubscriptionKind, SubscriptionMessage, TransportReceiverT, TransportSenderT,
BatchMessage, BatchResponse, ClientT, ReceivedMessage, RegisterNotificationMessage, RequestMessage,
Subscription, SubscriptionClientT, SubscriptionKind, SubscriptionMessage, TransportReceiverT, TransportSenderT, Error
};
use crate::error::Error;
use crate::params::BatchRequestBuilder;
use crate::error::RegisterMethodError;
use crate::params::{BatchRequestBuilder, EmptyBatchRequest};
use crate::tracing::client::{rx_log_from_json, tx_log_from_str};
use crate::traits::ToRpcParams;
use crate::JsonRawValue;
Expand Down Expand Up @@ -506,7 +506,9 @@ impl SubscriptionClientT for Client {
Notif: DeserializeOwned,
{
if subscribe_method == unsubscribe_method {
return Err(Error::SubscriptionNameConflict(unsubscribe_method.to_owned()));
return Err(RegisterMethodError::SubscriptionNameConflict(
unsubscribe_method.to_owned(),
).into());
}

let guard = self.id_manager.next_request_two_ids()?;
Expand Down Expand Up @@ -654,7 +656,7 @@ fn handle_backend_messages<R: TransportReceiverT>(
range.end += 1;
process_batch_response(&mut manager.lock(), batch, range)?;
} else {
return Err(Error::EmptyBatchRequest);
return Err(EmptyBatchRequest.into());
}
} else {
return Err(unparse_error(raw));
Expand Down Expand Up @@ -762,12 +764,13 @@ async fn handle_frontend_messages<S: TransportSenderT>(
if manager.lock().insert_notification_handler(&reg.method, subscribe_tx).is_ok() {
let _ = reg.send_back.send(Ok((subscribe_rx, reg.method)));
} else {
let _ = reg.send_back.send(Err(Error::MethodAlreadyRegistered(reg.method)));
let _ =
reg.send_back.send(Err(RegisterMethodError::AlreadyRegistered(reg.method).into()));
}
}
// User dropped the NotificationHandler for this method
FrontToBack::UnregisterNotification(method) => {
let _ = manager.lock().remove_notification_handler(method);
let _ = manager.lock().remove_notification_handler(&method);
}
};

Expand Down
Loading

0 comments on commit 0ca84f9

Please sign in to comment.