Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stabilize chainHead functions #1748

Merged
merged 2 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions full-node/src/json_rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ fn spawn_client_main_task(
client_main_task = task;

match request_process.request() {
methods::MethodCall::chainHead_unstable_header {
methods::MethodCall::chainHead_v1_header {
follow_subscription,
..
} => {
Expand All @@ -585,10 +585,10 @@ fn spawn_client_main_task(
// TODO racy; doesn't handle situation where follow subscription stops
} else {
request_process
.respond(methods::Response::chainHead_unstable_header(None));
.respond(methods::Response::chainHead_v1_header(None));
}
}
methods::MethodCall::chainHead_unstable_unpin {
methods::MethodCall::chainHead_v1_unpin {
follow_subscription,
hash_or_hashes,
} => {
Expand All @@ -614,14 +614,12 @@ fn spawn_client_main_task(

match outcome_rx.await {
Err(_) => {
request_process.respond(
methods::Response::chainHead_unstable_unpin(()),
);
request_process
.respond(methods::Response::chainHead_v1_unpin(()));
}
Ok(Ok(())) => {
request_process.respond(
methods::Response::chainHead_unstable_unpin(()),
);
request_process
.respond(methods::Response::chainHead_v1_unpin(()));
}
Ok(Err(())) => {
request_process.fail(service::ErrorResponse::InvalidParams);
Expand All @@ -645,7 +643,7 @@ fn spawn_client_main_task(

match subscription_start.request() {
// TODO: enforce limit to number of subscriptions
methods::MethodCall::chainHead_unstable_follow { with_runtime } => {
methods::MethodCall::chainHead_v1_follow { with_runtime } => {
let (tx, rx) = async_channel::bounded(16);
let subscription_id =
chain_head_subscriptions::spawn_chain_head_subscription_task(
Expand Down
111 changes: 50 additions & 61 deletions full-node/src/json_rpc_service/chain_head_subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ pub struct Config {
/// Receiver for actions that the JSON-RPC client wants to perform.
pub receiver: async_channel::Receiver<Message>,

/// `chainHead_unstable_follow` subscription start handle.
/// `chainHead_v1_follow` subscription start handle.
pub chain_head_follow_subscription: service::SubscriptionStartProcess,

/// Parameter that was passed by the user when requesting `chainHead_unstable_follow`.
/// Parameter that was passed by the user when requesting `chainHead_v1_follow`.
pub with_runtime: bool,

/// Consensus service of the chain.
Expand All @@ -68,7 +68,7 @@ pub enum Message {
},
}

/// Spawns a new tasks dedicated to handling a `chainHead_unstable_follow` subscription.
/// Spawns a new tasks dedicated to handling a `chainHead_v1_follow` subscription.
///
/// Returns the identifier of the subscription.
pub async fn spawn_chain_head_subscription_task(config: Config) -> String {
Expand All @@ -93,7 +93,7 @@ pub async fn spawn_chain_head_subscription_task(config: Config) -> String {

pinned_blocks.insert(consensus_service_subscription.finalized_block_hash);
json_rpc_subscription
.send_notification(methods::ServerToClient::chainHead_unstable_followEvent {
.send_notification(methods::ServerToClient::chainHead_v1_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::Initialized {
finalized_block_hash: methods::HashHexString(
Expand All @@ -115,7 +115,7 @@ pub async fn spawn_chain_head_subscription_task(config: Config) -> String {
for block in consensus_service_subscription.non_finalized_blocks_ancestry_order {
pinned_blocks.insert(block.block_hash);
json_rpc_subscription
.send_notification(methods::ServerToClient::chainHead_unstable_followEvent {
.send_notification(methods::ServerToClient::chainHead_v1_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::NewBlock {
block_hash: methods::HashHexString(block.block_hash),
Expand All @@ -134,7 +134,7 @@ pub async fn spawn_chain_head_subscription_task(config: Config) -> String {
if block.is_new_best {
current_best_block = block.block_hash;
json_rpc_subscription
.send_notification(methods::ServerToClient::chainHead_unstable_followEvent {
.send_notification(methods::ServerToClient::chainHead_v1_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::BestBlockChanged {
best_block_hash: methods::HashHexString(block.block_hash),
Expand Down Expand Up @@ -172,8 +172,7 @@ pub async fn spawn_chain_head_subscription_task(config: Config) -> String {
match wake_up_reason {
WakeUpReason::ForegroundClosed => return,
WakeUpReason::Foreground(Message::Header { request }) => {
let methods::MethodCall::chainHead_unstable_header { hash, .. } =
request.request()
let methods::MethodCall::chainHead_v1_header { hash, .. } = request.request()
else {
unreachable!()
};
Expand All @@ -190,7 +189,7 @@ pub async fn spawn_chain_head_subscription_task(config: Config) -> String {

match database_outcome {
Ok(Some(header)) => {
request.respond(methods::Response::chainHead_unstable_header(Some(
request.respond(methods::Response::chainHead_v1_header(Some(
methods::HexString(header),
)))
}
Expand Down Expand Up @@ -228,35 +227,31 @@ pub async fn spawn_chain_head_subscription_task(config: Config) -> String {
}) => {
pinned_blocks.insert(block.block_hash);
json_rpc_subscription
.send_notification(
methods::ServerToClient::chainHead_unstable_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::NewBlock {
block_hash: methods::HashHexString(block.block_hash),
new_runtime: if let (Some(new_runtime), true) =
(&block.runtime_update, config.with_runtime)
{
Some(convert_runtime_spec(new_runtime.runtime_version()))
} else {
None
},
parent_block_hash: methods::HashHexString(block.parent_hash),
.send_notification(methods::ServerToClient::chainHead_v1_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::NewBlock {
block_hash: methods::HashHexString(block.block_hash),
new_runtime: if let (Some(new_runtime), true) =
(&block.runtime_update, config.with_runtime)
{
Some(convert_runtime_spec(new_runtime.runtime_version()))
} else {
None
},
parent_block_hash: methods::HashHexString(block.parent_hash),
},
)
})
.await;

if block.is_new_best {
current_best_block = block.block_hash;
json_rpc_subscription
.send_notification(
methods::ServerToClient::chainHead_unstable_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::BestBlockChanged {
best_block_hash: methods::HashHexString(block.block_hash),
},
.send_notification(methods::ServerToClient::chainHead_v1_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::BestBlockChanged {
best_block_hash: methods::HashHexString(block.block_hash),
},
)
})
.await;
}
}
Expand All @@ -268,49 +263,43 @@ pub async fn spawn_chain_head_subscription_task(config: Config) -> String {
},
) => {
json_rpc_subscription
.send_notification(
methods::ServerToClient::chainHead_unstable_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::Finalized {
// As specified in the JSON-RPC spec, the list must be ordered
// in increasing block number. Consequently we have to reverse
// the list.
finalized_blocks_hashes: finalized_blocks_newest_to_oldest
.into_iter()
.map(methods::HashHexString)
.rev()
.collect(),
pruned_blocks_hashes: pruned_blocks_hashes
.into_iter()
.map(methods::HashHexString)
.collect(),
},
.send_notification(methods::ServerToClient::chainHead_v1_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::Finalized {
// As specified in the JSON-RPC spec, the list must be ordered
// in increasing block number. Consequently we have to reverse
// the list.
finalized_blocks_hashes: finalized_blocks_newest_to_oldest
.into_iter()
.map(methods::HashHexString)
.rev()
.collect(),
pruned_blocks_hashes: pruned_blocks_hashes
.into_iter()
.map(methods::HashHexString)
.collect(),
},
)
})
.await;

if best_block_hash != current_best_block {
current_best_block = best_block_hash;
json_rpc_subscription
.send_notification(
methods::ServerToClient::chainHead_unstable_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::BestBlockChanged {
best_block_hash: methods::HashHexString(best_block_hash),
},
.send_notification(methods::ServerToClient::chainHead_v1_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::BestBlockChanged {
best_block_hash: methods::HashHexString(best_block_hash),
},
)
})
.await;
}
}
WakeUpReason::ConsensusSubscriptionStop => {
json_rpc_subscription
.send_notification(
methods::ServerToClient::chainHead_unstable_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::Stop {},
},
)
.send_notification(methods::ServerToClient::chainHead_v1_followEvent {
subscription: (&json_rpc_subscription_id).into(),
result: methods::FollowEvent::Stop {},
})
.await;
}
}
Expand Down
6 changes: 3 additions & 3 deletions full-node/tests/author.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ fn basic_block_generated() {

loop {
client.send_json_rpc_request(
r#"{"jsonrpc":"2.0","id":1,"method":"chainHead_unstable_follow","params":[false]}"#
r#"{"jsonrpc":"2.0","id":1,"method":"chainHead_v1_follow","params":[false]}"#
.to_owned(),
);

Expand All @@ -60,11 +60,11 @@ fn basic_block_generated() {
match json_rpc::methods::parse_notification(&client.next_json_rpc_response().await)
.unwrap()
{
json_rpc::methods::ServerToClient::chainHead_unstable_followEvent {
json_rpc::methods::ServerToClient::chainHead_v1_followEvent {
result: json_rpc::methods::FollowEvent::NewBlock { .. },
..
} => return, // Test success
json_rpc::methods::ServerToClient::chainHead_unstable_followEvent {
json_rpc::methods::ServerToClient::chainHead_v1_followEvent {
result: json_rpc::methods::FollowEvent::Stop { .. },
..
} => break,
Expand Down
24 changes: 12 additions & 12 deletions lib/src/json_rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,41 +452,41 @@ define_methods! {
system_version() -> Cow<'a, str>,

// The functions below are experimental and are defined in the document https://github.com/paritytech/json-rpc-interface-spec/
chainHead_unstable_body(
chainHead_v1_body(
#[rename = "followSubscription"] follow_subscription: Cow<'a, str>,
hash: HashHexString
) -> ChainHeadBodyCallReturn<'a>,
chainHead_unstable_call(
chainHead_v1_call(
#[rename = "followSubscription"] follow_subscription: Cow<'a, str>,
hash: HashHexString,
function: Cow<'a, str>,
#[rename = "callParameters"] call_parameters: HexString
) -> ChainHeadBodyCallReturn<'a>,
chainHead_unstable_follow(
chainHead_v1_follow(
#[rename = "withRuntime"] with_runtime: bool
) -> Cow<'a, str>,
chainHead_unstable_header(
chainHead_v1_header(
#[rename = "followSubscription"] follow_subscription: Cow<'a, str>,
hash: HashHexString
) -> Option<HexString>,
chainHead_unstable_stopOperation(
chainHead_v1_stopOperation(
#[rename = "followSubscription"] follow_subscription: Cow<'a, str>,
#[rename = "operationId"] operation_id: Cow<'a, str>
) -> (),
chainHead_unstable_storage(
chainHead_v1_storage(
#[rename = "followSubscription"] follow_subscription: Cow<'a, str>,
hash: HashHexString,
items: Vec<ChainHeadStorageRequestItem>,
#[rename = "childTrie"] child_trie: Option<HexString>
) -> ChainHeadStorageReturn<'a>,
chainHead_unstable_continue(
chainHead_v1_continue(
#[rename = "followSubscription"] follow_subscription: Cow<'a, str>,
#[rename = "operationId"] operation_id: Cow<'a, str>
) -> (),
chainHead_unstable_unfollow(
chainHead_v1_unfollow(
#[rename = "followSubscription"] follow_subscription: Cow<'a, str>
) -> (),
chainHead_unstable_unpin(
chainHead_v1_unpin(
#[rename = "followSubscription"] follow_subscription: Cow<'a, str>,
#[rename = "hashOrHashes"] hash_or_hashes: HashHexStringSingleOrArray
) -> (),
Expand Down Expand Up @@ -523,7 +523,7 @@ define_methods! {
state_storage(subscription: Cow<'a, str>, result: StorageChangeSet) -> (),

// The functions below are experimental and are defined in the document https://github.com/paritytech/json-rpc-interface-spec/
chainHead_unstable_followEvent(subscription: Cow<'a, str>, result: FollowEvent<'a>) -> (),
chainHead_v1_followEvent(subscription: Cow<'a, str>, result: FollowEvent<'a>) -> (),
transactionWatch_unstable_watchEvent(subscription: Cow<'a, str>, result: TransactionWatchEvent<'a>) -> (),

// This function is a custom addition in smoldot. As of the writing of this comment, there is
Expand Down Expand Up @@ -1265,15 +1265,15 @@ mod tests {
fn no_params_refused() {
// No `params` field in the request.
let err = super::parse_jsonrpc_client_to_server(
r#"{"jsonrpc":"2.0","id":2,"method":"chainHead_unstable_follow"}"#,
r#"{"jsonrpc":"2.0","id":2,"method":"chainHead_v1_follow"}"#,
);

assert!(matches!(
err,
Err(super::ParseClientToServerError::Method {
request_id: "2",
error: super::MethodError::MissingParameters {
rpc_method: "chainHead_unstable_follow"
rpc_method: "chainHead_v1_follow"
}
})
));
Expand Down
Loading
Loading