Skip to content

Commit

Permalink
Finish backporting changes to JSON-RPC API (#966)
Browse files Browse the repository at this point in the history
* Clean up `ChainHeadFollowTask::run` a bit

* Add the new events and produce them from the subscriptions

* Assign operation IDs

* Send operations notifications to the main task

* Body, call, and storage are now regular functions

* Adjust the return values of body/call/storage

* Properly check available operation slots

* Add TODO regarding slots

* Remove unused events

* Merge the `stop` functions into `chainHead_unstable_stopOperation`

* Track operations in the chain head follow struct

* Properly implement stopOperation

* CHANGELOG

* Some fixes to stopping operations

* Fix test
  • Loading branch information
tomaka authored Jul 26, 2023
1 parent 1702f17 commit 86ed62f
Show file tree
Hide file tree
Showing 6 changed files with 574 additions and 377 deletions.
113 changes: 62 additions & 51 deletions lib/src/json_rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,13 +449,13 @@ define_methods! {
chainHead_unstable_body(
#[rename = "followSubscription"] follow_subscription: Cow<'a, str>,
hash: HashHexString
) -> Cow<'a, str>,
) -> ChainHeadBodyCallReturn<'a>,
chainHead_unstable_call(
#[rename = "followSubscription"] follow_subscription: Cow<'a, str>,
hash: HashHexString,
function: Cow<'a, str>,
#[rename = "callParameters"] call_parameters: HexString
) -> Cow<'a, str>,
) -> ChainHeadBodyCallReturn<'a>,
chainHead_unstable_follow(
#[rename = "withRuntime"] with_runtime: bool
) -> Cow<'a, str>,
Expand All @@ -464,21 +464,16 @@ define_methods! {
#[rename = "followSubscription"] follow_subscription: Cow<'a, str>,
hash: HashHexString
) -> Option<HexString>,
chainHead_unstable_stopBody(
subscription: Cow<'a, str>
) -> (),
chainHead_unstable_stopCall(
subscription: Cow<'a, str>
) -> (),
chainHead_unstable_stopStorage(
subscription: Cow<'a, str>
chainHead_unstable_stopOperation(
#[rename = "followSubscription"] follow_subscription: Cow<'a, str>,
#[rename = "operationId"] operation_id: Cow<'a, str>
) -> (),
chainHead_unstable_storage(
#[rename = "followSubscription"] follow_subscription: Cow<'a, str>,
hash: HashHexString,
items: Vec<ChainHeadStorageRequestItem>,
#[rename = "childTrie"] child_trie: Option<HexString>
) -> Cow<'a, str>,
) -> ChainHeadStorageReturn<'a>,
chainHead_unstable_continue(
#[rename = "followSubscription"] follow_subscription: Cow<'a, str>,
#[rename = "operationId"] operation_id: Cow<'a, str>
Expand Down Expand Up @@ -520,10 +515,7 @@ define_methods! {
state_storage(subscription: Cow<'a, str>, result: StorageChangeSet) -> (),

// The functions below are experimental and are defined in the document https://github.com/paritytech/json-rpc-interface-spec/
chainHead_unstable_bodyEvent(subscription: Cow<'a, str>, result: ChainHeadBodyEvent) -> (),
chainHead_unstable_callEvent(subscription: Cow<'a, str>, result: ChainHeadCallEvent<'a>) -> (),
chainHead_unstable_followEvent(subscription: Cow<'a, str>, result: FollowEvent<'a>) -> (),
chainHead_unstable_storageEvent(subscription: Cow<'a, str>, result: ChainHeadStorageEvent<'a>) -> (),
transaction_unstable_watchEvent(subscription: Cow<'a, str>, result: TransactionWatchEvent<'a>) -> (),

// This function is a custom addition in smoldot. As of the writing of this comment, there is
Expand Down Expand Up @@ -710,32 +702,70 @@ pub enum FollowEvent<'a> {
#[serde(rename = "prunedBlockHashes")]
pruned_blocks_hashes: Vec<HashHexString>,
},
#[serde(rename = "operation-body-done")]
OperationBodyDone {
#[serde(rename = "operationId")]
operation_id: Cow<'a, str>,
value: Vec<HexString>,
},
#[serde(rename = "operation-call-done")]
OperationCallDone {
#[serde(rename = "operationId")]
operation_id: Cow<'a, str>,
output: HexString,
},
#[serde(rename = "operation-inaccessible")]
OperationInaccessible {
#[serde(rename = "operationId")]
operation_id: Cow<'a, str>,
},
#[serde(rename = "operation-storage-items")]
OperationStorageItems {
#[serde(rename = "operationId")]
operation_id: Cow<'a, str>,
items: Vec<ChainHeadStorageResponseItem>,
},
#[serde(rename = "operation-storage-done")]
OperationStorageDone {
#[serde(rename = "operationId")]
operation_id: Cow<'a, str>,
},
#[serde(rename = "operation-waiting-for-continue")]
OperationWaitingForContinue,
#[serde(rename = "operation-error")]
OperationError {
#[serde(rename = "operationId")]
operation_id: Cow<'a, str>,
error: Cow<'a, str>,
},
#[serde(rename = "stop")]
Stop {},
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(tag = "event")]
pub enum ChainHeadBodyEvent {
#[serde(rename = "done")]
Done { value: Vec<HexString> },
#[serde(rename = "inaccessible")]
Inaccessible {},
#[serde(rename = "disjoint")]
Disjoint {},
#[serde(tag = "result")]
pub enum ChainHeadBodyCallReturn<'a> {
#[serde(rename = "started")]
Started {
#[serde(rename = "operationId")]
operation_id: Cow<'a, str>,
},
#[serde(rename = "limitReached")]
LimitReached {},
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(tag = "event")]
pub enum ChainHeadCallEvent<'a> {
#[serde(rename = "done")]
Done { output: HexString },
#[serde(rename = "inaccessible")]
Inaccessible {},
#[serde(rename = "error")]
Error { error: Cow<'a, str> },
#[serde(rename = "disjoint")]
Disjoint {},
#[serde(tag = "result")]
pub enum ChainHeadStorageReturn<'a> {
#[serde(rename = "started")]
Started {
#[serde(rename = "operationId")]
operation_id: Cow<'a, str>,
#[serde(rename = "discardedItems")]
discarded_items: usize,
},
#[serde(rename = "limitReached")]
LimitReached {},
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
Expand Down Expand Up @@ -773,25 +803,6 @@ pub enum ChainHeadStorageType {
DescendantsHashes,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(tag = "event")]
pub enum ChainHeadStorageEvent<'a> {
#[serde(rename = "items")]
Items {
items: Vec<ChainHeadStorageResponseItem>,
},
#[serde(rename = "done")]
Done,
#[serde(rename = "waiting-for-continue")]
WaitingForContinue,
#[serde(rename = "inaccessible")]
Inaccessible {},
#[serde(rename = "error")]
Error { error: Cow<'a, str> },
#[serde(rename = "disjoint")]
Disjoint {},
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(tag = "event")]
pub enum TransactionWatchEvent<'a> {
Expand Down
30 changes: 5 additions & 25 deletions lib/src/json_rpc/service/client_main_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,9 +429,13 @@ impl ClientMainTask {
| methods::MethodCall::rpc_methods { .. }
| methods::MethodCall::sudo_unstable_p2pDiscover { .. }
| methods::MethodCall::sudo_unstable_version { .. }
| methods::MethodCall::chainHead_unstable_body { .. }
| methods::MethodCall::chainHead_unstable_call { .. }
| methods::MethodCall::chainHead_unstable_continue { .. }
| methods::MethodCall::chainHead_unstable_finalizedDatabase { .. }
| methods::MethodCall::chainHead_unstable_header { .. }
| methods::MethodCall::chainHead_unstable_stopOperation { .. }
| methods::MethodCall::chainHead_unstable_storage { .. }
| methods::MethodCall::chainHead_unstable_unpin { .. } => {
// Simple one-request-one-response.
return Event::HandleRequest {
Expand All @@ -455,10 +459,7 @@ impl ClientMainTask {
| methods::MethodCall::state_subscribeStorage { .. }
| methods::MethodCall::transaction_unstable_submitAndWatch { .. }
| methods::MethodCall::network_unstable_subscribeEvents { .. }
| methods::MethodCall::chainHead_unstable_body { .. }
| methods::MethodCall::chainHead_unstable_call { .. }
| methods::MethodCall::chainHead_unstable_follow { .. }
| methods::MethodCall::chainHead_unstable_storage { .. } => {
| methods::MethodCall::chainHead_unstable_follow { .. } => {
// Subscription starting requests.

// We must check the maximum number of subscriptions.
Expand Down Expand Up @@ -530,9 +531,6 @@ impl ClientMainTask {
| methods::MethodCall::network_unstable_unsubscribeEvents {
subscription, ..
}
| methods::MethodCall::chainHead_unstable_stopBody { subscription, .. }
| methods::MethodCall::chainHead_unstable_stopStorage { subscription, .. }
| methods::MethodCall::chainHead_unstable_stopCall { subscription, .. }
| methods::MethodCall::chainHead_unstable_unfollow {
follow_subscription: subscription,
..
Expand Down Expand Up @@ -560,15 +558,6 @@ impl ClientMainTask {
methods::MethodCall::network_unstable_unsubscribeEvents {
..
} => methods::Response::network_unstable_unsubscribeEvents(()),
methods::MethodCall::chainHead_unstable_stopBody { .. } => {
methods::Response::chainHead_unstable_stopBody(())
}
methods::MethodCall::chainHead_unstable_stopStorage {
..
} => methods::Response::chainHead_unstable_stopStorage(()),
methods::MethodCall::chainHead_unstable_stopCall { .. } => {
methods::Response::chainHead_unstable_stopCall(())
}
methods::MethodCall::chainHead_unstable_unfollow { .. } => {
methods::Response::chainHead_unstable_unfollow(())
}
Expand Down Expand Up @@ -1178,18 +1167,9 @@ impl SubscriptionStartProcess {
&self.subscription_id,
))
}
methods::MethodCall::chainHead_unstable_body { .. } => {
methods::Response::chainHead_unstable_body(Cow::Borrowed(&self.subscription_id))
}
methods::MethodCall::chainHead_unstable_call { .. } => {
methods::Response::chainHead_unstable_call(Cow::Borrowed(&self.subscription_id))
}
methods::MethodCall::chainHead_unstable_follow { .. } => {
methods::Response::chainHead_unstable_follow(Cow::Borrowed(&self.subscription_id))
}
methods::MethodCall::chainHead_unstable_storage { .. } => {
methods::Response::chainHead_unstable_storage(Cow::Borrowed(&self.subscription_id))
}
_ => unreachable!(),
}
.to_json_response(request_id);
Expand Down
33 changes: 15 additions & 18 deletions light-base/src/json_rpc_service/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,7 @@ struct Background<TPlat: PlatformRef> {
chain_head_follow_tasks: Mutex<
hashbrown::HashMap<
String,
service::DeliverSender<
either::Either<service::RequestProcess, service::SubscriptionStartProcess>,
>,
service::DeliverSender<service::RequestProcess>,
fnv::FnvBuildHasher,
>,
>,
Expand Down Expand Up @@ -349,9 +347,7 @@ impl<TPlat: PlatformRef> Background<TPlat> {
| methods::MethodCall::chainHead_unstable_follow { .. }
| methods::MethodCall::chainHead_unstable_genesisHash { .. }
| methods::MethodCall::chainHead_unstable_header { .. }
| methods::MethodCall::chainHead_unstable_stopBody { .. }
| methods::MethodCall::chainHead_unstable_stopCall { .. }
| methods::MethodCall::chainHead_unstable_stopStorage { .. }
| methods::MethodCall::chainHead_unstable_stopOperation { .. }
| methods::MethodCall::chainHead_unstable_storage { .. }
| methods::MethodCall::chainHead_unstable_unfollow { .. }
| methods::MethodCall::chainHead_unstable_unpin { .. }
Expand Down Expand Up @@ -449,12 +445,24 @@ impl<TPlat: PlatformRef> Background<TPlat> {
self.system_version(request).await;
}

methods::MethodCall::chainHead_unstable_body { .. } => {
self.chain_head_unstable_body(request).await;
}
methods::MethodCall::chainHead_unstable_call { .. } => {
self.chain_head_call(request).await;
}
methods::MethodCall::chainHead_unstable_continue { .. } => {
self.chain_head_continue(request).await;
}
methods::MethodCall::chainHead_unstable_genesisHash {} => {
self.chain_head_unstable_genesis_hash(request).await;
}
methods::MethodCall::chainHead_unstable_storage { .. } => {
self.chain_head_storage(request).await;
}
methods::MethodCall::chainHead_unstable_stopOperation { .. } => {
self.chain_head_stop_operation(request).await;
}
methods::MethodCall::chainHead_unstable_header { .. } => {
self.chain_head_unstable_header(request).await;
}
Expand Down Expand Up @@ -613,9 +621,7 @@ impl<TPlat: PlatformRef> Background<TPlat> {
| methods::MethodCall::chainHead_unstable_follow { .. }
| methods::MethodCall::chainHead_unstable_genesisHash { .. }
| methods::MethodCall::chainHead_unstable_header { .. }
| methods::MethodCall::chainHead_unstable_stopBody { .. }
| methods::MethodCall::chainHead_unstable_stopCall { .. }
| methods::MethodCall::chainHead_unstable_stopStorage { .. }
| methods::MethodCall::chainHead_unstable_stopOperation { .. }
| methods::MethodCall::chainHead_unstable_storage { .. }
| methods::MethodCall::chainHead_unstable_unfollow { .. }
| methods::MethodCall::chainHead_unstable_unpin { .. }
Expand Down Expand Up @@ -645,15 +651,6 @@ impl<TPlat: PlatformRef> Background<TPlat> {
unreachable!()
}

methods::MethodCall::chainHead_unstable_body { .. } => {
self.chain_head_unstable_body(request).await;
}
methods::MethodCall::chainHead_unstable_call { .. } => {
self.chain_head_call(request).await;
}
methods::MethodCall::chainHead_unstable_storage { .. } => {
self.chain_head_storage(request).await;
}
methods::MethodCall::chainHead_unstable_follow { .. } => {
self.chain_head_follow(request).await;
}
Expand Down
Loading

0 comments on commit 86ed62f

Please sign in to comment.