diff --git a/light-base/src/json_rpc_service.rs b/light-base/src/json_rpc_service.rs index 5304b1a649..9eb908a44b 100644 --- a/light-base/src/json_rpc_service.rs +++ b/light-base/src/json_rpc_service.rs @@ -43,9 +43,12 @@ use crate::{ network_service, platform::PlatformRef, runtime_service, sync_service, transactions_service, }; -use alloc::{format, string::String, sync::Arc, vec::Vec}; +use alloc::{ + format, + string::{String, ToString as _}, + sync::Arc, +}; use core::num::{NonZeroU32, NonZeroUsize}; -use futures_util::future; use smoldot::{ chain_spec, json_rpc::{self, service}, @@ -90,21 +93,6 @@ pub struct Config { pub fn service(config: Config) -> (Frontend, ServicePrototype) { let log_target = format!("json-rpc-{}", config.log_name); - // We are later going to spawn a bunch of tasks. Each task is associated with an "abort - // handle" that makes it possible to later abort it. We calculate here the number of handles - // that are necessary. - // This calculation must be in sync with the part of the code that spawns the tasks. Assertions - // are there in order to make sure that this is the case. - let num_handles = 1; // TODO: a bit ridiculous for this to be 1 - - let mut background_aborts = Vec::with_capacity(usize::try_from(num_handles).unwrap()); - let mut background_abort_registrations = Vec::with_capacity(background_aborts.capacity()); - for _ in 0..num_handles { - let (abort, reg) = future::AbortHandle::new_pair(); - background_aborts.push(abort); - background_abort_registrations.push(reg); - } - let (requests_processing_task, requests_responses_io) = service::client_main_task(service::Config { max_active_subscriptions: config.max_subscriptions, @@ -115,11 +103,9 @@ pub fn service(config: Config) -> (Frontend, ServicePrototype) { let frontend = Frontend { log_target: log_target.clone(), requests_responses_io: Arc::new(requests_responses_io), - background_aborts: Arc::from(background_aborts), }; let prototype = ServicePrototype { - background_abort_registrations, log_target, requests_processing_task, max_parallel_requests: config.max_parallel_requests, @@ -143,9 +129,6 @@ pub struct Frontend { /// Target to use when emitting logs. log_target: String, - - /// Handles to abort the background tasks. - background_aborts: Arc<[future::AbortHandle]>, } impl Frontend { @@ -156,31 +139,22 @@ impl Frontend { /// isn't called often enough. Use [`HandleRpcError::into_json_rpc_error`] to build the /// JSON-RPC response to immediately send back to the user. pub fn queue_rpc_request(&self, json_rpc_request: String) -> Result<(), HandleRpcError> { - // If the request isn't even a valid JSON-RPC request, we can't even send back a response. - // We have no choice but to immediately refuse the request. - if let Err(error) = json_rpc::parse::parse_call(&json_rpc_request) { - log::warn!( - target: &self.log_target, - "Refused malformed JSON-RPC request: {}", error - ); - return Err(HandleRpcError::MalformedJsonRpc(error)); - } - - // Logging the request before it is queued. - log::debug!( - target: &self.log_target, - "PendingRequestsQueue <= {}", - crate::util::truncated_str( - json_rpc_request.chars().filter(|c| !c.is_control()), - 100, - ) - ); + let log_friendly_request = + crate::util::truncated_str(json_rpc_request.chars().filter(|c| !c.is_control()), 100) + .to_string(); match self .requests_responses_io .try_send_request(json_rpc_request) { - Ok(()) => Ok(()), + Ok(()) => { + log::debug!( + target: &self.log_target, + "JSON-RPC => {}", + log_friendly_request + ); + Ok(()) + } Err(service::TrySendRequestError { cause: service::TrySendRequestErrorCause::TooManyPendingRequests, request, @@ -188,9 +162,17 @@ impl Frontend { json_rpc_request: request, }), Err(service::TrySendRequestError { - cause: service::TrySendRequestErrorCause::MalformedJson(err), + cause: service::TrySendRequestErrorCause::MalformedJson(error), .. - }) => Err(HandleRpcError::MalformedJsonRpc(err)), + }) => { + // If the request isn't even a valid JSON-RPC request, we can't even send back a + // response. We have no choice but to immediately refuse the request. + log::warn!( + target: &self.log_target, + "Refused malformed JSON-RPC request: {}", error + ); + Err(HandleRpcError::MalformedJsonRpc(error)) + } Err(service::TrySendRequestError { cause: service::TrySendRequestErrorCause::ClientMainTaskDestroyed, .. @@ -221,18 +203,6 @@ impl Frontend { } } -impl Drop for Frontend { - fn drop(&mut self) { - // Call `abort()` if this was the last instance of the `Arc` (and thus the - // last instance of `Frontend`). - if let Some(background_aborts) = Arc::get_mut(&mut self.background_aborts) { - for background_abort in background_aborts { - background_abort.abort(); - } - } - } -} - /// Prototype for a JSON-RPC service. Must be initialized using [`ServicePrototype::start`]. pub struct ServicePrototype { /// Task processing the requests. @@ -245,10 +215,6 @@ pub struct ServicePrototype { /// Value obtained through [`Config::max_parallel_requests`]. max_parallel_requests: NonZeroU32, - - /// List of abort handles. When tasks are spawned, each handle is associated with a task, so - /// that they can all be aborted. See [`Frontend::background_aborts`]. - background_abort_registrations: Vec, } /// Configuration for a JSON-RPC service. @@ -310,7 +276,6 @@ impl ServicePrototype { config, self.requests_processing_task, self.max_parallel_requests, - self.background_abort_registrations, ) } } diff --git a/light-base/src/json_rpc_service/background.rs b/light-base/src/json_rpc_service/background.rs index 3ae5493af9..b97e95847d 100644 --- a/light-base/src/json_rpc_service/background.rs +++ b/light-base/src/json_rpc_service/background.rs @@ -37,17 +37,16 @@ use core::{ sync::atomic, time::Duration, }; -use futures_util::{future, FutureExt as _}; +use futures_channel::oneshot; use smoldot::{ executor::{host, runtime_host}, - header, json_rpc::{self, methods, service}, libp2p::{multiaddr, PeerId}, - network::protocol, }; mod chain_head; mod getters; +mod legacy_state_sub; mod state_chain; mod transactions; @@ -84,9 +83,16 @@ struct Background { /// See [`StartConfig::transactions_service`]. transactions_service: Arc>, - /// Various information caches about blocks, to potentially reduce the number of network - /// requests to perform. - cache: Mutex, + /// Channel where to send requests that concern the legacy JSON-RPC API that are handled by + /// a dedicated task. + to_legacy: Mutex>>, + + /// When `state_getKeysPaged` is called and the response is truncated, the response is + /// inserted in this cache. The API user is likely to call `state_getKeysPaged` again with + /// the same parameters, in which case we hit the cache and avoid the networking requests. + /// The values are list of keys. + state_get_keys_paged_cache: + Mutex>, util::SipHasherBuild>>, /// Hash of the genesis block. /// Keeping the genesis block is important, as the genesis block hash is included in @@ -110,56 +116,7 @@ struct Background { >, } -struct Cache { - /// When the runtime service reports a new block, it is kept pinned and inserted in this LRU - /// cache. When an entry in removed from the cache, it is unpinned. - /// - /// JSON-RPC clients are more likely to ask for information about recent blocks and perform - /// calls on them, hence a cache of recent blocks. - recent_pinned_blocks: lru::LruCache<[u8; 32], Vec, fnv::FnvBuildHasher>, - - /// Subscription on the runtime service under which the blocks of - /// [`Cache::recent_pinned_blocks`] are pinned. - /// - /// Contains `None` only at initialization, in which case [`Cache::recent_pinned_blocks`] - /// is guaranteed to be empty. In other words, if a block is found in - /// [`Cache::recent_pinned_blocks`] then this field is guaranteed to be `Some`. - subscription_id: Option, - - /// State trie root hashes and numbers of blocks that were not in - /// [`Cache::recent_pinned_blocks`]. - /// - /// The state trie root hash can also be an `Err` if the network request failed or if the - /// header is of an invalid format. - /// - /// The state trie root hash and number are wrapped in a `Shared` future. When multiple - /// requests need the state trie root hash and number of the same block, they are only queried - /// once and the query is inserted in the cache while in progress. This way, the multiple - /// requests can all wait on that single future. - /// - /// Most of the time, the JSON-RPC client will query blocks that are found in - /// [`Cache::recent_pinned_blocks`], but occasionally it will query older blocks. When the - /// storage of an older block is queried, it is common for the JSON-RPC client to make several - /// storage requests to that same old block. In order to avoid having to retrieve the state - /// trie root hash multiple, we store these hashes in this LRU cache. - block_state_root_hashes_numbers: lru::LruCache< - [u8; 32], - future::MaybeDone< - future::Shared< - future::BoxFuture<'static, Result<([u8; 32], u64), StateTrieRootHashError>>, - >, - >, - fnv::FnvBuildHasher, - >, - - /// When `state_getKeysPaged` is called and the response is truncated, the response is - /// inserted in this cache. The API user is likely to call `state_getKeysPaged` again with - /// the same parameters, in which case we hit the cache and avoid the networking requests. - /// The values are list of keys. - state_get_keys_paged: lru::LruCache>, util::SipHasherBuild>, -} - -/// See [`Cache::state_get_keys_paged`]. +/// See [`Background::state_get_keys_paged_cache`]. #[derive(Debug, Clone, PartialEq, Eq, Hash)] struct GetKeysPagedCacheKey { /// Value of the `hash` parameter of the call to `state_getKeysPaged`. @@ -173,8 +130,14 @@ pub(super) fn start( config: StartConfig<'_, TPlat>, mut requests_processing_task: service::ClientMainTask, max_parallel_requests: NonZeroU32, - background_abort_registrations: Vec, ) { + let to_legacy_tx = legacy_state_sub::start_task(legacy_state_sub::Config { + platform: config.platform.clone(), + log_target: log_target.clone(), + sync_service: config.sync_service.clone(), + runtime_service: config.runtime_service.clone(), + }); + let me = Arc::new(Background { log_target, platform: config.platform, @@ -189,28 +152,16 @@ pub(super) fn start( sync_service: config.sync_service.clone(), runtime_service: config.runtime_service.clone(), transactions_service: config.transactions_service.clone(), - cache: Mutex::new(Cache { - recent_pinned_blocks: lru::LruCache::with_hasher( - NonZeroUsize::new(32).unwrap(), - Default::default(), - ), - subscription_id: None, - block_state_root_hashes_numbers: lru::LruCache::with_hasher( - NonZeroUsize::new(32).unwrap(), - Default::default(), - ), - state_get_keys_paged: lru::LruCache::with_hasher( - NonZeroUsize::new(2).unwrap(), - util::SipHasherBuild::new(rand::random()), - ), - }), + to_legacy: Mutex::new(to_legacy_tx.clone()), + state_get_keys_paged_cache: Mutex::new(lru::LruCache::with_hasher( + NonZeroUsize::new(2).unwrap(), + util::SipHasherBuild::new(rand::random()), + )), genesis_block_hash: config.genesis_block_hash, printed_legacy_json_rpc_warning: atomic::AtomicBool::new(false), chain_head_follow_tasks: Mutex::new(hashbrown::HashMap::with_hasher(Default::default())), }); - let mut background_abort_registrations = background_abort_registrations.into_iter(); - let (tx, rx) = async_channel::bounded( usize::try_from(max_parallel_requests.get()).unwrap_or(usize::max_value()), ); @@ -236,7 +187,23 @@ pub(super) fn start( subscription_start, } => { requests_processing_task = task; - tx.send(either::Right(subscription_start)).await.unwrap(); + match subscription_start.request() { + methods::MethodCall::chain_subscribeAllHeads {} + | methods::MethodCall::chain_subscribeNewHeads {} + | methods::MethodCall::chain_subscribeFinalizedHeads {} + | methods::MethodCall::state_subscribeRuntimeVersion {} + | methods::MethodCall::state_subscribeStorage { .. } => { + me.to_legacy + .lock() + .await + .send(legacy_state_sub::Message::SubscriptionStart( + subscription_start, + )) + .await + .unwrap(); + } + _ => tx.send(either::Right(subscription_start)).await.unwrap(), + } } service::Event::SubscriptionDestroyed { task, @@ -248,6 +215,14 @@ pub(super) fn start( .lock() .await .remove(&subscription_id); + me.to_legacy + .lock() + .await + .send(legacy_state_sub::Message::SubscriptionDestroyed { + subscription_id, + }) + .await + .unwrap(); } service::Event::SerializedRequestsIoClosed => { break; @@ -280,114 +255,11 @@ pub(super) fn start( }, ); } - - // Spawn one task dedicated to filling the `Cache` with new blocks from the runtime - // service. - // TODO: this is actually racy, as a block subscription task could report a new block to a client, and then client can query it, before this block has been been added to the cache - // TODO: extract to separate function - me.platform - .clone() - .spawn_task(format!("{}-cache-populate", me.log_target).into(), { - future::Abortable::new( - async move { - loop { - let mut cache = me.cache.lock().await; - - // Subscribe to new runtime service blocks in order to push them in the - // cache as soon as they are available. - // The buffer size should be large enough so that, if the CPU is busy, it - // doesn't become full before the execution of this task resumes. - // The maximum number of pinned block is ignored, as this maximum is a way to - // avoid malicious behaviors. This code is by definition not considered - // malicious. - let mut subscribe_all = me - .runtime_service - .subscribe_all( - "json-rpc-blocks-cache", - 32, - NonZeroUsize::new(usize::max_value()).unwrap(), - ) - .await; - - cache.subscription_id = Some(subscribe_all.new_blocks.id()); - cache.recent_pinned_blocks.clear(); - debug_assert!(cache.recent_pinned_blocks.cap().get() >= 1); - - let finalized_block_hash = header::hash_from_scale_encoded_header( - &subscribe_all.finalized_block_scale_encoded_header, - ); - cache.recent_pinned_blocks.put( - finalized_block_hash, - subscribe_all.finalized_block_scale_encoded_header, - ); - - for block in subscribe_all.non_finalized_blocks_ancestry_order { - if cache.recent_pinned_blocks.len() - == cache.recent_pinned_blocks.cap().get() - { - let (hash, _) = cache.recent_pinned_blocks.pop_lru().unwrap(); - subscribe_all.new_blocks.unpin_block(&hash).await; - } - - let hash = - header::hash_from_scale_encoded_header(&block.scale_encoded_header); - cache - .recent_pinned_blocks - .put(hash, block.scale_encoded_header); - } - - drop(cache); - - loop { - let notification = subscribe_all.new_blocks.next().await; - match notification { - Some(runtime_service::Notification::Block(block)) => { - let mut cache = me.cache.lock().await; - - if cache.recent_pinned_blocks.len() - == cache.recent_pinned_blocks.cap().get() - { - let (hash, _) = - cache.recent_pinned_blocks.pop_lru().unwrap(); - subscribe_all.new_blocks.unpin_block(&hash).await; - } - - let hash = header::hash_from_scale_encoded_header( - &block.scale_encoded_header, - ); - cache - .recent_pinned_blocks - .put(hash, block.scale_encoded_header); - } - Some(runtime_service::Notification::Finalized { .. }) - | Some(runtime_service::Notification::BestBlockChanged { - .. - }) => {} - None => break, - } - } - } - }, - background_abort_registrations.next().unwrap(), - ) - .map(|_: Result<(), _>| ()) - .boxed() - }); - - debug_assert!(background_abort_registrations.next().is_none()); } impl Background { /// Pulls one request from the inner state machine, and processes it. async fn handle_request(self: &Arc, request: service::RequestProcess) { - // TODO: restore some form of logging - /*log::debug!(target: &self.log_target, "PendingRequestsQueue => {}", - crate::util::truncated_str( - json_rpc_request.chars().filter(|c| !c.is_control()), - 100, - ) - );*/ - // Print a warning for legacy JSON-RPC functions. match request.request() { methods::MethodCall::account_nextIndex { .. } @@ -761,20 +633,12 @@ impl Background { methods::MethodCall::author_submitAndWatchExtrinsic { .. } => { self.submit_and_watch_transaction(request).await } - methods::MethodCall::chain_subscribeAllHeads {} => { - self.chain_subscribe_all_heads(request).await; - } - methods::MethodCall::chain_subscribeFinalizedHeads {} => { - self.chain_subscribe_finalized_heads(request).await; - } - methods::MethodCall::chain_subscribeNewHeads {} => { - self.chain_subscribe_new_heads(request).await; - } - methods::MethodCall::state_subscribeRuntimeVersion {} => { - self.state_subscribe_runtime_version(request).await; - } - methods::MethodCall::state_subscribeStorage { .. } => { - self.state_subscribe_storage(request).await; + methods::MethodCall::chain_subscribeAllHeads {} + | methods::MethodCall::chain_subscribeFinalizedHeads {} + | methods::MethodCall::chain_subscribeNewHeads {} + | methods::MethodCall::state_subscribeRuntimeVersion {} + | methods::MethodCall::state_subscribeStorage { .. } => { + unreachable!() } methods::MethodCall::chainHead_unstable_body { .. } => { @@ -850,97 +714,6 @@ impl Background { } } - /// Obtain the state trie root hash and number of the given block, and make sure to put it - /// in cache. - async fn state_trie_root_hash( - &self, - hash: &[u8; 32], - ) -> Result<([u8; 32], u64), StateTrieRootHashError> { - let fetch = { - // Try to find an existing entry in cache, and if not create one. - let mut cache_lock = self.cache.lock().await; - - // Look in `recent_pinned_blocks`. - match cache_lock - .recent_pinned_blocks - .get(hash) - .map(|h| header::decode(h, self.sync_service.block_number_bytes())) - { - Some(Ok(header)) => return Ok((*header.state_root, header.number)), - Some(Err(err)) => return Err(StateTrieRootHashError::HeaderDecodeError(err)), // TODO: can this actually happen? unclear - None => {} - } - - // Look in `block_state_root_hashes`. - match cache_lock.block_state_root_hashes_numbers.get(hash) { - Some(future::MaybeDone::Done(Ok(val))) => return Ok(*val), - Some(future::MaybeDone::Future(f)) => f.clone(), - Some(future::MaybeDone::Gone) => unreachable!(), // We never use `Gone`. - Some(future::MaybeDone::Done(Err( - err @ StateTrieRootHashError::HeaderDecodeError(_), - ))) => { - // In case of a fatal error, return immediately. - return Err(err.clone()); - } - Some(future::MaybeDone::Done(Err(StateTrieRootHashError::NetworkQueryError))) - | None => { - // No existing cache entry. Create the future that will perform the fetch - // but do not actually start doing anything now. - let fetch = { - let sync_service = self.sync_service.clone(); - let hash = *hash; - async move { - // The sync service knows which peers are potentially aware of - // this block. - let result = sync_service - .clone() - .block_query_unknown_number( - hash, - protocol::BlocksRequestFields { - header: true, - body: false, - justifications: false, - }, - 4, - Duration::from_secs(8), - NonZeroU32::new(2).unwrap(), - ) - .await; - - if let Ok(block) = result { - // If successful, the `block_query` function guarantees that the - // header is present and valid. - let header = block.header.unwrap(); - debug_assert_eq!( - header::hash_from_scale_encoded_header(&header), - hash - ); - let decoded = - header::decode(&header, sync_service.block_number_bytes()) - .unwrap(); - Ok((*decoded.state_root, decoded.number)) - } else { - // TODO: better error details? - Err(StateTrieRootHashError::NetworkQueryError) - } - } - }; - - // Insert the future in the cache, so that any other call will use the same - // future. - let wrapped = fetch.boxed().shared(); - cache_lock - .block_state_root_hashes_numbers - .put(*hash, future::maybe_done(wrapped.clone())); - wrapped - } - } - }; - - // We await separately to be certain that the lock isn't held anymore. - fetch.await - } - async fn storage_query( &self, keys: impl Iterator + Clone> + Clone, @@ -949,10 +722,25 @@ impl Background { timeout_per_request: Duration, max_parallel: NonZeroU32, ) -> Result>>, StorageQueryError> { - let (state_trie_root_hash, block_number) = self - .state_trie_root_hash(hash) - .await - .map_err(StorageQueryError::FindStorageRootHashError)?; + let (state_trie_root_hash, block_number) = { + let (tx, rx) = oneshot::channel(); + self.to_legacy + .lock() + .await + .send(legacy_state_sub::Message::BlockStateRootAndNumber { + block_hash: *hash, + result_tx: tx, + }) + .await + .unwrap(); + + match rx.await.unwrap() { + Ok(v) => v, + Err(err) => { + return Err(StorageQueryError::FindStorageRootHashError(err)); + } + } + }; let result = self .sync_service @@ -997,114 +785,124 @@ impl Background { self: &Arc, block_hash: &[u8; 32], ) -> Result, RuntimeCallError> { - let cache_lock = self.cache.lock().await; - // Try to find the block in the cache of recent blocks. Most of the time, the call target // should be in there. - let lock = if cache_lock.recent_pinned_blocks.contains(block_hash) { - // The runtime service has the block pinned, meaning that we can ask the runtime - // service to perform the call. - self.runtime_service - .pinned_block_runtime_access(cache_lock.subscription_id.unwrap(), block_hash) + if let Some(runtime_access) = { + let (tx, rx) = oneshot::channel(); + self.to_legacy + .lock() .await - .ok() - } else { - None + .send(legacy_state_sub::Message::RecentBlockRuntimeAccess { + block_hash: *block_hash, + result_tx: tx, + }) + .await + .unwrap(); + rx.await.unwrap() + } { + return Ok(runtime_access); }; - Ok(if let Some(lock) = lock { - lock - } else { - // Second situation: the block is not in the cache of recent blocks. This isn't great. - drop::>(cache_lock); - - // The only solution is to download the runtime of the block in question from the network. + // Second situation: the block is not in the cache of recent blocks. This isn't great. + // The only solution is to download the runtime of the block in question from the network. - // TODO: considering caching the runtime code the same way as the state trie root hash + // TODO: considering caching the runtime code the same way as the state trie root hash - // In order to grab the runtime code and perform the call network request, we need - // to know the state trie root hash and the height of the block. - let (state_trie_root_hash, block_number) = self - .state_trie_root_hash(block_hash) + // In order to grab the runtime code and perform the call network request, we need + // to know the state trie root hash and the height of the block. + let (state_trie_root_hash, block_number) = { + let (tx, rx) = oneshot::channel(); + self.to_legacy + .lock() .await - .map_err(RuntimeCallError::FindStorageRootHashError)?; - - // Download the runtime of this block. This takes a long time as the runtime is rather - // big (around 1MiB in general). - let (storage_code, storage_heap_pages) = { - let entries = self - .sync_service - .clone() - .storage_query( - block_number, - block_hash, - &state_trie_root_hash, - [ - sync_service::StorageRequestItem { - key: b":code".to_vec(), - ty: sync_service::StorageRequestItemTy::Value, - }, - sync_service::StorageRequestItem { - key: b":heappages".to_vec(), - ty: sync_service::StorageRequestItemTy::Value, - }, - ] - .into_iter(), - 3, - Duration::from_secs(20), - NonZeroU32::new(1).unwrap(), - ) - .await - .map_err(runtime_service::RuntimeCallError::StorageQuery) - .map_err(RuntimeCallError::Call)?; - // TODO: not elegant - let heap_pages = entries - .iter() - .find_map(|entry| match entry { - sync_service::StorageResultItem::Value { key, value } - if key == b":heappages" => - { - Some(value.clone()) // TODO: overhead - } - _ => None, - }) - .unwrap(); - let code = entries - .iter() - .find_map(|entry| match entry { - sync_service::StorageResultItem::Value { key, value } - if key == b":code" => - { - Some(value.clone()) // TODO: overhead - } - _ => None, - }) - .unwrap(); - (code, heap_pages) - }; + .send(legacy_state_sub::Message::BlockStateRootAndNumber { + block_hash: *block_hash, + result_tx: tx, + }) + .await + .unwrap(); - // Give the code and heap pages to the runtime service. The runtime service will - // try to find any similar runtime it might have, and if not will compile it. - let pinned_runtime_id = self - .runtime_service - .compile_and_pin_runtime(storage_code, storage_heap_pages) - .await; + match rx.await.unwrap() { + Ok(v) => v, + Err(err) => { + return Err(RuntimeCallError::FindStorageRootHashError(err)); + } + } + }; - let precall = self - .runtime_service - .pinned_runtime_access( - pinned_runtime_id.clone(), - *block_hash, + // Download the runtime of this block. This takes a long time as the runtime is rather + // big (around 1MiB in general). + let (storage_code, storage_heap_pages) = { + let entries = self + .sync_service + .clone() + .storage_query( block_number, - state_trie_root_hash, + block_hash, + &state_trie_root_hash, + [ + sync_service::StorageRequestItem { + key: b":code".to_vec(), + ty: sync_service::StorageRequestItemTy::Value, + }, + sync_service::StorageRequestItem { + key: b":heappages".to_vec(), + ty: sync_service::StorageRequestItemTy::Value, + }, + ] + .into_iter(), + 3, + Duration::from_secs(20), + NonZeroU32::new(1).unwrap(), ) - .await; + .await + .map_err(runtime_service::RuntimeCallError::StorageQuery) + .map_err(RuntimeCallError::Call)?; + // TODO: not elegant + let heap_pages = entries + .iter() + .find_map(|entry| match entry { + sync_service::StorageResultItem::Value { key, value } + if key == b":heappages" => + { + Some(value.clone()) // TODO: overhead + } + _ => None, + }) + .unwrap(); + let code = entries + .iter() + .find_map(|entry| match entry { + sync_service::StorageResultItem::Value { key, value } if key == b":code" => { + Some(value.clone()) // TODO: overhead + } + _ => None, + }) + .unwrap(); + (code, heap_pages) + }; - // TODO: consider keeping pinned runtimes in a cache instead - self.runtime_service.unpin_runtime(pinned_runtime_id).await; + // Give the code and heap pages to the runtime service. The runtime service will + // try to find any similar runtime it might have, and if not will compile it. + let pinned_runtime_id = self + .runtime_service + .compile_and_pin_runtime(storage_code, storage_heap_pages) + .await; + + let precall = self + .runtime_service + .pinned_runtime_access( + pinned_runtime_id.clone(), + *block_hash, + block_number, + state_trie_root_hash, + ) + .await; - precall - }) + // TODO: consider keeping pinned runtimes in a cache instead + self.runtime_service.unpin_runtime(pinned_runtime_id).await; + + Ok(precall) } /// Performs a runtime call to a random block. @@ -1315,7 +1113,7 @@ impl Background { enum StorageQueryError { /// Error while finding the storage root hash of the requested block. #[display(fmt = "Failed to obtain block state trie root: {_0}")] - FindStorageRootHashError(StateTrieRootHashError), + FindStorageRootHashError(legacy_state_sub::StateTrieRootHashError), /// Error while retrieving the storage item from other nodes. #[display(fmt = "{_0}")] StorageRetrieval(sync_service::StorageQueryError), @@ -1326,7 +1124,7 @@ enum StorageQueryError { enum RuntimeCallError { /// Error while finding the storage root hash of the requested block. #[display(fmt = "Failed to obtain block state trie root: {_0}")] - FindStorageRootHashError(StateTrieRootHashError), + FindStorageRootHashError(legacy_state_sub::StateTrieRootHashError), #[display(fmt = "{_0}")] Call(runtime_service::RuntimeCallError), #[display(fmt = "{_0}")] @@ -1344,15 +1142,6 @@ enum RuntimeCallError { }, } -/// Error potentially returned by [`Background::state_trie_root_hash`]. -#[derive(Debug, derive_more::Display, Clone)] -enum StateTrieRootHashError { - /// Failed to decode block header. - HeaderDecodeError(header::Error), - /// Error while fetching block header from network. - NetworkQueryError, -} - #[derive(Debug)] struct RuntimeCallResult { return_value: Vec, diff --git a/light-base/src/json_rpc_service/background/legacy_state_sub.rs b/light-base/src/json_rpc_service/background/legacy_state_sub.rs new file mode 100644 index 0000000000..013f13e67a --- /dev/null +++ b/light-base/src/json_rpc_service/background/legacy_state_sub.rs @@ -0,0 +1,1297 @@ +// Smoldot +// Copyright (C) 2023 Pierre Krieger +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use core::{ + future::Future, + iter, + num::{NonZeroU32, NonZeroUsize}, + pin::Pin, + time::Duration, +}; + +use alloc::{borrow::ToOwned as _, boxed::Box, format, string::String, sync::Arc, vec::Vec}; +use futures_channel::oneshot; +use futures_lite::{FutureExt as _, StreamExt as _}; +use futures_util::{future, FutureExt as _}; +use smoldot::{ + executor, header, + informant::HashDisplay, + json_rpc::{self, methods, service}, + network::protocol, +}; + +use crate::{platform::PlatformRef, runtime_service, sync_service}; + +/// Message that can be passed to the task started with [`start_task`]. +pub(super) enum Message { + /// JSON-RPC client has sent a subscription request. + /// + /// Only the legacy API subscription requests are supported. Any other will trigger a panic. + SubscriptionStart(service::SubscriptionStartProcess), + + /// JSON-RPC client has unsubscribed from something. + SubscriptionDestroyed { + /// Identifier of the subscription. Does not necessarily match any of the subscriptions + /// previously passed through [`Message::SubscriptionStart`]. + subscription_id: String, + }, + + /// The task must send back access to the runtime of the given block, or `None` if the block + /// isn't available in the cache. + RecentBlockRuntimeAccess { + /// Hash of the block to query. + block_hash: [u8; 32], + /// How to send back the result. + result_tx: oneshot::Sender>>, + }, + + /// The task must send back the current best block hash. + /// + /// Waits for the runtime service to be ready, which can potentially take a long time. + CurrentBestBlockHash { + /// How to send back the result. + result_tx: oneshot::Sender<[u8; 32]>, + }, + + /// The task must send back the state root and number the given block. If the block isn't + /// available in the cache, a network request is performed. + // TODO: refactor this message and the ones below to be more consistent + BlockStateRootAndNumber { + /// Hash of the block to query. + block_hash: [u8; 32], + /// How to send back the result. + result_tx: oneshot::Sender>, + }, + + /// The task must send back the number of the given block, or `None` if the block isn't + /// available in the cache. + BlockNumber { + /// Hash of the block to query. + block_hash: [u8; 32], + /// How to send back the result. + result_tx: oneshot::Sender>, + }, + + /// The task must send back the header of the given block, or `None` if the block isn't + /// available in the cache. + BlockHeader { + /// Hash of the block to query. + block_hash: [u8; 32], + /// How to send back the result. + result_tx: oneshot::Sender>>, + }, + + /// Internal message. Do not use. + StorageFetch { + /// Hash of the block the storage fetch targets. + block_hash: [u8; 32], + /// Results provided by the [`sync_service`]. + result: Result, sync_service::StorageQueryError>, + }, +} + +/// Configuration to pass to [`start_task`]. +pub(super) struct Config { + /// Access to the platform bindings. + pub platform: TPlat, + /// Prefix used for all the logging in this module. + pub log_target: String, + /// Sync service used to start networking requests. + pub sync_service: Arc>, + /// Runtime service used to subscribe to notifications regarding blocks and report them to + /// the JSON-RPC client. + pub runtime_service: Arc>, +} + +/// Error potentially returned by [`Message::BlockStateRootAndNumber`]. +#[derive(Debug, derive_more::Display, Clone)] +pub(super) enum StateTrieRootHashError { + /// Failed to decode block header. + HeaderDecodeError(header::Error), + /// Error while fetching block header from network. + NetworkQueryError, +} + +/// Spawn a task dedicated to holding a cache and fulfilling the legacy API subscriptions that the +/// JSON-RPC client starts. +pub(super) fn start_task( + config: Config, +) -> async_channel::Sender> { + let (requests_tx, requests_rx) = async_channel::bounded(8); + + config.platform.clone().spawn_task( + format!("{}-legacy-state-subscriptions", config.log_target).into(), + Box::pin(run(Task { + block_state_root_hashes_numbers: lru::LruCache::with_hasher( + NonZeroUsize::new(32).unwrap(), + Default::default(), + ), + log_target: config.log_target.clone(), + platform: config.platform.clone(), + best_block_report: Vec::with_capacity(4), + sync_service: config.sync_service, + runtime_service: config.runtime_service, + subscription: Subscription::NotCreated, + requests_tx: async_channel::Sender::downgrade(&requests_tx), + requests_rx, + all_heads_subscriptions: hashbrown::HashMap::with_capacity_and_hasher( + 2, + Default::default(), + ), + new_heads_subscriptions: hashbrown::HashMap::with_capacity_and_hasher( + 2, + Default::default(), + ), + finalized_heads_subscriptions: hashbrown::HashMap::with_capacity_and_hasher( + 2, + Default::default(), + ), + runtime_version_subscriptions: hashbrown::HashMap::with_capacity_and_hasher( + 2, + Default::default(), + ), + storage_subscriptions: hashbrown::HashMap::with_capacity_and_hasher( + 16, + Default::default(), + ), + storage_subscriptions_by_key: hashbrown::HashMap::with_capacity_and_hasher( + 16, + crate::util::SipHasherBuild::new(rand::random()), + ), + stale_storage_subscriptions: hashbrown::HashSet::with_capacity_and_hasher( + 16, + Default::default(), + ), + storage_query_in_progress: false, + })), + ); + + requests_tx +} + +struct Task { + /// See [`Config::log_target`]. + log_target: String, + /// See [`Config::platform`]. + platform: TPlat, + /// See [`Config::sync_service`]. + sync_service: Arc>, + /// See [`Config::runtime_service`]. + runtime_service: Arc>, + + /// State of the subscription towards the runtime service. + subscription: Subscription, + /// Whenever the subscription becomes active and the best block becomes available, it must be + /// sent on these channels as soon as possible. + best_block_report: Vec>, + + /// Sending side of [`Task::requests_rx`]. + requests_tx: async_channel::WeakSender>, + /// How to receive messages from the API user. + requests_rx: async_channel::Receiver>, + + /// List of all active `chain_subscribeAllHeads` subscriptions, indexed by the subscription ID. + // TODO: shrink_to_fit? + all_heads_subscriptions: hashbrown::HashMap, + /// List of all active `chain_subscribeNewHeads` subscriptions, indexed by the subscription ID. + // TODO: shrink_to_fit? + new_heads_subscriptions: hashbrown::HashMap, + // TODO: shrink_to_fit? + /// List of all active `chain_subscribeFinalizedHeads` subscriptions, indexed by the + /// subscription ID. + finalized_heads_subscriptions: + hashbrown::HashMap, + // TODO: shrink_to_fit? + /// List of all active `state_subscribeRuntimeVersion` subscriptions, indexed by the + /// subscription ID. + runtime_version_subscriptions: + hashbrown::HashMap, + + /// List of all active `state_subscribeStorage` subscriptions, indexed by the subscription ID. + /// The value is the subscription plus the list of keys requested by this subscription. + // TODO: shrink_to_fit? + storage_subscriptions: + hashbrown::HashMap>), fnv::FnvBuildHasher>, + /// Identical to [`Task::storage_subscriptions`] by indexed by requested key. The inner + /// `HashSet`s are never empty. + // TODO: shrink_to_fit? + storage_subscriptions_by_key: hashbrown::HashMap< + Vec, + hashbrown::HashSet, + crate::util::SipHasherBuild, + >, + /// List of storage subscriptions whose latest sent notification isn't about the current + /// best block. + stale_storage_subscriptions: hashbrown::HashSet, + /// `true` if there exists a background task currently fetching storage items for storage + /// subscriptions. This task will send a [`Message::StorageFetch`] once it's finished. + storage_query_in_progress: bool, + + /// State trie root hashes and numbers of blocks that were not in + /// [`Subscription::Active::pinned_blocks`]. + /// + /// The state trie root hash can also be an `Err` if the network request failed or if the + /// header is of an invalid format. + /// + /// The state trie root hash and number are wrapped in a `Shared` future. When multiple + /// requests need the state trie root hash and number of the same block, they are only queried + /// once and the query is inserted in the cache while in progress. This way, the multiple + /// requests can all wait on that single future. + /// + /// Most of the time, the JSON-RPC client will query blocks that are found in + /// [`Subscription::Active::pinned_blocks`], but occasionally it will query older blocks. When + /// the storage of an older block is queried, it is common for the JSON-RPC client to make + /// several storage requests to that same old block. In order to avoid having to retrieve the + /// state trie root hash multiple, we store these hashes in this LRU cache. + block_state_root_hashes_numbers: lru::LruCache< + [u8; 32], + future::MaybeDone< + future::Shared< + future::BoxFuture<'static, Result<([u8; 32], u64), StateTrieRootHashError>>, + >, + >, + fnv::FnvBuildHasher, + >, +} + +/// State of the subscription towards the runtime service. See [`Task::subscription`]. +enum Subscription { + /// Subscription is active. + Active { + /// Object representing the subscription. + subscription: runtime_service::Subscription, + + /// Hash of the current best block. Guaranteed to be in + /// [`Subscription::Active::pinned_blocks`]. + current_best_block: [u8; 32], + + /// If `Some`, the new heads and runtime version subscriptions haven't been updated about + /// the new current best block yet. Contains the previous best block that the + /// subscriptions are aware of. The previous best block is guaranteed to be in + /// [`Subscription::Active::pinned_blocks`]. + new_heads_and_runtime_subscriptions_stale: Option>, + + /// Hash of the current finalized block. Guaranteed to be in + /// [`Subscription::Active::pinned_blocks`]. + current_finalized_block: [u8; 32], + + /// If `true`, the finalized heads subscriptions haven't been updated about the new + /// current finalized block yet. + finalized_heads_subscriptions_stale: bool, + + /// When the runtime service reports a new block, it is kept pinned and inserted in this + /// list. + /// + /// Blocks are removed from this container and unpinned when they leave + /// [`Subscription::Active::finalized_and_pruned_lru`]. + /// + /// JSON-RPC clients are more likely to ask for information about recent blocks and + /// perform calls on them, hence a cache of recent blocks. + pinned_blocks: hashbrown::HashMap<[u8; 32], RecentBlock, fnv::FnvBuildHasher>, + + /// When a block is finalized or pruned, it is inserted into this LRU cache. The least + /// recently used blocks are removed and unpinned. + finalized_and_pruned_lru: lru::LruCache<[u8; 32], (), fnv::FnvBuildHasher>, + }, + + /// Wiating for the runtime service to start the subscription. Can potentially take a long + /// time. + Pending(Pin> + Send>>), + + /// Subscription not requested yet. Should transition to [`Subscription::Pending`] as soon + /// as possible. + NotCreated, +} + +struct RecentBlock { + scale_encoded_header: Vec, + // TODO: do we really need to keep the runtime version here, given that the block is still pinned in the runtime service? + runtime_version: Arc>, +} + +/// Actually run the task. +async fn run(mut task: Task) { + loop { + // Perform some internal state updates if necessary. + + // Process the content of `best_block_report` + if let Subscription::Active { + current_best_block, .. + } = &task.subscription + { + while let Some(sender) = task.best_block_report.pop() { + let _ = sender.send(*current_best_block); + } + task.best_block_report.shrink_to_fit(); + } + + // If the finalized heads subcriptions aren't up-to-date with the latest finalized block, + // report it to them. + if let Subscription::Active { + pinned_blocks, + current_finalized_block, + finalized_heads_subscriptions_stale, + .. + } = &mut task.subscription + { + if *finalized_heads_subscriptions_stale { + let finalized_block_header = &pinned_blocks + .get(current_finalized_block) + .unwrap() + .scale_encoded_header; + let finalized_block_json_rpc_header = + match methods::Header::from_scale_encoded_header( + finalized_block_header, + task.runtime_service.block_number_bytes(), + ) { + Ok(h) => h, + Err(error) => { + log::warn!( + target: &task.log_target, + "`chain_subscribeFinalizedHeads` subscription has skipped block \ + due to undecodable header. Hash: {}. Error: {}", + HashDisplay(current_finalized_block), + error, + ); + continue; + } + }; + + for (subscription_id, subscription) in &mut task.finalized_heads_subscriptions { + subscription + .send_notification(methods::ServerToClient::chain_finalizedHead { + subscription: subscription_id.as_str().into(), + result: finalized_block_json_rpc_header.clone(), + }) + .await; + } + + *finalized_heads_subscriptions_stale = false; + } + } + + // If the new heads and runtime version subscriptions aren't up-to-date with the latest + // best block, report it to them. + if let Subscription::Active { + pinned_blocks, + current_best_block, + new_heads_and_runtime_subscriptions_stale, + .. + } = &mut task.subscription + { + if let Some(previous_best_block) = new_heads_and_runtime_subscriptions_stale.take() { + let best_block_header = &pinned_blocks + .get(current_best_block) + .unwrap() + .scale_encoded_header; + let best_block_json_rpc_header = match methods::Header::from_scale_encoded_header( + &best_block_header, + task.runtime_service.block_number_bytes(), + ) { + Ok(h) => h, + Err(error) => { + log::warn!( + target: &task.log_target, + "`chain_subscribeNewHeads` subscription has skipped block due to \ + undecodable header. Hash: {}. Error: {}", + HashDisplay(current_best_block), + error, + ); + continue; + } + }; + + for (subscription_id, subscription) in &mut task.new_heads_subscriptions { + subscription + .send_notification(methods::ServerToClient::chain_newHead { + subscription: subscription_id.as_str().into(), + result: best_block_json_rpc_header.clone(), + }) + .await; + } + + let new_best_runtime = &pinned_blocks + .get(current_best_block) + .unwrap() + .runtime_version; + if previous_best_block.map_or(true, |prev_best_block| { + !Arc::ptr_eq( + new_best_runtime, + &pinned_blocks.get(&prev_best_block).unwrap().runtime_version, + ) + }) { + for (subscription_id, subscription) in &mut task.runtime_version_subscriptions { + subscription + .send_notification(methods::ServerToClient::state_runtimeVersion { + subscription: subscription_id.as_str().into(), + result: convert_runtime_version(new_best_runtime), + }) + .await; + } + } + + task.stale_storage_subscriptions + .extend(task.storage_subscriptions.keys().cloned()); + } + } + + // Start a task that fetches the storage items of the stale storage subscriptions. + if let Subscription::Active { + pinned_blocks, + current_best_block, + .. + } = &task.subscription + { + if !task.storage_query_in_progress && !task.stale_storage_subscriptions.is_empty() { + // If the header of the current best block can't be decoded, we don't start + // the task. + let (block_number, state_trie_root) = match header::decode( + &pinned_blocks + .get(current_best_block) + .unwrap() + .scale_encoded_header, + task.runtime_service.block_number_bytes(), + ) { + Ok(header) => (header.number, *header.state_root), + Err(_) => { + // Can't decode the header of the current best block. + // All the subscriptions are marked as non-stale, since they are up-to-date + // with the current best block. + // TODO: print warning? + task.stale_storage_subscriptions.clear(); + continue; + } + }; + + // Build the list of keys that must be requested by aggregating the keys requested + // by all stale storage subscriptions. + let mut keys = hashbrown::HashSet::with_hasher(crate::util::SipHasherBuild::new( + rand::random(), + )); + keys.extend( + task.stale_storage_subscriptions + .iter() + .map(|s_id| &task.storage_subscriptions.get(s_id).unwrap().1) + .flat_map(|keys_list| keys_list.iter().cloned()), + ); + + // If the list of keys to query is empty, we mark all subscriptions as no longer + // stale and loop again. This is necessary in order to prevent infinite loops if + // the JSON-RPC client subscribes to an empty list of items. + if keys.is_empty() { + task.stale_storage_subscriptions.clear(); + continue; + } + + // Start the task in the background. + // The task will send a `Message::StorageFetch` once it is done. + task.storage_query_in_progress = true; + task.platform.spawn_task( + format!("{}-storage-subscriptions-fetch", task.log_target).into(), + Box::pin({ + let block_hash = current_best_block.clone(); + let sync_service = task.sync_service.clone(); + // TODO: a bit overcomplicated because `WeakSender` doesn't implement `Clone`: https://github.com/smol-rs/async-channel/pull/62 + let requests_tx = async_channel::Sender::downgrade( + &task.requests_tx.upgrade().unwrap().clone(), + ); + async move { + let result = sync_service + .clone() + .storage_query( + block_number, + &block_hash, + &state_trie_root, + keys.into_iter() + .map(|key| sync_service::StorageRequestItem { + key, + ty: sync_service::StorageRequestItemTy::Value, + }), + 4, + Duration::from_secs(12), + NonZeroU32::new(2).unwrap(), + ) + .await; + if let Some(requests_tx) = requests_tx.upgrade() { + let _ = requests_tx + .send(Message::StorageFetch { block_hash, result }) + .await; + } + } + }), + ); + } + } + + enum WhatHappened<'a, TPlat: PlatformRef> { + SubscriptionNotification { + notification: runtime_service::Notification, + subscription: &'a mut runtime_service::Subscription, + pinned_blocks: + &'a mut hashbrown::HashMap<[u8; 32], RecentBlock, fnv::FnvBuildHasher>, + finalized_and_pruned_lru: &'a mut lru::LruCache<[u8; 32], (), fnv::FnvBuildHasher>, + current_best_block: &'a mut [u8; 32], + new_heads_and_runtime_subscriptions_stale: &'a mut Option>, + current_finalized_block: &'a mut [u8; 32], + finalized_heads_subscriptions_stale: &'a mut bool, + }, + SubscriptionDead, + SubscriptionReady(runtime_service::SubscribeAll), + Message(Message), + ForegroundDead, + } + + // Asynchronously wait for something to happen. This can potentially take a long time. + let event: WhatHappened<'_, TPlat> = { + let subscription_event = async { + match &mut task.subscription { + Subscription::NotCreated => WhatHappened::SubscriptionDead, + Subscription::Active { + subscription, + pinned_blocks, + finalized_and_pruned_lru, + current_best_block, + new_heads_and_runtime_subscriptions_stale, + current_finalized_block, + finalized_heads_subscriptions_stale, + } => match subscription.next().await { + Some(notification) => WhatHappened::SubscriptionNotification { + notification, + subscription, + pinned_blocks, + finalized_and_pruned_lru, + current_best_block, + new_heads_and_runtime_subscriptions_stale, + current_finalized_block, + finalized_heads_subscriptions_stale, + }, + None => WhatHappened::SubscriptionDead, + }, + Subscription::Pending(pending) => { + WhatHappened::SubscriptionReady(pending.await) + } + } + }; + + let message = async { + match task.requests_rx.next().await { + Some(msg) => WhatHappened::Message(msg), + None => WhatHappened::ForegroundDead, + } + }; + + subscription_event.or(message).await + }; + + // Perform internal state updates depending on what happened. + match event { + // Runtime service is now ready to give us blocks. + WhatHappened::SubscriptionReady(subscribe_all) => { + // We must transition to `Subscription::Active`. + let mut pinned_blocks = + hashbrown::HashMap::with_capacity_and_hasher(32, Default::default()); + let mut finalized_and_pruned_lru = lru::LruCache::with_hasher( + NonZeroUsize::new(32).unwrap(), + fnv::FnvBuildHasher::default(), + ); + + let finalized_block_hash = header::hash_from_scale_encoded_header( + &subscribe_all.finalized_block_scale_encoded_header, + ); + pinned_blocks.insert( + finalized_block_hash, + RecentBlock { + scale_encoded_header: subscribe_all.finalized_block_scale_encoded_header, + runtime_version: Arc::new(subscribe_all.finalized_block_runtime), + }, + ); + finalized_and_pruned_lru.put(finalized_block_hash, ()); + + let mut current_best_block = finalized_block_hash; + + for block in subscribe_all.non_finalized_blocks_ancestry_order { + let hash = header::hash_from_scale_encoded_header(&block.scale_encoded_header); + pinned_blocks.insert( + hash, + RecentBlock { + scale_encoded_header: block.scale_encoded_header, + runtime_version: match block.new_runtime { + Some(r) => Arc::new(r), + None => pinned_blocks + .get(&block.parent_hash) + .unwrap() + .runtime_version + .clone(), + }, + }, + ); + + if block.is_new_best { + current_best_block = hash; + } + } + + task.subscription = Subscription::Active { + subscription: subscribe_all.new_blocks, + pinned_blocks, + finalized_and_pruned_lru, + current_best_block, + new_heads_and_runtime_subscriptions_stale: Some(None), + current_finalized_block: finalized_block_hash, + finalized_heads_subscriptions_stale: true, + }; + } + + // A new non-finalized block has appeared! + WhatHappened::SubscriptionNotification { + notification: runtime_service::Notification::Block(block), + pinned_blocks, + current_best_block, + new_heads_and_runtime_subscriptions_stale, + .. + } => { + let json_rpc_header = match methods::Header::from_scale_encoded_header( + &block.scale_encoded_header, + task.runtime_service.block_number_bytes(), + ) { + Ok(h) => h, + Err(error) => { + log::warn!( + target: &task.log_target, + "`chain_subscribeAllHeads` subscription has skipped block due to \ + undecodable header. Hash: {}. Error: {}", + HashDisplay(&header::hash_from_scale_encoded_header( + &block.scale_encoded_header + )), + error, + ); + continue; + } + }; + + let hash = header::hash_from_scale_encoded_header(&block.scale_encoded_header); + let _was_in = pinned_blocks.insert( + hash, + RecentBlock { + scale_encoded_header: block.scale_encoded_header, + runtime_version: match block.new_runtime { + Some(r) => Arc::new(r), + None => pinned_blocks + .get(&block.parent_hash) + .unwrap() + .runtime_version + .clone(), + }, + }, + ); + debug_assert!(_was_in.is_none()); + + for (subscription_id, subscription) in &mut task.all_heads_subscriptions { + subscription + .send_notification(methods::ServerToClient::chain_allHead { + subscription: subscription_id.as_str().into(), + result: json_rpc_header.clone(), + }) + .await; + } + + if block.is_new_best { + *new_heads_and_runtime_subscriptions_stale = Some(Some(*current_best_block)); + *current_best_block = hash; + } + } + + // A block has been finalized. + WhatHappened::SubscriptionNotification { + notification: + runtime_service::Notification::Finalized { + hash: finalized_hash, + pruned_blocks, + best_block_hash: new_best_block_hash, + }, + pinned_blocks, + finalized_and_pruned_lru, + subscription, + current_best_block, + new_heads_and_runtime_subscriptions_stale, + current_finalized_block, + finalized_heads_subscriptions_stale, + } => { + *current_finalized_block = finalized_hash; + *finalized_heads_subscriptions_stale = true; + + // Add the pruned and finalized blocks to the LRU cache. The least-recently used + // entries in the cache are unpinned and no longer tracked. + // + // An important detail here is that the newly-finalized block is added to the list + // at the end, in order to guaranteed that it doesn't get removed. This is + // necessary in order to guarantee that the current finalized (and current best, + // if the best block is also the finalized block) remains pinned until at least + // a different block gets finalized. + for block_hash in pruned_blocks.into_iter().chain(iter::once(finalized_hash)) { + if finalized_and_pruned_lru.len() == finalized_and_pruned_lru.cap().get() { + let (hash_to_unpin, _) = finalized_and_pruned_lru.pop_lru().unwrap(); + subscription.unpin_block(&hash_to_unpin).await; + pinned_blocks.remove(&hash_to_unpin).unwrap(); + } + finalized_and_pruned_lru.put(block_hash, ()); + } + + if *current_best_block != new_best_block_hash { + *new_heads_and_runtime_subscriptions_stale = Some(Some(*current_best_block)); + *current_best_block = new_best_block_hash; + } + } + + // The current best block has now changed. + WhatHappened::SubscriptionNotification { + notification: + runtime_service::Notification::BestBlockChanged { + hash: new_best_hash, + .. + }, + current_best_block, + new_heads_and_runtime_subscriptions_stale, + .. + } => { + *new_heads_and_runtime_subscriptions_stale = Some(Some(*current_best_block)); + *current_best_block = new_best_hash; + } + + // Request from the JSON-RPC client. + WhatHappened::Message(Message::SubscriptionStart(request)) => match request.request() { + methods::MethodCall::chain_subscribeAllHeads {} => { + let subscription = request.accept(); + let subscription_id = subscription.subscription_id().to_owned(); + task.all_heads_subscriptions + .insert(subscription_id, subscription); + } + methods::MethodCall::chain_subscribeNewHeads {} => { + let mut subscription = request.accept(); + let subscription_id = subscription.subscription_id().to_owned(); + let to_send = if let Subscription::Active { + current_best_block, + pinned_blocks, + .. + } = &task.subscription + { + Some( + match methods::Header::from_scale_encoded_header( + &pinned_blocks + .get(current_best_block) + .unwrap() + .scale_encoded_header, + task.runtime_service.block_number_bytes(), + ) { + Ok(h) => h, + Err(error) => { + log::warn!( + target: &task.log_target, + "`chain_subscribeNewHeads` subscription has skipped \ + block due to undecodable header. Hash: {}. Error: {}", + HashDisplay(current_best_block), + error, + ); + continue; + } + }, + ) + } else { + None + }; + if let Some(to_send) = to_send { + subscription + .send_notification(methods::ServerToClient::chain_newHead { + subscription: subscription_id.as_str().into(), + result: to_send, + }) + .await; + } + task.new_heads_subscriptions + .insert(subscription_id, subscription); + } + methods::MethodCall::chain_subscribeFinalizedHeads {} => { + let mut subscription = request.accept(); + let subscription_id = subscription.subscription_id().to_owned(); + let to_send = if let Subscription::Active { + current_finalized_block, + pinned_blocks, + .. + } = &task.subscription + { + Some( + match methods::Header::from_scale_encoded_header( + &pinned_blocks + .get(current_finalized_block) + .unwrap() + .scale_encoded_header, + task.runtime_service.block_number_bytes(), + ) { + Ok(h) => h, + Err(error) => { + log::warn!( + target: &task.log_target, + "`chain_subscribeFinalizedHeads` subscription has skipped \ + block due to undecodable header. Hash: {}. Error: {}", + HashDisplay(current_finalized_block), + error, + ); + continue; + } + }, + ) + } else { + None + }; + if let Some(to_send) = to_send { + subscription + .send_notification(methods::ServerToClient::chain_finalizedHead { + subscription: subscription_id.as_str().into(), + result: to_send, + }) + .await; + } + task.finalized_heads_subscriptions + .insert(subscription_id, subscription); + } + methods::MethodCall::state_subscribeRuntimeVersion {} => { + let mut subscription = request.accept(); + let subscription_id = subscription.subscription_id().to_owned(); + let to_send = if let Subscription::Active { + current_best_block, + pinned_blocks, + .. + } = &task.subscription + { + Some(convert_runtime_version( + &pinned_blocks + .get(current_best_block) + .unwrap() + .runtime_version, + )) + } else { + None + }; + if let Some(to_send) = to_send { + subscription + .send_notification(methods::ServerToClient::state_runtimeVersion { + subscription: (&subscription_id).into(), + result: to_send, + }) + .await; + } + task.runtime_version_subscriptions + .insert(subscription_id, subscription); + } + methods::MethodCall::state_subscribeStorage { list } => { + // TODO: limit the size of `list` to avoid DoS attacks + if list.is_empty() { + // When the list of keys is empty, that means we want to subscribe to *all* + // storage changes. It is not possible to reasonably implement this in a + // light client. + request.fail(json_rpc::parse::ErrorResponse::ServerError( + -32000, + "Subscribing to all storage changes isn't supported", + )); + continue; + } + + let subscription = request.accept(); + let subscription_id = subscription.subscription_id().to_owned(); + task.stale_storage_subscriptions + .insert(subscription_id.clone()); + for key in &list { + task.storage_subscriptions_by_key + .entry(key.0.clone()) + .or_default() + .insert(subscription_id.clone()); + } + task.storage_subscriptions.insert( + subscription_id, + (subscription, list.into_iter().map(|l| l.0).collect()), + ); + } + + // Any other request. + _ => unreachable!(), // TODO: stronger typing to avoid this? + }, + + // JSON-RPC client has unsubscribed. + WhatHappened::Message(Message::SubscriptionDestroyed { subscription_id }) => { + // We don't know the type of the unsubscription, that's not a big deal. Just + // remove the entry from everywhere. + task.all_heads_subscriptions.remove(&subscription_id); + task.new_heads_subscriptions.remove(&subscription_id); + task.finalized_heads_subscriptions.remove(&subscription_id); + task.runtime_version_subscriptions.remove(&subscription_id); + if let Some((_, keys)) = task.storage_subscriptions.remove(&subscription_id) { + for key in keys { + let hashbrown::hash_map::Entry::Occupied(mut entry) = task.storage_subscriptions_by_key.entry(key) + else { unreachable!() }; + let _was_in = entry.get_mut().remove(&subscription_id); + debug_assert!(_was_in); + if entry.get().is_empty() { + entry.remove(); + } + } + } + task.stale_storage_subscriptions.remove(&subscription_id); + // TODO: shrink_to_fit? + } + + WhatHappened::Message(Message::RecentBlockRuntimeAccess { + block_hash, + result_tx, + }) => { + let subscription_id_with_block = if let Subscription::Active { + pinned_blocks: recent_pinned_blocks, + subscription, + .. + } = &task.subscription + { + if recent_pinned_blocks.contains_key(&block_hash) { + // The runtime service has the block pinned, meaning that we can ask the runtime + // service to perform the call. + Some(subscription.id()) + } else { + None + } + } else { + None + }; + + let access = if let Some(subscription_id) = subscription_id_with_block { + task.runtime_service + .pinned_block_runtime_access(subscription_id, &block_hash) + .await + .ok() + } else { + None + }; + + let _ = result_tx.send(access); + } + + WhatHappened::Message(Message::CurrentBestBlockHash { result_tx }) => { + match &task.subscription { + Subscription::Active { + current_best_block, .. + } => { + let _ = result_tx.send(*current_best_block); + } + Subscription::Pending(_) | Subscription::NotCreated => { + task.best_block_report.push(result_tx); + } + } + } + + WhatHappened::Message(Message::BlockNumber { + block_hash, + result_tx, + }) => { + if let Some(future) = task.block_state_root_hashes_numbers.get_mut(&block_hash) { + let _ = future.now_or_never(); + } + + let block_number = if let Subscription::Active { + pinned_blocks: recent_pinned_blocks, + .. + } = &mut task.subscription + { + match ( + recent_pinned_blocks.get(&block_hash).map(|b| { + header::decode( + &b.scale_encoded_header, + task.runtime_service.block_number_bytes(), + ) + }), + task.block_state_root_hashes_numbers.get(&block_hash), + ) { + (Some(Ok(header)), _) => Some(header.number), + (_, Some(future::MaybeDone::Done(Ok((_, num))))) => Some(*num), + _ => None, + } + } else { + None + }; + + let _ = result_tx.send(block_number); + } + + WhatHappened::Message(Message::BlockHeader { + block_hash, + result_tx, + }) => { + let header = if let Subscription::Active { + pinned_blocks: recent_pinned_blocks, + .. + } = &mut task.subscription + { + if let Some(block) = recent_pinned_blocks.get(&block_hash) { + Some(block.scale_encoded_header.clone()) + } else { + None + } + } else { + None + }; + + let _ = result_tx.send(header); + } + + WhatHappened::Message(Message::BlockStateRootAndNumber { + block_hash, + result_tx, + }) => { + let fetch = { + // Try to find an existing entry in cache, and if not create one. + + // Look in `recent_pinned_blocks`. + if let Subscription::Active { + pinned_blocks: recent_pinned_blocks, + .. + } = &mut task.subscription + { + match recent_pinned_blocks.get(&block_hash).map(|b| { + header::decode( + &b.scale_encoded_header, + task.runtime_service.block_number_bytes(), + ) + }) { + Some(Ok(header)) => { + let _ = result_tx.send(Ok((*header.state_root, header.number))); + continue; + } + Some(Err(err)) => { + let _ = result_tx + .send(Err(StateTrieRootHashError::HeaderDecodeError(err))); + continue; + } // TODO: can this actually happen? unclear + None => {} + } + } + + // Look in `block_state_root_hashes`. + match task.block_state_root_hashes_numbers.get(&block_hash) { + Some(future::MaybeDone::Done(Ok(val))) => { + let _ = result_tx.send(Ok(*val)); + continue; + } + Some(future::MaybeDone::Future(f)) => f.clone(), + Some(future::MaybeDone::Gone) => unreachable!(), // We never use `Gone`. + Some(future::MaybeDone::Done(Err( + err @ StateTrieRootHashError::HeaderDecodeError(_), + ))) => { + // In case of a fatal error, return immediately. + let _ = result_tx.send(Err(err.clone())); + continue; + } + Some(future::MaybeDone::Done(Err( + StateTrieRootHashError::NetworkQueryError, + ))) + | None => { + // No existing cache entry. Create the future that will perform the fetch + // but do not actually start doing anything now. + let fetch = { + let sync_service = task.sync_service.clone(); + async move { + // The sync service knows which peers are potentially aware of + // this block. + let result = sync_service + .clone() + .block_query_unknown_number( + block_hash, + protocol::BlocksRequestFields { + header: true, + body: false, + justifications: false, + }, + 4, + Duration::from_secs(8), + NonZeroU32::new(2).unwrap(), + ) + .await; + + if let Ok(block) = result { + // If successful, the `block_query` function guarantees that the + // header is present and valid. + let header = block.header.unwrap(); + debug_assert_eq!( + header::hash_from_scale_encoded_header(&header), + block_hash + ); + let decoded = header::decode( + &header, + sync_service.block_number_bytes(), + ) + .unwrap(); + Ok((*decoded.state_root, decoded.number)) + } else { + // TODO: better error details? + Err(StateTrieRootHashError::NetworkQueryError) + } + } + }; + + // Insert the future in the cache, so that any other call will use the same + // future. + let wrapped = (Box::pin(fetch) + as Pin + Send>>) + .shared(); + task.block_state_root_hashes_numbers + .put(block_hash, future::maybe_done(wrapped.clone())); + wrapped + } + } + }; + + // We await separately to be certain that the lock isn't held anymore. + // TODO: crappy design + task.platform + .spawn_task("dummy-adapter".into(), async move { + let outcome = fetch.await; + let _ = result_tx.send(outcome); + }); + } + + // Background task dedicated to performing a storage query for the storage + // subscription has finished. + WhatHappened::Message(Message::StorageFetch { + block_hash, + result: Ok(result), + }) => { + debug_assert!(task.storage_query_in_progress); + task.storage_query_in_progress = false; + + // Determine whether another storage query targeting a more up-to-date block + // must be started afterwards. + let is_up_to_date = match task.subscription { + Subscription::Active { + current_best_block, .. + } => current_best_block == block_hash, + Subscription::NotCreated | Subscription::Pending(_) => true, + }; + + // Because all the keys of all the subscriptions are merged into one network + // request, we must now attribute each item in the result back to its subscription. + // While this solution is a bit CPU-heavy, it is a more elegant solution than + // keeping track of subscription in the background task. + let mut notifications_to_send = hashbrown::HashMap::< + String, + Vec<(methods::HexString, Option)>, + _, + >::with_capacity_and_hasher( + task.storage_subscriptions.len(), + fnv::FnvBuildHasher::default(), + ); + for item in result { + let sync_service::StorageResultItem::Value { key, value } = item + else { unreachable!() }; + for subscription_id in task + .storage_subscriptions_by_key + .get(&key) + .into_iter() + .flat_map(|list| list.iter()) + { + notifications_to_send + .entry_ref(subscription_id) + .or_insert_with(Vec::new) + .push(( + methods::HexString(key.clone()), + value.clone().map(methods::HexString), + )); + } + } + + // Send the notifications and mark the subscriptions as no longer stale if + // relevant. + for (subscription_id, changes) in notifications_to_send { + if is_up_to_date { + task.stale_storage_subscriptions.remove(&subscription_id); + } + task.storage_subscriptions + .get_mut(&subscription_id) + .unwrap() + .0 + .send_notification(methods::ServerToClient::state_storage { + subscription: subscription_id.into(), + result: methods::StorageChangeSet { + block: methods::HashHexString(block_hash), + changes, + }, + }) + .await; + } + } + + // Background task dedicated to performing a storage query for the storage + // subscription has finished but was unsuccessful. + WhatHappened::Message(Message::StorageFetch { result: Err(_), .. }) => { + debug_assert!(task.storage_query_in_progress); + task.storage_query_in_progress = false; + // TODO: add a delay or something? + } + + // JSON-RPC service has been destroyed. Stop the task altogether. + WhatHappened::ForegroundDead => { + return; + } + + // The subscription towards the runtime service needs to be renewed. + WhatHappened::SubscriptionDead => { + // The buffer size should be large enough so that, if the CPU is busy, it + // doesn't become full before the execution of this task resumes. + // The maximum number of pinned block is ignored, as this maximum is a way to + // avoid malicious behaviors. This code is by definition not considered + // malicious. + let runtime_service = task.runtime_service.clone(); + task.subscription = Subscription::Pending(Box::pin(async move { + runtime_service + .subscribe_all( + "json-rpc-blocks-cache", + 32, + NonZeroUsize::new(usize::max_value()).unwrap(), + ) + .await + })); + } + } + } +} + +fn convert_runtime_version( + runtime: &Arc>, +) -> Option { + if let Ok(runtime_spec) = &**runtime { + let runtime_spec = runtime_spec.decode(); + Some(methods::RuntimeVersion { + spec_name: runtime_spec.spec_name.into(), + impl_name: runtime_spec.impl_name.into(), + authoring_version: u64::from(runtime_spec.authoring_version), + spec_version: u64::from(runtime_spec.spec_version), + impl_version: u64::from(runtime_spec.impl_version), + transaction_version: runtime_spec.transaction_version.map(u64::from), + state_version: runtime_spec.state_version.map(u8::from).map(u64::from), + apis: runtime_spec + .apis + .map(|api| (methods::HexString(api.name_hash.to_vec()), api.version)) + .collect(), + }) + } else { + None + } +} diff --git a/light-base/src/json_rpc_service/background/state_chain.rs b/light-base/src/json_rpc_service/background/state_chain.rs index cf64b14ec8..0950eded95 100644 --- a/light-base/src/json_rpc_service/background/state_chain.rs +++ b/light-base/src/json_rpc_service/background/state_chain.rs @@ -17,28 +17,19 @@ //! All legacy JSON-RPC method handlers that relate to the chain or the storage. -use super::{Background, GetKeysPagedCacheKey, PlatformRef}; +use super::{legacy_state_sub, Background, GetKeysPagedCacheKey, PlatformRef}; -use crate::{runtime_service, sync_service}; +use crate::sync_service; -use alloc::{borrow::ToOwned as _, format, string::ToString as _, sync::Arc, vec, vec::Vec}; -use async_lock::MutexGuard; -use core::{ - iter, - num::{NonZeroU32, NonZeroUsize}, - pin, - time::Duration, -}; -use futures_util::{future, stream, FutureExt as _, StreamExt as _}; +use alloc::{format, string::ToString as _, sync::Arc, vec, vec::Vec}; +use core::{iter, num::NonZeroU32, time::Duration}; +use futures_channel::oneshot; use smoldot::{ header, - informant::HashDisplay, json_rpc::{self, methods, service}, network::protocol, }; -mod sub_utils; - impl Background { /// Handles a call to [`methods::MethodCall::system_accountNextIndex`]. pub(super) async fn account_next_index(self: &Arc, request: service::RequestProcess) { @@ -46,9 +37,16 @@ impl Background { unreachable!() }; - let block_hash = header::hash_from_scale_encoded_header( - sub_utils::subscribe_best(&self.runtime_service).await.0, - ); + let block_hash = { + let (tx, rx) = oneshot::channel(); + self.to_legacy + .lock() + .await + .send(legacy_state_sub::Message::CurrentBestBlockHash { result_tx: tx }) + .await + .unwrap(); + rx.await.unwrap() + }; let result = self .runtime_call( @@ -95,9 +93,16 @@ impl Background { // `hash` equal to `None` means "the current best block". let hash = match hash { Some(h) => h.0, - None => header::hash_from_scale_encoded_header( - sub_utils::subscribe_best(&self.runtime_service).await.0, - ), + None => { + let (tx, rx) = oneshot::channel(); + self.to_legacy + .lock() + .await + .send(legacy_state_sub::Message::CurrentBestBlockHash { result_tx: tx }) + .await + .unwrap(); + rx.await.unwrap() + } }; // Try to determine the block number by looking for the block in cache. @@ -105,24 +110,17 @@ impl Background { // knowing it will lead to a better selection of peers, and thus increase the chances of // the requests succeeding. let block_number = { - let mut cache_lock = self.cache.lock().await; - let cache_lock = &mut *cache_lock; - - if let Some(future) = cache_lock.block_state_root_hashes_numbers.get_mut(&hash) { - let _ = future.now_or_never(); - } - - match ( - cache_lock - .recent_pinned_blocks - .get(&hash) - .map(|h| header::decode(h, self.sync_service.block_number_bytes())), - cache_lock.block_state_root_hashes_numbers.get(&hash), - ) { - (Some(Ok(header)), _) => Some(header.number), - (_, Some(future::MaybeDone::Done(Ok((_, num))))) => Some(*num), - _ => None, - } + let (tx, rx) = oneshot::channel(); + self.to_legacy + .lock() + .await + .send(legacy_state_sub::Message::BlockNumber { + block_hash: hash, + result_tx: tx, + }) + .await + .unwrap(); + rx.await.unwrap() }; // Block bodies and justifications aren't stored locally. Ask the network. @@ -198,9 +196,17 @@ impl Background { methods::HashHexString(self.genesis_block_hash), )), None => { - let best_block = header::hash_from_scale_encoded_header( - sub_utils::subscribe_best(&self.runtime_service).await.0, - ); + let best_block = { + let (tx, rx) = oneshot::channel(); + self.to_legacy + .lock() + .await + .send(legacy_state_sub::Message::CurrentBestBlockHash { result_tx: tx }) + .await + .unwrap(); + rx.await.unwrap() + }; + request.respond(methods::Response::chain_getBlockHash( methods::HashHexString(best_block), )); @@ -224,39 +230,57 @@ impl Background { // `hash` equal to `None` means "best block". let hash = match hash { Some(h) => h.0, - None => header::hash_from_scale_encoded_header( - sub_utils::subscribe_best(&self.runtime_service).await.0, - ), + None => { + let (tx, rx) = oneshot::channel(); + self.to_legacy + .lock() + .await + .send(legacy_state_sub::Message::CurrentBestBlockHash { result_tx: tx }) + .await + .unwrap(); + rx.await.unwrap() + } }; // Try to look in the cache of recent blocks. If not found, ask the peer-to-peer network. // `header` is `Err` if and only if the network request failed. let scale_encoded_header = { - let mut cache_lock = self.cache.lock().await; - if let Some(header) = cache_lock.recent_pinned_blocks.get(&hash) { - Ok(header.clone()) + let from_cache = { + let (tx, rx) = oneshot::channel(); + self.to_legacy + .lock() + .await + .send(legacy_state_sub::Message::BlockHeader { + block_hash: hash, + result_tx: tx, + }) + .await + .unwrap(); + rx.await.unwrap() + }; + + if let Some(header) = from_cache { + Ok(header) } else { // Header isn't known locally. We need to ask the network. // First, try to determine the block number by looking into the cache. // The request can be fulfilled no matter whether it is found, but knowing it will // lead to a better selection of peers, and thus increase the chances of the // requests succeeding. - let block_number = if let Some(future) = - cache_lock.block_state_root_hashes_numbers.get_mut(&hash) - { - let _ = future.now_or_never(); - - match future { - future::MaybeDone::Done(Ok((_, num))) => Some(*num), - _ => None, - } - } else { - None + let block_number = { + let (tx, rx) = oneshot::channel(); + self.to_legacy + .lock() + .await + .send(legacy_state_sub::Message::BlockNumber { + block_hash: hash, + result_tx: tx, + }) + .await + .unwrap(); + rx.await.unwrap() }; - // Release the lock as we're going to start a long asynchronous operation. - drop::>(cache_lock); - // Actual network query. let result = if let Some(block_number) = block_number { self.sync_service @@ -327,276 +351,6 @@ impl Background { } } - /// Handles a call to [`methods::MethodCall::chain_subscribeAllHeads`]. - pub(super) async fn chain_subscribe_all_heads( - self: &Arc, - request: service::SubscriptionStartProcess, - ) { - let methods::MethodCall::chain_subscribeAllHeads {} = request.request() else { - unreachable!() - }; - - self.platform - .spawn_task(format!("{}-subscribe-all-heads", self.log_target).into(), { - let log_target = self.log_target.clone(); - let sync_service = self.sync_service.clone(); - let runtime_service = self.runtime_service.clone(); - - async move { - let mut subscription = request.accept(); - let subscription_id = subscription.subscription_id().to_owned(); - - 'main_sub_loop: loop { - let mut new_blocks = { - // The buffer size should be large enough so that, if the CPU is busy, it - // doesn't become full before the execution of the runtime service resumes. - // The maximum number of pinned block is ignored, as this maximum is a way - // to avoid malicious behaviors. This code is by definition not considered - // malicious. - let subscribe_all = runtime_service - .subscribe_all( - "chain_subscribeAllHeads", - 64, - NonZeroUsize::new(usize::max_value()).unwrap(), - ) - .await; - - // The existing finalized and already-known blocks aren't reported to the - // user, but we need to unpin them on to the runtime service. - subscribe_all - .new_blocks - .unpin_block(&header::hash_from_scale_encoded_header( - &subscribe_all.finalized_block_scale_encoded_header, - )) - .await; - for block in subscribe_all.non_finalized_blocks_ancestry_order { - subscribe_all - .new_blocks - .unpin_block(&header::hash_from_scale_encoded_header( - &block.scale_encoded_header, - )) - .await; - } - - subscribe_all.new_blocks - }; - - loop { - let message = { - let next_new_block = pin::pin!(new_blocks.next()); - let next_message = pin::pin!(subscription.wait_until_stale()); - match future::select(next_new_block, next_message).await { - future::Either::Left((v, _)) => either::Left(v), - future::Either::Right((v, _)) => either::Right(v), - } - }; - - match message { - either::Left(None) => { - // Break from the inner loop in order to recreate the channel. - break; - } - either::Left(Some(runtime_service::Notification::Block(block))) => { - new_blocks - .unpin_block(&header::hash_from_scale_encoded_header( - &block.scale_encoded_header, - )) - .await; - - let header = match methods::Header::from_scale_encoded_header( - &block.scale_encoded_header, - sync_service.block_number_bytes(), - ) { - Ok(h) => h, - Err(error) => { - log::warn!( - target: &log_target, - "`chain_subscribeAllHeads` subscription has skipped \ - block due to undecodable header. Hash: {}. Error: {}", - HashDisplay(&header::hash_from_scale_encoded_header( - &block.scale_encoded_header - )), - error, - ); - continue; - } - }; - - subscription - .send_notification(methods::ServerToClient::chain_allHead { - subscription: (&subscription_id).into(), - result: header, - }) - .await; - } - either::Left(Some( - runtime_service::Notification::BestBlockChanged { .. }, - )) - | either::Left(Some(runtime_service::Notification::Finalized { - .. - })) => {} - either::Right(()) => { - break 'main_sub_loop; - } - } - } - } - } - }); - } - - /// Handles a call to [`methods::MethodCall::chain_subscribeFinalizedHeads`]. - pub(super) async fn chain_subscribe_finalized_heads( - self: &Arc, - request: service::SubscriptionStartProcess, - ) { - let methods::MethodCall::chain_subscribeFinalizedHeads {} = request.request() else { - unreachable!() - }; - - let mut blocks_list = { - let (finalized_block_header, finalized_blocks_subscription) = - sub_utils::subscribe_finalized(&self.runtime_service).await; - stream::once(future::ready(finalized_block_header)).chain(finalized_blocks_subscription) - }; - - self.platform.spawn_task( - format!("{}-subscribe-finalized-heads", self.log_target).into(), - { - let log_target = self.log_target.clone(); - let sync_service = self.sync_service.clone(); - - async move { - let mut subscription = request.accept(); - let subscription_id = subscription.subscription_id().to_owned(); - - loop { - let event = { - let unsubscribed = pin::pin!(subscription.wait_until_stale()); - match future::select(blocks_list.next(), unsubscribed).await { - future::Either::Left((ev, _)) => either::Left(ev), - future::Either::Right((ev, _)) => either::Right(ev), - } - }; - - match event { - either::Left(None) => { - // Stream returned by `subscribe_finalized` is always unlimited. - unreachable!() - } - either::Left(Some(header)) => { - let header = match methods::Header::from_scale_encoded_header( - &header, - sync_service.block_number_bytes(), - ) { - Ok(h) => h, - Err(error) => { - log::warn!( - target: &log_target, - "`chain_subscribeFinalizedHeads` subscription has skipped \ - block due to undecodable header. Hash: {}. Error: {}", - HashDisplay(&header::hash_from_scale_encoded_header( - &header - )), - error, - ); - continue; - } - }; - - subscription - .send_notification( - methods::ServerToClient::chain_finalizedHead { - subscription: (&subscription_id).into(), - result: header, - }, - ) - .await; - } - either::Right(()) => { - break; - } - } - } - } - }, - ); - } - - /// Handles a call to [`methods::MethodCall::chain_subscribeNewHeads`]. - pub(super) async fn chain_subscribe_new_heads( - self: &Arc, - request: service::SubscriptionStartProcess, - ) { - let methods::MethodCall::chain_subscribeNewHeads {} = request.request() else { - unreachable!() - }; - - let mut blocks_list = { - let (block_header, blocks_subscription) = - sub_utils::subscribe_best(&self.runtime_service).await; - stream::once(future::ready(block_header)).chain(blocks_subscription) - }; - - self.platform - .spawn_task(format!("{}-subscribe-new-heads", self.log_target).into(), { - let log_target = self.log_target.clone(); - let sync_service = self.sync_service.clone(); - - async move { - let mut subscription = request.accept(); - let subscription_id = subscription.subscription_id().to_owned(); - - loop { - let event = { - let unsubscribed = pin::pin!(subscription.wait_until_stale()); - match future::select(blocks_list.next(), unsubscribed).await { - future::Either::Left((ev, _)) => either::Left(ev), - future::Either::Right((ev, _)) => either::Right(ev), - } - }; - - match event { - either::Left(None) => { - // Stream returned by `subscribe_best` is always unlimited. - unreachable!() - } - either::Left(Some(header)) => { - let header = match methods::Header::from_scale_encoded_header( - &header, - sync_service.block_number_bytes(), - ) { - Ok(h) => h, - Err(error) => { - log::warn!( - target: &log_target, - "`chain_subscribeNewHeads` subscription has skipped block \ - due to undecodable header. Hash: {}. Error: {}", - HashDisplay(&header::hash_from_scale_encoded_header( - &header - )), - error, - ); - continue; - } - }; - - subscription - .send_notification(methods::ServerToClient::chain_newHead { - subscription: (&subscription_id).into(), - result: header, - }) - .await; - } - either::Right(()) => { - break; - } - } - } - } - }); - } - /// Handles a call to [`methods::MethodCall::payment_queryInfo`]. pub(super) async fn payment_query_info(self: &Arc, request: service::RequestProcess) { let methods::MethodCall::payment_queryInfo { @@ -609,9 +363,16 @@ impl Background { let block_hash = match block_hash { Some(h) => h.0, - None => header::hash_from_scale_encoded_header( - sub_utils::subscribe_best(&self.runtime_service).await.0, - ), + None => { + let (tx, rx) = oneshot::channel(); + self.to_legacy + .lock() + .await + .send(legacy_state_sub::Message::CurrentBestBlockHash { result_tx: tx }) + .await + .unwrap(); + rx.await.unwrap() + } }; let result = self @@ -667,9 +428,14 @@ impl Background { let block_hash = if let Some(hash) = hash { hash.0 } else { - header::hash_from_scale_encoded_header( - sub_utils::subscribe_best(&self.runtime_service).await.0, - ) + let (tx, rx) = oneshot::channel(); + self.to_legacy + .lock() + .await + .send(legacy_state_sub::Message::CurrentBestBlockHash { result_tx: tx }) + .await + .unwrap(); + rx.await.unwrap() }; let result = self @@ -703,21 +469,41 @@ impl Background { // `hash` equal to `None` means "best block". let hash = match hash { Some(h) => h.0, - None => header::hash_from_scale_encoded_header( - sub_utils::subscribe_best(&self.runtime_service).await.0, - ), + None => { + let (tx, rx) = oneshot::channel(); + self.to_legacy + .lock() + .await + .send(legacy_state_sub::Message::CurrentBestBlockHash { result_tx: tx }) + .await + .unwrap(); + rx.await.unwrap() + } }; // Obtain the state trie root and height of the requested block. // This is necessary to perform network storage queries. - let (state_root, block_number) = match self.state_trie_root_hash(&hash).await { - Ok(v) => v, - Err(err) => { - request.fail(json_rpc::parse::ErrorResponse::ServerError( - -32000, - &format!("Failed to fetch block information: {err}"), - )); - return; + let (state_root, block_number) = { + let (tx, rx) = oneshot::channel(); + self.to_legacy + .lock() + .await + .send(legacy_state_sub::Message::BlockStateRootAndNumber { + block_hash: hash, + result_tx: tx, + }) + .await + .unwrap(); + + match rx.await.unwrap() { + Ok(v) => v, + Err(err) => { + request.fail(json_rpc::parse::ErrorResponse::ServerError( + -32000, + &format!("Failed to fetch block information: {err}"), + )); + return; + } } }; @@ -773,9 +559,16 @@ impl Background { // `hash` equal to `None` means "best block". let hash = match hash { Some(h) => h.0, - None => header::hash_from_scale_encoded_header( - sub_utils::subscribe_best(&self.runtime_service).await.0, - ), + None => { + let (tx, rx) = oneshot::channel(); + self.to_legacy + .lock() + .await + .send(legacy_state_sub::Message::CurrentBestBlockHash { result_tx: tx }) + .await + .unwrap(); + rx.await.unwrap() + } }; // A prefix of `None` means "empty". @@ -785,10 +578,9 @@ impl Background { // same parameters, we store the untruncated responses in a cache. Check if we hit the // cache. if let Some(keys) = - self.cache + self.state_get_keys_paged_cache .lock() .await - .state_get_keys_paged .get(&GetKeysPagedCacheKey { hash, prefix: prefix.clone(), @@ -808,14 +600,27 @@ impl Background { // Obtain the state trie root and height of the requested block. // This is necessary to perform network storage queries. - let (state_root, block_number) = match self.state_trie_root_hash(&hash).await { - Ok(v) => v, - Err(err) => { - request.fail(json_rpc::parse::ErrorResponse::ServerError( - -32000, - &format!("Failed to fetch block information: {err}"), - )); - return; + let (state_root, block_number) = { + let (tx, rx) = oneshot::channel(); + self.to_legacy + .lock() + .await + .send(legacy_state_sub::Message::BlockStateRootAndNumber { + block_hash: hash, + result_tx: tx, + }) + .await + .unwrap(); + + match rx.await.unwrap() { + Ok(v) => v, + Err(err) => { + request.fail(json_rpc::parse::ErrorResponse::ServerError( + -32000, + &format!("Failed to fetch block information: {err}"), + )); + return; + } } }; @@ -859,10 +664,9 @@ impl Background { // JSON-RPC client will call the function again with the exact same parameters. // Thus, store the results in a cache. if out.len() != keys.len() { - self.cache + self.state_get_keys_paged_cache .lock() .await - .state_get_keys_paged .push(GetKeysPagedCacheKey { hash, prefix }, keys); } @@ -884,9 +688,14 @@ impl Background { let block_hash = if let Some(hash) = hash { hash.0 } else { - header::hash_from_scale_encoded_header( - sub_utils::subscribe_best(&self.runtime_service).await.0, - ) + let (tx, rx) = oneshot::channel(); + self.to_legacy + .lock() + .await + .send(legacy_state_sub::Message::CurrentBestBlockHash { result_tx: tx }) + .await + .unwrap(); + rx.await.unwrap() }; let result = self @@ -939,9 +748,16 @@ impl Background { let block_hash = match block_hash { Some(h) => h.0, - None => header::hash_from_scale_encoded_header( - sub_utils::subscribe_best(&self.runtime_service).await.0, - ), + None => { + let (tx, rx) = oneshot::channel(); + self.to_legacy + .lock() + .await + .send(legacy_state_sub::Message::CurrentBestBlockHash { result_tx: tx }) + .await + .unwrap(); + rx.await.unwrap() + } }; match self @@ -984,12 +800,19 @@ impl Background { unreachable!() }; - let hash = hash - .as_ref() - .map(|h| h.0) - .unwrap_or(header::hash_from_scale_encoded_header( - sub_utils::subscribe_best(&self.runtime_service).await.0, - )); + let hash = match hash { + Some(h) => h.0, + None => { + let (tx, rx) = oneshot::channel(); + self.to_legacy + .lock() + .await + .send(legacy_state_sub::Message::CurrentBestBlockHash { result_tx: tx }) + .await + .unwrap(); + rx.await.unwrap() + } + }; let fut = self.storage_query( iter::once(&key.0), @@ -1017,11 +840,16 @@ impl Background { unreachable!() }; - let best_block = header::hash_from_scale_encoded_header( - &sub_utils::subscribe_best(&self.runtime_service).await.0, - ); - - let cache = self.cache.lock().await; + let best_block = { + let (tx, rx) = oneshot::channel(); + self.to_legacy + .lock() + .await + .send(legacy_state_sub::Message::CurrentBestBlockHash { result_tx: tx }) + .await + .unwrap(); + rx.await.unwrap() + }; let at = at.as_ref().map(|h| h.0).unwrap_or(best_block); @@ -1030,8 +858,6 @@ impl Background { changes: Vec::new(), }; - drop(cache); - let fut = self.storage_query( keys.iter(), &at, @@ -1048,263 +874,4 @@ impl Background { request.respond(methods::Response::state_queryStorageAt(vec![out])); } - - /// Handles a call to [`methods::MethodCall::state_subscribeRuntimeVersion`]. - pub(super) async fn state_subscribe_runtime_version( - self: &Arc, - request: service::SubscriptionStartProcess, - ) { - let methods::MethodCall::state_subscribeRuntimeVersion {} = request.request() else { - unreachable!() - }; - - let runtime_service = self.runtime_service.clone(); - - self.platform.spawn_task( - format!("{}-subscribe-runtime-version", self.log_target).into(), - async move { - let mut subscription = request.accept(); - let subscription_id = subscription.subscription_id().to_owned(); - - let (current_spec, spec_changes) = - sub_utils::subscribe_runtime_version(&runtime_service).await; - let mut spec_changes = - pin::pin!(stream::iter(iter::once(current_spec)).chain(spec_changes)); - - loop { - let event = { - let unsubscribed = pin::pin!(subscription.wait_until_stale()); - match future::select(spec_changes.next(), unsubscribed).await { - future::Either::Left((ev, _)) => either::Left(ev), - future::Either::Right((ev, _)) => either::Right(ev), - } - }; - - match event { - either::Left(None) => { - // Stream returned by `subscribe_runtime_version` is always unlimited. - unreachable!() - } - either::Left(Some(new_runtime)) => { - if let Ok(runtime_spec) = new_runtime { - let runtime_spec = runtime_spec.decode(); - subscription - .send_notification( - methods::ServerToClient::state_runtimeVersion { - subscription: (&subscription_id).into(), - result: Some(methods::RuntimeVersion { - spec_name: runtime_spec.spec_name.into(), - impl_name: runtime_spec.impl_name.into(), - authoring_version: u64::from( - runtime_spec.authoring_version, - ), - spec_version: u64::from(runtime_spec.spec_version), - impl_version: u64::from(runtime_spec.impl_version), - transaction_version: runtime_spec - .transaction_version - .map(u64::from), - state_version: runtime_spec - .state_version - .map(u8::from) - .map(u64::from), - apis: runtime_spec - .apis - .map(|api| { - ( - methods::HexString( - api.name_hash.to_vec(), - ), - api.version, - ) - }) - .collect(), - }), - }, - ) - .await; - } else { - subscription - .send_notification( - methods::ServerToClient::state_runtimeVersion { - subscription: (&subscription_id).into(), - result: None, - }, - ) - .await; - } - } - either::Right(()) => { - break; - } - } - } - }, - ); - } - - /// Handles a call to [`methods::MethodCall::state_subscribeStorage`]. - pub(super) async fn state_subscribe_storage( - self: &Arc, - request: service::SubscriptionStartProcess, - ) { - let methods::MethodCall::state_subscribeStorage { list } = request.request() else { - unreachable!() - }; - - if list.is_empty() { - // When the list of keys is empty, that means we want to subscribe to *all* - // storage changes. It is not possible to reasonably implement this in a - // light client. - request.fail(json_rpc::parse::ErrorResponse::ServerError( - -32000, - "Subscribing to all storage changes isn't supported", - )); - return; - } - - // Build a stream of `methods::StorageChangeSet` items to send back to the user. - let storage_updates = { - let known_values = (0..list.len()).map(|_| None).collect::>(); - let runtime_service = self.runtime_service.clone(); - let sync_service = self.sync_service.clone(); - let log_target = self.log_target.clone(); - - stream::unfold( - (None, list, known_values), - move |(mut blocks_stream, list, mut known_values)| { - let sync_service = sync_service.clone(); - let runtime_service = runtime_service.clone(); - let log_target = log_target.clone(); - async move { - loop { - if blocks_stream.is_none() { - // TODO: why is this done against the runtime_service and not the sync_service? clarify - let (block_header, blocks_subscription) = - sub_utils::subscribe_best(&runtime_service).await; - blocks_stream = Some( - stream::once(future::ready(block_header)) - .chain(blocks_subscription), - ); - } - - let block = match blocks_stream.as_mut().unwrap().next().await { - Some(b) => b, - None => { - blocks_stream = None; - continue; - } - }; - - let block_hash = header::hash_from_scale_encoded_header(&block); - let (state_trie_root, block_number) = { - let decoded = - header::decode(&block, sync_service.block_number_bytes()) - .unwrap(); - (decoded.state_root, decoded.number) - }; - - let mut out = methods::StorageChangeSet { - block: methods::HashHexString(block_hash), - changes: Vec::new(), - }; - - for (key_index, key) in list.iter().enumerate() { - // TODO: parallelism? - match sync_service - .clone() - .storage_query( - block_number, - &block_hash, - state_trie_root, - iter::once(sync_service::StorageRequestItem { - key: key.0.clone(), - ty: sync_service::StorageRequestItemTy::Value, - }), - 4, - Duration::from_secs(12), - NonZeroU32::new(2).unwrap(), - ) - .await - { - Ok(mut values) => { - let Some(sync_service::StorageResultItem::Value { - value, - .. - }) = values.pop() - else { - unreachable!() - }; - match &mut known_values[key_index] { - Some(v) if *v == value => {} - v => { - *v = Some(value.clone()); - out.changes.push(( - key.clone(), - value.map(methods::HexString), - )); - } - } - } - Err(error) => { - log::log!( - target: &log_target, - if error.is_network_problem() { - log::Level::Debug - } else { - log::Level::Warn - }, - "state_subscribeStorage changes check failed: {}", - error - ); - } - } - } - - if !out.changes.is_empty() { - return Some((out, (blocks_stream, list, known_values))); - } - } - } - }, - ) - }; - - self.platform.spawn_task( - format!("{}-subscribe-storage", self.log_target).into(), - async move { - let mut subscription = request.accept(); - let subscription_id = subscription.subscription_id().to_owned(); - - let mut storage_updates = pin::pin!(storage_updates); - - loop { - let event = { - let unsubscribed = pin::pin!(subscription.wait_until_stale()); - match future::select(storage_updates.next(), unsubscribed).await { - future::Either::Left((ev, _)) => either::Left(ev), - future::Either::Right((ev, _)) => either::Right(ev), - } - }; - - match event { - either::Left(None) => { - // Stream created above is always unlimited. - unreachable!() - } - either::Left(Some(changes)) => { - subscription - .send_notification(methods::ServerToClient::state_storage { - subscription: (&subscription_id).into(), - result: changes, - }) - .await; - } - either::Right(()) => { - break; - } - } - } - }, - ) - } } diff --git a/light-base/src/json_rpc_service/background/state_chain/sub_utils.rs b/light-base/src/json_rpc_service/background/state_chain/sub_utils.rs deleted file mode 100644 index c50daa140a..0000000000 --- a/light-base/src/json_rpc_service/background/state_chain/sub_utils.rs +++ /dev/null @@ -1,454 +0,0 @@ -// Smoldot -// Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. -// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -//! This module contains useful features built on top of the [`RuntimeService`] that are only used -//! by the JSON-RPC service. - -use crate::{ - platform::PlatformRef, - runtime_service::{Notification, RuntimeError, RuntimeService}, -}; - -use alloc::{sync::Arc, vec::Vec}; -use core::num::NonZeroUsize; -use futures_util::{future, stream, StreamExt as _}; -use smoldot::{executor, header}; - -/// Returns the current runtime version, plus an unlimited stream that produces one item every -/// time the specs of the runtime of the best block are changed. -/// -/// The future returned by this function waits until the runtime is available. This can take -/// a long time. -/// -/// The stream can generate an `Err` if the runtime in the best block is invalid. -/// -/// The stream is infinite. In other words it is guaranteed to never return `None`. -pub async fn subscribe_runtime_version( - runtime_service: &Arc>, -) -> ( - Result, - stream::BoxStream<'static, Result>, -) { - let mut master_stream = stream::unfold(runtime_service.clone(), |runtime_service| async move { - let subscribe_all = runtime_service - .subscribe_all("subscribe-runtime-version", 16, NonZeroUsize::new(24).unwrap()) - .await; - - // Map of runtimes by hash. Contains all non-finalized blocks, plus the current finalized - // block. - let mut headers = hashbrown::HashMap::< - [u8; 32], - Arc>, - fnv::FnvBuildHasher, - >::with_capacity_and_hasher(16, Default::default()); - - let current_finalized_hash = header::hash_from_scale_encoded_header( - &subscribe_all.finalized_block_scale_encoded_header, - ); - subscribe_all - .new_blocks - .unpin_block(¤t_finalized_hash) - .await; - - headers.insert( - current_finalized_hash, - Arc::new(subscribe_all.finalized_block_runtime), - ); - - let mut current_best = None; - for block in subscribe_all.non_finalized_blocks_ancestry_order { - let hash = header::hash_from_scale_encoded_header(&block.scale_encoded_header); - subscribe_all.new_blocks.unpin_block(&hash).await; - - if let Some(new_runtime) = block.new_runtime { - headers.insert(hash, Arc::new(new_runtime)); - } else { - let parent_runtime = headers - .get(&block.parent_hash) - .unwrap() - .clone(); - headers.insert(hash, parent_runtime); - } - - if block.is_new_best { - debug_assert!(current_best.is_none()); - current_best = Some(hash); - } - } - let current_best = current_best.unwrap_or(current_finalized_hash); - let current_best_runtime = (**headers.get(¤t_best).unwrap()).clone(); - - // Turns `subscribe_all.new_blocks` into a stream of headers. - let substream = stream::unfold( - ( - subscribe_all.new_blocks, - headers, - current_finalized_hash, - current_best, - ), - |( - mut new_blocks, - mut headers, - mut current_finalized_hash, - mut current_best, - )| async move { - loop { - match new_blocks.next().await? { - Notification::Block(block) => { - let hash = - header::hash_from_scale_encoded_header(&block.scale_encoded_header); - new_blocks.unpin_block(&hash).await; - - if let Some(new_runtime) = block.new_runtime { - headers.insert(hash, Arc::new(new_runtime)); - } else { - let parent_runtime = headers - .get(&block.parent_hash) - .unwrap() - .clone(); - headers.insert(hash, parent_runtime); - } - - if block.is_new_best { - let current_best_runtime = - headers.get(¤t_best).unwrap(); - let new_best_runtime = headers.get(&hash).unwrap(); - current_best = hash; - - if !Arc::ptr_eq(current_best_runtime, new_best_runtime) { - let runtime = (**new_best_runtime).clone(); - break Some(( - runtime, - ( - new_blocks, - headers, - current_finalized_hash, - current_best, - ), - )); - } - } - } - Notification::Finalized { - hash, - pruned_blocks, - best_block_hash, - } => { - let current_best_runtime = - headers.get(¤t_best).unwrap().clone(); - let new_best_runtime = - headers.get(&best_block_hash).unwrap().clone(); - - // Clean up the headers we won't need anymore. - for pruned_block in pruned_blocks { - let _was_in = headers.remove(&pruned_block); - debug_assert!(_was_in.is_some()); - } - - let _ = headers - .remove(¤t_finalized_hash) - .unwrap(); - current_finalized_hash = hash; - current_best = best_block_hash; - - if !Arc::ptr_eq(¤t_best_runtime, &new_best_runtime) { - let runtime = (*new_best_runtime).clone(); - break Some(( - runtime, - ( - new_blocks, - headers, - current_finalized_hash, - current_best, - ), - )); - } - } - Notification::BestBlockChanged { hash } => { - let current_best_runtime = - headers.get(¤t_best).unwrap().clone(); - let new_best_runtime = - headers.get(&hash).unwrap().clone(); - - current_best = hash; - - if !Arc::ptr_eq(¤t_best_runtime, &new_best_runtime) { - let runtime = (*new_best_runtime).clone(); - break Some(( - runtime, - ( - new_blocks, - headers, - current_finalized_hash, - current_best, - ), - )); - } - } - } - } - }, - ); - - // Prepend the current best block to the stream. - let substream = stream::once(future::ready(current_best_runtime)).chain(substream); - Some((substream, runtime_service)) - }) - .flatten() - .boxed(); - - // TODO: we don't dedup blocks; in other words the stream can produce the same block twice if the inner subscription drops - - // Now that we have a stream, extract the first element to be the first value. - let first_value = master_stream.next().await.unwrap(); - (first_value, master_stream) -} - -/// Returns the SCALE-encoded header of the current finalized block, plus an unlimited stream -/// that produces one item every time the finalized block is changed. -/// -/// This function only returns once the runtime of the current finalized block is known. This -/// might take a long time. -pub async fn subscribe_finalized( - runtime_service: &Arc>, -) -> (Vec, stream::BoxStream<'static, Vec>) { - let mut master_stream = stream::unfold(runtime_service.clone(), |runtime_service| async move { - let subscribe_all = runtime_service - .subscribe_all("subscribe-finalized", 16, NonZeroUsize::new(32).unwrap()) - .await; - - // Map of block headers by hash. Contains all non-finalized blocks headers. - let mut non_finalized_headers = - hashbrown::HashMap::<[u8; 32], Vec, fnv::FnvBuildHasher>::with_capacity_and_hasher( - 16, - Default::default(), - ); - - subscribe_all - .new_blocks - .unpin_block(&header::hash_from_scale_encoded_header( - &subscribe_all.finalized_block_scale_encoded_header, - )) - .await; - - for block in subscribe_all.non_finalized_blocks_ancestry_order { - let hash = header::hash_from_scale_encoded_header(&block.scale_encoded_header); - subscribe_all.new_blocks.unpin_block(&hash).await; - non_finalized_headers.insert(hash, block.scale_encoded_header); - } - - // Turns `subscribe_all.new_blocks` into a stream of headers. - let substream = stream::unfold( - (subscribe_all.new_blocks, non_finalized_headers), - |(mut new_blocks, mut non_finalized_headers)| async { - loop { - match new_blocks.next().await? { - Notification::Block(block) => { - let hash = - header::hash_from_scale_encoded_header(&block.scale_encoded_header); - new_blocks.unpin_block(&hash).await; - non_finalized_headers.insert(hash, block.scale_encoded_header); - } - Notification::Finalized { - hash, - pruned_blocks, - .. - } => { - // Clean up the headers we won't need anymore. - for pruned_block in pruned_blocks { - let _was_in = non_finalized_headers.remove(&pruned_block); - debug_assert!(_was_in.is_some()); - } - - let header = non_finalized_headers.remove(&hash).unwrap(); - break Some((header, (new_blocks, non_finalized_headers))); - } - Notification::BestBlockChanged { .. } => {} - } - } - }, - ); - - // Prepend the current finalized block to the stream. - let substream = stream::once(future::ready( - subscribe_all.finalized_block_scale_encoded_header, - )) - .chain(substream); - - Some((substream, runtime_service)) - }) - .flatten() - .boxed(); - - // TODO: we don't dedup blocks; in other words the stream can produce the same block twice if the inner subscription drops - - // Now that we have a stream, extract the first element to be the first value. - let first_value = master_stream.next().await.unwrap(); - (first_value, master_stream) -} - -/// Returns the SCALE-encoded header of the current best block, plus an unlimited stream that -/// produces one item every time the best block is changed. -/// -/// This function only returns once the runtime of the current best block is known. This might -/// take a long time. -pub async fn subscribe_best( - runtime_service: &Arc>, -) -> (Vec, stream::BoxStream<'static, Vec>) { - let mut master_stream = stream::unfold(runtime_service.clone(), |runtime_service| async move { - let subscribe_all = runtime_service - .subscribe_all("subscribe-best", 16, NonZeroUsize::new(32).unwrap()) - .await; - - // Map of block headers by hash. Contains all non-finalized blocks headers, plus the - // current finalized block header. - let mut headers = - hashbrown::HashMap::<[u8; 32], Vec, fnv::FnvBuildHasher>::with_capacity_and_hasher( - 16, - Default::default(), - ); - - let current_finalized_hash = header::hash_from_scale_encoded_header( - &subscribe_all.finalized_block_scale_encoded_header, - ); - - subscribe_all - .new_blocks - .unpin_block(¤t_finalized_hash) - .await; - - headers.insert( - current_finalized_hash, - subscribe_all.finalized_block_scale_encoded_header, - ); - - let mut current_best = None; - for block in subscribe_all.non_finalized_blocks_ancestry_order { - let hash = header::hash_from_scale_encoded_header(&block.scale_encoded_header); - subscribe_all.new_blocks.unpin_block(&hash).await; - headers.insert(hash, block.scale_encoded_header); - - if block.is_new_best { - debug_assert!(current_best.is_none()); - current_best = Some(hash); - } - } - let current_best = current_best.unwrap_or(current_finalized_hash); - let current_best_header = headers.get(¤t_best).unwrap().clone(); - - // Turns `subscribe_all.new_blocks` into a stream of headers. - let substream = stream::unfold( - ( - subscribe_all.new_blocks, - headers, - current_finalized_hash, - current_best, - ), - |( - mut new_blocks, - mut headers, - mut current_finalized_hash, - mut current_best, - )| async move { - loop { - match new_blocks.next().await? { - Notification::Block(block) => { - let hash = - header::hash_from_scale_encoded_header(&block.scale_encoded_header); - new_blocks.unpin_block(&hash).await; - headers.insert(hash, block.scale_encoded_header); - - if block.is_new_best { - current_best = hash; - let header = - headers.get(¤t_best).unwrap().clone(); - break Some(( - header, - ( - new_blocks, - headers, - current_finalized_hash, - current_best, - ), - )); - } - } - Notification::Finalized { - hash, - pruned_blocks, - best_block_hash, - } => { - // Clean up the headers we won't need anymore. - for pruned_block in pruned_blocks { - let _was_in = headers.remove(&pruned_block); - debug_assert!(_was_in.is_some()); - } - - let _ = headers - .remove(¤t_finalized_hash) - .unwrap(); - current_finalized_hash = hash; - - if best_block_hash != current_best { - current_best = best_block_hash; - let header = - headers.get(¤t_best).unwrap().clone(); - break Some(( - header, - ( - new_blocks, - headers, - current_finalized_hash, - current_best, - ), - )); - } - } - Notification::BestBlockChanged { hash } => { - if hash != current_best { - current_best = hash; - let header = - headers.get(¤t_best).unwrap().clone(); - break Some(( - header, - ( - new_blocks, - headers, - current_finalized_hash, - current_best, - ), - )); - } - } - } - } - }, - ); - - // Prepend the current best block to the stream. - let substream = stream::once(future::ready(current_best_header)).chain(substream); - Some((substream, runtime_service)) - }) - .flatten() - .boxed(); - - // TODO: we don't dedup blocks; in other words the stream can produce the same block twice if the inner subscription drops - - // Now that we have a stream, extract the first element to be the first value. - let first_value = master_stream.next().await.unwrap(); - (first_value, master_stream) -} diff --git a/wasm-node/CHANGELOG.md b/wasm-node/CHANGELOG.md index e2cb91c881..41c543a9d1 100644 --- a/wasm-node/CHANGELOG.md +++ b/wasm-node/CHANGELOG.md @@ -5,6 +5,9 @@ ### Changed - The `chainHead_unstable_storage` JSON-RPC function now supports a `type` equal to `closest-descendant-merkle-value` and no longer supports `closest-ancestor-merkle-value`, in accordance with the latest changes in the JSON-RPC API specification. ([#824](https://github.com/smol-dot/smoldot/pull/824)) +- Blocks are now reported to `chain_subscribeAllHeads` and `chain_subscribeNewHeads` subscribers only after they have been put in the cache, preventing race conditions where JSON-RPC clients suffer from a cache miss if they ask information about these blocks too quickly. ([#854](https://github.com/smol-dot/smoldot/pull/854)) +- Runtime updates are now always reported to `state_subscribeRuntimeVersion` subscribers immediately after the `chain_subscribeNewHeads` notification corresponding to the block containing the runtime update. They were previously reported in a pseudo-random order. ([#854](https://github.com/smol-dot/smoldot/pull/854)) +- All the storage subscriptions made using `state_subscribeStorage` are now queried together into a single networking request per block, instead of sending one networking query per storage key and per subscription. ([#854](https://github.com/smol-dot/smoldot/pull/854)) ## 1.0.11 - 2023-06-25