Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the JSON-RPC cache and handling of legacy API subscriptions #854

Merged
merged 46 commits into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
238a2ae
Move cache populating task to separate module
tomaka Jul 4, 2023
95357d2
Move main loop to separate function
tomaka Jul 4, 2023
5e7a60b
Pass individual components to the task rather than the Background
tomaka Jul 4, 2023
62a6570
Move `chain_subscribeAllHeads` to new task
tomaka Jul 4, 2023
c4a1d59
Move chain_subscribeNewHeads to new task
tomaka Jul 4, 2023
fc85534
Add a `Message` enum for messages
tomaka Jul 4, 2023
b7f70c0
Notify when subscription is destroyed
tomaka Jul 4, 2023
a0cd7b8
Make runtime_access ask the cache through a message
tomaka Jul 4, 2023
7e6084e
Query block number from cache
tomaka Jul 4, 2023
7e9a4c4
Ask block header from cache through a message
tomaka Jul 4, 2023
532cb53
Ask for the block state root and number by sending a message
tomaka Jul 4, 2023
173ca4f
Move `state_get_keys_paged` as a separate field
tomaka Jul 4, 2023
cf756d3
The `Cache` is now scoped to `legacy_state_sub`
tomaka Jul 4, 2023
9d6dd9e
Remove background abort registration system
tomaka Jul 4, 2023
c5fe695
Simplify `Frontend::queue_rpc_request`
tomaka Jul 4, 2023
ed5ebe4
Remove TODOs and update CHANGELOG
tomaka Jul 4, 2023
aeae663
Perform the re-subscription within the task
tomaka Jul 4, 2023
af2ff4b
Inline the fields of `Cache` within `Task`
tomaka Jul 4, 2023
8ee9ec4
Move recent pinned blocks to Subscription::Active as it makes sense t…
tomaka Jul 4, 2023
0f655d0
Simplify `start_task`
tomaka Jul 4, 2023
54bb53e
Wrap recent blocks in a struct
tomaka Jul 4, 2023
d9a9187
Add `runtime_version` field to `RecentBlock`
tomaka Jul 4, 2023
1c6bbe6
Fix pinning strategy
tomaka Jul 5, 2023
d95f273
Handle finalized subscriptions in new task
tomaka Jul 5, 2023
fd965d9
Fix runtime_version todo!()
tomaka Jul 5, 2023
25c72d1
Remove obsolete TODO
tomaka Jul 5, 2023
679318b
Keep current best and finalized and report them immediately
tomaka Jul 5, 2023
7ec026a
Fix unused imports and variables
tomaka Jul 5, 2023
6323c44
Ask the best block from the new task instead of using sub_utils
tomaka Jul 5, 2023
827445f
Move runtime subscription to new task
tomaka Jul 5, 2023
61a35c0
Remove unused function
tomaka Jul 5, 2023
f6351e5
Move `state_subscribeStorage` to new task
tomaka Jul 5, 2023
9b941eb
Update CHANGELOG
tomaka Jul 5, 2023
5658f95
Small tweaks and restore logging incoming requests
tomaka Jul 5, 2023
210cefd
Remove obsolete code
tomaka Jul 5, 2023
c071fa3
Add TODO
tomaka Jul 5, 2023
4cd5c7b
Change creation API to return a sender
tomaka Jul 5, 2023
382efd2
Use a Config struct and pass proper seeds
tomaka Jul 5, 2023
41ea4cc
Add lots of comments and tweaks
tomaka Jul 5, 2023
8d56473
Report finalized block separately
tomaka Jul 5, 2023
1c371aa
Simplify notifying best block update
tomaka Jul 5, 2023
e510594
Split the blocks more for readability
tomaka Jul 5, 2023
cb9ec78
Fix all warnings
tomaka Jul 5, 2023
66c460f
Small tweak
tomaka Jul 5, 2023
9934527
PR link
tomaka Jul 5, 2023
9e360ab
Docfix and error moved
tomaka Jul 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 26 additions & 61 deletions light-base/src/json_rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ use crate::{
network_service, platform::PlatformRef, runtime_service, sync_service, transactions_service,
};

use alloc::{format, string::String, sync::Arc, vec::Vec};
use alloc::{
format,
string::{String, ToString as _},
sync::Arc,
};
use core::num::{NonZeroU32, NonZeroUsize};
use futures_util::future;
use smoldot::{
chain_spec,
json_rpc::{self, service},
Expand Down Expand Up @@ -90,21 +93,6 @@ pub struct Config {
pub fn service(config: Config) -> (Frontend, ServicePrototype) {
let log_target = format!("json-rpc-{}", config.log_name);

// We are later going to spawn a bunch of tasks. Each task is associated with an "abort
// handle" that makes it possible to later abort it. We calculate here the number of handles
// that are necessary.
// This calculation must be in sync with the part of the code that spawns the tasks. Assertions
// are there in order to make sure that this is the case.
let num_handles = 1; // TODO: a bit ridiculous for this to be 1

let mut background_aborts = Vec::with_capacity(usize::try_from(num_handles).unwrap());
let mut background_abort_registrations = Vec::with_capacity(background_aborts.capacity());
for _ in 0..num_handles {
let (abort, reg) = future::AbortHandle::new_pair();
background_aborts.push(abort);
background_abort_registrations.push(reg);
}

let (requests_processing_task, requests_responses_io) =
service::client_main_task(service::Config {
max_active_subscriptions: config.max_subscriptions,
Expand All @@ -115,11 +103,9 @@ pub fn service(config: Config) -> (Frontend, ServicePrototype) {
let frontend = Frontend {
log_target: log_target.clone(),
requests_responses_io: Arc::new(requests_responses_io),
background_aborts: Arc::from(background_aborts),
};

let prototype = ServicePrototype {
background_abort_registrations,
log_target,
requests_processing_task,
max_parallel_requests: config.max_parallel_requests,
Expand All @@ -143,9 +129,6 @@ pub struct Frontend {

/// Target to use when emitting logs.
log_target: String,

/// Handles to abort the background tasks.
background_aborts: Arc<[future::AbortHandle]>,
}

impl Frontend {
Expand All @@ -156,41 +139,40 @@ impl Frontend {
/// isn't called often enough. Use [`HandleRpcError::into_json_rpc_error`] to build the
/// JSON-RPC response to immediately send back to the user.
pub fn queue_rpc_request(&self, json_rpc_request: String) -> Result<(), HandleRpcError> {
// If the request isn't even a valid JSON-RPC request, we can't even send back a response.
// We have no choice but to immediately refuse the request.
if let Err(error) = json_rpc::parse::parse_call(&json_rpc_request) {
log::warn!(
target: &self.log_target,
"Refused malformed JSON-RPC request: {}", error
);
return Err(HandleRpcError::MalformedJsonRpc(error));
}

// Logging the request before it is queued.
log::debug!(
target: &self.log_target,
"PendingRequestsQueue <= {}",
crate::util::truncated_str(
json_rpc_request.chars().filter(|c| !c.is_control()),
100,
)
);
let log_friendly_request =
crate::util::truncated_str(json_rpc_request.chars().filter(|c| !c.is_control()), 100)
.to_string();

match self
.requests_responses_io
.try_send_request(json_rpc_request)
{
Ok(()) => Ok(()),
Ok(()) => {
log::debug!(
target: &self.log_target,
"JSON-RPC => {}",
log_friendly_request
);
Ok(())
}
Err(service::TrySendRequestError {
cause: service::TrySendRequestErrorCause::TooManyPendingRequests,
request,
}) => Err(HandleRpcError::TooManyPendingRequests {
json_rpc_request: request,
}),
Err(service::TrySendRequestError {
cause: service::TrySendRequestErrorCause::MalformedJson(err),
cause: service::TrySendRequestErrorCause::MalformedJson(error),
..
}) => Err(HandleRpcError::MalformedJsonRpc(err)),
}) => {
// If the request isn't even a valid JSON-RPC request, we can't even send back a
// response. We have no choice but to immediately refuse the request.
log::warn!(
target: &self.log_target,
"Refused malformed JSON-RPC request: {}", error
);
Err(HandleRpcError::MalformedJsonRpc(error))
}
Err(service::TrySendRequestError {
cause: service::TrySendRequestErrorCause::ClientMainTaskDestroyed,
..
Expand Down Expand Up @@ -221,18 +203,6 @@ impl Frontend {
}
}

impl Drop for Frontend {
fn drop(&mut self) {
// Call `abort()` if this was the last instance of the `Arc<AbortHandle>` (and thus the
// last instance of `Frontend`).
if let Some(background_aborts) = Arc::get_mut(&mut self.background_aborts) {
for background_abort in background_aborts {
background_abort.abort();
}
}
}
}

/// Prototype for a JSON-RPC service. Must be initialized using [`ServicePrototype::start`].
pub struct ServicePrototype {
/// Task processing the requests.
Expand All @@ -245,10 +215,6 @@ pub struct ServicePrototype {

/// Value obtained through [`Config::max_parallel_requests`].
max_parallel_requests: NonZeroU32,

/// List of abort handles. When tasks are spawned, each handle is associated with a task, so
/// that they can all be aborted. See [`Frontend::background_aborts`].
background_abort_registrations: Vec<future::AbortRegistration>,
}

/// Configuration for a JSON-RPC service.
Expand Down Expand Up @@ -310,7 +276,6 @@ impl ServicePrototype {
config,
self.requests_processing_task,
self.max_parallel_requests,
self.background_abort_registrations,
)
}
}
Expand Down
Loading