Skip to content

Commit

Permalink
Stabilize chainHead functions (#1748)
Browse files Browse the repository at this point in the history
* Stabilize `chainHead` functions

* PR link
  • Loading branch information
tomaka authored Apr 4, 2024
1 parent fba1a8a commit 88275b4
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 166 deletions.
18 changes: 8 additions & 10 deletions full-node/src/json_rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ fn spawn_client_main_task(
client_main_task = task;

match request_process.request() {
methods::MethodCall::chainHead_unstable_header {
methods::MethodCall::chainHead_v1_header {
follow_subscription,
..
} => {
Expand All @@ -585,10 +585,10 @@ fn spawn_client_main_task(
// TODO racy; doesn't handle situation where follow subscription stops
} else {
request_process
.respond(methods::Response::chainHead_unstable_header(None));
.respond(methods::Response::chainHead_v1_header(None));
}
}
methods::MethodCall::chainHead_unstable_unpin {
methods::MethodCall::chainHead_v1_unpin {
follow_subscription,
hash_or_hashes,
} => {
Expand All @@ -614,14 +614,12 @@ fn spawn_client_main_task(

match outcome_rx.await {
Err(_) => {
request_process.respond(
methods::Response::chainHead_unstable_unpin(()),
);
request_process
.respond(methods::Response::chainHead_v1_unpin(()));
}
Ok(Ok(())) => {
request_process.respond(
methods::Response::chainHead_unstable_unpin(()),
);
request_process
.respond(methods::Response::chainHead_v1_unpin(()));
}
Ok(Err(())) => {
request_process.fail(service::ErrorResponse::InvalidParams);
Expand All @@ -645,7 +643,7 @@ fn spawn_client_main_task(

match subscription_start.request() {
// TODO: enforce limit to number of subscriptions
methods::MethodCall::chainHead_unstable_follow { with_runtime } => {
methods::MethodCall::chainHead_v1_follow { with_runtime } => {
let (tx, rx) = async_channel::bounded(16);
let subscription_id =
chain_head_subscriptions::spawn_chain_head_subscription_task(
Expand Down
111 changes: 50 additions & 61 deletions full-node/src/json_rpc_service/chain_head_subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ pub struct Config {
/// Receiver for actions that the JSON-RPC client wants to perform.
pub receiver: async_channel::Receiver<Message>,

/// `chainHead_unstable_follow` subscription start handle.
/// `chainHead_v1_follow` subscription start handle.
pub chain_head_follow_subscription: service::SubscriptionStartProcess,

/// Parameter that was passed by the user when requesting `chainHead_unstable_follow`.
/// Parameter that was passed by the user when requesting `chainHead_v1_follow`.
pub with_runtime: bool,

/// Consensus service of the chain.
Expand All @@ -68,7 +68,7 @@ pub enum Message {
},
}

/// Spawns a new tasks dedicated to handling a `chainHead_unstable_follow` subscription.
/// Spawns a new tasks dedicated to handling a `chainHead_v1_follow` subscription.
///
/// Returns the identifier of the subscription.
pub async fn spawn_chain_head_subscription_task(config: Config) -> String {
Expand All @@ -93,7 +93,7 @@ pub async fn spawn_chain_head_subscription_task(config: Config) -> String {

pinned_blocks.insert(consensus_service_subscription.finalized_block_hash);
json_rpc_subscription
.send_notification(methods::ServerToClient::chainHead_unstable_followEvent {
.send_notification(methods::ServerToClient::chainHead_v1_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::Initialized {
finalized_block_hash: methods::HashHexString(
Expand All @@ -115,7 +115,7 @@ pub async fn spawn_chain_head_subscription_task(config: Config) -> String {
for block in consensus_service_subscription.non_finalized_blocks_ancestry_order {
pinned_blocks.insert(block.block_hash);
json_rpc_subscription
.send_notification(methods::ServerToClient::chainHead_unstable_followEvent {
.send_notification(methods::ServerToClient::chainHead_v1_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::NewBlock {
block_hash: methods::HashHexString(block.block_hash),
Expand All @@ -134,7 +134,7 @@ pub async fn spawn_chain_head_subscription_task(config: Config) -> String {
if block.is_new_best {
current_best_block = block.block_hash;
json_rpc_subscription
.send_notification(methods::ServerToClient::chainHead_unstable_followEvent {
.send_notification(methods::ServerToClient::chainHead_v1_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::BestBlockChanged {
best_block_hash: methods::HashHexString(block.block_hash),
Expand Down Expand Up @@ -172,8 +172,7 @@ pub async fn spawn_chain_head_subscription_task(config: Config) -> String {
match wake_up_reason {
WakeUpReason::ForegroundClosed => return,
WakeUpReason::Foreground(Message::Header { request }) => {
let methods::MethodCall::chainHead_unstable_header { hash, .. } =
request.request()
let methods::MethodCall::chainHead_v1_header { hash, .. } = request.request()
else {
unreachable!()
};
Expand All @@ -190,7 +189,7 @@ pub async fn spawn_chain_head_subscription_task(config: Config) -> String {

match database_outcome {
Ok(Some(header)) => {
request.respond(methods::Response::chainHead_unstable_header(Some(
request.respond(methods::Response::chainHead_v1_header(Some(
methods::HexString(header),
)))
}
Expand Down Expand Up @@ -228,35 +227,31 @@ pub async fn spawn_chain_head_subscription_task(config: Config) -> String {
}) => {
pinned_blocks.insert(block.block_hash);
json_rpc_subscription
.send_notification(
methods::ServerToClient::chainHead_unstable_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::NewBlock {
block_hash: methods::HashHexString(block.block_hash),
new_runtime: if let (Some(new_runtime), true) =
(&block.runtime_update, config.with_runtime)
{
Some(convert_runtime_spec(new_runtime.runtime_version()))
} else {
None
},
parent_block_hash: methods::HashHexString(block.parent_hash),
.send_notification(methods::ServerToClient::chainHead_v1_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::NewBlock {
block_hash: methods::HashHexString(block.block_hash),
new_runtime: if let (Some(new_runtime), true) =
(&block.runtime_update, config.with_runtime)
{
Some(convert_runtime_spec(new_runtime.runtime_version()))
} else {
None
},
parent_block_hash: methods::HashHexString(block.parent_hash),
},
)
})
.await;

if block.is_new_best {
current_best_block = block.block_hash;
json_rpc_subscription
.send_notification(
methods::ServerToClient::chainHead_unstable_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::BestBlockChanged {
best_block_hash: methods::HashHexString(block.block_hash),
},
.send_notification(methods::ServerToClient::chainHead_v1_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::BestBlockChanged {
best_block_hash: methods::HashHexString(block.block_hash),
},
)
})
.await;
}
}
Expand All @@ -268,49 +263,43 @@ pub async fn spawn_chain_head_subscription_task(config: Config) -> String {
},
) => {
json_rpc_subscription
.send_notification(
methods::ServerToClient::chainHead_unstable_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::Finalized {
// As specified in the JSON-RPC spec, the list must be ordered
// in increasing block number. Consequently we have to reverse
// the list.
finalized_blocks_hashes: finalized_blocks_newest_to_oldest
.into_iter()
.map(methods::HashHexString)
.rev()
.collect(),
pruned_blocks_hashes: pruned_blocks_hashes
.into_iter()
.map(methods::HashHexString)
.collect(),
},
.send_notification(methods::ServerToClient::chainHead_v1_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::Finalized {
// As specified in the JSON-RPC spec, the list must be ordered
// in increasing block number. Consequently we have to reverse
// the list.
finalized_blocks_hashes: finalized_blocks_newest_to_oldest
.into_iter()
.map(methods::HashHexString)
.rev()
.collect(),
pruned_blocks_hashes: pruned_blocks_hashes
.into_iter()
.map(methods::HashHexString)
.collect(),
},
)
})
.await;

if best_block_hash != current_best_block {
current_best_block = best_block_hash;
json_rpc_subscription
.send_notification(
methods::ServerToClient::chainHead_unstable_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::BestBlockChanged {
best_block_hash: methods::HashHexString(best_block_hash),
},
.send_notification(methods::ServerToClient::chainHead_v1_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::BestBlockChanged {
best_block_hash: methods::HashHexString(best_block_hash),
},
)
})
.await;
}
}
WakeUpReason::ConsensusSubscriptionStop => {
json_rpc_subscription
.send_notification(
methods::ServerToClient::chainHead_unstable_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::Stop {},
},
)
.send_notification(methods::ServerToClient::chainHead_v1_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::Stop {},
})
.await;
}
}
Expand Down
6 changes: 3 additions & 3 deletions full-node/tests/author.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ fn basic_block_generated() {

loop {
client.send_json_rpc_request(
r#"{"jsonrpc":"2.0","id":1,"method":"chainHead_unstable_follow","params":[false]}"#
r#"{"jsonrpc":"2.0","id":1,"method":"chainHead_v1_follow","params":[false]}"#
.to_owned(),
);

Expand All @@ -60,11 +60,11 @@ fn basic_block_generated() {
match json_rpc::methods::parse_notification(&client.next_json_rpc_response().await)
.unwrap()
{
json_rpc::methods::ServerToClient::chainHead_unstable_followEvent {
json_rpc::methods::ServerToClient::chainHead_v1_followEvent {
result: json_rpc::methods::FollowEvent::NewBlock { .. },
..
} => return, // Test success
json_rpc::methods::ServerToClient::chainHead_unstable_followEvent {
json_rpc::methods::ServerToClient::chainHead_v1_followEvent {
result: json_rpc::methods::FollowEvent::Stop { .. },
..
} => break,
Expand Down
24 changes: 12 additions & 12 deletions lib/src/json_rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,41 +452,41 @@ define_methods! {
system_version() -> Cow<'a, str>,

// The functions below are experimental and are defined in the document https://github.com/paritytech/json-rpc-interface-spec/
chainHead_unstable_body(
chainHead_v1_body(
#[rename = "followSubscription"] follow_subscription: Cow<'a, str>,
hash: HashHexString
) -> ChainHeadBodyCallReturn<'a>,
chainHead_unstable_call(
chainHead_v1_call(
#[rename = "followSubscription"] follow_subscription: Cow<'a, str>,
hash: HashHexString,
function: Cow<'a, str>,
#[rename = "callParameters"] call_parameters: HexString
) -> ChainHeadBodyCallReturn<'a>,
chainHead_unstable_follow(
chainHead_v1_follow(
#[rename = "withRuntime"] with_runtime: bool
) -> Cow<'a, str>,
chainHead_unstable_header(
chainHead_v1_header(
#[rename = "followSubscription"] follow_subscription: Cow<'a, str>,
hash: HashHexString
) -> Option<HexString>,
chainHead_unstable_stopOperation(
chainHead_v1_stopOperation(
#[rename = "followSubscription"] follow_subscription: Cow<'a, str>,
#[rename = "operationId"] operation_id: Cow<'a, str>
) -> (),
chainHead_unstable_storage(
chainHead_v1_storage(
#[rename = "followSubscription"] follow_subscription: Cow<'a, str>,
hash: HashHexString,
items: Vec<ChainHeadStorageRequestItem>,
#[rename = "childTrie"] child_trie: Option<HexString>
) -> ChainHeadStorageReturn<'a>,
chainHead_unstable_continue(
chainHead_v1_continue(
#[rename = "followSubscription"] follow_subscription: Cow<'a, str>,
#[rename = "operationId"] operation_id: Cow<'a, str>
) -> (),
chainHead_unstable_unfollow(
chainHead_v1_unfollow(
#[rename = "followSubscription"] follow_subscription: Cow<'a, str>
) -> (),
chainHead_unstable_unpin(
chainHead_v1_unpin(
#[rename = "followSubscription"] follow_subscription: Cow<'a, str>,
#[rename = "hashOrHashes"] hash_or_hashes: HashHexStringSingleOrArray
) -> (),
Expand Down Expand Up @@ -523,7 +523,7 @@ define_methods! {
state_storage(subscription: Cow<'a, str>, result: StorageChangeSet) -> (),

// The functions below are experimental and are defined in the document https://github.com/paritytech/json-rpc-interface-spec/
chainHead_unstable_followEvent(subscription: Cow<'a, str>, result: FollowEvent<'a>) -> (),
chainHead_v1_followEvent(subscription: Cow<'a, str>, result: FollowEvent<'a>) -> (),
transactionWatch_unstable_watchEvent(subscription: Cow<'a, str>, result: TransactionWatchEvent<'a>) -> (),

// This function is a custom addition in smoldot. As of the writing of this comment, there is
Expand Down Expand Up @@ -1265,15 +1265,15 @@ mod tests {
fn no_params_refused() {
// No `params` field in the request.
let err = super::parse_jsonrpc_client_to_server(
r#"{"jsonrpc":"2.0","id":2,"method":"chainHead_unstable_follow"}"#,
r#"{"jsonrpc":"2.0","id":2,"method":"chainHead_v1_follow"}"#,
);

assert!(matches!(
err,
Err(super::ParseClientToServerError::Method {
request_id: "2",
error: super::MethodError::MissingParameters {
rpc_method: "chainHead_unstable_follow"
rpc_method: "chainHead_v1_follow"
}
})
));
Expand Down
Loading

0 comments on commit 88275b4

Please sign in to comment.