Skip to content

Commit

Permalink
Refactor the JSON-RPC cache and handling of legacy API subscriptions (#…
Browse files Browse the repository at this point in the history
…854)

* Move cache populating task to separate module

* Move main loop to separate function

* Pass individual components to the task rather than the Background

* Move `chain_subscribeAllHeads` to new task

* Move chain_subscribeNewHeads to new task

* Add a `Message` enum for messages

* Notify when subscription is destroyed

* Make runtime_access ask the cache through a message

* Query block number from cache

* Ask block header from cache through a message

* Ask for the block state root and number by sending a message

* Move `state_get_keys_paged` as a separate field

* The `Cache` is now scoped to `legacy_state_sub`

* Remove background abort registration system

* Simplify `Frontend::queue_rpc_request`

* Remove TODOs and update CHANGELOG

* Perform the re-subscription within the task

* Inline the fields of `Cache` within `Task`

* Move recent pinned blocks to Subscription::Active as it makes sense there

* Simplify `start_task`

* Wrap recent blocks in a struct

* Add `runtime_version` field to `RecentBlock`

* Fix pinning strategy

* Handle finalized subscriptions in new task

* Fix runtime_version todo!()

* Remove obsolete TODO

* Keep current best and finalized and report them immediately

* Fix unused imports and variables

* Ask the best block from the new task instead of using sub_utils

* Move runtime subscription to new task

* Remove unused function

* Move `state_subscribeStorage` to new task

* Update CHANGELOG

* Small tweaks and restore logging incoming requests

* Remove obsolete code

* Add TODO

* Change creation API to return a sender

* Use a Config struct and pass proper seeds

* Add lots of comments and tweaks

* Report finalized block separately

* Simplify notifying best block update

* Split the blocks more for readability

* Fix all warnings

* Small tweak

* PR link

* Docfix and error moved
  • Loading branch information
tomaka authored Jul 6, 2023
1 parent ff29867 commit e97f1d3
Show file tree
Hide file tree
Showing 6 changed files with 1,715 additions and 1,548 deletions.
87 changes: 26 additions & 61 deletions light-base/src/json_rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ use crate::{
network_service, platform::PlatformRef, runtime_service, sync_service, transactions_service,
};

use alloc::{format, string::String, sync::Arc, vec::Vec};
use alloc::{
format,
string::{String, ToString as _},
sync::Arc,
};
use core::num::{NonZeroU32, NonZeroUsize};
use futures_util::future;
use smoldot::{
chain_spec,
json_rpc::{self, service},
Expand Down Expand Up @@ -90,21 +93,6 @@ pub struct Config {
pub fn service(config: Config) -> (Frontend, ServicePrototype) {
let log_target = format!("json-rpc-{}", config.log_name);

// We are later going to spawn a bunch of tasks. Each task is associated with an "abort
// handle" that makes it possible to later abort it. We calculate here the number of handles
// that are necessary.
// This calculation must be in sync with the part of the code that spawns the tasks. Assertions
// are there in order to make sure that this is the case.
let num_handles = 1; // TODO: a bit ridiculous for this to be 1

let mut background_aborts = Vec::with_capacity(usize::try_from(num_handles).unwrap());
let mut background_abort_registrations = Vec::with_capacity(background_aborts.capacity());
for _ in 0..num_handles {
let (abort, reg) = future::AbortHandle::new_pair();
background_aborts.push(abort);
background_abort_registrations.push(reg);
}

let (requests_processing_task, requests_responses_io) =
service::client_main_task(service::Config {
max_active_subscriptions: config.max_subscriptions,
Expand All @@ -115,11 +103,9 @@ pub fn service(config: Config) -> (Frontend, ServicePrototype) {
let frontend = Frontend {
log_target: log_target.clone(),
requests_responses_io: Arc::new(requests_responses_io),
background_aborts: Arc::from(background_aborts),
};

let prototype = ServicePrototype {
background_abort_registrations,
log_target,
requests_processing_task,
max_parallel_requests: config.max_parallel_requests,
Expand All @@ -143,9 +129,6 @@ pub struct Frontend {

/// Target to use when emitting logs.
log_target: String,

/// Handles to abort the background tasks.
background_aborts: Arc<[future::AbortHandle]>,
}

impl Frontend {
Expand All @@ -156,41 +139,40 @@ impl Frontend {
/// isn't called often enough. Use [`HandleRpcError::into_json_rpc_error`] to build the
/// JSON-RPC response to immediately send back to the user.
pub fn queue_rpc_request(&self, json_rpc_request: String) -> Result<(), HandleRpcError> {
// If the request isn't even a valid JSON-RPC request, we can't even send back a response.
// We have no choice but to immediately refuse the request.
if let Err(error) = json_rpc::parse::parse_call(&json_rpc_request) {
log::warn!(
target: &self.log_target,
"Refused malformed JSON-RPC request: {}", error
);
return Err(HandleRpcError::MalformedJsonRpc(error));
}

// Logging the request before it is queued.
log::debug!(
target: &self.log_target,
"PendingRequestsQueue <= {}",
crate::util::truncated_str(
json_rpc_request.chars().filter(|c| !c.is_control()),
100,
)
);
let log_friendly_request =
crate::util::truncated_str(json_rpc_request.chars().filter(|c| !c.is_control()), 100)
.to_string();

match self
.requests_responses_io
.try_send_request(json_rpc_request)
{
Ok(()) => Ok(()),
Ok(()) => {
log::debug!(
target: &self.log_target,
"JSON-RPC => {}",
log_friendly_request
);
Ok(())
}
Err(service::TrySendRequestError {
cause: service::TrySendRequestErrorCause::TooManyPendingRequests,
request,
}) => Err(HandleRpcError::TooManyPendingRequests {
json_rpc_request: request,
}),
Err(service::TrySendRequestError {
cause: service::TrySendRequestErrorCause::MalformedJson(err),
cause: service::TrySendRequestErrorCause::MalformedJson(error),
..
}) => Err(HandleRpcError::MalformedJsonRpc(err)),
}) => {
// If the request isn't even a valid JSON-RPC request, we can't even send back a
// response. We have no choice but to immediately refuse the request.
log::warn!(
target: &self.log_target,
"Refused malformed JSON-RPC request: {}", error
);
Err(HandleRpcError::MalformedJsonRpc(error))
}
Err(service::TrySendRequestError {
cause: service::TrySendRequestErrorCause::ClientMainTaskDestroyed,
..
Expand Down Expand Up @@ -221,18 +203,6 @@ impl Frontend {
}
}

impl Drop for Frontend {
fn drop(&mut self) {
// Call `abort()` if this was the last instance of the `Arc<AbortHandle>` (and thus the
// last instance of `Frontend`).
if let Some(background_aborts) = Arc::get_mut(&mut self.background_aborts) {
for background_abort in background_aborts {
background_abort.abort();
}
}
}
}

/// Prototype for a JSON-RPC service. Must be initialized using [`ServicePrototype::start`].
pub struct ServicePrototype {
/// Task processing the requests.
Expand All @@ -245,10 +215,6 @@ pub struct ServicePrototype {

/// Value obtained through [`Config::max_parallel_requests`].
max_parallel_requests: NonZeroU32,

/// List of abort handles. When tasks are spawned, each handle is associated with a task, so
/// that they can all be aborted. See [`Frontend::background_aborts`].
background_abort_registrations: Vec<future::AbortRegistration>,
}

/// Configuration for a JSON-RPC service.
Expand Down Expand Up @@ -310,7 +276,6 @@ impl ServicePrototype {
config,
self.requests_processing_task,
self.max_parallel_requests,
self.background_abort_registrations,
)
}
}
Expand Down
Loading

0 comments on commit e97f1d3

Please sign in to comment.