Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the JSON-RPC cache and handling of legacy API subscriptions #854

Merged
merged 46 commits into from
Jul 6, 2023
Merged
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
238a2ae
Move cache populating task to separate module
tomaka Jul 4, 2023
95357d2
Move main loop to separate function
tomaka Jul 4, 2023
5e7a60b
Pass individual components to the task rather than the Background
tomaka Jul 4, 2023
62a6570
Move `chain_subscribeAllHeads` to new task
tomaka Jul 4, 2023
c4a1d59
Move chain_subscribeNewHeads to new task
tomaka Jul 4, 2023
fc85534
Add a `Message` enum for messages
tomaka Jul 4, 2023
b7f70c0
Notify when subscription is destroyed
tomaka Jul 4, 2023
a0cd7b8
Make runtime_access ask the cache through a message
tomaka Jul 4, 2023
7e6084e
Query block number from cache
tomaka Jul 4, 2023
7e9a4c4
Ask block header from cache through a message
tomaka Jul 4, 2023
532cb53
Ask for the block state root and number by sending a message
tomaka Jul 4, 2023
173ca4f
Move `state_get_keys_paged` as a separate field
tomaka Jul 4, 2023
cf756d3
The `Cache` is now scoped to `legacy_state_sub`
tomaka Jul 4, 2023
9d6dd9e
Remove background abort registration system
tomaka Jul 4, 2023
c5fe695
Simplify `Frontend::queue_rpc_request`
tomaka Jul 4, 2023
ed5ebe4
Remove TODOs and update CHANGELOG
tomaka Jul 4, 2023
aeae663
Perform the re-subscription within the task
tomaka Jul 4, 2023
af2ff4b
Inline the fields of `Cache` within `Task`
tomaka Jul 4, 2023
8ee9ec4
Move recent pinned blocks to Subscription::Active as it makes sense t…
tomaka Jul 4, 2023
0f655d0
Simplify `start_task`
tomaka Jul 4, 2023
54bb53e
Wrap recent blocks in a struct
tomaka Jul 4, 2023
d9a9187
Add `runtime_version` field to `RecentBlock`
tomaka Jul 4, 2023
1c6bbe6
Fix pinning strategy
tomaka Jul 5, 2023
d95f273
Handle finalized subscriptions in new task
tomaka Jul 5, 2023
fd965d9
Fix runtime_version todo!()
tomaka Jul 5, 2023
25c72d1
Remove obsolete TODO
tomaka Jul 5, 2023
679318b
Keep current best and finalized and report them immediately
tomaka Jul 5, 2023
7ec026a
Fix unused imports and variables
tomaka Jul 5, 2023
6323c44
Ask the best block from the new task instead of using sub_utils
tomaka Jul 5, 2023
827445f
Move runtime subscription to new task
tomaka Jul 5, 2023
61a35c0
Remove unused function
tomaka Jul 5, 2023
f6351e5
Move `state_subscribeStorage` to new task
tomaka Jul 5, 2023
9b941eb
Update CHANGELOG
tomaka Jul 5, 2023
5658f95
Small tweaks and restore logging incoming requests
tomaka Jul 5, 2023
210cefd
Remove obsolete code
tomaka Jul 5, 2023
c071fa3
Add TODO
tomaka Jul 5, 2023
4cd5c7b
Change creation API to return a sender
tomaka Jul 5, 2023
382efd2
Use a Config struct and pass proper seeds
tomaka Jul 5, 2023
41ea4cc
Add lots of comments and tweaks
tomaka Jul 5, 2023
8d56473
Report finalized block separately
tomaka Jul 5, 2023
1c371aa
Simplify notifying best block update
tomaka Jul 5, 2023
e510594
Split the blocks more for readability
tomaka Jul 5, 2023
cb9ec78
Fix all warnings
tomaka Jul 5, 2023
66c460f
Small tweak
tomaka Jul 5, 2023
9934527
PR link
tomaka Jul 5, 2023
9e360ab
Docfix and error moved
tomaka Jul 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Move chain_subscribeNewHeads to new task
tomaka committed Jul 4, 2023

Verified

This commit was signed with the committer’s verified signature.
tomaka Pierre Krieger
commit c4a1d5994079dfb865191551eebc8aaf3376c2e0
5 changes: 3 additions & 2 deletions light-base/src/json_rpc_service/background.rs
Original file line number Diff line number Diff line change
@@ -245,7 +245,8 @@ pub(super) fn start<TPlat: PlatformRef>(
} => {
requests_processing_task = task;
match subscription_start.request() {
methods::MethodCall::chain_subscribeAllHeads {} => {
methods::MethodCall::chain_subscribeAllHeads {}
| methods::MethodCall::chain_subscribeNewHeads {} => {
me.to_legacy
.lock()
.await
@@ -702,7 +703,7 @@ impl<TPlat: PlatformRef> Background<TPlat> {
self.chain_subscribe_finalized_heads(request).await;
}
methods::MethodCall::chain_subscribeNewHeads {} => {
self.chain_subscribe_new_heads(request).await;
unreachable!()
}
methods::MethodCall::state_subscribeRuntimeVersion {} => {
self.state_subscribe_runtime_version(request).await;
38 changes: 33 additions & 5 deletions light-base/src/json_rpc_service/background/legacy_state_sub.rs
Original file line number Diff line number Diff line change
@@ -105,6 +105,10 @@ pub(super) fn start_task<TPlat: PlatformRef>(
8,
Default::default(),
),
new_heads_subscriptions: hashbrown::HashMap::with_capacity_and_hasher(
8,
Default::default(),
),
})
.await;

@@ -126,6 +130,8 @@ struct Task<TPlat: PlatformRef> {
requests_rx: async_channel::Receiver<service::SubscriptionStartProcess>,
// TODO: shrink_to_fit?
all_heads_subscriptions: hashbrown::HashMap<String, service::Subscription, fnv::FnvBuildHasher>,
// TODO: shrink_to_fit?
new_heads_subscriptions: hashbrown::HashMap<String, service::Subscription, fnv::FnvBuildHasher>,
}

async fn run<TPlat: PlatformRef>(mut task: Task<TPlat>) {
@@ -153,8 +159,8 @@ async fn run<TPlat: PlatformRef>(mut task: Task<TPlat>) {
Err(error) => {
log::warn!(
target: &task.log_target,
"`chain_subscribeAllHeads` subscription has skipped \
block due to undecodable header. Hash: {}. Error: {}",
"`chain_subscribeAllHeads` or `chain_subscribeNewHeads` subscription \
has skipped block due to undecodable header. Hash: {}. Error: {}",
HashDisplay(&header::hash_from_scale_encoded_header(
&block.scale_encoded_header
)),
@@ -179,17 +185,39 @@ async fn run<TPlat: PlatformRef>(mut task: Task<TPlat>) {
})
.await;
}

if block.is_new_best {
for (subscription_id, subscription) in &mut task.new_heads_subscriptions {
subscription
.send_notification(methods::ServerToClient::chain_newHead {
subscription: subscription_id.as_str().into(),
result: json_rpc_header.clone(),
})
.await;
}
}
}
either::Left(Some(runtime_service::Notification::Finalized { .. })) => {}
either::Left(Some(runtime_service::Notification::BestBlockChanged {
hash, ..
})) => {
// TODO: report a chain_newHead subscription
}
either::Left(Some(runtime_service::Notification::Finalized { .. }))
| either::Left(Some(runtime_service::Notification::BestBlockChanged { .. })) => {}

either::Right(Some(request)) => match request.request() {
methods::MethodCall::chain_subscribeNewHeads {} => {
methods::MethodCall::chain_subscribeAllHeads {} => {
let subscription = request.accept();
let subscription_id = subscription.subscription_id().to_owned();
task.all_heads_subscriptions
.insert(subscription_id, subscription);
}
methods::MethodCall::chain_subscribeNewHeads {} => {
let subscription = request.accept();
let subscription_id = subscription.subscription_id().to_owned();
// TODO: must immediately send the current best block
task.new_heads_subscriptions
.insert(subscription_id, subscription);
}
_ => unreachable!(), // TODO: stronger typing to avoid this?
},

83 changes: 2 additions & 81 deletions light-base/src/json_rpc_service/background/state_chain.rs
Original file line number Diff line number Diff line change
@@ -19,16 +19,11 @@

use super::{Background, GetKeysPagedCacheKey, PlatformRef};

use crate::{runtime_service, sync_service};
use crate::sync_service;

use alloc::{borrow::ToOwned as _, format, string::ToString as _, sync::Arc, vec, vec::Vec};
use async_lock::MutexGuard;
use core::{
iter,
num::{NonZeroU32, NonZeroUsize},
pin,
time::Duration,
};
use core::{iter, num::NonZeroU32, pin, time::Duration};
use futures_util::{future, stream, FutureExt as _, StreamExt as _};
use smoldot::{
header,
@@ -405,80 +400,6 @@ impl<TPlat: PlatformRef> Background<TPlat> {
);
}

/// Handles a call to [`methods::MethodCall::chain_subscribeNewHeads`].
pub(super) async fn chain_subscribe_new_heads(
self: &Arc<Self>,
request: service::SubscriptionStartProcess,
) {
let methods::MethodCall::chain_subscribeNewHeads {} = request.request() else {
unreachable!()
};

let mut blocks_list = {
let (block_header, blocks_subscription) =
sub_utils::subscribe_best(&self.runtime_service).await;
stream::once(future::ready(block_header)).chain(blocks_subscription)
};

self.platform
.spawn_task(format!("{}-subscribe-new-heads", self.log_target).into(), {
let log_target = self.log_target.clone();
let sync_service = self.sync_service.clone();

async move {
let mut subscription = request.accept();
let subscription_id = subscription.subscription_id().to_owned();

loop {
let event = {
let unsubscribed = pin::pin!(subscription.wait_until_stale());
match future::select(blocks_list.next(), unsubscribed).await {
future::Either::Left((ev, _)) => either::Left(ev),
future::Either::Right((ev, _)) => either::Right(ev),
}
};

match event {
either::Left(None) => {
// Stream returned by `subscribe_best` is always unlimited.
unreachable!()
}
either::Left(Some(header)) => {
let header = match methods::Header::from_scale_encoded_header(
&header,
sync_service.block_number_bytes(),
) {
Ok(h) => h,
Err(error) => {
log::warn!(
target: &log_target,
"`chain_subscribeNewHeads` subscription has skipped block \
due to undecodable header. Hash: {}. Error: {}",
HashDisplay(&header::hash_from_scale_encoded_header(
&header
)),
error,
);
continue;
}
};

subscription
.send_notification(methods::ServerToClient::chain_newHead {
subscription: (&subscription_id).into(),
result: header,
})
.await;
}
either::Right(()) => {
break;
}
}
}
}
});
}

/// Handles a call to [`methods::MethodCall::payment_queryInfo`].
pub(super) async fn payment_query_info(self: &Arc<Self>, request: service::RequestProcess) {
let methods::MethodCall::payment_queryInfo {