diff --git a/lib/src/json_rpc/methods.rs b/lib/src/json_rpc/methods.rs index 8af916cd72..527d6dcd44 100644 --- a/lib/src/json_rpc/methods.rs +++ b/lib/src/json_rpc/methods.rs @@ -478,9 +478,8 @@ define_methods! { chainHead_unstable_storage( #[rename = "followSubscription"] follow_subscription: Cow<'a, str>, hash: HashHexString, - key: HexString, + items: Vec, #[rename = "childTrie"] child_trie: Option, - #[rename = "type"] ty: ChainHeadStorageType, #[rename = "networkConfig"] network_config: Option ) -> Cow<'a, str>, chainHead_unstable_storageContinue( @@ -734,6 +733,26 @@ pub enum ChainHeadCallEvent<'a> { Disjoint {}, } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct ChainHeadStorageRequestItem { + pub key: HexString, + #[serde(rename = "type")] + pub ty: ChainHeadStorageType, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct ChainHeadStorageResponseItem { + pub key: HexString, + #[serde(skip_serializing_if = "Option::is_none")] + pub value: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub hash: Option, + #[serde(rename = "merkle-value-key", skip_serializing_if = "Option::is_none")] + pub merkle_value_key: Option, // TODO: `String` because the number of hex digits can be uneven + #[serde(rename = "merkle-value", skip_serializing_if = "Option::is_none")] + pub merkle_value: Option, +} + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum ChainHeadStorageType { #[serde(rename = "value")] @@ -751,15 +770,9 @@ pub enum ChainHeadStorageType { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] #[serde(tag = "event")] pub enum ChainHeadStorageEvent<'a> { - #[serde(rename = "item")] - Item { - key: HexString, - #[serde(skip_serializing_if = "Option::is_none")] - value: Option, - #[serde(skip_serializing_if = "Option::is_none")] - hash: Option, - #[serde(rename = "merkle-value", skip_serializing_if = "Option::is_none")] - merkle_value: Option, + #[serde(rename = "items")] + Items { + items: Vec, }, #[serde(rename = "done")] Done, diff --git a/lib/src/trie/prefix_proof.rs b/lib/src/trie/prefix_proof.rs index 5db1ef1b7f..db1d5aa461 100644 --- a/lib/src/trie/prefix_proof.rs +++ b/lib/src/trie/prefix_proof.rs @@ -269,6 +269,7 @@ impl PrefixScan { if next.is_empty() && self.next_queries.is_empty() { return Ok(ResumeOutcome::Success { entries: self.final_result, + full_storage_values_required: self.full_storage_values_required, }); } @@ -303,6 +304,8 @@ pub enum ResumeOutcome { Success { /// List of entries who key starts with the requested prefix. entries: Vec<(Vec, StorageValue)>, + /// Value that was passed as [`Config::full_storage_values_required`]. + full_storage_values_required: bool, }, } diff --git a/lib/src/trie/prefix_proof/tests.rs b/lib/src/trie/prefix_proof/tests.rs index 89eed6506d..5468495c2e 100644 --- a/lib/src/trie/prefix_proof/tests.rs +++ b/lib/src/trie/prefix_proof/tests.rs @@ -14500,7 +14500,7 @@ fn regression_test_174() { prefix_scan = scan; continue; } - Ok(ResumeOutcome::Success { mut entries }) => { + Ok(ResumeOutcome::Success { mut entries, .. }) => { let mut expected = EXPECTED.to_owned(); expected.sort(); entries.sort_by(|(key1, _), (key2, _)| key1.cmp(&key2)); diff --git a/lib/src/trie/proof_decode.rs b/lib/src/trie/proof_decode.rs index 399058d29b..45f99dccd4 100644 --- a/lib/src/trie/proof_decode.rs +++ b/lib/src/trie/proof_decode.rs @@ -1011,6 +1011,35 @@ impl> DecodedTrieProof { } } } + + /// Returns the key and Merkle value of the closest ancestor to the given key. + /// + /// Returns `None` if the key has no ancestor within the trie. + pub fn closest_ancestor_merkle_value<'a>( + &'a self, + trie_root_merkle_value: &[u8; 32], + key: &[nibble::Nibble], + ) -> Result, IncompleteProofError> + { + let (full_key, (_, node_value_range, _)) = + match self.closest_ancestor(trie_root_merkle_value, key) { + Ok(Some(v)) => v, + Ok(None) => return Ok(None), + Err(err) => return Err(err), + }; + + let node_value = &self.proof.as_ref()[node_value_range.clone()]; + if node_value.len() < 32 { + Ok(Some(( + full_key, + trie_node::MerkleValueOutput::from_bytes(node_value), + ))) + } else { + let hash = blake2_rfc::blake2b::blake2b(32, &[], node_value); + let merkle_value = trie_node::MerkleValueOutput::from_bytes(hash.as_bytes()); + Ok(Some((full_key, merkle_value))) + } + } } /// Proof doesn't contain enough information to answer the request. diff --git a/light-base/src/json_rpc_service/background.rs b/light-base/src/json_rpc_service/background.rs index cc9fa2965a..8d0b195c79 100644 --- a/light-base/src/json_rpc_service/background.rs +++ b/light-base/src/json_rpc_service/background.rs @@ -948,7 +948,10 @@ impl Background { block_number, hash, &state_trie_root_hash, - keys, + keys.clone().map(|key| sync_service::StorageRequestItem { + key: key.as_ref().to_vec(), // TODO: overhead + ty: sync_service::StorageRequestItemTy::Value, + }), total_attempts, timeout_per_request, max_parallel, @@ -956,6 +959,22 @@ impl Background { .await .map_err(StorageQueryError::StorageRetrieval)?; + let result = keys + .map(|key| { + result + .iter() + .find_map(|entry| match entry { + sync_service::StorageResultItem::Value { key: k, value } + if k == key.as_ref() => + { + Some(value.clone()) // TODO: overhead + } + _ => None, + }) + .unwrap() + }) + .collect(); + Ok(result) } @@ -1000,14 +1019,24 @@ impl Background { // Download the runtime of this block. This takes a long time as the runtime is rather // big (around 1MiB in general). let (storage_code, storage_heap_pages) = { - let mut code_query_result = self + let entries = self .sync_service .clone() .storage_query( block_number, block_hash, &state_trie_root_hash, - iter::once(&b":code"[..]).chain(iter::once(&b":heappages"[..])), + [ + sync_service::StorageRequestItem { + key: b":code".to_vec(), + ty: sync_service::StorageRequestItemTy::Value, + }, + sync_service::StorageRequestItem { + key: b":heappages".to_vec(), + ty: sync_service::StorageRequestItemTy::Value, + }, + ] + .into_iter(), 3, Duration::from_secs(20), NonZeroU32::new(1).unwrap(), @@ -1015,8 +1044,29 @@ impl Background { .await .map_err(runtime_service::RuntimeCallError::StorageQuery) .map_err(RuntimeCallError::Call)?; - let heap_pages = code_query_result.pop().unwrap(); - let code = code_query_result.pop().unwrap(); + // TODO: not elegant + let heap_pages = entries + .iter() + .find_map(|entry| match entry { + sync_service::StorageResultItem::Value { key, value } + if key == b":heappages" => + { + Some(value.clone()) // TODO: overhead + } + _ => None, + }) + .unwrap(); + let code = entries + .iter() + .find_map(|entry| match entry { + sync_service::StorageResultItem::Value { key, value } + if key == b":code" => + { + Some(value.clone()) // TODO: overhead + } + _ => None, + }) + .unwrap(); (code, heap_pages) }; diff --git a/light-base/src/json_rpc_service/background/chain_head.rs b/light-base/src/json_rpc_service/background/chain_head.rs index eb4de51d02..dfeb5b4ab9 100644 --- a/light-base/src/json_rpc_service/background/chain_head.rs +++ b/light-base/src/json_rpc_service/background/chain_head.rs @@ -838,9 +838,8 @@ impl ChainHeadFollowTask { async fn start_chain_head_storage(&mut self, request: service::SubscriptionStartProcess) { let methods::MethodCall::chainHead_unstable_storage { hash, - key, + items, child_trie, - ty, network_config, .. } = request.request() @@ -877,26 +876,6 @@ impl ChainHeadFollowTask { return; } - let is_hash = match ty { - methods::ChainHeadStorageType::Value => false, - methods::ChainHeadStorageType::Hash => true, - methods::ChainHeadStorageType::DescendantsValues - | methods::ChainHeadStorageType::DescendantsHashes - | methods::ChainHeadStorageType::ClosestAncestorMerkleValue => { - // TODO: implement this - request.fail(json_rpc::parse::ErrorResponse::ServerError( - -32000, - "Child key storage queries not supported yet", - )); - log::warn!( - target: &self.log_target, - "chainHead_unstable_storage has been called with a type other than value or hash. \ - This isn't supported by smoldot yet." - ); - return; - } - }; - let mut subscription = request.accept(); let subscription_id = subscription.subscription_id().to_owned(); @@ -927,11 +906,36 @@ impl ChainHeadFollowTask { } }; + // Perform some API conversions. + let queries = items + .into_iter() + .map(|item| sync_service::StorageRequestItem { + key: item.key.0, + ty: match item.ty { + methods::ChainHeadStorageType::Value => { + sync_service::StorageRequestItemTy::Value + } + methods::ChainHeadStorageType::Hash => { + sync_service::StorageRequestItemTy::Hash + } + methods::ChainHeadStorageType::ClosestAncestorMerkleValue => { + sync_service::StorageRequestItemTy::ClosestAncestorMerkleValue + } + methods::ChainHeadStorageType::DescendantsValues => { + sync_service::StorageRequestItemTy::DescendantsValues + } + methods::ChainHeadStorageType::DescendantsHashes => { + sync_service::StorageRequestItemTy::DescendantsHashes + } + }, + }) + .collect::>(); + let future = sync_service.clone().storage_query( decoded_header.number, &hash.0, decoded_header.state_root, - iter::once(key.0.clone()), // TODO: clone :-/ + queries.into_iter(), cmp::min(10, network_config.total_attempts), Duration::from_millis(u64::from(cmp::min( 20000, @@ -952,29 +956,66 @@ impl ChainHeadFollowTask { }; match outcome { - Ok(values) => { - // `storage_query` returns a list of values because it can perform - // multiple queries at once. In our situation, we only start one query - // and as such the outcome only ever contains one element. - debug_assert_eq!(values.len(), 1); - let value = values.into_iter().next().unwrap(); - if let Some(mut value) = value { - if is_hash { - value = blake2_rfc::blake2b::blake2b(8, &[], &value) - .as_bytes() - .to_vec(); - } + Ok(entries) => { + // Perform some API conversions. + let items = entries + .into_iter() + .filter_map(|item| match item { + sync_service::StorageResultItem::Value { key, value } => { + Some(methods::ChainHeadStorageResponseItem { + key: methods::HexString(key), + value: Some(methods::HexString(value?)), + hash: None, + merkle_value: None, + merkle_value_key: None, + }) + } + sync_service::StorageResultItem::Hash { key, hash } => { + Some(methods::ChainHeadStorageResponseItem { + key: methods::HexString(key), + value: None, + hash: Some(methods::HexString(hash?.to_vec())), + merkle_value: None, + merkle_value_key: None, + }) + } + sync_service::StorageResultItem::DescendantValue { key, value, .. } => { + Some(methods::ChainHeadStorageResponseItem { + key: methods::HexString(key), + value: Some(methods::HexString(value)), + hash: None, + merkle_value: None, + merkle_value_key: None, + }) + } + sync_service::StorageResultItem::DescendantHash { key, hash, .. } => { + Some(methods::ChainHeadStorageResponseItem { + key: methods::HexString(key), + value: None, + hash: Some(methods::HexString(hash.to_vec())), + merkle_value: None, + merkle_value_key: None, + }) + } + sync_service::StorageResultItem::ClosestAncestorMerkleValue { requested_key, merkle_value } => { + let (merkle_value_of, merkle_value) = merkle_value?; + Some(methods::ChainHeadStorageResponseItem { + key: methods::HexString(requested_key), + value: None, + hash: None, + merkle_value: Some(methods::HexString(merkle_value)), + merkle_value_key: Some(format!("0x{}", merkle_value_of.iter().map(|n| format!("{:x}", n)).collect::())), + }) + } + }) + .collect::>(); + if !items.is_empty() { subscription .send_notification( methods::ServerToClient::chainHead_unstable_storageEvent { subscription: (&subscription_id).into(), - result: methods::ChainHeadStorageEvent::Item { - key, - value: Some(methods::HexString(value)), - hash: None, - merkle_value: None, - }, + result: methods::ChainHeadStorageEvent::Items { items }, }, ) .await; diff --git a/light-base/src/json_rpc_service/background/state_chain.rs b/light-base/src/json_rpc_service/background/state_chain.rs index 823ecaec0b..3aa20f3c53 100644 --- a/light-base/src/json_rpc_service/background/state_chain.rs +++ b/light-base/src/json_rpc_service/background/state_chain.rs @@ -19,7 +19,7 @@ use super::{Background, GetKeysPagedCacheKey, PlatformRef}; -use crate::runtime_service; +use crate::{runtime_service, sync_service}; use alloc::{borrow::ToOwned as _, format, string::ToString as _, sync::Arc, vec, vec::Vec}; use async_lock::MutexGuard; @@ -705,11 +705,14 @@ impl Background { let outcome = self .sync_service .clone() - .storage_prefix_keys_query( + .storage_query( block_number, &hash, - &prefix.0, &state_root, + iter::once(sync_service::StorageRequestItem { + key: prefix.0, + ty: sync_service::StorageRequestItemTy::DescendantsHashes, + }), 3, Duration::from_secs(12), NonZeroU32::new(1).unwrap(), @@ -717,8 +720,16 @@ impl Background { .await; match outcome { - Ok(keys) => { - let out = keys.into_iter().map(methods::HexString).collect::>(); + Ok(entries) => { + let out = entries + .into_iter() + .map(|item| match item { + sync_service::StorageResultItem::DescendantHash { key, .. } => { + methods::HexString(key) + } + _ => unreachable!(), + }) + .collect::>(); request.respond(methods::Response::state_getKeys(out)) } Err(error) => request.fail(json_rpc::parse::ErrorResponse::ServerError( @@ -785,11 +796,14 @@ impl Background { let outcome = self .sync_service .clone() - .storage_prefix_keys_query( + .storage_query( block_number, &hash, - &prefix, &state_root, + iter::once(sync_service::StorageRequestItem { + key: prefix.clone(), + ty: sync_service::StorageRequestItemTy::DescendantsHashes, + }), 3, Duration::from_secs(12), NonZeroU32::new(1).unwrap(), @@ -797,12 +811,20 @@ impl Background { .await; match outcome { - Ok(keys) => { + Ok(entries) => { // TODO: instead of requesting all keys with that prefix from the network, pass `start_key` to the network service + let keys = entries + .into_iter() + .map(|item| match item { + sync_service::StorageResultItem::DescendantHash { key, .. } => key, + _ => unreachable!(), + }) + .collect::>(); + let out = keys .iter() - .filter(|k| start_key.as_ref().map_or(true, |start| *k >= &start.0)) // TODO: not sure if start should be in the set or not? - .cloned() // TODO: instead of cloning, make `Response::state_getKeysPaged` accept references + .cloned() + .filter(|k| start_key.as_ref().map_or(true, |start| *k >= start.0)) // TODO: not sure if start should be in the set or not? .map(methods::HexString) .take(usize::try_from(count).unwrap_or(usize::max_value())) .collect::>(); @@ -1161,7 +1183,10 @@ impl Background { block_number, &block_hash, state_trie_root, - iter::once(&key.0), + iter::once(sync_service::StorageRequestItem { + key: key.0.clone(), + ty: sync_service::StorageRequestItemTy::Value, + }), 4, Duration::from_secs(12), NonZeroU32::new(2).unwrap(), @@ -1169,7 +1194,8 @@ impl Background { .await { Ok(mut values) => { - let value = values.pop().unwrap(); + let Some(sync_service::StorageResultItem::Value { value, .. }) = values.pop() + else { unreachable!() }; match &mut known_values[key_index] { Some(v) if *v == value => {} v => { diff --git a/light-base/src/runtime_service.rs b/light-base/src/runtime_service.rs index 0e031af630..3fc1ea49bd 100644 --- a/light-base/src/runtime_service.rs +++ b/light-base/src/runtime_service.rs @@ -1977,7 +1977,17 @@ impl Background { block_number, &block_hash, &state_root, - iter::once(&b":code"[..]).chain(iter::once(&b":heappages"[..])), + [ + sync_service::StorageRequestItem { + key: b":code".to_vec(), + ty: sync_service::StorageRequestItemTy::Value, + }, + sync_service::StorageRequestItem { + key: b":heappages".to_vec(), + ty: sync_service::StorageRequestItemTy::Value, + }, + ] + .into_iter(), 3, Duration::from_secs(20), NonZeroU32::new(3).unwrap(), @@ -1985,9 +1995,31 @@ impl Background { .await; let result = match result { - Ok(mut c) => { - let heap_pages = c.pop().unwrap(); - let code = c.pop().unwrap(); + Ok(entries) => { + let heap_pages = entries + .iter() + .find_map(|entry| match entry { + sync_service::StorageResultItem::Value { + key, + value, + } if key == b":heappages" => { + Some(value.clone()) // TODO: overhead + } + _ => None, + }) + .unwrap(); + let code = entries + .iter() + .find_map(|entry| match entry { + sync_service::StorageResultItem::Value { + key, + value, + } if key == b":code" => { + Some(value.clone()) // TODO: overhead + } + _ => None, + }) + .unwrap(); Ok((code, heap_pages)) } Err(error) => Err(RuntimeDownloadError::StorageQuery(error)), diff --git a/light-base/src/sync_service.rs b/light-base/src/sync_service.rs index 9faffb51b3..689c09cd27 100644 --- a/light-base/src/sync_service.rs +++ b/light-base/src/sync_service.rs @@ -28,11 +28,12 @@ use crate::{network_service, platform::PlatformRef, runtime_service}; -use alloc::{borrow::ToOwned as _, boxed::Box, format, string::String, sync::Arc, vec::Vec}; +use alloc::{boxed::Box, format, string::String, sync::Arc, vec::Vec}; use async_lock::Mutex; -use core::{fmt, num::NonZeroU32, time::Duration}; +use core::{fmt, mem, num::NonZeroU32, time::Duration}; use futures_channel::{mpsc, oneshot}; use futures_util::{stream, SinkExt as _}; +use rand::seq::IteratorRandom as _; use smoldot::{ chain, executor::host, @@ -377,42 +378,131 @@ impl SyncService { Err(()) } - /// Performs one or more storage proof requests in order to find the value of the given - /// `requested_keys`. + /// Performs one or more storage proof requests in order to fulfill the `requests` passed as + /// parameter. /// /// Must be passed a block hash, a block number, and the Merkle value of the root node of the /// storage trie of this same block. The value of `block_number` corresponds to the value - /// in the [`smoldot::header::HeaderRef::number`] field, and the value of `storage_trie_root` + /// in the [`smoldot::header::HeaderRef::number`] field, and the value of `main_trie_root_hash` /// corresponds to the value in the [`smoldot::header::HeaderRef::state_root`] field. /// - /// Returns the storage values of `requested_keys` in the storage of the block, or an error if - /// it couldn't be determined. If `Ok`, the `Vec` is guaranteed to have the same number of - /// elements as `requested_keys`. + /// The result will contain items corresponding to the requests, but in no particular order. /// - /// This function is equivalent to calling - /// [`network_service::NetworkService::storage_proof_request`] and verifying the proof, - /// potentially multiple times until it succeeds. The number of attempts and the selection of - /// peers is done through reasonable heuristics. + /// See the documentation of [`StorageRequestItem`] and [`StorageResultItem`] for more + /// information. + // TODO: should return the items in a streaming way, so that we don't need to wait for all the queries to have finished pub async fn storage_query( self: Arc, block_number: u64, block_hash: &[u8; 32], - storage_trie_root: &[u8; 32], - requested_keys: impl Iterator + Clone> + Clone, + main_trie_root_hash: &[u8; 32], + requests: impl Iterator, total_attempts: u32, timeout_per_request: Duration, _max_parallel: NonZeroU32, - ) -> Result>>, StorageQueryError> { - let mut outcome_errors = - Vec::with_capacity(usize::try_from(total_attempts).unwrap_or(usize::max_value())); - - // TODO: better peers selection ; don't just take the first + ) -> Result, StorageQueryError> { + // TODO: this should probably be extracted to a state machine in `/lib`, with unit tests + // TODO: big requests should be split into multiple smaller ones // TODO: handle max_parallel - for target in self - .peers_assumed_know_blocks(block_number, block_hash) - .await - .take(usize::try_from(total_attempts).unwrap_or(usize::max_value())) - { + enum RequestImpl { + PrefixScan { + requested_key: Vec, + scan: prefix_proof::PrefixScan, + }, + ValueOrHash { + key: Vec, + hash: bool, + }, + ClosestAncestorMerkleValue { + key: Vec, + }, + } + + let mut requests_remaining = requests + .map(|request| match request.ty { + StorageRequestItemTy::DescendantsHashes + | StorageRequestItemTy::DescendantsValues => RequestImpl::PrefixScan { + scan: prefix_proof::prefix_scan(prefix_proof::Config { + prefix: &request.key, + trie_root_hash: *main_trie_root_hash, + full_storage_values_required: matches!( + request.ty, + StorageRequestItemTy::DescendantsValues + ), + }), + requested_key: request.key, + }, + StorageRequestItemTy::Value => RequestImpl::ValueOrHash { + key: request.key, + hash: false, + }, + StorageRequestItemTy::Hash => RequestImpl::ValueOrHash { + key: request.key, + hash: true, + }, + StorageRequestItemTy::ClosestAncestorMerkleValue => { + RequestImpl::ClosestAncestorMerkleValue { key: request.key } + } + }) + .collect::>(); + + let total_attempts = usize::try_from(total_attempts).unwrap_or(usize::max_value()); + let mut outcome_errors = Vec::with_capacity(total_attempts); + + let mut final_results = + Vec::::with_capacity(requests_remaining.len() * 4); + + loop { + // Check if we're done. + if requests_remaining.is_empty() { + return Ok(final_results); + } + + if outcome_errors.len() >= total_attempts { + return Err(StorageQueryError { + errors: outcome_errors, + }); + } + + // Choose peer to query. + // TODO: better peers selection + let Some(target) = self + .peers_assumed_know_blocks(block_number, block_hash) + .await + .choose(&mut rand::thread_rng()) + else { + // No peer knows this block. Returning with a failure. + return Err(StorageQueryError { + errors: outcome_errors, + }); + }; + + // Build the list of keys to request. + let keys_to_request = { + let mut keys = hashbrown::HashSet::with_capacity_and_hasher( + requests_remaining.len() * 4, + fnv::FnvBuildHasher::default(), + ); + + for request in &requests_remaining { + match request { + RequestImpl::PrefixScan { scan, .. } => { + keys.extend(scan.requested_keys().map(|nibbles| { + trie::nibbles_to_bytes_suffix_extend(nibbles).collect::>() + })); + } + RequestImpl::ValueOrHash { key, .. } => { + keys.insert(key.clone()); + } + RequestImpl::ClosestAncestorMerkleValue { key } => { + keys.insert(key.clone()); + } + } + } + + keys + }; + let result = self .network_service .clone() @@ -421,124 +511,173 @@ impl SyncService { target, protocol::StorageProofRequestConfig { block_hash: *block_hash, - keys: requested_keys.clone(), + keys: keys_to_request.into_iter(), }, timeout_per_request, ) - .await - .map_err(StorageQueryErrorDetail::Network) - .and_then(|outcome| { - let decoded = outcome.decode(); - let decoded = proof_decode::decode_and_verify_proof(proof_decode::Config { - proof: decoded, - }) - .map_err(StorageQueryErrorDetail::ProofVerification)?; - - let mut result = Vec::with_capacity(requested_keys.clone().count()); - for key in requested_keys.clone() { - result.push( - decoded - .storage_value(storage_trie_root, key.as_ref()) - .map_err(|_| StorageQueryErrorDetail::MissingProofEntry)? - .map(|(v, _)| v.to_owned()), - ); - } - debug_assert_eq!(result.len(), result.capacity()); - Ok(result) - }); + .await; - match result { - Ok(values) => return Ok(values), + let proof = match result { + Ok(r) => r, Err(err) => { - outcome_errors.push(err); + outcome_errors.push(StorageQueryErrorDetail::Network(err)); + continue; } - } - } + }; - Err(StorageQueryError { - errors: outcome_errors, - }) - } + let decoded_proof = match proof_decode::decode_and_verify_proof(proof_decode::Config { + proof: proof.decode(), + }) { + Ok(d) => d, + Err(err) => { + outcome_errors.push(StorageQueryErrorDetail::ProofVerification(err)); + continue; + } + }; - pub async fn storage_prefix_keys_query( - self: Arc, - block_number: u64, - block_hash: &[u8; 32], - prefix: &[u8], - storage_trie_root: &[u8; 32], - total_attempts: u32, - timeout_per_request: Duration, - _max_parallel: NonZeroU32, - ) -> Result>, StorageQueryError> { - let mut prefix_scan = prefix_proof::prefix_scan(prefix_proof::Config { - prefix, - trie_root_hash: *storage_trie_root, - full_storage_values_required: false, - }); - - 'main_scan: loop { - let mut outcome_errors = - Vec::with_capacity(usize::try_from(total_attempts).unwrap_or(usize::max_value())); - - // TODO: better peers selection ; don't just take the first - // TODO: handle max_parallel - // TODO: is the number of keys is large, split into multiple requests - for target in self - .peers_assumed_know_blocks(block_number, block_hash) - .await - .take(usize::try_from(total_attempts).unwrap_or(usize::max_value())) - { - let result = self - .network_service - .clone() - .storage_proof_request( - self.network_chain_index, - target, - protocol::StorageProofRequestConfig { - block_hash: *block_hash, - keys: prefix_scan.requested_keys().map(|nibbles| { - trie::nibbles_to_bytes_suffix_extend(nibbles).collect::>() - }), - }, - timeout_per_request, - ) - .await - .map_err(StorageQueryErrorDetail::Network); - - match result { - Ok(proof) => { - match prefix_scan.resume(proof.decode()) { + for request in mem::take(&mut requests_remaining) { + match request { + RequestImpl::PrefixScan { + scan, + requested_key, + } => { + match scan.resume(proof.decode()) { Ok(prefix_proof::ResumeOutcome::InProgress(scan)) => { - // Continue next step of the proof. - prefix_scan = scan; - continue 'main_scan; + requests_remaining.push(RequestImpl::PrefixScan { + scan, + requested_key, + }); + } + Ok(prefix_proof::ResumeOutcome::Success { + entries, + full_storage_values_required, + }) => { + // The value of `full_storage_values_required` determines whether + // we wanted full values (`true`) or hashes (`false`). + for (key, value) in entries { + match value { + prefix_proof::StorageValue::Hash(hash) => { + debug_assert!(!full_storage_values_required); + final_results.push(StorageResultItem::DescendantHash { + key, + hash, + requested_key: requested_key.clone(), + }); + } + prefix_proof::StorageValue::Value(value) + if full_storage_values_required => + { + final_results.push( + StorageResultItem::DescendantValue { + requested_key: requested_key.clone(), + key, + value, + }, + ); + } + prefix_proof::StorageValue::Value(value) => { + let hashed_value = + blake2_rfc::blake2b::blake2b(32, &[], &value); + final_results.push(StorageResultItem::DescendantHash { + key, + hash: *<&[u8; 32]>::try_from( + hashed_value.as_bytes(), + ) + .unwrap(), + requested_key: requested_key.clone(), + }); + } + } + } + } + Err((_, prefix_proof::Error::InvalidProof(err))) => { + // Since we decode the proof above, this is never supposed to + // be reachable. + debug_assert!(false); + outcome_errors + .push(StorageQueryErrorDetail::ProofVerification(err)); } - Ok(prefix_proof::ResumeOutcome::Success { entries }) => { - // TODO :overhead - return Ok(entries.into_iter().map(|(key, _)| key).collect()); + Err((_, prefix_proof::Error::MissingProofEntry)) => { + outcome_errors.push(StorageQueryErrorDetail::MissingProofEntry); } - Err((scan, err)) => { - prefix_scan = scan; - outcome_errors.push(match err { - prefix_proof::Error::InvalidProof(err) => { - StorageQueryErrorDetail::ProofVerification(err) + } + } + RequestImpl::ValueOrHash { key, hash } => { + // TODO: overhead + match decoded_proof.trie_node_info( + main_trie_root_hash, + &trie::bytes_to_nibbles(key.iter().copied()).collect::>(), + ) { + Ok(node_info) => match node_info.storage_value { + proof_decode::StorageValue::HashKnownValueMissing(h) if hash => { + final_results.push(StorageResultItem::Hash { + key, + hash: Some(*h), + }); + } + proof_decode::StorageValue::HashKnownValueMissing(_) => { + outcome_errors.push(StorageQueryErrorDetail::MissingProofEntry); + } + proof_decode::StorageValue::Known { value, .. } => { + if hash { + let hashed_value = + blake2_rfc::blake2b::blake2b(32, &[], value); + final_results.push(StorageResultItem::Hash { + key, + hash: Some( + *<&[u8; 32]>::try_from(hashed_value.as_bytes()) + .unwrap(), + ), + }); + } else { + final_results.push(StorageResultItem::Value { + key, + value: Some(value.to_vec()), + }); } - prefix_proof::Error::MissingProofEntry => { - StorageQueryErrorDetail::MissingProofEntry + } + proof_decode::StorageValue::None => { + if hash { + final_results + .push(StorageResultItem::Hash { key, hash: None }); + } else { + final_results + .push(StorageResultItem::Value { key, value: None }); } - }); + } + }, + Err(proof_decode::IncompleteProofError { .. }) => { + outcome_errors.push(StorageQueryErrorDetail::MissingProofEntry); } } } - Err(err) => { - outcome_errors.push(err); + RequestImpl::ClosestAncestorMerkleValue { key } => { + match decoded_proof.closest_ancestor_merkle_value( + main_trie_root_hash, + &trie::bytes_to_nibbles(key.iter().copied()).collect::>(), + ) { + Ok(Some((ancestor_key, merkle_value))) => { + final_results.push(StorageResultItem::ClosestAncestorMerkleValue { + requested_key: key, + merkle_value: Some(( + ancestor_key.to_vec(), + merkle_value.as_ref().to_vec(), + )), + }) + } + Ok(None) => { + final_results.push(StorageResultItem::ClosestAncestorMerkleValue { + requested_key: key, + merkle_value: None, + }) + } + Err(proof_decode::IncompleteProofError { .. }) => { + outcome_errors.push(StorageQueryErrorDetail::MissingProofEntry); + } + } } } } - - return Err(StorageQueryError { - errors: outcome_errors, - }); } } @@ -598,6 +737,95 @@ impl SyncService { } } +/// An item requested with [`SyncService::storage_query`]. +#[derive(Debug, Clone)] +pub struct StorageRequestItem { + /// Key to request. Exactly what is requested depends on [`StorageRequestItem::ty`]. + pub key: Vec, + /// Detail about what is being requested. + pub ty: StorageRequestItemTy, +} + +/// See [`StorageRequestItem::ty`]. +#[derive(Debug, Clone)] +pub enum StorageRequestItemTy { + /// The storage value associated to the [`StorageRequestItem::key`] is requested. + /// A [`StorageResultItem::Value`] will be returned containing the potential value. + Value, + + /// The hash of the storage value associated to the [`StorageRequestItem::key`] is requested. + /// A [`StorageResultItem::Hash`] will be returned containing the potential hash. + Hash, + + /// The list of the descendants of the [`StorageRequestItem::key`] (including the `key` + /// itself) that have a storage value is requested. + /// + /// Zero or more [`StorageResultItem::DescendantValue`] will be returned where the + /// [`StorageResultItem::DescendantValue::requested_key`] is equal to + /// [`StorageRequestItem::key`]. + DescendantsValues, + + /// The list of the descendants of the [`StorageRequestItem::key`] (including the `key` + /// itself) that have a storage value is requested. + /// + /// Zero or more [`StorageResultItem::DescendantHash`] will be returned where the + /// [`StorageResultItem::DescendantHash::requested_key`] is equal to + /// [`StorageRequestItem::key`]. + DescendantsHashes, + + /// The Merkle value of the trie node that is the closest ancestor to + /// [`StorageRequestItem::key`] is requested. + /// A [`StorageResultItem::ClosestAncestorMerkleValue`] will be returned where + /// [`StorageResultItem::ClosestAncestorMerkleValue::requested_key`] is equal to + /// [`StorageRequestItem::key`]. + ClosestAncestorMerkleValue, +} + +/// An item returned by [`SyncService::storage_query`]. +#[derive(Debug, Clone)] +pub enum StorageResultItem { + /// Corresponds to a [`StorageRequestItemTy::Value`]. + Value { + /// Key that was requested. Equal to the value of [`StorageRequestItem::key`]. + key: Vec, + /// Storage value of the key, or `None` if there is no storage value associated with that + /// key. + value: Option>, + }, + /// Corresponds to a [`StorageRequestItemTy::Hash`]. + Hash { + /// Key that was requested. Equal to the value of [`StorageRequestItem::key`]. + key: Vec, + /// Hash of the storage value of the key, or `None` if there is no storage value + /// associated with that key. + hash: Option<[u8; 32]>, + }, + /// Corresponds to a [`StorageRequestItemTy::DescendantsValues`]. + DescendantValue { + /// Key that was requested. Equal to the value of [`StorageRequestItem::key`]. + requested_key: Vec, + /// Equal or a descendant of [`StorageResultItem::DescendantValue::requested_key`]. + key: Vec, + /// Storage value associated with [`StorageResultItem::DescendantValue::key`]. + value: Vec, + }, + /// Corresponds to a [`StorageRequestItemTy::DescendantsHashes`]. + DescendantHash { + /// Key that was requested. Equal to the value of [`StorageRequestItem::key`]. + requested_key: Vec, + /// Equal or a descendant of [`StorageResultItem::DescendantHash::requested_key`]. + key: Vec, + /// Hash of the storage value associated with [`StorageResultItem::DescendantHash::key`]. + hash: [u8; 32], + }, + /// Corresponds to a [`StorageRequestItemTy::ClosestAncestorMerkleValue`]. + ClosestAncestorMerkleValue { + /// Key that was requested. Equal to the value of [`StorageRequestItem::key`]. + requested_key: Vec, + merkle_value: Option<(Vec, Vec)>, + }, +} + /// Error that can happen when calling [`SyncService::storage_query`]. #[derive(Debug, Clone)] pub struct StorageQueryError { diff --git a/wasm-node/CHANGELOG.md b/wasm-node/CHANGELOG.md index a265604a32..37dc32298f 100644 --- a/wasm-node/CHANGELOG.md +++ b/wasm-node/CHANGELOG.md @@ -2,6 +2,12 @@ ## Unreleased +### Changed + +- Add support for the `descendants-values`, `descendants-hashes`, and `closest-ancestor-merkle-value` types for the `chainHead_unstable_storage` JSON-RPC function. ([#813](https://github.com/smol-dot/smoldot/pull/813)) +- The `chainHead_unstable_storage` JSON-RPC function now accepts an array of `items` as parameter instead of a `key` and `type`, in accordance with the latest changes in the JSON-RPC API specification. ([#813](https://github.com/smol-dot/smoldot/pull/813)) +- The `chainHead_unstable_storage` JSON-RPC function now generates `items` notifications containin an array of multiple `items`, in accordance with the latest changes in the JSON-RPC API specification. ([#813](https://github.com/smol-dot/smoldot/pull/813)) + ### Fixed - Fix not absorbing the JavaScript exception triggered by the browser when connecting to a `ws://` node when smoldot is embedded in a web page served over `https://`. ([#795](https://github.com/smol-dot/smoldot/pull/795), [#800](https://github.com/smol-dot/smoldot/pull/800))