Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: split client and server errors #1122

Merged
merged 24 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions client/http-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ use hyper::body::HttpBody;
use hyper::http::HeaderMap;
use hyper::Body;
use jsonrpsee_core::client::{
generate_batch_id_range, BatchResponse, CertificateStore, ClientT, IdKind, RequestIdManager, Subscription,
generate_batch_id_range, BatchResponse, CertificateStore, ClientT, Error, IdKind, RequestIdManager, Subscription,
SubscriptionClientT,
};
use jsonrpsee_core::params::BatchRequestBuilder;
use jsonrpsee_core::traits::ToRpcParams;
use jsonrpsee_core::{Error, JsonRawValue, TEN_MB_SIZE_BYTES};
use jsonrpsee_core::{JsonRawValue, TEN_MB_SIZE_BYTES};
use jsonrpsee_types::{ErrorObject, InvalidRequestId, ResponseSuccess, TwoPointZero};
use serde::de::DeserializeOwned;
use tower::layer::util::Identity;
Expand Down
18 changes: 9 additions & 9 deletions client/http-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::types::error::{ErrorCode, ErrorObject};
use crate::HttpClientBuilder;
use jsonrpsee_core::client::{BatchResponse, ClientT, IdKind};
use jsonrpsee_core::params::BatchRequestBuilder;
use jsonrpsee_core::Error;
use jsonrpsee_core::ClientError;
use jsonrpsee_core::{rpc_params, DeserializeOwned};
use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::mocks::Id;
Expand Down Expand Up @@ -59,8 +59,8 @@ async fn method_call_with_wrong_id_kind() {
http_server_with_hardcoded_response(ok_response(exp.into(), Id::Num(0))).with_default_timeout().await.unwrap();
let uri = format!("http://{server_addr}");
let client = HttpClientBuilder::default().id_format(IdKind::String).build(&uri).unwrap();
let res: Result<String, Error> = client.request("o", rpc_params![]).with_default_timeout().await.unwrap();
assert!(matches!(res, Err(Error::InvalidRequestId(_))));
let res: Result<String, ClientError> = client.request("o", rpc_params![]).with_default_timeout().await.unwrap();
assert!(matches!(res, Err(ClientError::InvalidRequestId(_))));
}

#[tokio::test]
Expand Down Expand Up @@ -96,7 +96,7 @@ async fn response_with_wrong_id() {
.await
.unwrap()
.unwrap_err();
assert!(matches!(err, Error::InvalidRequestId(_)));
assert!(matches!(err, ClientError::InvalidRequestId(_)));
}

#[tokio::test]
Expand Down Expand Up @@ -137,7 +137,7 @@ async fn internal_error_works() {
async fn subscription_response_to_request() {
let req = r#"{"jsonrpc":"2.0","method":"subscribe_hello","params":{"subscription":"3px4FrtxSYQ1zBKW154NoVnrDhrq764yQNCXEgZyM6Mu","result":"hello my friend"}}"#.to_string();
let err = run_request_with_response(req).with_default_timeout().await.unwrap().unwrap_err();
assert!(matches!(err, Error::ParseError(_)));
assert!(matches!(err, ClientError::ParseError(_)));
}

#[tokio::test]
Expand Down Expand Up @@ -261,23 +261,23 @@ async fn batch_request_out_of_order_response() {
async fn run_batch_request_with_response<T: Send + DeserializeOwned + std::fmt::Debug + Clone + 'static>(
batch: BatchRequestBuilder<'_>,
response: String,
) -> Result<BatchResponse<T>, Error> {
) -> Result<BatchResponse<T>, ClientError> {
let server_addr = http_server_with_hardcoded_response(response).with_default_timeout().await.unwrap();
let uri = format!("http://{server_addr}");
let client = HttpClientBuilder::default().build(&uri).unwrap();
client.batch_request(batch).with_default_timeout().await.unwrap()
}

async fn run_request_with_response(response: String) -> Result<String, Error> {
async fn run_request_with_response(response: String) -> Result<String, ClientError> {
let server_addr = http_server_with_hardcoded_response(response).with_default_timeout().await.unwrap();
let uri = format!("http://{server_addr}");
let client = HttpClientBuilder::default().build(&uri).unwrap();
client.request("say_hello", rpc_params![]).with_default_timeout().await.unwrap()
}

fn assert_jsonrpc_error_response(err: Error, exp: ErrorObjectOwned) {
fn assert_jsonrpc_error_response(err: ClientError, exp: ErrorObjectOwned) {
match &err {
Error::Call(err) => {
ClientError::Call(err) => {
assert_eq!(err, &exp);
}
e => panic!("Expected error: \"{err}\", got: {e:?}"),
Expand Down
3 changes: 1 addition & 2 deletions client/http-client/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ use hyper::body::{Body, HttpBody};
use hyper::client::{Client, HttpConnector};
use hyper::http::{HeaderMap, HeaderValue};
use jsonrpsee_core::client::CertificateStore;
use jsonrpsee_core::error::GenericTransportError;
use jsonrpsee_core::http_helpers;
use jsonrpsee_core::tracing::client::{rx_log_from_bytes, tx_log_from_str};
use jsonrpsee_core::{http_helpers, GenericTransportError};
use std::error::Error as StdError;
use std::future::Future;
use std::pin::Pin;
Expand Down
3 changes: 1 addition & 2 deletions client/wasm-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ pub use jsonrpsee_types as types;
use std::time::Duration;

use jsonrpsee_client_transport::web;
use jsonrpsee_core::client::{ClientBuilder, IdKind};
use jsonrpsee_core::Error;
use jsonrpsee_core::client::{ClientBuilder, Error, IdKind};

/// Builder for [`Client`].
///
Expand Down
2 changes: 1 addition & 1 deletion client/ws-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ readme.workspace = true
publish = true

[dependencies]
http = "0.2.0"
jsonrpsee-types = { workspace = true }
jsonrpsee-client-transport = { workspace = true, features = ["ws"] }
jsonrpsee-core = { workspace = true, features = ["async-client"] }
http = "0.2.0"
url = "2.4.0"

[dev-dependencies]
Expand Down
19 changes: 9 additions & 10 deletions client/ws-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,17 @@
#[cfg(test)]
mod tests;

pub use http::{HeaderMap, HeaderValue};
pub use jsonrpsee_core::client::Client as WsClient;
pub use jsonrpsee_types as types;

pub use http::{HeaderMap, HeaderValue};
use std::time::Duration;
use url::Url;

use jsonrpsee_client_transport::ws::{AsyncRead, AsyncWrite, WsTransportClientBuilder};
use jsonrpsee_core::client::{
CertificateStore, ClientBuilder, IdKind, MaybeSend, TransportReceiverT, TransportSenderT,
CertificateStore, ClientBuilder, Error, IdKind, MaybeSend, TransportReceiverT, TransportSenderT,
};
use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES};
use jsonrpsee_core::TEN_MB_SIZE_BYTES;
use std::time::Duration;
use url::Url;

/// Builder for [`WsClient`].
///
Expand Down Expand Up @@ -220,7 +219,7 @@ impl WsClientBuilder {
/// ## Panics
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
///
/// Panics if being called outside of `tokio` runtime context.
pub async fn build_with_transport<S, R>(self, sender: S, receiver: R) -> Result<WsClient, Error>
pub fn build_with_transport<S, R>(self, sender: S, receiver: R) -> WsClient
where
S: TransportSenderT + Send,
R: TransportReceiverT + Send,
Expand All @@ -246,7 +245,7 @@ impl WsClientBuilder {
client = client.ping_interval(interval);
}

Ok(client.build_with_tokio(sender, receiver))
client.build_with_tokio(sender, receiver)
}

/// Build the [`WsClient`] with specified data stream, using [`WsTransportClientBuilder::build_with_stream`].
Expand All @@ -271,7 +270,7 @@ impl WsClientBuilder {
let (sender, receiver) =
transport_builder.build_with_stream(uri, data_stream).await.map_err(|e| Error::Transport(e.into()))?;

let ws_client = self.build_with_transport(sender, receiver).await?;
let ws_client = self.build_with_transport(sender, receiver);
Ok(ws_client)
}

Expand All @@ -294,7 +293,7 @@ impl WsClientBuilder {
let uri = Url::parse(url.as_ref()).map_err(|e| Error::Transport(e.into()))?;
let (sender, receiver) = transport_builder.build(uri).await.map_err(|e| Error::Transport(e.into()))?;

let ws_client = self.build_with_transport(sender, receiver).await?;
let ws_client = self.build_with_transport(sender, receiver);
Ok(ws_client)
}
}
5 changes: 2 additions & 3 deletions client/ws-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@
use crate::types::error::{ErrorCode, ErrorObject};
use crate::WsClientBuilder;

use jsonrpsee_core::client::{BatchResponse, ClientT, SubscriptionClientT};
use jsonrpsee_core::client::{IdKind, Subscription};
use jsonrpsee_core::client::{BatchResponse, ClientT, Error, IdKind, Subscription, SubscriptionClientT};
use jsonrpsee_core::params::BatchRequestBuilder;
use jsonrpsee_core::{rpc_params, DeserializeOwned, Error};
use jsonrpsee_core::{rpc_params, DeserializeOwned};
use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::mocks::{Id, WebSocketTestServer};
use jsonrpsee_test_utils::TimeoutFutureExt;
Expand Down
2 changes: 0 additions & 2 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@ futures-util = { version = "0.3.14", default-features = false, optional = true }
hyper = { version = "0.14.10", default-features = false, features = ["stream"], optional = true }
rustc-hash = { version = "1", optional = true }
rand = { version = "0.8", optional = true }
soketto = { version = "0.7.1", optional = true }
parking_lot = { version = "0.12", optional = true }
tokio = { version = "1.16", optional = true }
wasm-bindgen-futures = { version = "0.4.19", optional = true }
futures-timer = { version = "3", optional = true }


[features]
default = []
http-helpers = ["hyper", "futures-util"]
Expand Down
17 changes: 8 additions & 9 deletions core/src/client/async_client/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@

use crate::client::async_client::LOG_TARGET;
use crate::client::async_client::manager::{RequestManager, RequestStatus};
use crate::client::{RequestMessage, TransportSenderT};
use crate::client::{RequestMessage, TransportSenderT, Error};
use crate::params::ArrayParams;
use crate::traits::ToRpcParams;
use crate::Error;

use futures_timer::Delay;
use futures_util::future::{self, Either};
Expand All @@ -56,7 +55,7 @@ pub(crate) fn process_batch_response(
manager: &mut RequestManager,
rps: Vec<InnerBatchResponse>,
range: Range<u64>,
) -> Result<(), Error> {
) -> Result<(), InvalidRequestId> {
let mut responses = Vec::with_capacity(rps.len());

let start_idx = range.start;
Expand All @@ -65,7 +64,7 @@ pub(crate) fn process_batch_response(
Some(state) => state,
None => {
tracing::warn!(target: LOG_TARGET, "Received unknown batch response");
return Err(InvalidRequestId::NotPendingRequest(format!("{:?}", range)).into());
return Err(InvalidRequestId::NotPendingRequest(format!("{:?}", range)));
}
};

Expand All @@ -81,7 +80,7 @@ pub(crate) fn process_batch_response(
if let Some(elem) = maybe_elem {
*elem = rp.result;
} else {
return Err(InvalidRequestId::NotPendingRequest(rp.id.to_string()).into());
return Err(InvalidRequestId::NotPendingRequest(rp.id.to_string()));
}
}

Expand Down Expand Up @@ -155,7 +154,7 @@ pub(crate) fn process_notification(manager: &mut RequestManager, notif: Notifica
Ok(()) => (),
Err(err) => {
tracing::warn!(target: LOG_TARGET, "Could not send notification, dropping handler for {:?} error: {:?}", notif.method, err);
let _ = manager.remove_notification_handler(notif.method.into_owned());
let _ = manager.remove_notification_handler(&notif.method);
}
},
None => {
Expand All @@ -173,7 +172,7 @@ pub(crate) fn process_single_response(
manager: &mut RequestManager,
response: Response<JsonValue>,
max_capacity_per_subscription: usize,
) -> Result<Option<RequestMessage>, Error> {
) -> Result<Option<RequestMessage>, InvalidRequestId> {
let response_id = response.id.clone().into_owned();
let result = ResponseSuccess::try_from(response).map(|s| s.result).map_err(Error::Call);

Expand All @@ -182,7 +181,7 @@ pub(crate) fn process_single_response(
let send_back_oneshot = match manager.complete_pending_call(response_id.clone()) {
Some(Some(send)) => send,
Some(None) => return Ok(None),
None => return Err(InvalidRequestId::NotPendingRequest(response_id.to_string()).into()),
None => return Err(InvalidRequestId::NotPendingRequest(response_id.to_string())),
};

let _ = send_back_oneshot.send(result);
Expand Down Expand Up @@ -223,7 +222,7 @@ pub(crate) fn process_single_response(
}

RequestStatus::Subscription | RequestStatus::Invalid => {
Err(InvalidRequestId::NotPendingRequest(response_id.to_string()).into())
Err(InvalidRequestId::NotPendingRequest(response_id.to_string()))
}
}
}
Expand Down
17 changes: 8 additions & 9 deletions core/src/client/async_client/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ use std::{
ops::Range,
};

use crate::{client::BatchEntry, Error};
use crate::{
client::BatchEntry,
client::Error, error::RegisterMethodError,
};
use jsonrpsee_types::{Id, SubscriptionId};
use rustc_hash::FxHashMap;
use serde_json::value::Value as JsonValue;
Expand Down Expand Up @@ -183,22 +186,18 @@ impl RequestManager {
&mut self,
method: &str,
send_back: SubscriptionSink,
) -> Result<(), Error> {
) -> Result<(), RegisterMethodError> {
if let Entry::Vacant(handle) = self.notification_handlers.entry(method.to_owned()) {
handle.insert(send_back);
Ok(())
} else {
Err(Error::MethodAlreadyRegistered(method.to_owned()))
Err(RegisterMethodError::AlreadyRegistered(method.to_owned()))
}
}

/// Removes a notification handler.
pub(crate) fn remove_notification_handler(&mut self, method: String) -> Result<(), Error> {
if self.notification_handlers.remove(&method).is_some() {
Ok(())
} else {
Err(Error::UnregisteredNotification(method))
}
pub(crate) fn remove_notification_handler(&mut self, method: &str) -> Option<SubscriptionSink> {
self.notification_handlers.remove(method)
}

/// Tries to complete a pending subscription.
Expand Down
19 changes: 11 additions & 8 deletions core/src/client/async_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ mod manager;

use crate::client::async_client::helpers::{process_subscription_close_response, InnerBatchResponse};
use crate::client::{
BatchMessage, BatchResponse, ClientT, ReceivedMessage, RegisterNotificationMessage, RequestMessage, Subscription,
SubscriptionClientT, SubscriptionKind, SubscriptionMessage, TransportReceiverT, TransportSenderT,
BatchMessage, BatchResponse, ClientT, ReceivedMessage, RegisterNotificationMessage, RequestMessage,
Subscription, SubscriptionClientT, SubscriptionKind, SubscriptionMessage, TransportReceiverT, TransportSenderT, Error
};
use crate::error::Error;
use crate::params::BatchRequestBuilder;
use crate::error::RegisterMethodError;
use crate::params::{BatchRequestBuilder, EmptyBatchRequest};
use crate::tracing::client::{rx_log_from_json, tx_log_from_str};
use crate::traits::ToRpcParams;
use crate::JsonRawValue;
Expand Down Expand Up @@ -506,7 +506,9 @@ impl SubscriptionClientT for Client {
Notif: DeserializeOwned,
{
if subscribe_method == unsubscribe_method {
return Err(Error::SubscriptionNameConflict(unsubscribe_method.to_owned()));
return Err(RegisterMethodError::SubscriptionNameConflict(
unsubscribe_method.to_owned(),
).into());
}

let guard = self.id_manager.next_request_two_ids()?;
Expand Down Expand Up @@ -654,7 +656,7 @@ fn handle_backend_messages<R: TransportReceiverT>(
range.end += 1;
process_batch_response(&mut manager.lock(), batch, range)?;
} else {
return Err(Error::EmptyBatchRequest);
return Err(EmptyBatchRequest.into());
}
} else {
return Err(unparse_error(raw));
Expand Down Expand Up @@ -762,12 +764,13 @@ async fn handle_frontend_messages<S: TransportSenderT>(
if manager.lock().insert_notification_handler(&reg.method, subscribe_tx).is_ok() {
let _ = reg.send_back.send(Ok((subscribe_rx, reg.method)));
} else {
let _ = reg.send_back.send(Err(Error::MethodAlreadyRegistered(reg.method)));
let _ =
reg.send_back.send(Err(RegisterMethodError::AlreadyRegistered(reg.method).into()));
}
}
// User dropped the NotificationHandler for this method
FrontToBack::UnregisterNotification(method) => {
let _ = manager.lock().remove_notification_handler(method);
let _ = manager.lock().remove_notification_handler(&method);
}
};

Expand Down
Loading
Loading