Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rpc module): fail subscription calls with bad params #728

Merged
merged 44 commits into from
Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
d6d9e5a
fix(rpc module): fail subscription with bad params
niklasad1 Apr 7, 2022
cabdb41
draft; show my point
niklasad1 Apr 8, 2022
8c8a590
fix tests
niklasad1 Apr 8, 2022
520e27d
fix build
niklasad1 Apr 8, 2022
3d750e1
add tests for proc macros too
niklasad1 Apr 8, 2022
03b7787
add tests for bad params in proc macros
niklasad1 Apr 9, 2022
c7f8eed
fix nits
niklasad1 Apr 9, 2022
1b34a49
commit all files
niklasad1 Apr 9, 2022
3d5256d
add ugly fix for proc macro code
niklasad1 Apr 11, 2022
e62d1ac
add more user friendly API
niklasad1 Apr 11, 2022
b40f95b
make SubscriptionSink::close take mut self
niklasad1 Apr 11, 2022
aff9ee7
fix grumbles
niklasad1 Apr 11, 2022
17ef15b
show james some code
niklasad1 Apr 11, 2022
4bc82f6
Update core/src/server/rpc_module.rs
niklasad1 Apr 11, 2022
7f4cd8f
remove needless clone
niklasad1 Apr 11, 2022
66c6dff
fix build
niklasad1 Apr 11, 2022
568ae7c
Merge remote-tracking branch 'origin/na-jsonrpsee-pubsub-should-not-r…
niklasad1 Apr 11, 2022
82a5a87
client fix docs + error type
niklasad1 Apr 11, 2022
68a52a0
simplify code: merge connect reset and unsubscribe close reason
niklasad1 Apr 11, 2022
a3e50ed
remove unknown close reason
niklasad1 Apr 11, 2022
9c041bd
refactor: remove Error::SubscriptionClosed
niklasad1 Apr 12, 2022
e857f20
add some nice APIs to ErrorObjectOwned
niklasad1 Apr 12, 2022
bdc9075
unify api
niklasad1 Apr 12, 2022
8ebdf6c
address grumbles
niklasad1 Apr 13, 2022
84048b4
remove redundant methods for close and reject
niklasad1 Apr 14, 2022
332f377
proc macro: compile err when subscription -> Result
niklasad1 Apr 14, 2022
cb6da3d
rpc module: fix test subscription test
niklasad1 Apr 14, 2022
698c8eb
Update core/src/server/rpc_module.rs
niklasad1 Apr 14, 2022
d86c231
Update core/src/server/rpc_module.rs
niklasad1 Apr 14, 2022
e6e3aea
Update core/src/server/rpc_module.rs
niklasad1 Apr 14, 2022
d741f43
Update core/src/server/rpc_module.rs
niklasad1 Apr 14, 2022
7c6f67f
Update core/src/server/rpc_module.rs
niklasad1 Apr 14, 2022
409acd1
Update proc-macros/src/lib.rs
niklasad1 Apr 14, 2022
61921d1
address grumbles
niklasad1 Apr 14, 2022
02fce6b
Merge remote-tracking branch 'origin/na-jsonrpsee-pubsub-should-not-r…
niklasad1 Apr 14, 2022
06951c9
remove faulty comment
niklasad1 Apr 14, 2022
d81fbf4
Update core/src/server/rpc_module.rs
niklasad1 Apr 15, 2022
def9392
Update core/src/server/rpc_module.rs
niklasad1 Apr 15, 2022
2576d0f
Update core/src/server/rpc_module.rs
niklasad1 Apr 15, 2022
a921d47
Update core/src/server/rpc_module.rs
niklasad1 Apr 15, 2022
036ea6b
Update core/src/server/rpc_module.rs
niklasad1 Apr 15, 2022
07ba3b3
fix: don't send `RPC Call failed: error`.
niklasad1 Apr 16, 2022
83081d1
remove debug assert
niklasad1 Apr 16, 2022
3da7b0f
Merge remote-tracking branch 'origin/na-jsonrpsee-pubsub-should-not-r…
niklasad1 Apr 16, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions benches/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,13 @@ pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::ws
let mut module = gen_rpc_module();

module
.register_subscription(SUB_METHOD_NAME, SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, mut sink, _ctx| {
.register_subscription(SUB_METHOD_NAME, SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, pending, _ctx| {
let mut sink = match pending.accept() {
Some(sink) => sink,
_ => return,
};
let x = "Hello";
tokio::spawn(async move { sink.send(&x) });
Ok(())
})
.unwrap();

Expand Down
5 changes: 3 additions & 2 deletions client/http-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::types::{ErrorResponse, Id, NotificationSer, ParamsSer, RequestSer, Re
use async_trait::async_trait;
use jsonrpsee_core::client::{CertificateStore, ClientT, IdKind, RequestIdManager, Subscription, SubscriptionClientT};
use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES};
use jsonrpsee_types::error::CallError;
use rustc_hash::FxHashMap;
use serde::de::DeserializeOwned;

Expand Down Expand Up @@ -147,7 +148,7 @@ impl ClientT for HttpClient {
Ok(response) => response,
Err(_) => {
let err: ErrorResponse = serde_json::from_slice(&body).map_err(Error::ParseError)?;
return Err(Error::Call(err.error.to_call_error()));
return Err(Error::Call(CallError::Custom(err.error_object().clone().into_owned())));
}
};

Expand Down Expand Up @@ -186,7 +187,7 @@ impl ClientT for HttpClient {

let rps: Vec<Response<_>> =
serde_json::from_slice(&body).map_err(|_| match serde_json::from_slice::<ErrorResponse>(&body) {
Ok(e) => Error::Call(e.error.to_call_error()),
Ok(e) => Error::Call(CallError::Custom(e.error_object().clone().into_owned())),
Err(e) => Error::ParseError(e),
})?;

Expand Down
19 changes: 10 additions & 9 deletions client/http-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use jsonrpsee_core::Error;
use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::mocks::Id;
use jsonrpsee_test_utils::TimeoutFutureExt;
use jsonrpsee_types::error::CallError;
use jsonrpsee_types::error::{CallError, ErrorObjectOwned};

#[tokio::test]
async fn method_call_works() {
Expand Down Expand Up @@ -98,34 +98,34 @@ async fn response_with_wrong_id() {
async fn response_method_not_found() {
let err =
run_request_with_response(method_not_found(Id::Num(0))).with_default_timeout().await.unwrap().unwrap_err();
assert_jsonrpc_error_response(err, ErrorObject::from(ErrorCode::MethodNotFound).to_call_error());
assert_jsonrpc_error_response(err, ErrorObject::from(ErrorCode::MethodNotFound).into_owned());
}

#[tokio::test]
async fn response_parse_error() {
let err = run_request_with_response(parse_error(Id::Num(0))).with_default_timeout().await.unwrap().unwrap_err();
assert_jsonrpc_error_response(err, ErrorObject::from(ErrorCode::ParseError).to_call_error());
assert_jsonrpc_error_response(err, ErrorObject::from(ErrorCode::ParseError).into_owned());
}

#[tokio::test]
async fn invalid_request_works() {
let err =
run_request_with_response(invalid_request(Id::Num(0_u64))).with_default_timeout().await.unwrap().unwrap_err();
assert_jsonrpc_error_response(err, ErrorObject::from(ErrorCode::InvalidRequest).to_call_error());
assert_jsonrpc_error_response(err, ErrorObject::from(ErrorCode::InvalidRequest).into_owned());
}

#[tokio::test]
async fn invalid_params_works() {
let err =
run_request_with_response(invalid_params(Id::Num(0_u64))).with_default_timeout().await.unwrap().unwrap_err();
assert_jsonrpc_error_response(err, ErrorObject::from(ErrorCode::InvalidParams).to_call_error());
assert_jsonrpc_error_response(err, ErrorObject::from(ErrorCode::InvalidParams).into_owned());
}

#[tokio::test]
async fn internal_error_works() {
let err =
run_request_with_response(internal_error(Id::Num(0_u64))).with_default_timeout().await.unwrap().unwrap_err();
assert_jsonrpc_error_response(err, ErrorObject::from(ErrorCode::InternalError).to_call_error());
assert_jsonrpc_error_response(err, ErrorObject::from(ErrorCode::InternalError).into_owned());
}

#[tokio::test]
Expand Down Expand Up @@ -172,10 +172,11 @@ async fn run_request_with_response(response: String) -> Result<String, Error> {
client.request("say_hello", None).with_default_timeout().await.unwrap()
}

fn assert_jsonrpc_error_response(err: Error, exp: CallError) {
fn assert_jsonrpc_error_response(err: Error, exp: ErrorObjectOwned) {
let exp = CallError::Custom(exp);
match &err {
Error::Call(e) => {
assert_eq!(e.to_string(), exp.to_string());
Error::Call(err) => {
assert_eq!(err.to_string(), exp.to_string());
}
e => panic!("Expected error: \"{}\", got: {:?}", err, e),
};
Expand Down
15 changes: 8 additions & 7 deletions client/ws-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use jsonrpsee_core::Error;
use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::mocks::{Id, WebSocketTestServer};
use jsonrpsee_test_utils::TimeoutFutureExt;
use jsonrpsee_types::error::CallError;
use jsonrpsee_types::error::{CallError, ErrorObjectOwned};
use serde_json::Value as JsonValue;

#[tokio::test]
Expand Down Expand Up @@ -109,34 +109,34 @@ async fn response_with_wrong_id() {
async fn response_method_not_found() {
let err =
run_request_with_response(method_not_found(Id::Num(0))).with_default_timeout().await.unwrap().unwrap_err();
assert_error_response(err, ErrorObject::from(ErrorCode::MethodNotFound).to_call_error());
assert_error_response(err, ErrorObject::from(ErrorCode::MethodNotFound).into_owned());
}

#[tokio::test]
async fn parse_error_works() {
let err = run_request_with_response(parse_error(Id::Num(0))).with_default_timeout().await.unwrap().unwrap_err();
assert_error_response(err, ErrorObject::from(ErrorCode::ParseError).to_call_error());
assert_error_response(err, ErrorObject::from(ErrorCode::ParseError).into_owned());
}

#[tokio::test]
async fn invalid_request_works() {
let err =
run_request_with_response(invalid_request(Id::Num(0_u64))).with_default_timeout().await.unwrap().unwrap_err();
assert_error_response(err, ErrorObject::from(ErrorCode::InvalidRequest).to_call_error());
assert_error_response(err, ErrorObject::from(ErrorCode::InvalidRequest).into_owned());
}

#[tokio::test]
async fn invalid_params_works() {
let err =
run_request_with_response(invalid_params(Id::Num(0_u64))).with_default_timeout().await.unwrap().unwrap_err();
assert_error_response(err, ErrorObject::from(ErrorCode::InvalidParams).to_call_error());
assert_error_response(err, ErrorObject::from(ErrorCode::InvalidParams).into_owned());
}

#[tokio::test]
async fn internal_error_works() {
let err =
run_request_with_response(internal_error(Id::Num(0_u64))).with_default_timeout().await.unwrap().unwrap_err();
assert_error_response(err, ErrorObject::from(ErrorCode::InternalError).to_call_error());
assert_error_response(err, ErrorObject::from(ErrorCode::InternalError).into_owned());
}

#[tokio::test]
Expand Down Expand Up @@ -283,7 +283,8 @@ async fn run_request_with_response(response: String) -> Result<String, Error> {
client.request("say_hello", None).with_default_timeout().await.unwrap()
}

fn assert_error_response(err: Error, exp: CallError) {
fn assert_error_response(err: Error, exp: ErrorObjectOwned) {
let exp = CallError::Custom(exp);
match &err {
Error::Call(e) => {
assert_eq!(e.to_string(), exp.to_string());
Expand Down
47 changes: 32 additions & 15 deletions core/src/client/async_client/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ use std::time::Duration;

use crate::client::async_client::manager::{RequestManager, RequestStatus};
use crate::client::{RequestMessage, TransportSenderT};
use crate::error::SubscriptionClosed;
use crate::Error;

use futures_channel::{mpsc, oneshot};
use jsonrpsee_types::error::CallError;
use jsonrpsee_types::response::SubscriptionError;
use jsonrpsee_types::{
ErrorResponse, Id, Notification, ParamsSer, RequestSer, Response, SubscriptionId, SubscriptionResponse,
};
Expand Down Expand Up @@ -81,20 +82,14 @@ pub(crate) fn process_subscription_response(
let sub_id = response.params.subscription.into_owned();
let request_id = match manager.get_request_id_by_subscription_id(&sub_id) {
Some(request_id) => request_id,
None => return Err(None),
None => {
tracing::error!("Subscription ID: {:?} is not an active subscription", sub_id);
return Err(None);
}
};

match manager.as_subscription_mut(&request_id) {
Some(send_back_sink) => match send_back_sink.try_send(response.params.result.clone()) {
// The server sent a subscription closed notification, then close down the subscription.
Ok(()) if serde_json::from_value::<SubscriptionClosed>(response.params.result).is_ok() => {
if manager.remove_subscription(request_id, sub_id.clone()).is_some() {
Ok(())
} else {
tracing::error!("The server tried to close down an invalid subscription: {:?}", sub_id);
Err(None)
}
}
Some(send_back_sink) => match send_back_sink.try_send(response.params.result) {
Ok(()) => Ok(()),
Err(err) => {
tracing::error!("Dropping subscription {:?} error: {:?}", sub_id, err);
Expand All @@ -110,6 +105,27 @@ pub(crate) fn process_subscription_response(
}
}

/// Attempts to close a subscription when a [`SubscriptionError`] is received.
///
/// Returns `Ok(())` if the subscription was removed
/// Return `Err(e)` if the subscription was not found.
pub(crate) fn process_subscription_close_response(
manager: &mut RequestManager,
response: SubscriptionError<JsonValue>,
) -> Result<(), Error> {
let sub_id = response.params.subscription.into_owned();
let request_id = match manager.get_request_id_by_subscription_id(&sub_id) {
Some(request_id) => request_id,
None => {
tracing::error!("The server tried to close down an invalid subscription: {:?}", sub_id);
return Err(Error::InvalidSubscriptionId);
}
};

manager.remove_subscription(request_id, sub_id).expect("Both request ID and sub ID in RequestManager; qed");
Ok(())
}

/// Attempts to process an incoming notification
///
/// Returns Ok() if the response was successfully handled
Expand Down Expand Up @@ -217,17 +233,18 @@ pub(crate) fn build_unsubscribe_message(
/// Returns `Ok` if the response was successfully sent.
/// Returns `Err(_)` if the response ID was not found.
pub(crate) fn process_error_response(manager: &mut RequestManager, err: ErrorResponse) -> Result<(), Error> {
let id = err.id.clone().into_owned();
let id = err.id().clone().into_owned();

match manager.request_status(&id) {
RequestStatus::PendingMethodCall => {
let send_back = manager.complete_pending_call(id).expect("State checked above; qed");
let _ = send_back.map(|s| s.send(Err(Error::Call(err.error.to_call_error()))));
let _ =
send_back.map(|s| s.send(Err(Error::Call(CallError::Custom(err.error_object().clone().into_owned())))));
Ok(())
}
RequestStatus::PendingSubscription => {
let (_, send_back, _) = manager.complete_pending_subscription(id).expect("State checked above; qed");
let _ = send_back.send(Err(Error::Call(err.error.to_call_error())));
let _ = send_back.send(Err(Error::Call(CallError::Custom(err.error_object().clone().into_owned()))));
Ok(())
}
_ => Err(Error::InvalidRequestId),
Expand Down
13 changes: 10 additions & 3 deletions core/src/client/async_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ mod manager;
use std::time::Duration;

use crate::client::{
BatchMessage, ClientT, RegisterNotificationMessage, RequestMessage, Subscription, SubscriptionClientT,
SubscriptionKind, SubscriptionMessage, TransportReceiverT, TransportSenderT,
async_client::helpers::process_subscription_close_response, BatchMessage, ClientT, RegisterNotificationMessage,
RequestMessage, Subscription, SubscriptionClientT, SubscriptionKind, SubscriptionMessage, TransportReceiverT,
TransportSenderT,
};
use helpers::{
build_unsubscribe_message, call_with_timeout, process_batch_response, process_error_response, process_notification,
Expand All @@ -20,7 +21,8 @@ use futures_util::future::Either;
use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
use jsonrpsee_types::{
ErrorResponse, Id, Notification, NotificationSer, ParamsSer, RequestSer, Response, SubscriptionResponse,
response::SubscriptionError, ErrorResponse, Id, Notification, NotificationSer, ParamsSer, RequestSer, Response,
SubscriptionResponse,
};
use serde::de::DeserializeOwned;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -489,6 +491,11 @@ async fn background_task<S: TransportSenderT, R: TransportReceiverT>(
let _ = stop_subscription(&mut sender, &mut manager, unsub).await;
}
}
// Subscription error response.
else if let Ok(response) = serde_json::from_str::<SubscriptionError<_>>(&raw) {
tracing::debug!("[backend]: recv subscription closed {:?}", response);
let _ = process_subscription_close_response(&mut manager, response);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here, the backend has sent back something like this I think?:

{ jsonrpc: "2.0", params: { subscription: "abc123", error: { code, message, data } } }

As a result, we close the subscription on the client. Right now, we don't pass on that error to the client though? (this may have been what you were getting at in a previous comment and I'm just catching up now :))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, because I was lazy because o the lifetime issues with serde.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK that makes sense; if it's a bit of a pain to do, let's add an issue to address this separately if it's not important to Substrate!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no this is just the client code so shouldn't matter

}
// Incoming Notification
else if let Ok(notif) = serde_json::from_str::<Notification<_>>(&raw) {
tracing::debug!("[backend]: recv notification {:?}", notif);
Expand Down
20 changes: 4 additions & 16 deletions core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::task;

use crate::error::{Error, SubscriptionClosed};
use crate::error::Error;
use async_trait::async_trait;
use core::marker::PhantomData;
use futures_channel::{mpsc, oneshot};
use futures_util::future::FutureExt;
use futures_util::sink::SinkExt;
use futures_util::stream::{Stream, StreamExt};
use jsonrpsee_types::{Id, ParamsSer, SubscriptionId};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde::de::DeserializeOwned;
use serde_json::Value as JsonValue;

#[doc(hidden)]
Expand Down Expand Up @@ -162,17 +162,6 @@ pub enum SubscriptionKind {
Method(String),
}

/// Internal type to detect whether a subscription response from
/// the server was a valid notification or should be treated as an error.
#[derive(Debug, Deserialize, Serialize)]
#[serde(untagged)]
pub enum NotifResponse<Notif> {
/// Successful response.
Ok(Notif),
/// Subscription was closed.
Err(SubscriptionClosed),
}

/// Active subscription on the client.
///
/// It will automatically unsubscribe in the [`Subscription::drop`] so no need to explicitly call
Expand Down Expand Up @@ -301,9 +290,8 @@ where
type Item = Result<Notif, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Option<Self::Item>> {
let n = futures_util::ready!(self.notifs_rx.poll_next_unpin(cx));
let res = n.map(|n| match serde_json::from_value::<NotifResponse<Notif>>(n) {
Ok(NotifResponse::Ok(parsed)) => Ok(parsed),
Ok(NotifResponse::Err(e)) => Err(Error::SubscriptionClosed(e)),
let res = n.map(|n| match serde_json::from_value::<Notif>(n) {
Ok(parsed) => Ok(parsed),
Err(e) => Err(Error::ParseError(e)),
});
task::Poll::Ready(res)
Expand Down
Loading