Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make it possible to override method name in subscriptions #568

Merged
merged 23 commits into from
Nov 19, 2021
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
df60861
feat: override `method` subscription notif
niklasad1 Nov 17, 2021
d6eb4ea
Arrow syntax for overwrites (#569)
maciejhirsz Nov 18, 2021
86ea9db
Merge remote-tracking branch 'origin/master' into na-override-notif-m…
niklasad1 Nov 18, 2021
84aad42
check that unique notifs are used
niklasad1 Nov 18, 2021
eea1b2b
check that custom sub name is unique
niklasad1 Nov 18, 2021
e72a33a
cargo fmt
niklasad1 Nov 18, 2021
673699a
address grumbles
niklasad1 Nov 18, 2021
2913c0b
Update proc-macros/src/rpc_macro.rs
niklasad1 Nov 18, 2021
37f77d1
commit added tests
niklasad1 Nov 18, 2021
70477f9
Merge remote-tracking branch 'origin/na-override-notif-method-name' i…
niklasad1 Nov 18, 2021
66972ef
Update proc-macros/src/render_server.rs
niklasad1 Nov 19, 2021
5067f0d
Update proc-macros/src/render_server.rs
niklasad1 Nov 19, 2021
6886c2b
Update proc-macros/src/rpc_macro.rs
niklasad1 Nov 19, 2021
1eb0a80
Update proc-macros/src/rpc_macro.rs
niklasad1 Nov 19, 2021
2be32e9
Update utils/src/server/rpc_module.rs
niklasad1 Nov 19, 2021
b67be8f
grumbles
niklasad1 Nov 19, 2021
3e8a18e
fix long lines
niklasad1 Nov 19, 2021
48ebf92
Update utils/src/server/rpc_module.rs
niklasad1 Nov 19, 2021
b01fbe8
Update utils/src/server/rpc_module.rs
niklasad1 Nov 19, 2021
a1a9577
Update proc-macros/src/rpc_macro.rs
niklasad1 Nov 19, 2021
86b06ed
Update proc-macros/src/render_server.rs
niklasad1 Nov 19, 2021
b76efe8
Update proc-macros/src/render_server.rs
niklasad1 Nov 19, 2021
b20a0c3
more grumbles
niklasad1 Nov 19, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benches/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::ws
module.register_method(SYNC_METHOD_NAME, |_, _| Ok("lo")).unwrap();
module.register_async_method(ASYNC_METHOD_NAME, |_, _| async { Ok("lo") }).unwrap();
module
.register_subscription(SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, mut sink, _ctx| {
.register_subscription(SUB_METHOD_NAME, SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, mut sink, _ctx| {
let x = "Hello";
tokio::spawn(async move { sink.send(&x) });
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion examples/proc_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
async fn storage_keys(&self, storage_key: StorageKey, hash: Option<Hash>) -> Result<Vec<StorageKey>, Error>;

/// Subscription that takes a `StorageKey` as input and produces a `Vec<Hash>`.
#[subscription(name = "subscribeStorage", item = Vec<Hash>)]
#[subscription(name = "subscribeStorage" => "override", item = Vec<Hash>)]
fn subscribe_storage(&self, keys: Option<Vec<StorageKey>>) -> Result<(), Error>;
}

Expand Down
4 changes: 2 additions & 2 deletions examples/ws_sub_with_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
let server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module
.register_subscription("sub_one_param", "unsub_one_param", |params, mut sink, _| {
.register_subscription("sub_one_param", "sub_one_param", "unsub_one_param", |params, mut sink, _| {
let idx: usize = params.one()?;
std::thread::spawn(move || loop {
let _ = sink.send(&LETTERS.chars().nth(idx));
Expand All @@ -72,7 +72,7 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
})
.unwrap();
module
.register_subscription("sub_params_two", "unsub_params_two", |params, mut sink, _| {
.register_subscription("sub_params_two", "params_two", "unsub_params_two", |params, mut sink, _| {
let (one, two): (usize, usize) = params.parse()?;
std::thread::spawn(move || loop {
let _ = sink.send(&LETTERS[one..two].to_string());
Expand Down
2 changes: 1 addition & 1 deletion examples/ws_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async fn main() -> anyhow::Result<()> {
async fn run_server() -> anyhow::Result<SocketAddr> {
let server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module.register_subscription("subscribe_hello", "unsubscribe_hello", |_, mut sink, _| {
module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", |_, mut sink, _| {
std::thread::spawn(move || loop {
if let Err(Error::SubscriptionClosed(_)) = sink.send(&"hello my friend") {
return;
Expand Down
39 changes: 31 additions & 8 deletions proc-macros/src/attributes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use proc_macro2::{Span, TokenStream as TokenStream2, TokenTree};
use std::{fmt, iter};
use syn::parse::{Parse, ParseStream, Parser};
use syn::punctuated::Punctuated;
use syn::{spanned::Spanned, Attribute, Error, Token};
use syn::{spanned::Spanned, Attribute, Error, LitInt, LitStr, Token};

pub(crate) struct AttributeMeta {
pub path: syn::Path,
Expand All @@ -48,15 +48,22 @@ pub enum ParamKind {

#[derive(Debug, Clone)]
pub struct Resource {
pub name: syn::LitStr,
pub name: LitStr,
pub assign: Token![=],
pub value: syn::LitInt,
pub value: LitInt,
}

pub struct Aliases {
pub list: Punctuated<syn::LitStr, Token![,]>,
pub struct NameMapping {
pub name: String,
pub mapped: Option<String>,
}

pub struct Bracketed<T> {
pub list: Punctuated<T, Token![,]>,
}

pub type Aliases = Bracketed<LitStr>;

impl Parse for Argument {
fn parse(input: ParseStream) -> syn::Result<Self> {
let label = input.parse()?;
Expand Down Expand Up @@ -91,15 +98,31 @@ impl Parse for Resource {
}
}

impl Parse for Aliases {
impl Parse for NameMapping {
fn parse(input: ParseStream) -> syn::Result<Self> {
let name = input.parse::<LitStr>()?.value();

let mapped = if input.peek(Token![=>]) {
input.parse::<Token![=>]>()?;

Some(input.parse::<LitStr>()?.value())
} else {
None
};

Ok(NameMapping { name, mapped })
}
}

impl<T: Parse> Parse for Bracketed<T> {
fn parse(input: ParseStream) -> syn::Result<Self> {
let content;

syn::bracketed!(content in input);

let list = content.parse_terminated(Parse::parse)?;

Ok(Aliases { list })
Ok(Bracketed { list })
}
}

Expand Down Expand Up @@ -201,7 +224,7 @@ impl Argument {

/// Asserts that the argument is `key = "string"` and gets the value of the string
pub fn string(self) -> syn::Result<String> {
self.value::<syn::LitStr>().map(|lit| lit.value())
self.value::<LitStr>().map(|lit| lit.value())
}
}

Expand Down
12 changes: 11 additions & 1 deletion proc-macros/src/render_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ impl RpcDescription {
let rust_method_name = &sub.signature.sig.ident;
// Name of the RPC method to subscribe to (e.g. `foo_sub`).
let rpc_sub_name = self.rpc_identifier(&sub.name);
// Name of `method` in the subscription response.
let rpc_sub_name_override = sub.name_override.as_ref().map(|m| self.rpc_identifier(m));
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
// Name of the RPC method to unsubscribe (e.g. `foo_sub`).
let rpc_unsub_name = self.rpc_identifier(&sub.unsubscribe);
// `parsing` is the code associated with parsing structure from the
Expand All @@ -184,8 +186,16 @@ impl RpcDescription {
check_name(&rpc_sub_name, rust_method_name.span());
check_name(&rpc_unsub_name, rust_method_name.span());

let rpc_notif_name = match rpc_sub_name_override {
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
Some(notif) => {
check_name(&notif, rust_method_name.span());
notif
}
None => rpc_sub_name.clone(),
};

handle_register_result(quote! {
rpc.register_subscription(#rpc_sub_name, #rpc_unsub_name, |params, sink, context| {
rpc.register_subscription(#rpc_sub_name, #rpc_notif_name, #rpc_unsub_name, |params, sink, context| {
#parsing
context.as_ref().#rust_method_name(sink, #params_seq)
})
Expand Down
28 changes: 25 additions & 3 deletions proc-macros/src/rpc_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
//! Declaration of the JSON RPC generator procedural macros.

use crate::{
attributes::{optional, parse_param_kind, Aliases, Argument, AttributeMeta, MissingArgument, ParamKind, Resource},
attributes::{
optional, parse_param_kind, Aliases, Argument, AttributeMeta, MissingArgument, NameMapping, ParamKind, Resource,
},
helpers::extract_doc_comments,
};

Expand Down Expand Up @@ -95,6 +97,13 @@ impl RpcMethod {
#[derive(Debug, Clone)]
pub struct RpcSubscription {
pub name: String,
/// When subscribing to an RPC, users can override the content of the `method` field
/// in the JSON data sent to subscribers.
/// Each subscription thus has one method name to set up the subscription,
/// one to unsubscribe and, optionally, a third method name used to describe the
/// payload sent back from the server to subscribers.
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
/// If no override is provided, the subscription method name is used.
pub name_override: Option<String>,
pub docs: TokenStream2,
pub unsubscribe: String,
pub params: Vec<(syn::PatIdent, syn::Type)>,
Expand All @@ -111,7 +120,9 @@ impl RpcSubscription {
AttributeMeta::parse(attr)?.retain(["aliases", "item", "name", "param_kind", "unsubscribe_aliases"])?;

let aliases = parse_aliases(aliases)?;
let name = name?.string()?;
let map = name?.value::<NameMapping>()?;
let name = map.name;
let name_override = map.mapped;
let item = item?.value()?;
let param_kind = parse_param_kind(param_kind)?;
let unsubscribe_aliases = parse_aliases(unsubscribe_aliases)?;
Expand All @@ -135,7 +146,18 @@ impl RpcSubscription {
// We've analyzed attributes and don't need them anymore.
sub.attrs.clear();

Ok(Self { name, unsubscribe, unsubscribe_aliases, params, param_kind, item, signature: sub, aliases, docs })
Ok(Self {
name,
name_override,
unsubscribe,
unsubscribe_aliases,
params,
param_kind,
item,
signature: sub,
aliases,
docs,
})
}
}

Expand Down
13 changes: 13 additions & 0 deletions proc-macros/tests/ui/correct/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ pub trait Rpc {

#[subscription(name = "echo", aliases = ["ECHO"], item = u32, unsubscribe_aliases = ["NotInterested", "listenNoMore"])]
fn sub_with_params(&self, val: u32) -> RpcResult<()>;

// This will send data to subscribers with the `method` field in the JSON payload set to `foo_subscribe_override`
// because it's in the `foo` namespace.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh that's a good point, let's not forget to include that in the docs for the macro.

#[subscription(name = "subscribe_method" => "subscribe_override", item = u32)]
fn sub_with_override_notif_method(&self) -> RpcResult<()>;
}

pub struct RpcServerImpl;
Expand Down Expand Up @@ -68,6 +73,10 @@ impl RpcServer for RpcServerImpl {
sink.send(&val)?;
sink.send(&val)
}

fn sub_with_override_notif_method(&self, mut sink: SubscriptionSink) -> RpcResult<()> {
sink.send(&1)
}
}

pub async fn websocket_server() -> SocketAddr {
Expand Down Expand Up @@ -102,4 +111,8 @@ async fn main() {
assert_eq!(first_recv, Some("Response_A".to_string()));
let second_recv = sub.next().await.unwrap();
assert_eq!(second_recv, Some("Response_B".to_string()));

let mut sub = client.sub_with_override_notif_method().await.unwrap();
let recv = sub.next().await.unwrap();
assert_eq!(recv, Some(1));
}
12 changes: 12 additions & 0 deletions proc-macros/tests/ui/incorrect/sub/sub_dup_name_override.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use jsonrpsee::{proc_macros::rpc, types::RpcResult};

// Subscription method must not use the same override name.
#[rpc(client, server)]
pub trait DupOverride {
#[subscription(name = "one" => "override", item = u8)]
fn one(&self) -> RpcResult<()>;
#[subscription(name = "two" => "override", item = u8)]
fn two(&self) -> RpcResult<()>;
}

fn main() {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
error: "override" is already defined
--> tests/ui/incorrect/sub/sub_dup_name_override.rs:9:5
|
9 | fn two(&self) -> RpcResult<()>;
| ^^^
33 changes: 19 additions & 14 deletions tests/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle
module.register_method("say_hello", |_, _| Ok("hello")).unwrap();

module
.register_subscription("subscribe_hello", "unsubscribe_hello", |_, mut sink, _| {
.register_subscription("subscribe_hello", "subscribe_hello", "unsubscribe_hello", |_, mut sink, _| {
std::thread::spawn(move || loop {
if let Err(Error::SubscriptionClosed(_)) = sink.send(&"hello from subscription") {
break;
Expand All @@ -52,7 +52,7 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle
.unwrap();

module
.register_subscription("subscribe_foo", "unsubscribe_foo", |_, mut sink, _| {
.register_subscription("subscribe_foo", "subscribe_foo", "unsubscribe_foo", |_, mut sink, _| {
std::thread::spawn(move || loop {
if let Err(Error::SubscriptionClosed(_)) = sink.send(&1337) {
break;
Expand All @@ -64,21 +64,26 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle
.unwrap();

module
.register_subscription("subscribe_add_one", "unsubscribe_add_one", |params, mut sink, _| {
let mut count: usize = params.one()?;
std::thread::spawn(move || loop {
count = count.wrapping_add(1);
if let Err(Error::SubscriptionClosed(_)) = sink.send(&count) {
break;
}
std::thread::sleep(Duration::from_millis(100));
});
Ok(())
})
.register_subscription(
"subscribe_add_one",
"subscribe_add_one",
"unsubscribe_add_one",
|params, mut sink, _| {
let mut count: usize = params.one()?;
std::thread::spawn(move || loop {
count = count.wrapping_add(1);
if let Err(Error::SubscriptionClosed(_)) = sink.send(&count) {
break;
}
std::thread::sleep(Duration::from_millis(100));
});
Ok(())
},
)
.unwrap();

module
.register_subscription("subscribe_noop", "unsubscribe_noop", |_, mut sink, _| {
.register_subscription("subscribe_noop", "subscribe_noop", "unsubscribe_noop", |_, mut sink, _| {
std::thread::spawn(move || {
std::thread::sleep(Duration::from_secs(1));
sink.close("Server closed the stream because it was lazy")
Expand Down
2 changes: 1 addition & 1 deletion tests/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ async fn ws_server_should_stop_subscription_after_client_drop() {
let mut module = RpcModule::new(tx);

module
.register_subscription("subscribe_hello", "unsubscribe_hello", |_, mut sink, mut tx| {
.register_subscription("subscribe_hello", "subscribe_hello", "unsubscribe_hello", |_, mut sink, mut tx| {
tokio::spawn(async move {
let close_err = loop {
if let Err(Error::SubscriptionClosed(err)) = sink.send(&1) {
Expand Down
2 changes: 1 addition & 1 deletion tests/tests/proc_macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ async fn multiple_blocking_calls_overlap() {

#[tokio::test]
async fn subscriptions_do_not_work_for_http_servers() {
let htserver = HttpServerBuilder::default().build("127.0.0.1:0".parse().unwrap()).unwrap();
let htserver = HttpServerBuilder::default().build("127.0.0.1:0").unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let addr = htserver.local_addr().unwrap();
let htserver_url = format!("http://{}", addr);
let _handle = htserver.start(RpcServerImpl.into_rpc()).unwrap();
Expand Down
Loading