Skip to content

Commit

Permalink
Move chain_subscribeNewHeads to new task
Browse files Browse the repository at this point in the history
  • Loading branch information
tomaka committed Jul 4, 2023
1 parent 62a6570 commit c4a1d59
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 88 deletions.
5 changes: 3 additions & 2 deletions light-base/src/json_rpc_service/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ pub(super) fn start<TPlat: PlatformRef>(
} => {
requests_processing_task = task;
match subscription_start.request() {
methods::MethodCall::chain_subscribeAllHeads {} => {
methods::MethodCall::chain_subscribeAllHeads {}
| methods::MethodCall::chain_subscribeNewHeads {} => {
me.to_legacy
.lock()
.await
Expand Down Expand Up @@ -702,7 +703,7 @@ impl<TPlat: PlatformRef> Background<TPlat> {
self.chain_subscribe_finalized_heads(request).await;
}
methods::MethodCall::chain_subscribeNewHeads {} => {
self.chain_subscribe_new_heads(request).await;
unreachable!()
}
methods::MethodCall::state_subscribeRuntimeVersion {} => {
self.state_subscribe_runtime_version(request).await;
Expand Down
38 changes: 33 additions & 5 deletions light-base/src/json_rpc_service/background/legacy_state_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ pub(super) fn start_task<TPlat: PlatformRef>(
8,
Default::default(),
),
new_heads_subscriptions: hashbrown::HashMap::with_capacity_and_hasher(
8,
Default::default(),
),
})
.await;

Expand All @@ -126,6 +130,8 @@ struct Task<TPlat: PlatformRef> {
requests_rx: async_channel::Receiver<service::SubscriptionStartProcess>,
// TODO: shrink_to_fit?
all_heads_subscriptions: hashbrown::HashMap<String, service::Subscription, fnv::FnvBuildHasher>,
// TODO: shrink_to_fit?
new_heads_subscriptions: hashbrown::HashMap<String, service::Subscription, fnv::FnvBuildHasher>,
}

async fn run<TPlat: PlatformRef>(mut task: Task<TPlat>) {
Expand Down Expand Up @@ -153,8 +159,8 @@ async fn run<TPlat: PlatformRef>(mut task: Task<TPlat>) {
Err(error) => {
log::warn!(
target: &task.log_target,
"`chain_subscribeAllHeads` subscription has skipped \
block due to undecodable header. Hash: {}. Error: {}",
"`chain_subscribeAllHeads` or `chain_subscribeNewHeads` subscription \
has skipped block due to undecodable header. Hash: {}. Error: {}",
HashDisplay(&header::hash_from_scale_encoded_header(
&block.scale_encoded_header
)),
Expand All @@ -179,17 +185,39 @@ async fn run<TPlat: PlatformRef>(mut task: Task<TPlat>) {
})
.await;
}

if block.is_new_best {
for (subscription_id, subscription) in &mut task.new_heads_subscriptions {
subscription
.send_notification(methods::ServerToClient::chain_newHead {
subscription: subscription_id.as_str().into(),
result: json_rpc_header.clone(),
})
.await;
}
}
}
either::Left(Some(runtime_service::Notification::Finalized { .. })) => {}
either::Left(Some(runtime_service::Notification::BestBlockChanged {
hash, ..
})) => {
// TODO: report a chain_newHead subscription
}
either::Left(Some(runtime_service::Notification::Finalized { .. }))
| either::Left(Some(runtime_service::Notification::BestBlockChanged { .. })) => {}

either::Right(Some(request)) => match request.request() {
methods::MethodCall::chain_subscribeNewHeads {} => {
methods::MethodCall::chain_subscribeAllHeads {} => {
let subscription = request.accept();
let subscription_id = subscription.subscription_id().to_owned();
task.all_heads_subscriptions
.insert(subscription_id, subscription);
}
methods::MethodCall::chain_subscribeNewHeads {} => {
let subscription = request.accept();
let subscription_id = subscription.subscription_id().to_owned();
// TODO: must immediately send the current best block
task.new_heads_subscriptions
.insert(subscription_id, subscription);
}
_ => unreachable!(), // TODO: stronger typing to avoid this?
},

Expand Down
83 changes: 2 additions & 81 deletions light-base/src/json_rpc_service/background/state_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,11 @@
use super::{Background, GetKeysPagedCacheKey, PlatformRef};

use crate::{runtime_service, sync_service};
use crate::sync_service;

use alloc::{borrow::ToOwned as _, format, string::ToString as _, sync::Arc, vec, vec::Vec};
use async_lock::MutexGuard;
use core::{
iter,
num::{NonZeroU32, NonZeroUsize},
pin,
time::Duration,
};
use core::{iter, num::NonZeroU32, pin, time::Duration};
use futures_util::{future, stream, FutureExt as _, StreamExt as _};
use smoldot::{
header,
Expand Down Expand Up @@ -405,80 +400,6 @@ impl<TPlat: PlatformRef> Background<TPlat> {
);
}

/// Handles a call to [`methods::MethodCall::chain_subscribeNewHeads`].
pub(super) async fn chain_subscribe_new_heads(
self: &Arc<Self>,
request: service::SubscriptionStartProcess,
) {
let methods::MethodCall::chain_subscribeNewHeads {} = request.request() else {
unreachable!()
};

let mut blocks_list = {
let (block_header, blocks_subscription) =
sub_utils::subscribe_best(&self.runtime_service).await;
stream::once(future::ready(block_header)).chain(blocks_subscription)
};

self.platform
.spawn_task(format!("{}-subscribe-new-heads", self.log_target).into(), {
let log_target = self.log_target.clone();
let sync_service = self.sync_service.clone();

async move {
let mut subscription = request.accept();
let subscription_id = subscription.subscription_id().to_owned();

loop {
let event = {
let unsubscribed = pin::pin!(subscription.wait_until_stale());
match future::select(blocks_list.next(), unsubscribed).await {
future::Either::Left((ev, _)) => either::Left(ev),
future::Either::Right((ev, _)) => either::Right(ev),
}
};

match event {
either::Left(None) => {
// Stream returned by `subscribe_best` is always unlimited.
unreachable!()
}
either::Left(Some(header)) => {
let header = match methods::Header::from_scale_encoded_header(
&header,
sync_service.block_number_bytes(),
) {
Ok(h) => h,
Err(error) => {
log::warn!(
target: &log_target,
"`chain_subscribeNewHeads` subscription has skipped block \
due to undecodable header. Hash: {}. Error: {}",
HashDisplay(&header::hash_from_scale_encoded_header(
&header
)),
error,
);
continue;
}
};

subscription
.send_notification(methods::ServerToClient::chain_newHead {
subscription: (&subscription_id).into(),
result: header,
})
.await;
}
either::Right(()) => {
break;
}
}
}
}
});
}

/// Handles a call to [`methods::MethodCall::payment_queryInfo`].
pub(super) async fn payment_query_info(self: &Arc<Self>, request: service::RequestProcess) {
let methods::MethodCall::payment_queryInfo {
Expand Down

0 comments on commit c4a1d59

Please sign in to comment.