-
Notifications
You must be signed in to change notification settings - Fork 732
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rpc-servers: Allow chainHead methods to be called from a single connection #3343
Conversation
Signed-off-by: Alexandru Vasile <[email protected]>
Signed-off-by: Alexandru Vasile <[email protected]>
Signed-off-by: Alexandru Vasile <[email protected]>
Signed-off-by: Alexandru Vasile <[email protected]>
Signed-off-by: Alexandru Vasile <[email protected]>
…o lexnv/chainhead-connections
The CI pipeline was cancelled due to failure one of the required jobs. |
/// "{"jsonrpc":"2.0","result":"tfMQUZekzJLorGlR","id":0}" | ||
/// ``` | ||
fn get_method_result(response: &MethodResponse) -> Option<String> { | ||
if response.is_error() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if response.is_error() { | |
if response.is_error() || !response.is_subscription() { |
} | ||
|
||
let result = response.as_result(); | ||
let Ok(value) = serde_json::from_str(result) else { return None }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, this is awkward to decode the response from the final JSON string.
It would be cleaner and easier with a type that implements deserialize.
#[derive(serde::Deserialize)]
struct SubscriptionPayload {
result: String,
}
I guess the rpc v2 requires to subscription ID to be a string but jsonrpsee accepts integers as well.
/// This method handles positional and named `camelCase` parameters. | ||
fn get_subscription_id<'a>(params: Params<'a>) -> Option<String> { | ||
// Support positional parameters. | ||
if let Ok(follow_subscription) = params.sequence().next::<String>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can call params.is_object()
here to do either parse it as a sequence or map.
however, you use params.one::<String>()
instead sequence here since you only care about one item
} | ||
|
||
// Support named parameters. | ||
let Ok(value) = params.parse::<serde_json::Value>() else { return None }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again custom type that implement Deserialize here would be cleaner
#[derive(serde::Deserialize)]
struct FollowSubscriptionPayload {
result: String,
}
#[derive(Default)] | ||
pub struct ConnectionData { | ||
/// Active `chainHead_follow` subscriptions for this connection. | ||
subscriptions: HashSet<String>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would probably add a type alias SubscriptionId here for readability.
/// subscription ID as a first parameter. | ||
/// | ||
/// This method handles positional and named `camelCase` parameters. | ||
fn get_subscription_id<'a>(params: Params<'a>) -> Option<String> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
subscription_id_from_request
/// ```ignore | ||
/// "{"jsonrpc":"2.0","result":"tfMQUZekzJLorGlR","id":0}" | ||
/// ``` | ||
fn get_method_result(response: &MethodResponse) -> Option<String> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
subscription_id_from_response
req.id(), | ||
jsonrpsee::types::error::ErrorObject::owned( | ||
-32602, | ||
"Invalid subscription ID", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A JSON-RPC error with error code -32602 is generated if one of the parameters doesn't correspond to the expected type (similarly to a missing parameter or an invalid parameter type).
I would probably add another error for this, the subscription ID is decoded successfully but the subscription is not "active" on the connection....
impl<F: Future<Output = MethodResponse>> Future for ResponseFuture<F> { | ||
type Output = F::Output; | ||
|
||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure but may be worth boxing the future for readability
match this { | ||
ResponseFutureProj::Ready { response } => | ||
Poll::Ready(response.take().expect("Value is set; qed")), | ||
ResponseFutureProj::Forward { fut } => fut.poll(cx), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are you not removing subscriptions when chainHead_unstable_unfollow
is called?
|
||
fn call(&self, req: Request<'a>) -> Self::Future { | ||
const CHAIN_HEAD_FOLLOW: &str = "chainHead_unstable_follow"; | ||
const CHAIN_HEAD_CALL_METHODS: [&str; 8] = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A random thought is whether we can fetch these method names from the rpc module itself i.e, to keep in sync if a method is removed, modified and so on.
Before the server start you could do:
let chain_head_api = Vec<_> = rpc_api.method_names().filter(|m| m.starts_with("chainHead")).collect();
let chain_middleware = ChainHeadMiddleware::new(service, connection_data, chain_api);
Perhaps, we could add some integration tests for it at some point.
Replaced by: #3481 |
… context and limit connections (#3481) This PR ensures that the chainHead RPC class can be called only from within the same connection context. The chainHead methods are now registered as raw methods. - paritytech/jsonrpsee#1297 The concept of raw methods is introduced in jsonrpsee, which is an async method that exposes the connection ID: The raw method doesn't have the concept of a blocking method. Previously blocking methods are now spawning a blocking task to handle their blocking (ie DB) access. We spawn the same number of tasks as before, however we do that explicitly. Another approach would be implementing a RPC middleware that captures and decodes the method parameters: - #3343 However, that approach is prone to errors since the methods are hardcoded by name. Performace is affected by the double deserialization that needs to happen to extract the subscription ID we'd like to limit. Once from the middleware, and once from the methods itself. This PR paves the way to implement the chainHead connection limiter: - #1505 Registering tokens (subscription ID / operation ID) on the `RpcConnections` could be extended to return an error when the maximum number of operations is reached. While at it, have added an integration-test to ensure that chainHead methods can be called from within the same connection context. Before this is merged, a new JsonRPC release should be made to expose the `raw-methods`: - [x] Use jsonrpsee from crates io (blocked by: paritytech/jsonrpsee#1297) Closes: #3207 cc @paritytech/subxt-team --------- Signed-off-by: Alexandru Vasile <[email protected]> Co-authored-by: Niklas Adolfsson <[email protected]>
… context and limit connections (#3481) This PR ensures that the chainHead RPC class can be called only from within the same connection context. The chainHead methods are now registered as raw methods. - paritytech/jsonrpsee#1297 The concept of raw methods is introduced in jsonrpsee, which is an async method that exposes the connection ID: The raw method doesn't have the concept of a blocking method. Previously blocking methods are now spawning a blocking task to handle their blocking (ie DB) access. We spawn the same number of tasks as before, however we do that explicitly. Another approach would be implementing a RPC middleware that captures and decodes the method parameters: - #3343 However, that approach is prone to errors since the methods are hardcoded by name. Performace is affected by the double deserialization that needs to happen to extract the subscription ID we'd like to limit. Once from the middleware, and once from the methods itself. This PR paves the way to implement the chainHead connection limiter: - #1505 Registering tokens (subscription ID / operation ID) on the `RpcConnections` could be extended to return an error when the maximum number of operations is reached. While at it, have added an integration-test to ensure that chainHead methods can be called from within the same connection context. Before this is merged, a new JsonRPC release should be made to expose the `raw-methods`: - [x] Use jsonrpsee from crates io (blocked by: paritytech/jsonrpsee#1297) Closes: #3207 cc @paritytech/subxt-team --------- Signed-off-by: Alexandru Vasile <[email protected]> Co-authored-by: Niklas Adolfsson <[email protected]>
… context and limit connections (paritytech#3481) This PR ensures that the chainHead RPC class can be called only from within the same connection context. The chainHead methods are now registered as raw methods. - paritytech/jsonrpsee#1297 The concept of raw methods is introduced in jsonrpsee, which is an async method that exposes the connection ID: The raw method doesn't have the concept of a blocking method. Previously blocking methods are now spawning a blocking task to handle their blocking (ie DB) access. We spawn the same number of tasks as before, however we do that explicitly. Another approach would be implementing a RPC middleware that captures and decodes the method parameters: - paritytech#3343 However, that approach is prone to errors since the methods are hardcoded by name. Performace is affected by the double deserialization that needs to happen to extract the subscription ID we'd like to limit. Once from the middleware, and once from the methods itself. This PR paves the way to implement the chainHead connection limiter: - paritytech#1505 Registering tokens (subscription ID / operation ID) on the `RpcConnections` could be extended to return an error when the maximum number of operations is reached. While at it, have added an integration-test to ensure that chainHead methods can be called from within the same connection context. Before this is merged, a new JsonRPC release should be made to expose the `raw-methods`: - [x] Use jsonrpsee from crates io (blocked by: paritytech/jsonrpsee#1297) Closes: paritytech#3207 cc @paritytech/subxt-team --------- Signed-off-by: Alexandru Vasile <[email protected]> Co-authored-by: Niklas Adolfsson <[email protected]>
This PR adds middleware for the JSON rpc servers to ensure that the chainHead methods are called from a single connection.
The middleware intercepts the subscription IDs generated by the
chainHead_follow
.Any further chainHead function call (ie
chainHead_storage
) must provide a subscription ID that was generated by the middleware's connection.A middleware per connection is created in this context.
Closes: #3207
cc @paritytech/subxt-team