From f9fefae98857bbac2d68cce0bc0d6adc1074d37d Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 21 Sep 2023 12:34:01 +0000 Subject: [PATCH] Warp sync state machine refactoring (#1060) * Use let-else construct * Add sources_by_finalized_height * Allow querying runtime and chain information from any source * Add comment * Implement Index * Merge `DownloadFragments` and `PendingVerify` stages * Bugfix with state inconsistency * Download warp sync fragments in parallel of veryfing the previous batch * Merge Phase::RuntimeDownload with Phase::ChainInformationDownload * Remove `Phase` enum and move everything to base state machine * Small comment tweaks * Verify fragments even if source has disconnected * Small refactor * Implement correctly fetching from proof * Fix wasm build * Embed the verifier within warp_sync.rs * Add TODO * Also check the justification target number * PR link and more changelog * Polish the fragment verification error * Add `Config::pause_if_blocks_gap_lower_than` * `WarpSync` no longer stops after success * Rename `start_chain_information` * Move runtime calls to main state machine for parallelism * CHANGELOG update * Don't warp sync the same block if there's no point in doing that * Remove `already_tried` system * Move things around * Store the header in scale-encoded form and cache some fields * Docfix * Small doc tweaks * Don't download call proofs if there are fragments to verify * Add TODO * Add TODO * Better reporting on fragment verification * Make sure that headers are above the requested height * Fix the calculation of the warp sync minimum gap * Docfix * Index requests by source to remove a `O(n)` operation * Update the finalized block of the source when receiving a response * Remove TODO * Option to Result<(), Error> * Add TODO * Download the runtime and call proofs only once there's no fragment download * Update warp sync state machine on commit message * Fix `sources_by_finalized_height` state mismatch * Add TODO * Punish sources that send back invalid data * Punish sources, part 2 * Adjust code that handles empty lists of fragments * Add TODO * Move things around * Tweak error types --- lib/src/sync/all.rs | 492 ++--- lib/src/sync/warp_sync.rs | 2281 +++++++++++++-------- lib/src/sync/warp_sync/verifier.rs | 223 -- light-base/src/sync_service/standalone.rs | 81 +- wasm-node/CHANGELOG.md | 5 + 5 files changed, 1594 insertions(+), 1488 deletions(-) delete mode 100644 lib/src/sync/warp_sync/verifier.rs diff --git a/lib/src/sync/all.rs b/lib/src/sync/all.rs index b0ddc20f8f..d3ad62dd91 100644 --- a/lib/src/sync/all.rs +++ b/lib/src/sync/all.rs @@ -33,6 +33,7 @@ use crate::{ chain::{blocks_tree, chain_information}, executor::host, + finality::grandpa, header, sync::{all_forks, optimistic, warp_sync}, trie::Nibble, @@ -49,7 +50,9 @@ use core::{ pub use crate::executor::vm::ExecHint; pub use warp_sync::{ - ConfigCodeTrieNodeHint, FragmentError as WarpSyncFragmentError, WarpSyncFragment, + BuildChainInformationError as WarpSyncBuildChainInformationError, + BuildRuntimeError as WarpSyncBuildRuntimeError, ConfigCodeTrieNodeHint, VerifyFragmentError, + WarpSyncFragment, }; /// Configuration for the [`AllSync`]. @@ -157,8 +160,6 @@ pub enum Status<'a, TSrc> { /// Warp syncing algorithm has reached the head of the finalized chain and is downloading and /// building the chain information. WarpSyncChainInformation { - /// Source from which the chain information is being downloaded. - source: (SourceId, &'a TSrc), /// Hash of the highest block that is proven to be finalized. /// /// This isn't necessarily the same block as returned by @@ -198,9 +199,12 @@ impl AllSync { sources_capacity: config.sources_capacity, requests_capacity: config.sources_capacity, // TODO: ?! add as config? code_trie_node_hint: config.code_trie_node_hint, + num_download_ahead_fragments: 128, // TODO: make configurable? + warp_sync_minimum_gap: 64, // TODO: make configurable? }) { Ok(inner) => AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::InProgress(inner), + inner, + ready_to_transition: None, }, Err(( chain_information, @@ -246,12 +250,7 @@ impl AllSync { pub fn as_chain_information(&self) -> chain_information::ValidChainInformationRef { match &self.inner { AllSyncInner::AllForks(sync) => sync.as_chain_information(), - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::InProgress(sync), - } => sync.as_chain_information(), - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::Finished(sync), - } => (&sync.chain_information).into(), + AllSyncInner::GrandpaWarpSync { inner, .. } => inner.as_chain_information(), AllSyncInner::Optimistic { inner } => inner.as_chain_information(), AllSyncInner::Poisoned => unreachable!(), } @@ -261,9 +260,7 @@ impl AllSync { pub fn status(&self) -> Status { match &self.inner { AllSyncInner::AllForks(_) => Status::Sync, - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::InProgress(sync), - } => match sync.status() { + AllSyncInner::GrandpaWarpSync { inner, .. } => match inner.status() { warp_sync::Status::Fragments { source: None, finalized_block_hash, @@ -283,18 +280,13 @@ impl AllSync { finalized_block_number, }, warp_sync::Status::ChainInformation { - source: (_, user_data), finalized_block_hash, finalized_block_number, } => Status::WarpSyncChainInformation { - source: (user_data.outer_source_id, &user_data.user_data), finalized_block_hash, finalized_block_number, }, }, - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::Finished(_), - } => Status::Sync, AllSyncInner::Optimistic { .. } => Status::Sync, // TODO: right now we don't differentiate between AllForks and Optimistic, as they're kind of similar anyway AllSyncInner::Poisoned => unreachable!(), } @@ -305,12 +297,9 @@ impl AllSync { match &self.inner { AllSyncInner::AllForks(sync) => sync.finalized_block_header(), AllSyncInner::Optimistic { inner } => inner.finalized_block_header(), - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::InProgress(sync), - } => sync.as_chain_information().as_ref().finalized_block_header, - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::Finished(sync), - } => sync.chain_information.as_ref().finalized_block_header, + AllSyncInner::GrandpaWarpSync { inner, .. } => { + inner.as_chain_information().as_ref().finalized_block_header + } AllSyncInner::Poisoned => unreachable!(), } } @@ -430,7 +419,10 @@ impl AllSync { // `inner` is temporarily replaced with `Poisoned`. A new value must be put back before // returning. match mem::replace(&mut self.inner, AllSyncInner::Poisoned) { - AllSyncInner::GrandpaWarpSync { mut inner } => { + AllSyncInner::GrandpaWarpSync { + mut inner, + ready_to_transition, + } => { let outer_source_id_entry = self.shared.sources.vacant_entry(); let outer_source_id = SourceId(outer_source_id_entry.key()); @@ -441,23 +433,13 @@ impl AllSync { best_block_hash, }; - let inner_source_id = match &mut inner { - warp_sync::WarpSync::InProgress(sync) => sync.add_source(source_extra), - warp_sync::WarpSync::Finished(sync) => { - let new_id = sync.sources_ordered.last().map_or( - warp_sync::SourceId::min_value(), - |(id, _, _)| { - id.checked_add(1).unwrap_or_else(|| panic!()) // TODO: don't panic? - }, - ); - sync.sources_ordered.push((new_id, 0, source_extra)); - new_id - } - }; - + let inner_source_id = inner.add_source(source_extra); outer_source_id_entry.insert(SourceMapping::GrandpaWarpSync(inner_source_id)); - self.inner = AllSyncInner::GrandpaWarpSync { inner }; + self.inner = AllSyncInner::GrandpaWarpSync { + inner, + ready_to_transition, + }; outer_source_id } AllSyncInner::AllForks(mut all_forks) => { @@ -579,31 +561,10 @@ impl AllSync { (user_data.user_data, requests) } ( - AllSyncInner::GrandpaWarpSync { inner }, + AllSyncInner::GrandpaWarpSync { inner, .. }, SourceMapping::GrandpaWarpSync(source_id), ) => { - let (user_data, requests) = match inner { - warp_sync::WarpSync::InProgress(inner) => { - let (ud, requests) = inner.remove_source(source_id); - (ud, either::Left(requests)) - } - warp_sync::WarpSync::Finished(inner) => { - let index = inner - .sources_ordered - .binary_search_by_key(&source_id, |(id, _, _)| *id) - .unwrap_or_else(|_| panic!()); - let (_, _, user_data) = inner.sources_ordered.remove(index); - let (requests_of_source, requests_back) = - mem::take(&mut inner.in_progress_requests) - .into_iter() - .partition(|(s, ..)| *s == source_id); - inner.in_progress_requests = requests_back; - let requests_of_source = requests_of_source - .into_iter() - .map(|(_, rq_id, ud, _)| (rq_id, ud)); - (user_data, either::Right(requests_of_source)) - } - }; + let (user_data, requests) = inner.remove_source(source_id); let requests = requests .map(|(_inner_request_id, request_inner_user_data)| { @@ -649,20 +610,9 @@ impl AllSync { /// Returns the list of sources in this state machine. pub fn sources(&'_ self) -> impl Iterator + '_ { match &self.inner { - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::InProgress(sync), - } => { - let iter = sync.sources().map(move |id| sync[id].outer_source_id); - either::Left(either::Left(iter)) - } - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::Finished(sync), - } => { - let iter = sync - .sources_ordered - .iter() - .map(move |(_, _, ud)| ud.outer_source_id); - either::Left(either::Right(iter)) + AllSyncInner::GrandpaWarpSync { inner, .. } => { + let iter = inner.sources().map(move |id| inner[id].outer_source_id); + either::Left(iter) } AllSyncInner::Optimistic { inner: sync } => { let iter = sync.sources().map(move |id| sync[id].outer_source_id); @@ -737,28 +687,10 @@ impl AllSync { let hash = &inner[*src].best_block_hash; (height, hash) } - ( - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::InProgress(sync), - }, - SourceMapping::GrandpaWarpSync(src), - ) => { - let ud = &sync[*src]; + (AllSyncInner::GrandpaWarpSync { inner, .. }, SourceMapping::GrandpaWarpSync(src)) => { + let ud = &inner[*src]; (ud.best_block_number, &ud.best_block_hash) } - ( - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::Finished(sync), - }, - SourceMapping::GrandpaWarpSync(src), - ) => { - let index = sync - .sources_ordered - .binary_search_by_key(src, |(id, _, _)| *id) - .unwrap_or_else(|_| panic!()); - let user_data = &sync.sources_ordered[index].2; - (user_data.best_block_number, &user_data.best_block_hash) - } (AllSyncInner::Poisoned, _) => unreachable!(), // Invalid combinations of syncing state machine and source id. @@ -799,44 +731,17 @@ impl AllSync { // TODO: is this correct? inner.source_best_block(*src) >= height } - ( - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::InProgress(sync), - }, - SourceMapping::GrandpaWarpSync(src), - ) => { + (AllSyncInner::GrandpaWarpSync { inner, .. }, SourceMapping::GrandpaWarpSync(src)) => { assert!( height - > sync + > inner .as_chain_information() .as_ref() .finalized_block_header .number ); - let user_data = &sync[*src]; - user_data.best_block_hash == *hash && user_data.best_block_number == height - } - ( - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::Finished(sync), - }, - SourceMapping::GrandpaWarpSync(src), - ) => { - assert!( - height - > sync - .chain_information - .as_ref() - .finalized_block_header - .number - ); - - let index = sync - .sources_ordered - .binary_search_by_key(src, |(id, _, _)| *id) - .unwrap_or_else(|_| panic!()); - let user_data = &sync.sources_ordered[index].2; + let user_data = &inner[*src]; user_data.best_block_hash == *hash && user_data.best_block_number == height } @@ -868,12 +773,10 @@ impl AllSync { hash: &[u8; 32], ) -> impl Iterator + '_ { match &self.inner { - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::InProgress(sync), - } => { + AllSyncInner::GrandpaWarpSync { inner, .. } => { assert!( height - > sync + > inner .as_chain_information() .as_ref() .finalized_block_header @@ -881,38 +784,15 @@ impl AllSync { ); let hash = *hash; - let iter = sync + let iter = inner .sources() .filter(move |source_id| { - let user_data = &sync[*source_id]; + let user_data = &inner[*source_id]; user_data.best_block_hash == hash && user_data.best_block_number == height }) - .map(move |id| sync[id].outer_source_id); + .map(move |id| inner[id].outer_source_id); - either::Right(either::Left(iter)) - } - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::Finished(sync), - } => { - assert!( - height - > sync - .chain_information - .as_ref() - .finalized_block_header - .number - ); - - let hash = *hash; - let iter = sync - .sources_ordered - .iter() - .filter(move |(_, _, user_data)| { - user_data.best_block_hash == hash && user_data.best_block_number == height - }) - .map(move |(_, _, ud)| ud.outer_source_id); - - either::Right(either::Right(iter)) + either::Right(iter) } AllSyncInner::AllForks(sync) => { let iter = sync @@ -993,11 +873,9 @@ impl AllSync { ) }); - either::Right(either::Left(iter)) + either::Right(iter) } - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::InProgress(inner), - } => { + AllSyncInner::GrandpaWarpSync { inner, .. } => { let iter = inner .desired_requests() .map(move |(_, src_user_data, rq_detail)| { @@ -1036,9 +914,6 @@ impl AllSync { either::Left(either::Left(iter)) } - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::Finished(_), - } => either::Right(either::Right(iter::empty())), AllSyncInner::Poisoned => unreachable!(), } } @@ -1127,9 +1002,7 @@ impl AllSync { return outer_request_id; } ( - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::InProgress(inner), - }, + AllSyncInner::GrandpaWarpSync { inner, .. }, RequestDetail::GrandpaWarpSync { sync_start_block_hash, }, @@ -1157,9 +1030,7 @@ impl AllSync { return outer_request_id; } ( - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::InProgress(inner), - }, + AllSyncInner::GrandpaWarpSync { inner, .. }, RequestDetail::StorageGet { block_hash, keys }, ) => { let inner_source_id = match self.shared.sources.get(source_id.0).unwrap() { @@ -1186,9 +1057,7 @@ impl AllSync { return outer_request_id; } ( - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::InProgress(inner), - }, + AllSyncInner::GrandpaWarpSync { inner, .. }, RequestDetail::RuntimeCallMerkleProof { block_hash, function_name, @@ -1280,17 +1149,20 @@ impl AllSync { pub fn process_one(mut self) -> ProcessOne { match self.inner { AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::InProgress(inner), + inner, + ready_to_transition: None, } => match inner.process_one() { warp_sync::ProcessOne::Idle(inner) => { self.inner = AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::InProgress(inner), + inner, + ready_to_transition: None, }; ProcessOne::AllSync(self) } warp_sync::ProcessOne::VerifyWarpSyncFragment(inner) => { ProcessOne::VerifyWarpSyncFragment(WarpSyncFragmentVerify { inner, + ready_to_transition: None, shared: self.shared, marker: marker::PhantomData, }) @@ -1298,6 +1170,7 @@ impl AllSync { warp_sync::ProcessOne::BuildRuntime(inner) => { ProcessOne::WarpSyncBuildRuntime(WarpSyncBuildRuntime { inner, + ready_to_transition: None, shared: self.shared, marker: marker::PhantomData, }) @@ -1311,7 +1184,8 @@ impl AllSync { } }, AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::Finished(success), + inner, + ready_to_transition: Some(ready_to_transition), } => { let ( new_inner, @@ -1320,7 +1194,9 @@ impl AllSync { finalized_storage_heap_pages, finalized_storage_code_merkle_value, finalized_storage_code_closest_ancestor_excluding, - ) = self.shared.transition_grandpa_warp_sync_all_forks(success); + ) = self + .shared + .transition_grandpa_warp_sync_all_forks(inner, ready_to_transition); self.inner = AllSyncInner::AllForks(new_inner); ProcessOne::WarpSyncFinished { sync: self, @@ -1420,7 +1296,7 @@ impl AllSync { } } ( - AllSyncInner::GrandpaWarpSync { inner }, + AllSyncInner::GrandpaWarpSync { inner, .. }, &SourceMapping::GrandpaWarpSync(source_id), ) => { match header::decode( @@ -1433,16 +1309,7 @@ impl AllSync { // in the user data. It will be useful later when transitioning to another // syncing strategy. if is_best { - let user_data = match inner { - warp_sync::WarpSync::InProgress(sync) => &mut sync[source_id], - warp_sync::WarpSync::Finished(sync) => { - let index = sync - .sources_ordered - .binary_search_by_key(&source_id, |(id, _, _)| *id) - .unwrap_or_else(|_| panic!()); - &mut sync.sources_ordered[index].2 - } - }; + let user_data = &mut inner[source_id]; user_data.best_block_number = header.number; user_data.best_block_hash = header.hash(self.shared.block_number_bytes); } @@ -1484,27 +1351,11 @@ impl AllSync { } (AllSyncInner::Optimistic { .. }, _) => {} // TODO: the optimistic sync could get some help from the finalized block ( - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::InProgress(inner), - }, + AllSyncInner::GrandpaWarpSync { inner, .. }, SourceMapping::GrandpaWarpSync(source_id), ) => { inner.set_source_finality_state(*source_id, finalized_block_height); } - ( - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::Finished(sync), - }, - SourceMapping::GrandpaWarpSync(src), - ) => { - let index = sync - .sources_ordered - .binary_search_by_key(src, |(id, _, _)| *id) - .unwrap_or_else(|_| panic!()); - // TODO: the warp syncing algorithm could maybe be interested in the finalized block height - let n = &mut sync.sources_ordered[index].1; - *n = cmp::max(*n, finalized_block_height); - } // Invalid internal states. (AllSyncInner::AllForks(_), _) => unreachable!(), @@ -1535,11 +1386,26 @@ impl AllSync { } } } + ( + AllSyncInner::GrandpaWarpSync { inner, .. }, + SourceMapping::GrandpaWarpSync(source_id), + ) => { + let block_number = match grandpa::commit::decode::decode_grandpa_commit( + &scale_encoded_message, + inner.block_number_bytes(), + ) { + Ok(msg) => msg.message.target_number, + Err(_) => return GrandpaCommitMessageOutcome::Discarded, + }; + + inner.set_source_finality_state(*source_id, block_number); + GrandpaCommitMessageOutcome::Discarded + } (AllSyncInner::Optimistic { .. }, _) => GrandpaCommitMessageOutcome::Discarded, - (AllSyncInner::GrandpaWarpSync { .. }, _) => GrandpaCommitMessageOutcome::Discarded, // Invalid internal states. (AllSyncInner::AllForks(_), _) => unreachable!(), + (AllSyncInner::GrandpaWarpSync { .. }, _) => unreachable!(), (AllSyncInner::Poisoned, _) => unreachable!(), } } @@ -1724,34 +1590,15 @@ impl AllSync { let request = self.shared.requests.remove(request_id.0); match (&mut self.inner, request) { - ( - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::InProgress(grandpa), - }, - RequestMapping::WarpSync(request_id), - ) => { + (AllSyncInner::GrandpaWarpSync { inner, .. }, RequestMapping::WarpSync(request_id)) => { let user_data = if let Some((fragments, is_finished)) = response { - grandpa.warp_sync_request_success(request_id, fragments, is_finished) + inner.warp_sync_request_success(request_id, fragments, is_finished) } else { - grandpa.fail_request(request_id) + inner.fail_request(request_id) }; (user_data.user_data, ResponseOutcome::Queued) } - ( - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::Finished(sync), - }, - RequestMapping::WarpSync(request_id), - ) => { - let pos = sync - .in_progress_requests - .iter() - .position(|(_, id, ..)| *id == request_id) - .unwrap(); - let (_, _, user_data, _) = sync.in_progress_requests.remove(pos); - (user_data.user_data, ResponseOutcome::Outdated) - } // Only the GrandPa warp syncing ever starts GrandPa warp sync requests. (_, RequestMapping::Inline(_, _, user_data)) => { @@ -1787,48 +1634,34 @@ impl AllSync { ) { ( AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::InProgress(mut sync), + mut inner, + ready_to_transition, }, Ok(response), RequestMapping::WarpSync(request_id), ) => { - let user_data = sync.storage_get_success(request_id, response); + let user_data = inner.storage_get_success(request_id, response); self.inner = AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::InProgress(sync), + inner, + ready_to_transition, }; (user_data.user_data, ResponseOutcome::Queued) } ( AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::InProgress(mut sync), + mut inner, + ready_to_transition, }, Err(_), RequestMapping::WarpSync(request_id), ) => { - let user_data = sync.fail_request(request_id).user_data; + let user_data = inner.fail_request(request_id).user_data; self.inner = AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::InProgress(sync), + inner, + ready_to_transition, }; (user_data, ResponseOutcome::Queued) } - ( - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::Finished(mut sync), - }, - _, - RequestMapping::WarpSync(request_id), - ) => { - let pos = sync - .in_progress_requests - .iter() - .position(|(_, id, ..)| *id == request_id) - .unwrap(); - let (_, _, user_data, _) = sync.in_progress_requests.remove(pos); - self.inner = AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::Finished(sync), - }; - (user_data.user_data, ResponseOutcome::Outdated) - } // Only the GrandPa warp syncing ever starts GrandPa warp sync requests. (other, _, RequestMapping::Inline(_, _, user_data)) => { self.inner = other; @@ -1866,49 +1699,35 @@ impl AllSync { ) { ( AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::InProgress(mut sync), + mut inner, + ready_to_transition, }, Ok(response), RequestMapping::WarpSync(request_id), ) => { - let user_data = sync.runtime_call_merkle_proof_success(request_id, response); + let user_data = inner.runtime_call_merkle_proof_success(request_id, response); self.inner = AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::InProgress(sync), + inner, + ready_to_transition, }; (user_data.user_data, ResponseOutcome::Queued) } ( AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::InProgress(mut sync), + mut inner, + ready_to_transition, }, Err(_), RequestMapping::WarpSync(request_id), ) => { - let user_data = sync.fail_request(request_id); + let user_data = inner.fail_request(request_id); // TODO: notify user of the problem self.inner = AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::InProgress(sync), + inner, + ready_to_transition, }; (user_data.user_data, ResponseOutcome::Queued) } - ( - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::Finished(mut sync), - }, - _, - RequestMapping::WarpSync(request_id), - ) => { - let pos = sync - .in_progress_requests - .iter() - .position(|(_, id, ..)| *id == request_id) - .unwrap(); - let (_, _, user_data, _) = sync.in_progress_requests.remove(pos); - self.inner = AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::Finished(sync), - }; - (user_data.user_data, ResponseOutcome::Outdated) - } // Only the GrandPa warp syncing ever starts call proof requests. (other, _, RequestMapping::Inline(_, _, user_data)) => { self.inner = other; @@ -1933,23 +1752,8 @@ impl ops::Index for AllSync { (AllSyncInner::Optimistic { inner }, SourceMapping::Optimistic(src)) => { &inner[*src].user_data } - ( - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::InProgress(sync), - }, - SourceMapping::GrandpaWarpSync(src), - ) => &sync[*src].user_data, - ( - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::Finished(sync), - }, - SourceMapping::GrandpaWarpSync(src), - ) => { - let index = sync - .sources_ordered - .binary_search_by_key(src, |(id, _, _)| *id) - .unwrap_or_else(|_| panic!()); - &sync.sources_ordered[index].2.user_data + (AllSyncInner::GrandpaWarpSync { inner, .. }, SourceMapping::GrandpaWarpSync(src)) => { + &inner[*src].user_data } (AllSyncInner::Poisoned, _) => unreachable!(), @@ -1980,23 +1784,8 @@ impl ops::IndexMut for AllSync { (AllSyncInner::Optimistic { inner }, SourceMapping::Optimistic(src)) => { &mut inner[*src].user_data } - ( - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::InProgress(sync), - }, - SourceMapping::GrandpaWarpSync(src), - ) => &mut sync[*src].user_data, - ( - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::Finished(sync), - }, - SourceMapping::GrandpaWarpSync(src), - ) => { - let index = sync - .sources_ordered - .binary_search_by_key(src, |(id, _, _)| *id) - .unwrap_or_else(|_| panic!()); - &mut sync.sources_ordered[index].2.user_data + (AllSyncInner::GrandpaWarpSync { inner, .. }, SourceMapping::GrandpaWarpSync(src)) => { + &mut inner[*src].user_data } (AllSyncInner::Poisoned, _) => unreachable!(), @@ -2809,6 +2598,7 @@ pub struct WarpSyncFragmentVerify { GrandpaWarpSyncSourceExtra, GrandpaWarpSyncRequestExtra, >, + ready_to_transition: Option, shared: Shared, marker: marker::PhantomData>, } @@ -2816,29 +2606,38 @@ pub struct WarpSyncFragmentVerify { impl WarpSyncFragmentVerify { /// Returns the identifier and user data of the source that has sent the fragment to be /// verified. - pub fn proof_sender(&self) -> (SourceId, &TSrc) { - let (_, ud) = self.inner.proof_sender(); - (ud.outer_source_id, &ud.user_data) + /// + /// Returns `None` if the source has been removed since the fragments have been downloaded. + pub fn proof_sender(&self) -> Option<(SourceId, &TSrc)> { + let (_, ud) = self.inner.proof_sender()?; + Some((ud.outer_source_id, &ud.user_data)) } /// Perform the verification. /// /// A randomness seed must be provided and will be used during the verification. Note that the /// verification is nonetheless deterministic. + /// + /// On success, returns the block hash and height that have been verified as being part of + /// the chain. pub fn perform( self, randomness_seed: [u8; 32], - ) -> (AllSync, Result<(), WarpSyncFragmentError>) { - let (next_grandpa_warp_sync, error) = self.inner.verify(randomness_seed); + ) -> ( + AllSync, + Result<([u8; 32], u64), VerifyFragmentError>, + ) { + let (next_grandpa_warp_sync, result) = self.inner.verify(randomness_seed); ( AllSync { inner: AllSyncInner::GrandpaWarpSync { - inner: warp_sync::WarpSync::InProgress(next_grandpa_warp_sync), + inner: next_grandpa_warp_sync, + ready_to_transition: self.ready_to_transition, }, shared: self.shared, }, - error.map_or(Ok(()), Result::Err), + result, ) } } @@ -2848,6 +2647,7 @@ impl WarpSyncFragmentVerify { pub struct WarpSyncBuildRuntime { inner: warp_sync::BuildRuntime, GrandpaWarpSyncRequestExtra>, + ready_to_transition: Option, shared: Shared, marker: marker::PhantomData>, } @@ -2858,25 +2658,25 @@ impl WarpSyncBuildRuntime { /// Assuming that the warp syncing goes to completion, the provided parameters are used to /// compile the runtime that will be yielded in /// [`ProcessOne::WarpSyncFinished::finalized_block_runtime`]. - // TODO: better error type pub fn build( self, exec_hint: ExecHint, allow_unresolved_imports: bool, - ) -> (AllSync, Result<(), warp_sync::Error>) { - let (warp_sync_status, error) = self.inner.build(exec_hint, allow_unresolved_imports); + ) -> ( + AllSync, + Result<(), WarpSyncBuildRuntimeError>, + ) { + let (warp_sync_status, outcome) = self.inner.build(exec_hint, allow_unresolved_imports); ( AllSync { inner: AllSyncInner::GrandpaWarpSync { inner: warp_sync_status, + ready_to_transition: self.ready_to_transition, }, shared: self.shared, }, - match error { - Some(err) => Err(err), - None => Ok(()), - }, + outcome, ) } } @@ -2894,20 +2694,28 @@ pub struct WarpSyncBuildChainInformation { impl WarpSyncBuildChainInformation { /// Builds the chain information. - // TODO: better error type - pub fn build(self) -> (AllSync, Result<(), warp_sync::Error>) { - let (warp_sync_status, error) = self.inner.build(); + pub fn build( + self, + ) -> ( + AllSync, + Result<(), WarpSyncBuildChainInformationError>, + ) { + let (warp_sync_status, outcome) = self.inner.build(); + + let (ready_to_transition, outcome) = match outcome { + Ok(info) => (Some(info), Ok(())), + Err(err) => (None, Err(err)), + }; + ( AllSync { inner: AllSyncInner::GrandpaWarpSync { inner: warp_sync_status, + ready_to_transition, }, shared: self.shared, }, - match error { - Some(err) => Err(err), - None => Ok(()), - }, + outcome, ) } } @@ -2916,6 +2724,7 @@ enum AllSyncInner { GrandpaWarpSync { inner: warp_sync::WarpSync, GrandpaWarpSyncRequestExtra>, + ready_to_transition: Option, }, Optimistic { inner: optimistic::OptimisticSync< @@ -2990,10 +2799,11 @@ impl Shared { /// strategy. fn transition_grandpa_warp_sync_all_forks( &mut self, - grandpa: warp_sync::Success< + grandpa: warp_sync::WarpSync< GrandpaWarpSyncSourceExtra, GrandpaWarpSyncRequestExtra, >, + ready_to_transition: warp_sync::RuntimeInformation, ) -> ( all_forks::AllForksSync, AllForksRequestExtra, AllForksSourceExtra>, host::HostVmPrototype, @@ -3002,6 +2812,8 @@ impl Shared { Option>, Option>, ) { + let grandpa = grandpa.deconstruct(); + let mut all_forks = all_forks::AllForksSync::new(all_forks::Config { chain_information: grandpa.chain_information, block_number_bytes: self.block_number_bytes, @@ -3083,7 +2895,9 @@ impl Shared { } }; - all_forks.update_source_finality_state(updated_source_id, finalized_block_height); + if let Some(finalized_block_height) = finalized_block_height { + all_forks.update_source_finality_state(updated_source_id, finalized_block_height); + } self.sources[source.outer_source_id.0] = SourceMapping::AllForks(updated_source_id); } @@ -3099,11 +2913,11 @@ impl Shared { ( all_forks, - grandpa.finalized_runtime, - grandpa.finalized_storage_code, - grandpa.finalized_storage_heap_pages, - grandpa.finalized_storage_code_merkle_value, - grandpa.finalized_storage_code_closest_ancestor_excluding, + ready_to_transition.finalized_runtime, + ready_to_transition.finalized_storage_code, + ready_to_transition.finalized_storage_heap_pages, + ready_to_transition.finalized_storage_code_merkle_value, + ready_to_transition.finalized_storage_code_closest_ancestor_excluding, ) } } diff --git a/lib/src/sync/warp_sync.rs b/lib/src/sync/warp_sync.rs index bf9a0a29ce..fc3d61273c 100644 --- a/lib/src/sync/warp_sync.rs +++ b/lib/src/sync/warp_sync.rs @@ -83,14 +83,16 @@ //! API user. //! //! Similarly, at any given moment, this state machine holds a list of requests that concern these -//! sources. Use [`InProgressWarpSync::desired_requests`] to determine which requests will be -//! useful to the progress of the warp syncing, then use [`InProgressWarpSync::add_request`] to -//! update the state machine with a newly-started request. +//! sources. Use [`WarpSync::desired_requests`] to determine which requests will be useful to the +//! progress of the warp syncing, then use [`WarpSync::add_request`] to update the state machine +//! with a newly-started request. //! -//! Use [`InProgressWarpSync::process_one`] in order to run verifications of the payloads that have +//! Use [`WarpSync::process_one`] in order to run verifications of the payloads that have //! previously been downloaded. //! +// TODO: this module is "vulnerable" to situations where new malicious sources are continuously added with a high finalized block, as the state machine will repeatedly try to download from that source and fail + use crate::{ chain::chain_information::{ self, ChainInformationConsensusRef, ChainInformationFinality, ChainInformationFinalityRef, @@ -101,47 +103,21 @@ use crate::{ host::{self, HostVmPrototype}, vm::ExecHint, }, - header::{self, Header}, + finality::justification, + header, + informant::HashDisplay, trie::{self, proof_decode}, }; use alloc::{ borrow::{Cow, ToOwned as _}, + collections::{BTreeSet, VecDeque}, vec, vec::Vec, }; -use core::{iter, mem, ops}; +use core::{cmp, fmt, iter, mem, ops}; pub use trie::Nibble; -pub use verifier::{Error as FragmentError, WarpSyncFragment}; - -mod verifier; - -/// Problem encountered during a call to [`start_warp_sync()`]. -#[derive(Debug, derive_more::Display)] -pub enum Error { - /// The chain doesn't include any storage item at `:code`. - #[display(fmt = "The chain doesn't include any storage item at `:code`")] - MissingCode, - /// The storage item at `:heappages` is in an incorrect format. - #[display(fmt = "Invalid heap pages value: {_0}")] - InvalidHeapPages(executor::InvalidHeapPagesError), - /// Error building the runtime of the chain. - #[display(fmt = "Error building the runtime: {_0}")] - RuntimeBuild(executor::host::NewErr), - /// Error building the chain information. - #[display(fmt = "Error building the chain information: {_0}")] - ChainInformationBuild(chain_information::build::Error), - /// Failed to verify Merkle proof. - // TODO: this is a non-fatal error contrary to all the other errors in this enum - InvalidMerkleProof(proof_decode::Error), - /// Merkle proof is missing the necessary entries. - // TODO: this is a non-fatal error contrary to all the other errors in this enum - MerkleProofEntriesMissing, - /// Warp sync requires fetching the key that follows another one. This isn't implemented in - /// smoldot. - NextKeyUnimplemented, -} /// The configuration for [`start_warp_sync()`]. #[derive(Debug)] @@ -166,6 +142,24 @@ pub struct Config { /// instead of downloading it. If the hint doesn't match, an extra round-trip will be needed, /// but if the hint matches it saves a big download. pub code_trie_node_hint: Option, + + /// Number of warp sync fragments after which the state machine will pause downloading new + /// ones until the ones that have been downloaded are verified. + /// + /// A too low value will cause stalls, while a high value will use more memory and runs the + /// risk of wasting more bandwidth in case the downloaded fragments need to be thrown away. + pub num_download_ahead_fragments: usize, + + /// If the height of the current local finalized block is `N`, the warp sync state machine + /// will not attempt to warp sync to blocks whose height inferior or equal to `N + k` where + /// `k` is the value in this field. + /// + /// Because warp syncing is a relatively expensive process, it is not worth performing it + /// between two blocks that are too close to each other. + /// + /// The ideal value of this field depends on the block production rate and the time it takes + /// to answer requests. + pub warp_sync_minimum_gap: usize, } /// See [`Config::code_trie_node_hint`]. @@ -186,9 +180,13 @@ pub struct ConfigCodeTrieNodeHint { /// On error, returns the [`ValidChainInformation`] that was provided in the configuration. pub fn start_warp_sync( config: Config, -) -> Result, (ValidChainInformation, WarpSyncInitError)> { +) -> Result, (ValidChainInformation, WarpSyncInitError)> { match config.start_chain_information.as_ref().finality { - ChainInformationFinalityRef::Grandpa { .. } => {} + // TODO: we make sure that `finalized_scheduled_change` is `None` because it seems complicated to support, but ideally it would be supported + ChainInformationFinalityRef::Grandpa { + finalized_scheduled_change: None, + .. + } => {} _ => { return Err(( config.start_chain_information, @@ -207,21 +205,44 @@ pub fn start_warp_sync( } } - Ok(InProgressWarpSync { - phase: Phase::DownloadFragments { - warp_sync_fragments_download: None, - }, - warped_header: config + let warped_header = config + .start_chain_information + .as_ref() + .finalized_block_header + .scale_encoding_vec(config.block_number_bytes); + + Ok(WarpSync { + warped_header_number: config + .start_chain_information + .as_ref() + .finalized_block_header + .number, + warped_header_state_root: *config .start_chain_information .as_ref() .finalized_block_header - .into(), + .state_root, + warped_header_hash: header::hash_from_scale_encoded_header(&warped_header), + warped_header, warped_finality: config.start_chain_information.as_ref().finality.into(), - start_chain_information: config.start_chain_information, + warped_block_ty: WarpedBlockTy::AlreadyVerified, + runtime_calls: runtime_calls_default_value( + config.start_chain_information.as_ref().consensus, + ), + verified_chain_information: config.start_chain_information, code_trie_node_hint: config.code_trie_node_hint, + num_download_ahead_fragments: config.num_download_ahead_fragments, + warp_sync_minimum_gap: config.warp_sync_minimum_gap, block_number_bytes: config.block_number_bytes, sources: slab::Slab::with_capacity(config.sources_capacity), + sources_by_finalized_height: BTreeSet::new(), in_progress_requests: slab::Slab::with_capacity(config.requests_capacity), + in_progress_requests_by_source: BTreeSet::new(), + warp_sync_fragments_download: None, + verify_queue: VecDeque::new(), + runtime_download: RuntimeDownload::NotStarted { + hint_doesnt_match: false, + }, }) } @@ -245,19 +266,29 @@ impl SourceId { pub fn min_value() -> Self { SourceId(usize::min_value()) } - - pub fn checked_add(&self, n: u8) -> Option { - Some(SourceId(self.0.checked_add(usize::from(n))?)) - } } -/// The result of a successful warp sync. -pub struct Success { +/// A [`WarpSync`] that has been deconstructed. +// TODO: consider removing this entirely +pub struct Deconstructed { /// The synced chain information. pub chain_information: ValidChainInformation, + /// The list of sources that were added to the state machine, with their finalized block + /// height and user data. + /// The list is ordered by [`SourceId`]. + // TODO: use a struct? + // TODO: this `Option` is weird + pub sources_ordered: Vec<(SourceId, Option, TSrc)>, + + /// The list of requests that were added to the state machine. + pub in_progress_requests: Vec<(SourceId, RequestId, TRq, RequestDetail)>, +} + +// TODO: consider removing this entirely +pub struct RuntimeInformation { /// The runtime constructed in `VirtualMachineParamsGet`. Corresponds to the runtime of the - /// finalized block of [`Success::chain_information`]. + /// finalized block of [`WarpSync::as_chain_information`]. pub finalized_runtime: HostVmPrototype, /// Storage value at the `:code` key of the finalized block. @@ -271,122 +302,124 @@ pub struct Success { /// Closest ancestor of the `:code` trie node of the finalized block excluding `:code` itself. pub finalized_storage_code_closest_ancestor_excluding: Option>, - - /// The list of sources that were added to the state machine, with their finalized block - /// height and user data. - /// The list is ordered by [`SourceId`]. - // TODO: use a struct? - pub sources_ordered: Vec<(SourceId, u64, TSrc)>, - - /// The list of requests that were added to the state machine. - pub in_progress_requests: Vec<(SourceId, RequestId, TRq, RequestDetail)>, } -/// The warp sync state machine. -#[derive(derive_more::From)] -pub enum WarpSync { - /// Warp syncing is over. - Finished(Success), - /// Warp syncing is in progress, - InProgress(InProgressWarpSync), -} - -impl ops::Index for InProgressWarpSync { - type Output = TSrc; - - #[track_caller] - fn index(&self, source_id: SourceId) -> &TSrc { - debug_assert!(self.sources.contains(source_id.0)); - &self.sources[source_id.0].user_data - } -} +/// Fragment to be verified. +#[derive(Debug)] +pub struct WarpSyncFragment { + /// Header of a block in the chain. + pub scale_encoded_header: Vec, -impl ops::IndexMut for InProgressWarpSync { - #[track_caller] - fn index_mut(&mut self, source_id: SourceId) -> &mut TSrc { - debug_assert!(self.sources.contains(source_id.0)); - &mut self.sources[source_id.0].user_data - } + /// Justification that proves the finality of [`WarpSyncFragment::scale_encoded_header`]. + pub scale_encoded_justification: Vec, } -/// Warp syncing process now obtaining the chain information. -pub struct InProgressWarpSync { - /// See [`Phase`]. - phase: Phase, - /// Finalized block of the chain we warp synced to. Initially identical to the value in - /// [`InProgressWarpSync::start_chain_information`]. - warped_header: Header, +/// Warp syncing process state machine. +pub struct WarpSync { + /// SCALE-encoded header of the finalized block of the chain we warp synced to. Initially + /// identical to the value in [`WarpSync::verified_chain_information`]. + warped_header: Vec, + /// Hash of the block in [`WarpSync::warped_header`]. + warped_header_hash: [u8; 32], + /// State trie root hash of the block in [`WarpSync::warped_header`]. + warped_header_state_root: [u8; 32], + /// Number of the block in [`WarpSync::warped_header`]. + warped_header_number: u64, /// Information about the finality of the chain at the point where we warp synced to. - /// Initially identical to the value in [`InProgressWarpSync::start_chain_information`]. + /// Initially identical to the value in [`WarpSync::verified_chain_information`]. warped_finality: ChainInformationFinality, + /// Information about the block described by [`WarpSync::warped_header`] and + /// [`WarpSync::warped_finality`]. + warped_block_ty: WarpedBlockTy, /// See [`Config::code_trie_node_hint`]. code_trie_node_hint: Option, - /// Starting point of the warp syncing, as provided to [`start_warp_sync`]. - start_chain_information: ValidChainInformation, - /// Number of bytes used to encode the block number in headers. + /// Starting point of the warp syncing as provided to [`start_warp_sync`], or latest chain + /// information that was warp synced to. + verified_chain_information: ValidChainInformation, + /// See [`Config::num_download_ahead_fragments`]. + num_download_ahead_fragments: usize, + /// See [`Config::warp_sync_minimum_gap`]. + warp_sync_minimum_gap: usize, + /// See [`Config::block_number_bytes`]. block_number_bytes: usize, - /// List of requests that have been added using [`InProgressWarpSync::add_source`]. + /// List of requests that have been added using [`WarpSync::add_source`]. sources: slab::Slab>, - /// List of requests that have been added using [`InProgressWarpSync::add_request`]. + /// Subset of the entries as [`WarpSync::sources`] whose [`Source::finalized_block_height`] + /// is `Ok`. Indexed by [`Source::finalized_block_height`]. + sources_by_finalized_height: BTreeSet<(u64, SourceId)>, + /// List of requests that have been added using [`WarpSync::add_request`]. in_progress_requests: slab::Slab<(SourceId, TRq, RequestDetail)>, + /// Identical to [`WarpSync::in_progress_requests`], but indexed differently. + in_progress_requests_by_source: BTreeSet<(SourceId, RequestId)>, + /// Request that is downloading warp sync fragments, if any has been started yet. + warp_sync_fragments_download: Option, + /// Queue of fragments that have been downloaded and need to be verified. + verify_queue: VecDeque, + /// State of the download of the runtime and chain information call proofs. + runtime_download: RuntimeDownload, + /// For each call required by the chain information builder, whether it has been downloaded yet. + runtime_calls: + hashbrown::HashMap, +} + +/// See [`WarpSync::sources`]. +#[derive(Debug, Copy, Clone)] +struct Source { + /// User data chosen by the API user. + user_data: TSrc, + /// Height of the finalized block of the source, as reported by the source. Contains `Err` + /// if the source has sent invalid fragments or proofs in the past. + finalized_block_height: Result, +} + +/// See [`WarpSync::warped_block_ty`]. +enum WarpedBlockTy { + /// Block is equal to the finalized block in [`WarpSync::verified_chain_information`]. + AlreadyVerified, + /// Block is known to not be warp-syncable due to an incompatibility between smoldot and + /// the chain. + KnownBad, + /// Block is expected to be warp syncable. + Normal, } -enum Phase { - /// Downloading warp sync fragments. - DownloadFragments { - /// Request that is downloading warp sync fragments, if any has been started yet. - warp_sync_fragments_download: Option, +/// See [`WarpSync::runtime_download`]. +enum RuntimeDownload { + NotStarted { + hint_doesnt_match: bool, }, - /// Warp sync fragments have been downloaded. Now to verify them. - PendingVerify { - /// Source the fragments have been obtained from - downloaded_source: SourceId, - /// `true` if the source has indicated that there is no more fragment afterwards, in other - /// words that the last fragment corresponds to the current finalized block of the chain. - final_set_of_fragments: bool, - /// Contains the downloaded fragments. - /// Always `Some`, but wrapped within an `Option` in order to permit extracting - /// temporarily. - verifier: Option, + Downloading { + hint_doesnt_match: bool, + request_id: RequestId, }, - /// All warp sync fragments have been verified, and we are now downloading the runtime of the - /// finalized block of the chain. - RuntimeDownload { - /// Request that is downloading the runtime, if any has been started yet. - runtime_download: Option, - /// Source we downloaded the last fragments from. Assuming that the source isn't malicious, - /// it is guaranteed to have access to the storage of the finalized block. - warp_sync_source_id: SourceId, - /// `true` if it is known that [`InProgressWarpSync::code_trie_node_hint`] doesn't match - /// the storage of the header we warp synced to. + NotVerified { + /// Source the runtime has been obtained from. `None` if the source has been removed. + downloaded_source: Option, hint_doesnt_match: bool, - /// Merkle proof containing the runtime information, or `None` if it was not downloaded yet. - downloaded_runtime: Option>, + trie_proof: Vec, }, - /// All warp sync fragments have been verified, we have downloaded the runtime of the finalized - /// block, and we are now downloading and computing the information of the chain. - ChainInformationDownload { - /// Source we downloaded the last fragments from. Assuming that the source isn't malicious, - /// it is guaranteed to have access to the storage of the finalized block. - warp_sync_source_id: SourceId, - /// Merkle proof containing the runtime information. - /// Always `Some`, but wrapped within an `Option` in order to allow extraction. - downloaded_runtime: Option, - /// State machine that builds the chain information. - /// This state machine is built ahead of time but isn't driven until everything has been - /// downloaded. - /// Always `Some`, but wrapped within an `Option` in order to allow extraction. - chain_info_builder: Option, - /// For each call required by the chain information builder, whether it has been - /// downloaded yet. - calls: hashbrown::HashMap< - chain_information::build::RuntimeCall, - CallProof, - fnv::FnvBuildHasher, - >, + Verified { + downloaded_runtime: DownloadedRuntime, + chain_info_builder: chain_information::build::ChainInformationBuild, }, } +/// See [`WarpSync::verify_queue`]. +struct PendingVerify { + /// Source the fragments have been obtained from. `None` if the source has been removed. + downloaded_source: Option, + /// `true` if the source has indicated that there is no more fragment afterwards, in other + /// words that the last fragment corresponds to the current finalized block of the chain. + final_set_of_fragments: bool, + /// List of fragments to verify. Can be empty. + fragments: Vec, + /// Number of fragments at the start of [`PendingVerify::fragments`] that have already been + /// verified. Must always be strictly inferior to `fragments.len()`, unless the list of + /// fragments is empty. + next_fragment_to_verify_index: usize, +} + +/// See [`RuntimeDownload::Verified`]. struct DownloadedRuntime { /// Storage item at the `:code` key. `None` if there is no entry at that key. storage_code: Option>, @@ -398,13 +431,56 @@ struct DownloadedRuntime { closest_ancestor_excluding: Option>, } +/// See [`WarpSync::runtime_calls`]. enum CallProof { NotStarted, Downloading(RequestId), - Downloaded(Vec), + Downloaded { + /// Source the proof has been obtained from. `None` if the source has been removed. + downloaded_source: Option, + proof: Vec, + }, +} + +/// Returns the default value for [`WarpSync::runtime_calls`]. +/// +/// Contains the list of calls that we anticipate the chain information builder will make. This +/// assumes that the runtime is the latest version available. +fn runtime_calls_default_value( + verified_chain_information_consensus: chain_information::ChainInformationConsensusRef, +) -> hashbrown::HashMap { + let mut list = hashbrown::HashMap::with_capacity_and_hasher(8, Default::default()); + match verified_chain_information_consensus { + ChainInformationConsensusRef::Aura { .. } => { + list.insert( + chain_information::build::RuntimeCall::AuraApiAuthorities, + CallProof::NotStarted, + ); + list.insert( + chain_information::build::RuntimeCall::AuraApiSlotDuration, + CallProof::NotStarted, + ); + } + ChainInformationConsensusRef::Babe { .. } => { + list.insert( + chain_information::build::RuntimeCall::BabeApiCurrentEpoch, + CallProof::NotStarted, + ); + list.insert( + chain_information::build::RuntimeCall::BabeApiNextEpoch, + CallProof::NotStarted, + ); + list.insert( + chain_information::build::RuntimeCall::BabeApiConfiguration, + CallProof::NotStarted, + ); + } + ChainInformationConsensusRef::Unknown => {} + } + list } -/// See [`InProgressWarpSync::status`]. +/// See [`WarpSync::status`]. #[derive(Debug)] pub enum Status<'a, TSrc> { /// Warp syncing algorithm is downloading Grandpa warp sync fragments containing a finality @@ -415,7 +491,7 @@ pub enum Status<'a, TSrc> { /// Hash of the highest block that is proven to be finalized. /// /// This isn't necessarily the same block as returned by - /// [`InProgressWarpSync::as_chain_information`], as this function first has to download + /// [`WarpSync::as_chain_information`], as this function first has to download /// extra information compared to just the finalized block. finalized_block_hash: [u8; 32], /// Height of the block indicated by [`Status::ChainInformation::finalized_block_hash`]. @@ -424,12 +500,10 @@ pub enum Status<'a, TSrc> { /// Warp syncing algorithm has reached the head of the finalized chain and is downloading and /// building the chain information. ChainInformation { - /// Source from which the chain information is being downloaded. - source: (SourceId, &'a TSrc), /// Hash of the highest block that is proven to be finalized. /// /// This isn't necessarily the same block as returned by - /// [`InProgressWarpSync::as_chain_information`], as this function first has to download + /// [`WarpSync::as_chain_information`], as this function first has to download /// extra information compared to just the finalized block. finalized_block_hash: [u8; 32], /// Height of the block indicated by [`Status::ChainInformation::finalized_block_hash`]. @@ -437,7 +511,7 @@ pub enum Status<'a, TSrc> { }, } -impl InProgressWarpSync { +impl WarpSync { /// Returns the value that was initially passed in [`Config::block_number_bytes`]. pub fn block_number_bytes(&self) -> usize { self.block_number_bytes @@ -450,55 +524,60 @@ impl InProgressWarpSync { // produce a full chain information struct. Such struct can only be produced after the // entire warp syncing has succeeded. If if it still in progress, all we can return is // the starting point. - (&self.start_chain_information).into() + (&self.verified_chain_information).into() } /// Returns the current status of the warp syncing. pub fn status(&self) -> Status { - match self.phase { - Phase::DownloadFragments { - warp_sync_fragments_download, - } => { - let finalized_block_hash = self.warped_header.hash(self.block_number_bytes); - - let source_id = warp_sync_fragments_download - .as_ref() - .map(|&RequestId(rq_id)| self.in_progress_requests.get(rq_id).unwrap().0); + match &self.runtime_download { + RuntimeDownload::NotStarted { .. } => { + let finalized_block_hash = self.warped_header_hash; + + let source_id = + if let Some(warp_sync_fragments_download) = self.warp_sync_fragments_download { + Some( + self.in_progress_requests + .get(warp_sync_fragments_download.0) + .unwrap() + .0, + ) + } else { + self.verify_queue.back().and_then(|f| f.downloaded_source) + }; Status::Fragments { source: source_id.map(|id| (id, &self.sources[id.0].user_data)), finalized_block_hash, - finalized_block_number: self.warped_header.number, + finalized_block_number: self.warped_header_number, } } - Phase::PendingVerify { - downloaded_source, .. - } => Status::Fragments { - source: Some(( - downloaded_source, - &self.sources[downloaded_source.0].user_data, - )), - finalized_block_hash: self.warped_header.hash(self.block_number_bytes), - finalized_block_number: self.warped_header.number, - }, - Phase::RuntimeDownload { - warp_sync_source_id, - .. - } - | Phase::ChainInformationDownload { - warp_sync_source_id, - .. - } => Status::ChainInformation { - source: ( - warp_sync_source_id, - &self.sources[warp_sync_source_id.0].user_data, - ), - finalized_block_hash: self.warped_header.hash(self.block_number_bytes), - finalized_block_number: self.warped_header.number, + _ => Status::ChainInformation { + finalized_block_hash: self.warped_header_hash, + finalized_block_number: self.warped_header_number, }, } } + pub fn deconstruct(mut self) -> Deconstructed { + Deconstructed { + chain_information: self.verified_chain_information, + sources_ordered: mem::take(&mut self.sources) + .into_iter() + .map(|(id, source)| { + ( + SourceId(id), + source.finalized_block_height.ok(), + source.user_data, + ) + }) + .collect(), + in_progress_requests: mem::take(&mut self.in_progress_requests) + .into_iter() + .map(|(id, (src_id, user_data, detail))| (src_id, RequestId(id), user_data, detail)) + .collect(), + } + } + /// Returns a list of all known sources stored in the state machine. pub fn sources(&'_ self) -> impl Iterator + '_ { self.sources.iter().map(|(id, _)| SourceId(id)) @@ -507,13 +586,18 @@ impl InProgressWarpSync { /// Add a source to the list of sources. /// /// The source has a finalized block height of 0, which should later be updated using - /// [`InProgressWarpSync::set_source_finality_state`]. + /// [`WarpSync::set_source_finality_state`]. pub fn add_source(&mut self, user_data: TSrc) -> SourceId { - SourceId(self.sources.insert(Source { + let source_id = SourceId(self.sources.insert(Source { user_data, - already_tried: false, - finalized_block_height: 0, - })) + finalized_block_height: Ok(0), + })); + + let _inserted = self.sources_by_finalized_height.insert((0, source_id)); + debug_assert!(_inserted); + debug_assert!(self.sources.len() >= self.sources_by_finalized_height.len()); + + source_id } /// Removes a source from the list of sources. In addition to the user data associated to this @@ -529,74 +613,78 @@ impl InProgressWarpSync { to_remove: SourceId, ) -> (TSrc, impl Iterator + '_) { debug_assert!(self.sources.contains(to_remove.0)); - let removed = self.sources.remove(to_remove.0).user_data; + let removed = self.sources.remove(to_remove.0); - if let Phase::RuntimeDownload { - warp_sync_source_id, - .. + if let Ok(finalized_block_height) = removed.finalized_block_height { + let _was_in = self + .sources_by_finalized_height + .remove(&(finalized_block_height, to_remove)); + debug_assert!(_was_in); } - | Phase::ChainInformationDownload { - warp_sync_source_id, - .. - } = &self.phase - { - if to_remove == *warp_sync_source_id { - self.phase = Phase::DownloadFragments { - warp_sync_fragments_download: None, - }; + debug_assert!(self.sources.len() >= self.sources_by_finalized_height.len()); + + // We make sure to not leave invalid source IDs in the state of `self`. + // TODO: O(n) + for item in &mut self.verify_queue { + if item.downloaded_source == Some(to_remove) { + item.downloaded_source = None; } - } else if let Phase::PendingVerify { + } + if let RuntimeDownload::NotVerified { downloaded_source, .. - } = &mut self.phase + } = &mut self.runtime_download { - // We make sure to not leave invalid source IDs in the state of `self`. - // While it is a waste of bandwidth to completely remove a proof that has already - // been downloaded if the source disconnects, it is in practice not something that is - // supposed to happen. - if *downloaded_source == to_remove { - self.phase = Phase::DownloadFragments { - warp_sync_fragments_download: None, - }; + if *downloaded_source == Some(to_remove) { + *downloaded_source = None; + } + } + for (_, call_proof) in &mut self.runtime_calls { + if let CallProof::Downloaded { + downloaded_source, .. + } = call_proof + { + if *downloaded_source == Some(to_remove) { + *downloaded_source = None; + } } } let obsolete_requests_indices = self - .in_progress_requests - .iter() - .filter_map(|(id, (src, _, _))| if *src == to_remove { Some(id) } else { None }) + .in_progress_requests_by_source + .range( + (to_remove, RequestId(usize::min_value())) + ..=(to_remove, RequestId(usize::max_value())), + ) + .map(|(_, rq_id)| rq_id.0) .collect::>(); let mut obsolete_requests = Vec::with_capacity(obsolete_requests_indices.len()); for index in obsolete_requests_indices { let (_, user_data, _) = self.in_progress_requests.remove(index); - match &mut self.phase { - Phase::DownloadFragments { - warp_sync_fragments_download, - } => { - if *warp_sync_fragments_download == Some(RequestId(index)) { - *warp_sync_fragments_download = None; - } - } - Phase::RuntimeDownload { - runtime_download, .. - } => { - if *runtime_download == Some(RequestId(index)) { - *runtime_download = None; - } + self.in_progress_requests_by_source + .remove(&(to_remove, RequestId(index))); + if self.warp_sync_fragments_download == Some(RequestId(index)) { + self.warp_sync_fragments_download = None; + } + for call in self.runtime_calls.values_mut() { + if matches!(call, CallProof::Downloading(rq_id) if *rq_id == RequestId(index)) { + *call = CallProof::NotStarted; } - Phase::ChainInformationDownload { calls, .. } => { - for call in calls.values_mut() { - if matches!(call, CallProof::Downloading(rq_id) if *rq_id == RequestId(index)) - { - *call = CallProof::NotStarted; - } - } + } + if let RuntimeDownload::Downloading { + request_id, + hint_doesnt_match, + } = &mut self.runtime_download + { + if *request_id == RequestId(index) { + self.runtime_download = RuntimeDownload::NotStarted { + hint_doesnt_match: *hint_doesnt_match, + }; } - _ => {} } obsolete_requests.push((RequestId(index), user_data)); } - (removed, obsolete_requests.into_iter()) + (removed.user_data, obsolete_requests.into_iter()) } /// Sets the finalized block height of the given source. @@ -606,56 +694,123 @@ impl InProgressWarpSync { /// Panics if `source_id` is invalid. /// pub fn set_source_finality_state(&mut self, source_id: SourceId, finalized_block_height: u64) { - self.sources[source_id.0].finalized_block_height = finalized_block_height; + if let Ok(stored_height) = self.sources[source_id.0].finalized_block_height.as_mut() { + // Small optimization. No need to do anything more if the block doesn't actuall change. + if *stored_height == finalized_block_height { + return; + } + + // Note that if the new finalized block is below the former one (which is not something + // that is ever supposed to happen), we should in principle cancel the requests + // targeting that source that require a specific block height. In practice, however, + // we don't care as again this isn't supposed to ever happen. While ongoing requests + // might fail as a result, this is handled the same way as a regular request failure. + + let _was_in = self + .sources_by_finalized_height + .remove(&(*stored_height, source_id)); + debug_assert!(_was_in); + let _inserted = self + .sources_by_finalized_height + .insert((finalized_block_height, source_id)); + debug_assert!(_inserted); + + *stored_height = finalized_block_height; + } } /// Returns a list of requests that should be started in order to drive the warp syncing /// process to completion. /// /// Once a request that matches a desired request is added through - /// [`InProgressWarpSync::add_request`], it is no longer returned by this function. + /// [`WarpSync::add_request`], it is no longer returned by this function. pub fn desired_requests( &'_ self, ) -> impl Iterator + '_ { // If we are in the fragments download phase, return a fragments download request. - let warp_sync_request = if let Phase::DownloadFragments { - warp_sync_fragments_download: None, - } = &self.phase - { - // TODO: it feels like a hack to try again sources that have failed in the past; also, this means that the already_tried mechanism only works once - let all_sources_already_tried = self.sources.iter().all(|(_, s)| s.already_tried); - - let start_block_hash = self.warped_header.hash(self.block_number_bytes); - - // Combine the request with every single available source. - either::Left(self.sources.iter().filter_map(move |(src_id, src)| { - // TODO: also filter by source finalized block? so that we don't request from sources below us - if all_sources_already_tried || !src.already_tried { - Some(( - SourceId(src_id), - &src.user_data, - DesiredRequest::WarpSyncRequest { - block_hash: start_block_hash, - }, - )) + let mut desired_warp_sync_request = if self.warp_sync_fragments_download.is_none() { + if self.verify_queue.iter().fold(0, |sum, entry| { + sum + entry.fragments.len() - entry.next_fragment_to_verify_index + }) < self.num_download_ahead_fragments + { + // Block hash to request. + let start_block_hash = self + .verify_queue + .back() + .and_then(|entry| entry.fragments.last()) + .map(|fragment| { + header::hash_from_scale_encoded_header(&fragment.scale_encoded_header) + }) + .unwrap_or(self.warped_header_hash); + + // Calculate the block number at the tail of the verify queue. + // Contains `None` if the verify queue has a problem such as an indecodable header. + // In that situation, we don't start any new request and wait for the verify + // queue to empty itself. + let verify_queue_tail_block_number = self + .verify_queue + .back() + .map(|entry| { + entry + .fragments + .last() + .and_then(|fragment| { + header::decode( + &fragment.scale_encoded_header, + self.block_number_bytes, + ) + .ok() + }) + .map(|header| header.number) + }) + .unwrap_or(Some(self.warped_header_number)); + let warp_sync_minimum_gap = self.warp_sync_minimum_gap; + + if let Some(verify_queue_tail_block_number) = verify_queue_tail_block_number { + // Combine the request with every single available source. + either::Left(self.sources.iter().filter_map(move |(src_id, src)| { + if src.finalized_block_height.map_or(true, |h| { + h <= verify_queue_tail_block_number.saturating_add( + u64::try_from(warp_sync_minimum_gap).unwrap_or(u64::max_value()), + ) + }) { + return None; + } + + Some(( + SourceId(src_id), + &src.user_data, + DesiredRequest::WarpSyncRequest { + block_hash: start_block_hash, + }, + )) + })) } else { - None + either::Right(iter::empty()) } - })) + } else { + either::Right(iter::empty()) + } } else { either::Right(iter::empty()) - }; + } + .peekable(); // If we are in the appropriate phase, and we are not currently downloading the runtime, // return a runtime download request. - let runtime_parameters_get = if let Phase::RuntimeDownload { - runtime_download: None, - downloaded_runtime: None, - warp_sync_source_id, - hint_doesnt_match, - .. - } = &self.phase - { + let desired_runtime_parameters_get = if let ( + WarpedBlockTy::Normal, + RuntimeDownload::NotStarted { hint_doesnt_match }, + None, + true, + None, + ) = ( + &self.warped_block_ty, + &self.runtime_download, + self.warp_sync_fragments_download, + self.verify_queue.is_empty(), + desired_warp_sync_request.peek(), + ) { let code_key_to_request = if let (false, Some(hint)) = (*hint_doesnt_match, self.code_trie_node_hint.as_ref()) { @@ -669,41 +824,59 @@ impl InProgressWarpSync { Cow::Borrowed(&b":code"[..]) }; - Some(( - *warp_sync_source_id, - &self.sources[warp_sync_source_id.0].user_data, - DesiredRequest::StorageGetMerkleProof { - block_hash: self.warped_header.hash(self.block_number_bytes), - state_trie_root: self.warped_header.state_root, - keys: vec![code_key_to_request.to_vec(), b":heappages".to_vec()], - }, - )) + // Sources are ordered by increasing finalized block height, in order to + // have the highest chance for the block to not be pruned. + let sources_with_block = self + .sources_by_finalized_height + .range((self.warped_header_number, SourceId(usize::min_value()))..) + .map(|(_, src_id)| src_id); + + either::Left(sources_with_block.map(move |source_id| { + ( + *source_id, + &self.sources[source_id.0].user_data, + DesiredRequest::StorageGetMerkleProof { + block_hash: self.warped_header_hash, + state_trie_root: self.warped_header_state_root, + keys: vec![code_key_to_request.to_vec(), b":heappages".to_vec()], + }, + ) + })) } else { - None + either::Right(iter::empty()) }; - // If we are in the appropriate phase, return the list of runtime calls indicated by the - // chain information builder state machine. - let call_proofs = if let Phase::ChainInformationDownload { - warp_sync_source_id, - calls, - .. - } = &self.phase + // Return the list of runtime calls indicated by the chain information builder state + // machine. + let desired_call_proofs = if matches!(self.warped_block_ty, WarpedBlockTy::Normal) + && self.warp_sync_fragments_download.is_none() + && self.verify_queue.is_empty() + && desired_warp_sync_request.peek().is_none() { either::Left( - calls + self.runtime_calls .iter() .filter(|(_, v)| matches!(v, CallProof::NotStarted)) - .map(|(call, _)| { - ( - *warp_sync_source_id, - &self.sources[warp_sync_source_id.0].user_data, - DesiredRequest::RuntimeCallMerkleProof { - block_hash: self.warped_header.hash(self.block_number_bytes), - function_name: call.function_name().into(), - parameter_vectored: Cow::Owned(call.parameter_vectored_vec()), - }, - ) + .map(|(call, _)| DesiredRequest::RuntimeCallMerkleProof { + block_hash: self.warped_header_hash, + function_name: call.function_name().into(), + parameter_vectored: Cow::Owned(call.parameter_vectored_vec()), + }) + .flat_map(move |request_detail| { + // Sources are ordered by increasing finalized block height, in order to + // have the highest chance for the block to not be pruned. + let sources_with_block = self + .sources_by_finalized_height + .range((self.warped_header_number, SourceId(usize::min_value()))..) + .map(|(_, src_id)| src_id); + + sources_with_block.map(move |source_id| { + ( + *source_id, + &self.sources[source_id.0].user_data, + request_detail.clone(), + ) + }) }), ) } else { @@ -711,15 +884,15 @@ impl InProgressWarpSync { }; // Chain all these demanded requests together. - warp_sync_request - .chain(runtime_parameters_get.into_iter()) - .chain(call_proofs) + desired_warp_sync_request + .chain(desired_runtime_parameters_get) + .chain(desired_call_proofs) } /// Inserts a new request in the data structure. /// /// > **Note**: The request doesn't necessarily have to match a request returned by - /// > [`InProgressWarpSync::desired_requests`]. + /// > [`WarpSync::desired_requests`]. /// /// # Panic /// @@ -736,24 +909,26 @@ impl InProgressWarpSync { let request_slot = self.in_progress_requests.vacant_entry(); let request_id = RequestId(request_slot.key()); - match (&detail, &mut self.phase) { - ( - RequestDetail::WarpSyncRequest { block_hash }, - Phase::DownloadFragments { - warp_sync_fragments_download: warp_sync_fragments_download @ None, - }, - ) if *block_hash == self.warped_header.hash(self.block_number_bytes) => { - *warp_sync_fragments_download = Some(request_id); + match (&detail, &mut self.runtime_download) { + (RequestDetail::WarpSyncRequest { block_hash }, _) + if self.warp_sync_fragments_download.is_none() + && *block_hash + == self + .verify_queue + .back() + .and_then(|entry| entry.fragments.last()) + .map(|fragment| { + header::hash_from_scale_encoded_header( + &fragment.scale_encoded_header, + ) + }) + .unwrap_or(self.warped_header_hash) => + { + self.warp_sync_fragments_download = Some(request_id); } ( RequestDetail::StorageGetMerkleProof { block_hash, keys }, - Phase::RuntimeDownload { - downloaded_runtime: None, - runtime_download: runtime_download @ None, - hint_doesnt_match, - warp_sync_source_id, - .. - }, + RuntimeDownload::NotStarted { hint_doesnt_match }, ) => { let code_key_to_request = if let (false, Some(hint)) = (*hint_doesnt_match, self.code_trie_node_hint.as_ref()) @@ -768,12 +943,17 @@ impl InProgressWarpSync { Cow::Borrowed(&b":code"[..]) }; - if source_id == *warp_sync_source_id - && *block_hash == self.warped_header.hash(self.block_number_bytes) + if self.sources[source_id.0] + .finalized_block_height + .map_or(false, |h| h >= self.warped_header_number) + && *block_hash == self.warped_header_hash && keys.iter().any(|k| *k == *code_key_to_request) && keys.iter().any(|k| k == b":heappages") { - *runtime_download = Some(request_id); + self.runtime_download = RuntimeDownload::Downloading { + hint_doesnt_match: *hint_doesnt_match, + request_id, + }; } } ( @@ -782,11 +962,14 @@ impl InProgressWarpSync { function_name, parameter_vectored, }, - Phase::ChainInformationDownload { calls, .. }, + _, ) => { - for (info, status) in calls { + for (info, status) in &mut self.runtime_calls { if matches!(status, CallProof::NotStarted) - && *block_hash == self.warped_header.hash(self.block_number_bytes) + && self.sources[source_id.0] + .finalized_block_height + .map_or(false, |h| h >= self.warped_header_number) + && *block_hash == self.warped_header_hash && function_name == info.function_name() && parameters_equal(parameter_vectored, info.parameter_vectored()) { @@ -799,78 +982,51 @@ impl InProgressWarpSync { } request_slot.insert((source_id, user_data, detail)); + let _was_inserted = self + .in_progress_requests_by_source + .insert((source_id, request_id)); + debug_assert!(_was_inserted); request_id } /// Removes the given request from the state machine. Returns the user data that was associated /// to it. /// + /// > **Note**: The state machine might want to re-start the same request again. It is out of + /// > the scope of this module to keep track of requests that don't succeed. + /// /// # Panic /// /// Panics if the [`RequestId`] is invalid. /// + // TODO: rename to `cancel_request` to convey the meaning that nothing negative will happen to the source pub fn fail_request(&mut self, id: RequestId) -> TRq { - match &mut self.phase { - Phase::DownloadFragments { - warp_sync_fragments_download, - } => { - if *warp_sync_fragments_download == Some(id) { - *warp_sync_fragments_download = None; - } - } - Phase::RuntimeDownload { - runtime_download, .. - } => { - if *runtime_download == Some(id) { - *runtime_download = None; - } - } - Phase::ChainInformationDownload { calls, .. } => { - for call in calls.values_mut() { - if matches!(call, CallProof::Downloading(rq_id) if *rq_id == id) { - *call = CallProof::NotStarted; - } - } - } - _ => {} + if self.warp_sync_fragments_download == Some(id) { + self.warp_sync_fragments_download = None; } - match (self.in_progress_requests.remove(id.0), &mut self.phase) { - ((source_id, user_data, RequestDetail::WarpSyncRequest { .. }), _) => { - // TODO: check that block hash matches starting point? ^ - self.sources[source_id.0].already_tried = true; - user_data + for call in self.runtime_calls.values_mut() { + if matches!(call, CallProof::Downloading(rq_id) if *rq_id == id) { + *call = CallProof::NotStarted; } - ( - ( - source_id, - user_data, - RequestDetail::RuntimeCallMerkleProof { .. } - | RequestDetail::StorageGetMerkleProof { .. }, - ), - Phase::RuntimeDownload { - warp_sync_source_id, - .. - }, - ) if source_id == *warp_sync_source_id => { - // If the source has failed a request, we jump back to downloading fragments - // in order to try a different source. - self.sources[source_id.0].already_tried = true; - self.phase = Phase::DownloadFragments { - warp_sync_fragments_download: None, - }; - user_data + } + + if let RuntimeDownload::Downloading { + request_id, + hint_doesnt_match, + } = &mut self.runtime_download + { + if *request_id == id { + self.runtime_download = RuntimeDownload::NotStarted { + hint_doesnt_match: *hint_doesnt_match, + } } - ( - ( - _, - user_data, - RequestDetail::RuntimeCallMerkleProof { .. } - | RequestDetail::StorageGetMerkleProof { .. }, - ), - _, - ) => user_data, } + + let (source_id, user_data, _) = self.in_progress_requests.remove(id.0); + let _was_removed = self.in_progress_requests_by_source.remove(&(source_id, id)); + debug_assert!(_was_removed); + user_data } /// Injects a successful Merkle proof and removes the given request from the state machine. @@ -885,14 +1041,22 @@ impl InProgressWarpSync { // Remove the request from the list, obtaining its user data. // If the request corresponds to the runtime parameters we're looking for, the function // continues below, otherwise we return early. - let user_data = match (self.in_progress_requests.remove(id.0), &self.phase) { + let (source_id, hint_doesnt_match, user_data) = match ( + self.in_progress_requests.remove(id.0), + &self.runtime_download, + ) { ( - (_, user_data, _), - Phase::RuntimeDownload { - runtime_download, .. + (source_id, user_data, _), + RuntimeDownload::Downloading { + request_id, + hint_doesnt_match, }, - ) if *runtime_download == Some(id) => user_data, - ((_, user_data, RequestDetail::StorageGetMerkleProof { .. }), _) => return user_data, + ) if *request_id == id => (source_id, *hint_doesnt_match, user_data), + ((source_id, user_data, RequestDetail::StorageGetMerkleProof { .. }), _) => { + let _was_removed = self.in_progress_requests_by_source.remove(&(source_id, id)); + debug_assert!(_was_removed); + return user_data; + } ( ( _, @@ -904,15 +1068,14 @@ impl InProgressWarpSync { ) => panic!(), }; - if let Phase::RuntimeDownload { - downloaded_runtime, .. - } = &mut self.phase - { - *downloaded_runtime = Some(merkle_proof); - } else { - // This is checked at the beginning of this function. - unreachable!() - } + self.runtime_download = RuntimeDownload::NotVerified { + downloaded_source: Some(source_id), + hint_doesnt_match, + trie_proof: merkle_proof, + }; + + let _was_removed = self.in_progress_requests_by_source.remove(&(source_id, id)); + debug_assert!(_was_removed); user_data } @@ -930,36 +1093,37 @@ impl InProgressWarpSync { request_id: RequestId, response: Vec, ) -> TRq { - match ( - self.in_progress_requests.remove(request_id.0), - &mut self.phase, - ) { - ((_, user_data, _), Phase::ChainInformationDownload { ref mut calls, .. }) => { - for call in calls.values_mut() { - if matches!(call, CallProof::Downloading(rq_id) if *rq_id == request_id) { - *call = CallProof::Downloaded(response); - break; - } - } + let (source_id, user_data, RequestDetail::RuntimeCallMerkleProof { .. }) = + self.in_progress_requests.remove(request_id.0) + else { + // Wrong request type. + panic!() + }; - user_data + for call in self.runtime_calls.values_mut() { + if matches!(call, CallProof::Downloading(rq_id) if *rq_id == request_id) { + *call = CallProof::Downloaded { + downloaded_source: Some(source_id), + proof: response, + }; + break; } + } - // Uninteresting request. - ((_, user_data, RequestDetail::RuntimeCallMerkleProof { .. }), _) => user_data, + let _was_removed = self + .in_progress_requests_by_source + .remove(&(source_id, request_id)); + debug_assert!(_was_removed); - // Wrong request type. - ( - (_, _, RequestDetail::StorageGetMerkleProof { .. }) - | (_, _, RequestDetail::WarpSyncRequest { .. }), - _, - ) => panic!(), - } + user_data } /// Injects a successful response and removes the given request from the state machine. Returns /// the user data that was associated to it. /// + /// If the header of the last fragment of the response is decodable, this function updates + /// the finalized block of the source. + /// /// # Panic /// /// Panics if the [`RequestId`] is invalid. @@ -972,66 +1136,94 @@ impl InProgressWarpSync { fragments: Vec, final_set_of_fragments: bool, ) -> TRq { - match ( - self.in_progress_requests.remove(request_id.0), - &mut self.phase, - ) { - ( - (rq_source_id, user_data, _), - Phase::DownloadFragments { - warp_sync_fragments_download, - }, - ) if *warp_sync_fragments_download == Some(request_id) => { - // TODO: why this? - self.sources[rq_source_id.0].already_tried = true; - - let verifier = verifier::Verifier::new( - (&self.warped_finality).into(), - self.block_number_bytes, - fragments, - final_set_of_fragments, - ); + let (rq_source_id, user_data) = match self.in_progress_requests.remove(request_id.0) { + (rq_source_id, user_data, RequestDetail::WarpSyncRequest { .. }) => { + (rq_source_id, user_data) + } + (_, _, _) => panic!(), + }; + + debug_assert!(self.sources.contains(rq_source_id.0)); - self.phase = Phase::PendingVerify { - final_set_of_fragments, - downloaded_source: rq_source_id, - verifier: Some(verifier), + // Since we send requests only to sources with an appropriate finalized block, we make + // sure that the finalized block of the source that sent the response matches the + // fragments that it sent. + // If we didn't do that, it would be possible for example to warp sync to block 200 while + // believing that the source is only at block 199, and thus the warp syncing would stall. + if let Some(last_header) = fragments + .last() + .and_then(|h| header::decode(&h.scale_encoded_header, self.block_number_bytes).ok()) + { + if let Ok(src_finalized_height) = + self.sources[rq_source_id.0].finalized_block_height.as_mut() + { + let new_height = if final_set_of_fragments { + // If the source indicated that this is the last fragment, then we know that + // it's also equal to their finalized block. + last_header.number + } else { + // If this is not the last fragment, we know that the finalized block of the + // source is *at least* the one provided. + // TODO: could maybe do + gap or something? + cmp::max(*src_finalized_height, last_header.number.saturating_add(1)) }; - user_data - } - ((_, user_data, RequestDetail::WarpSyncRequest { .. }), _) => { - // Uninteresting download. We simply ignore the response. - user_data + if *src_finalized_height != new_height { + let _was_in = self + .sources_by_finalized_height + .remove(&(*src_finalized_height, rq_source_id)); + debug_assert!(_was_in); + + *src_finalized_height = new_height; + + let _inserted = self + .sources_by_finalized_height + .insert((*src_finalized_height, rq_source_id)); + debug_assert!(_inserted); + } } - ((_, _, _), _) => panic!(), } + + if self.warp_sync_fragments_download == Some(request_id) { + self.warp_sync_fragments_download = None; + + self.verify_queue.push_back(PendingVerify { + final_set_of_fragments, + downloaded_source: Some(rq_source_id), + fragments, + next_fragment_to_verify_index: 0, + }); + } + + let _was_removed = self + .in_progress_requests_by_source + .remove(&(rq_source_id, request_id)); + debug_assert!(_was_removed); + + user_data } /// Start processing one CPU operation. /// /// This function takes ownership of `self` and yields it back after the operation is finished. + // TODO: take a `&mut self` instead of `self` ; requires many changes in all.rs pub fn process_one(self) -> ProcessOne { - if let Phase::ChainInformationDownload { calls, .. } = &self.phase { - // If we've downloaded everything that was needed, switch to "build chain information" - // mode. - if calls + // If we've downloaded everything that was needed, switch to "build chain information" + // mode. + if matches!(self.runtime_download, RuntimeDownload::Verified { .. }) + && self + .runtime_calls .values() - .all(|c| matches!(c, CallProof::Downloaded(_))) - { - return ProcessOne::BuildChainInformation(BuildChainInformation { inner: self }); - } + .all(|c| matches!(c, CallProof::Downloaded { .. })) + { + return ProcessOne::BuildChainInformation(BuildChainInformation { inner: self }); } - if let Phase::RuntimeDownload { - downloaded_runtime: Some(_), - .. - } = &self.phase - { + if let RuntimeDownload::NotVerified { .. } = &self.runtime_download { return ProcessOne::BuildRuntime(BuildRuntime { inner: self }); } - if let Phase::PendingVerify { .. } = &self.phase { + if !self.verify_queue.is_empty() { return ProcessOne::VerifyWarpSyncFragment(VerifyWarpSyncFragment { inner: self }); } @@ -1039,18 +1231,45 @@ impl InProgressWarpSync { } } -#[derive(Debug, Copy, Clone)] -struct Source { - user_data: TSrc, - /// `true` if this source has been in a past warp sync request and we should try a different - /// source. - already_tried: bool, - finalized_block_height: u64, -} +impl ops::Index for WarpSync { + type Output = TSrc; + + #[track_caller] + fn index(&self, source_id: SourceId) -> &TSrc { + debug_assert!(self.sources.contains(source_id.0)); + &self.sources[source_id.0].user_data + } +} + +impl ops::IndexMut for WarpSync { + #[track_caller] + fn index_mut(&mut self, source_id: SourceId) -> &mut TSrc { + debug_assert!(self.sources.contains(source_id.0)); + &mut self.sources[source_id.0].user_data + } +} + +impl ops::Index for WarpSync { + type Output = TRq; + + #[track_caller] + fn index(&self, request_id: RequestId) -> &TRq { + debug_assert!(self.in_progress_requests.contains(request_id.0)); + &self.in_progress_requests[request_id.0].1 + } +} + +impl ops::IndexMut for WarpSync { + #[track_caller] + fn index_mut(&mut self, request_id: RequestId) -> &mut TRq { + debug_assert!(self.in_progress_requests.contains(request_id.0)); + &mut self.in_progress_requests[request_id.0].1 + } +} /// Information about a request that the warp sync state machine would like to start. /// -/// See [`InProgressWarpSync::desired_requests`]. +/// See [`WarpSync::desired_requests`]. #[derive(Debug, Clone)] pub enum DesiredRequest { /// A warp sync request should be start. @@ -1082,7 +1301,7 @@ pub enum DesiredRequest { /// Information about a request to add to the state machine. /// -/// See [`InProgressWarpSync::add_request`]. +/// See [`WarpSync::add_request`]. #[derive(Debug, Clone)] pub enum RequestDetail { /// See [`DesiredRequest::WarpSyncRequest`]. @@ -1112,11 +1331,14 @@ pub enum RequestDetail { #[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)] pub struct RequestId(usize); -/// Return value of [`InProgressWarpSync::process_one`]. +/// Return value of [`WarpSync::process_one`]. pub enum ProcessOne { /// Nothing to verify at the moment. The state machine is yielded back. - Idle(InProgressWarpSync), + Idle(WarpSync), /// Ready to verify a warp sync fragment. + /// + /// > **Note**: In case where a source has sent an empty list of fragment, which is invalid, + /// > this variant will "verify" the list and produce an error. VerifyWarpSyncFragment(VerifyWarpSyncFragment), /// Ready to build the runtime of the chain.. BuildRuntime(BuildRuntime), @@ -1125,518 +1347,781 @@ pub enum ProcessOne { } /// Ready to verify a warp sync fragment. +/// +/// > **Note**: In case where a source has sent an empty list of fragment, which is invalid, +/// > this variant will "verify" the list and produce an error. pub struct VerifyWarpSyncFragment { - inner: InProgressWarpSync, + inner: WarpSync, } impl VerifyWarpSyncFragment { /// Returns the source that has sent the fragments that we are about to verify, and its user /// data. - pub fn proof_sender(&self) -> (SourceId, &TSrc) { - if let Phase::PendingVerify { - downloaded_source, .. - } = &self.inner.phase - { - ( - *downloaded_source, - &self.inner.sources[downloaded_source.0].user_data, - ) - } else { - unreachable!() - } + /// + /// Returns `None` if the source has been removed since the fragments have been downloaded. + pub fn proof_sender(&self) -> Option<(SourceId, &TSrc)> { + let entry_to_verify = self.inner.verify_queue.front().unwrap(); + let source_id = entry_to_verify.downloaded_source?; + Some((source_id, &self.inner.sources[source_id.0].user_data)) } /// Verify one warp sync fragment. /// /// Must be passed a randomly-generated value that is used by the verification process. Note /// that the verification is still deterministic. - // TODO: does this API make sense? refactor or explain what this error is + /// + /// On success, returns the block hash and height that have been verified as being part of + /// the chain. + /// On error, returns why the verification has failed. The warp syncing process still + /// continues. pub fn verify( mut self, randomness_seed: [u8; 32], - ) -> (InProgressWarpSync, Option) { - if let Phase::PendingVerify { - verifier, - final_set_of_fragments, - downloaded_source, - .. - } = &mut self.inner.phase - { - match verifier.take().unwrap().next(randomness_seed) { - Ok(verifier::Next::NotFinished(next_verifier)) => { - *verifier = Some(next_verifier); + ) -> ( + WarpSync, + Result<([u8; 32], u64), VerifyFragmentError>, + ) { + // A `VerifyWarpSyncFragment` is only ever created if `verify_queue` is non-empty. + debug_assert!(!self.inner.verify_queue.is_empty()); + let fragments_to_verify = self + .inner + .verify_queue + .front_mut() + .unwrap_or_else(|| unreachable!()); + + // The source has sent an empty list of fragments. This is invalid. + if fragments_to_verify.fragments.is_empty() { + if let Some(SourceId(downloaded_source)) = fragments_to_verify.downloaded_source { + self.inner.sources[downloaded_source].finalized_block_height = Err(()); + } + self.inner.verify_queue.pop_front().unwrap(); + return (self.inner, Err(VerifyFragmentError::EmptyProof)); + } + + // Given that the list of fragments is non-empty, we are assuming that there are still + // fragments to verify, otherwise this entry should have been removed in a previous + // iteration. + let fragment_to_verify = fragments_to_verify + .fragments + .get(fragments_to_verify.next_fragment_to_verify_index) + .unwrap_or_else(|| unreachable!()); + + // It has been checked at the warp sync initialization that the finality algorithm is + // indeed Grandpa. + let chain_information::ChainInformationFinality::Grandpa { + after_finalized_block_authorities_set_id, + finalized_triggered_authorities, + .. // TODO: support finalized_scheduled_change? difficult to implement + } = &mut self.inner.warped_finality + else { + unreachable!() + }; + + // Decode the header and justification of the fragment. + let fragment_header_hash = + header::hash_from_scale_encoded_header(&fragment_to_verify.scale_encoded_header); + let fragment_decoded_header = match header::decode( + &fragment_to_verify.scale_encoded_header, + self.inner.block_number_bytes, + ) { + Ok(j) => j, + Err(err) => { + if let Some(SourceId(source_id)) = fragments_to_verify.downloaded_source { + self.inner.sources[source_id].finalized_block_height = Err(()); } - Ok(verifier::Next::EmptyProof) => { - self.inner.warped_header = self - .inner - .start_chain_information - .as_ref() - .finalized_block_header - .into(); - self.inner.warped_finality = - self.inner.start_chain_information.as_ref().finality.into(); - self.inner.phase = Phase::RuntimeDownload { - runtime_download: None, - warp_sync_source_id: *downloaded_source, - downloaded_runtime: None, - hint_doesnt_match: false, - }; + self.inner.verify_queue.clear(); + self.inner.warp_sync_fragments_download = None; + return (self.inner, Err(VerifyFragmentError::InvalidHeader(err))); + } + }; + let fragment_decoded_justification = match justification::decode::decode_grandpa( + &fragment_to_verify.scale_encoded_justification, + self.inner.block_number_bytes, + ) { + Ok(j) => j, + Err(err) => { + if let Some(SourceId(source_id)) = fragments_to_verify.downloaded_source { + self.inner.sources[source_id].finalized_block_height = Err(()); } - Ok(verifier::Next::Success { - scale_encoded_header, - chain_information_finality, - }) => { - // As the verification of the fragment has succeeded, we are sure that the header - // is valid and can decode it. - self.inner.warped_header = - header::decode(&scale_encoded_header, self.inner.block_number_bytes) - .unwrap() - .into(); - self.inner.warped_finality = chain_information_finality; - - if *final_set_of_fragments { - self.inner.phase = Phase::RuntimeDownload { - runtime_download: None, - warp_sync_source_id: *downloaded_source, - downloaded_runtime: None, - hint_doesnt_match: false, - }; - } else { - self.inner.phase = Phase::DownloadFragments { - warp_sync_fragments_download: None, - }; + self.inner.verify_queue.clear(); + self.inner.warp_sync_fragments_download = None; + return ( + self.inner, + Err(VerifyFragmentError::InvalidJustification(err)), + ); + } + }; + + // Make sure that the header would actually advance the warp sync process forward. + if fragment_decoded_header.number <= self.inner.warped_header_number { + if let Some(SourceId(source_id)) = fragments_to_verify.downloaded_source { + self.inner.sources[source_id].finalized_block_height = Err(()); + } + self.inner.verify_queue.clear(); + self.inner.warp_sync_fragments_download = None; + return ( + self.inner, + Err(VerifyFragmentError::BlockNumberNotIncrementing), + ); + } + + // Make sure that the justification indeed corresponds to the header. + if *fragment_decoded_justification.target_hash != fragment_header_hash + || fragment_decoded_justification.target_number != fragment_decoded_header.number + { + let error = VerifyFragmentError::TargetHashMismatch { + justification_target_hash: *fragment_decoded_justification.target_hash, + justification_target_height: fragment_decoded_justification.target_number, + header_hash: fragment_header_hash, + header_height: fragment_decoded_header.number, + }; + if let Some(SourceId(source_id)) = fragments_to_verify.downloaded_source { + self.inner.sources[source_id].finalized_block_height = Err(()); + } + self.inner.verify_queue.clear(); + self.inner.warp_sync_fragments_download = None; + return (self.inner, Err(error)); + } + + // Check whether the justification is valid. + if let Err(err) = justification::verify::verify(justification::verify::Config { + justification: fragment_decoded_justification, + block_number_bytes: self.inner.block_number_bytes, + authorities_list: finalized_triggered_authorities + .iter() + .map(|a| &a.public_key[..]), + authorities_set_id: *after_finalized_block_authorities_set_id, + randomness_seed, + }) { + if let Some(SourceId(source_id)) = fragments_to_verify.downloaded_source { + self.inner.sources[source_id].finalized_block_height = Err(()); + } + self.inner.verify_queue.clear(); + self.inner.warp_sync_fragments_download = None; + return ( + self.inner, + Err(VerifyFragmentError::JustificationVerify(err)), + ); + } + + // Try to grab the new list of authorities from the header. + let new_authorities_list = fragment_decoded_header + .digest + .logs() + .find_map(|log_item| match log_item { + header::DigestItemRef::GrandpaConsensus(grandpa_log_item) => match grandpa_log_item + { + header::GrandpaConsensusLogRef::ScheduledChange(change) + | header::GrandpaConsensusLogRef::ForcedChange { change, .. } => { + Some(change.next_authorities) } - } - Err(error) => { - self.inner.phase = Phase::DownloadFragments { - warp_sync_fragments_download: None, - }; - return (self.inner, Some(error)); - } + _ => None, + }, + _ => None, + }) + .map(|next_authorities| { + next_authorities + .map(header::GrandpaAuthority::from) + .collect() + }); + + // Fragments must only include headers containing an update to the list of authorities, + // unless it's the very head of the chain. + if new_authorities_list.is_none() + && (!fragments_to_verify.final_set_of_fragments + || fragments_to_verify.next_fragment_to_verify_index + != fragments_to_verify.fragments.len() - 1) + { + if let Some(SourceId(source_id)) = fragments_to_verify.downloaded_source { + self.inner.sources[source_id].finalized_block_height = Err(()); } + self.inner.verify_queue.clear(); + self.inner.warp_sync_fragments_download = None; + return (self.inner, Err(VerifyFragmentError::NonMinimalProof)); + } - (self.inner, None) - } else { - unreachable!() + // Verification of the fragment has succeeded 🎉. We can now update `self`. + fragments_to_verify.next_fragment_to_verify_index += 1; + self.inner.warped_header_number = fragment_decoded_header.number; + self.inner.warped_header_state_root = *fragment_decoded_header.state_root; + self.inner.warped_header_hash = fragment_header_hash; + self.inner.warped_header = fragment_to_verify.scale_encoded_header.clone(); // TODO: figure out how to remove this clone() + self.inner.warped_block_ty = WarpedBlockTy::Normal; + self.inner.runtime_download = RuntimeDownload::NotStarted { + hint_doesnt_match: false, + }; + self.inner.runtime_calls = + runtime_calls_default_value(self.inner.verified_chain_information.as_ref().consensus); + if let Some(new_authorities_list) = new_authorities_list { + *finalized_triggered_authorities = new_authorities_list; + *after_finalized_block_authorities_set_id += 1; + } + if let Some(SourceId(source_id)) = fragments_to_verify.downloaded_source { + let src_finalized = &mut self.inner.sources[source_id].finalized_block_height; + if src_finalized.is_err() { + self.inner.sources[source_id].finalized_block_height = + Ok(self.inner.warped_header_number); + } + } + if fragments_to_verify.next_fragment_to_verify_index == fragments_to_verify.fragments.len() + { + self.inner.verify_queue.pop_front().unwrap(); + } + + // Returning. + let result = Ok(( + self.inner.warped_header_hash, + self.inner.warped_header_number, + )); + (self.inner, result) + } +} + +/// Error potentially returned by [`VerifyWarpSyncFragment::verify`]. +#[derive(Debug)] +pub enum VerifyFragmentError { + /// Justification found within the fragment is invalid. + JustificationVerify(justification::verify::Error), + /// Mismatch between the block targeted by the justification and the header. + TargetHashMismatch { + /// Hash of the block the justification targets. + justification_target_hash: [u8; 32], + /// Height of the block the justification targets. + justification_target_height: u64, + /// Hash of the header. + header_hash: [u8; 32], + /// Height of the header. + header_height: u64, + }, + /// Warp sync fragment doesn't contain an authorities list change when it should. + NonMinimalProof, + /// Header does not actually advance the warp syncing process. This means that a source has + /// sent a header below the requested hash. + BlockNumberNotIncrementing, + /// Warp sync proof is empty. + EmptyProof, + /// Failed to decode header. + InvalidHeader(header::Error), + /// Failed to decode justification. + InvalidJustification(justification::decode::Error), +} + +impl fmt::Display for VerifyFragmentError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + VerifyFragmentError::JustificationVerify(err) => fmt::Display::fmt(err, f), + VerifyFragmentError::TargetHashMismatch { + justification_target_hash, + justification_target_height, + header_hash, + header_height, + } => { + write!( + f, + "Justification target (hash: {}, height: {}) doesn't match the associated header (hash: {}, height: {})", + HashDisplay(justification_target_hash), + justification_target_height, + HashDisplay(header_hash), + header_height, + ) + } + VerifyFragmentError::NonMinimalProof => write!( + f, + "Warp sync proof fragment doesn't contain an authorities list change" + ), + VerifyFragmentError::BlockNumberNotIncrementing => write!( + f, + "Warp sync proof header doesn't advance the warp syncing process" + ), + VerifyFragmentError::EmptyProof => write!(f, "Warp sync proof is empty"), + VerifyFragmentError::InvalidHeader(err) => write!(f, "Failed to decode header: {err}"), + VerifyFragmentError::InvalidJustification(err) => { + write!(f, "Failed to decode justification: {err}") + } } } } +/// Problem encountered during a call to [`BuildRuntime::build`] or +/// [`BuildChainInformation::build`] that can be attributed to the source sending invalid data. +#[derive(Debug, derive_more::Display)] +#[display(fmt = "{error}")] +pub struct SourceMisbehavior { + /// Source that committed the felony. `None` if the source has been removed between the moment + /// when the request has succeeded and when it has been verified. + pub source_id: Option, + /// Error that the source made. + pub error: SourceMisbehaviorTy, +} + +/// See [`SourceMisbehavior::error`]. +#[derive(Debug, derive_more::Display)] +pub enum SourceMisbehaviorTy { + /// Failed to verify Merkle proof. + InvalidMerkleProof(proof_decode::Error), + /// Merkle proof is missing the necessary entries. + MerkleProofEntriesMissing, +} + +/// Problem encountered during a call to [`BuildRuntime::build`]. +#[derive(Debug, derive_more::Display)] +pub enum BuildRuntimeError { + /// The chain doesn't include any storage item at `:code`. + #[display(fmt = "The chain doesn't include any storage item at `:code`")] + MissingCode, + /// The storage item at `:heappages` is in an incorrect format. + #[display(fmt = "Invalid heap pages value: {_0}")] + InvalidHeapPages(executor::InvalidHeapPagesError), + /// Error building the runtime of the chain. + #[display(fmt = "Error building the runtime: {_0}")] + RuntimeBuild(executor::host::NewErr), + /// Source that has sent a proof didn't behave properly. + SourceMisbehavior(SourceMisbehavior), +} + /// Ready to build the runtime of the finalized chain. pub struct BuildRuntime { - inner: InProgressWarpSync, + inner: WarpSync, } impl BuildRuntime { /// Build the runtime of the chain. /// - /// This function might return a [`WarpSync::Finished`], indicating the end of the warp sync. - /// /// Must be passed parameters used for the construction of the runtime: a hint as to whether /// the runtime is trusted and/or will be executed again, and whether unresolved function /// imports are allowed. - // TODO: refactor this error or explain what it is pub fn build( mut self, exec_hint: ExecHint, allow_unresolved_imports: bool, - ) -> (WarpSync, Option) { - if let Phase::RuntimeDownload { - downloaded_runtime, - warp_sync_source_id, + ) -> (WarpSync, Result<(), BuildRuntimeError>) { + let RuntimeDownload::NotVerified { + downloaded_source, hint_doesnt_match, - .. - } = &mut self.inner.phase - { - let downloaded_runtime = downloaded_runtime.take().unwrap(); - let decoded_downloaded_runtime = - match proof_decode::decode_and_verify_proof(proof_decode::Config { - proof: &downloaded_runtime[..], - }) { - Ok(p) => p, - Err(err) => { - self.inner.phase = Phase::DownloadFragments { - warp_sync_fragments_download: None, - }; - return ( - WarpSync::InProgress(self.inner), - Some(Error::InvalidMerkleProof(err)), - ); - } - }; + trie_proof, + } = &mut self.inner.runtime_download + else { + unreachable!() + }; - let ( - finalized_storage_code_merkle_value, - finalized_storage_code_closest_ancestor_excluding, - ) = { - let code_nibbles = - trie::bytes_to_nibbles(b":code".iter().copied()).collect::>(); - match decoded_downloaded_runtime.closest_ancestor_in_proof( - &self.inner.warped_header.state_root, - &code_nibbles[..code_nibbles.len() - 1], - ) { - Ok(Some(closest_ancestor_key)) => { - let next_nibble = code_nibbles[closest_ancestor_key.len()]; - let merkle_value = decoded_downloaded_runtime - .trie_node_info( - &self.inner.warped_header.state_root, - closest_ancestor_key, - ) - .unwrap() - .children - .child(next_nibble) - .merkle_value(); - - match merkle_value { - Some(mv) => (mv.to_owned(), closest_ancestor_key.to_vec()), - None => { - self.inner.phase = Phase::DownloadFragments { - warp_sync_fragments_download: None, - }; - return ( - WarpSync::InProgress(self.inner), - Some(Error::MissingCode), - ); - } - } - } - Ok(None) => { - self.inner.phase = Phase::DownloadFragments { - warp_sync_fragments_download: None, - }; - return (WarpSync::InProgress(self.inner), Some(Error::MissingCode)); - } - Err(proof_decode::IncompleteProofError { .. }) => { - self.inner.phase = Phase::DownloadFragments { - warp_sync_fragments_download: None, - }; - return ( - WarpSync::InProgress(self.inner), - Some(Error::MerkleProofEntriesMissing), - ); + let downloaded_runtime = mem::take(trie_proof); + let decoded_downloaded_runtime = + match proof_decode::decode_and_verify_proof(proof_decode::Config { + proof: &downloaded_runtime[..], + }) { + Ok(p) => p, + Err(err) => { + let downloaded_source = *downloaded_source; + if let Some(SourceId(downloaded_source)) = downloaded_source { + self.inner.sources[downloaded_source].finalized_block_height = Err(()); } + self.inner.runtime_download = RuntimeDownload::NotStarted { + hint_doesnt_match: *hint_doesnt_match, + }; + return ( + self.inner, + Err(BuildRuntimeError::SourceMisbehavior(SourceMisbehavior { + source_id: downloaded_source, + error: SourceMisbehaviorTy::InvalidMerkleProof(err), + })), + ); } }; - let finalized_storage_code = if let (false, Some(hint)) = - (*hint_doesnt_match, self.inner.code_trie_node_hint.as_ref()) - { - if hint.merkle_value == finalized_storage_code_merkle_value { - &hint.storage_value - } else { - *hint_doesnt_match = true; - return (WarpSync::InProgress(self.inner), None); - } - } else { - match decoded_downloaded_runtime - .storage_value(&self.inner.warped_header.state_root, b":code") - { - Ok(Some((code, _))) => code, - Ok(None) => { - self.inner.phase = Phase::DownloadFragments { - warp_sync_fragments_download: None, - }; - return (WarpSync::InProgress(self.inner), Some(Error::MissingCode)); + let ( + finalized_storage_code_merkle_value, + finalized_storage_code_closest_ancestor_excluding, + ) = { + let code_nibbles = trie::bytes_to_nibbles(b":code".iter().copied()).collect::>(); + match decoded_downloaded_runtime.closest_ancestor_in_proof( + &self.inner.warped_header_state_root, + &code_nibbles[..code_nibbles.len() - 1], + ) { + Ok(Some(closest_ancestor_key)) => { + let next_nibble = code_nibbles[closest_ancestor_key.len()]; + let merkle_value = decoded_downloaded_runtime + .trie_node_info(&self.inner.warped_header_state_root, closest_ancestor_key) + .unwrap() + .children + .child(next_nibble) + .merkle_value(); + + match merkle_value { + Some(mv) => (mv.to_owned(), closest_ancestor_key.to_vec()), + None => { + self.inner.warped_block_ty = WarpedBlockTy::KnownBad; + self.inner.runtime_download = RuntimeDownload::NotStarted { + hint_doesnt_match: *hint_doesnt_match, + }; + return (self.inner, Err(BuildRuntimeError::MissingCode)); + } } - Err(proof_decode::IncompleteProofError { .. }) => { - return ( - WarpSync::InProgress(self.inner), - Some(Error::MerkleProofEntriesMissing), - ); + } + Ok(None) => { + self.inner.warped_block_ty = WarpedBlockTy::KnownBad; + self.inner.runtime_download = RuntimeDownload::NotStarted { + hint_doesnt_match: *hint_doesnt_match, + }; + return (self.inner, Err(BuildRuntimeError::MissingCode)); + } + Err(proof_decode::IncompleteProofError { .. }) => { + let downloaded_source = *downloaded_source; + if let Some(SourceId(downloaded_source)) = downloaded_source { + self.inner.sources[downloaded_source].finalized_block_height = Err(()); } + self.inner.runtime_download = RuntimeDownload::NotStarted { + hint_doesnt_match: *hint_doesnt_match, + }; + return ( + self.inner, + Err(BuildRuntimeError::SourceMisbehavior(SourceMisbehavior { + source_id: downloaded_source, + error: SourceMisbehaviorTy::MerkleProofEntriesMissing, + })), + ); } - }; + } + }; - let finalized_storage_heappages = match decoded_downloaded_runtime - .storage_value(&self.inner.warped_header.state_root, b":heappages") + let finalized_storage_code = if let (false, Some(hint)) = + (*hint_doesnt_match, self.inner.code_trie_node_hint.as_ref()) + { + if hint.merkle_value == finalized_storage_code_merkle_value { + &hint.storage_value + } else { + self.inner.runtime_download = RuntimeDownload::NotStarted { + hint_doesnt_match: true, + }; + return (self.inner, Ok(())); + } + } else { + match decoded_downloaded_runtime + .storage_value(&self.inner.warped_header_state_root, b":code") { - Ok(val) => val.map(|(v, _)| v), + Ok(Some((code, _))) => code, + Ok(None) => { + self.inner.warped_block_ty = WarpedBlockTy::KnownBad; + self.inner.runtime_download = RuntimeDownload::NotStarted { + hint_doesnt_match: *hint_doesnt_match, + }; + return (self.inner, Err(BuildRuntimeError::MissingCode)); + } Err(proof_decode::IncompleteProofError { .. }) => { + let downloaded_source = *downloaded_source; + if let Some(SourceId(downloaded_source)) = downloaded_source { + self.inner.sources[downloaded_source].finalized_block_height = Err(()); + } return ( - WarpSync::InProgress(self.inner), - Some(Error::MerkleProofEntriesMissing), + self.inner, + Err(BuildRuntimeError::SourceMisbehavior(SourceMisbehavior { + source_id: downloaded_source, + error: SourceMisbehaviorTy::MerkleProofEntriesMissing, + })), ); } - }; + } + }; - let decoded_heap_pages = - match executor::storage_heap_pages_to_value(finalized_storage_heappages) { - Ok(hp) => hp, - Err(err) => { - self.inner.phase = Phase::DownloadFragments { - warp_sync_fragments_download: None, - }; - return ( - WarpSync::InProgress(self.inner), - Some(Error::InvalidHeapPages(err)), - ); - } - }; + let finalized_storage_heappages = match decoded_downloaded_runtime + .storage_value(&self.inner.warped_header_state_root, b":heappages") + { + Ok(val) => val.map(|(v, _)| v), + Err(proof_decode::IncompleteProofError { .. }) => { + let downloaded_source = *downloaded_source; + if let Some(SourceId(downloaded_source)) = downloaded_source { + self.inner.sources[downloaded_source].finalized_block_height = Err(()); + } + return ( + self.inner, + Err(BuildRuntimeError::SourceMisbehavior(SourceMisbehavior { + source_id: downloaded_source, + error: SourceMisbehaviorTy::MerkleProofEntriesMissing, + })), + ); + } + }; - let runtime = match HostVmPrototype::new(host::Config { - module: &finalized_storage_code, - heap_pages: decoded_heap_pages, - exec_hint, - allow_unresolved_imports, - }) { - Ok(runtime) => runtime, + let decoded_heap_pages = + match executor::storage_heap_pages_to_value(finalized_storage_heappages) { + Ok(hp) => hp, Err(err) => { - self.inner.phase = Phase::DownloadFragments { - warp_sync_fragments_download: None, + self.inner.warped_block_ty = WarpedBlockTy::KnownBad; + self.inner.runtime_download = RuntimeDownload::NotStarted { + hint_doesnt_match: *hint_doesnt_match, }; - return ( - WarpSync::InProgress(self.inner), - Some(Error::RuntimeBuild(err)), - ); + return (self.inner, Err(BuildRuntimeError::InvalidHeapPages(err))); } }; - let chain_info_builder = chain_information::build::ChainInformationBuild::new( - chain_information::build::Config { - finalized_block_header: - chain_information::build::ConfigFinalizedBlockHeader::Any { - scale_encoded_header: self - .inner - .warped_header - .scale_encoding_vec(self.inner.block_number_bytes), - known_finality: Some((&self.inner.warped_finality).clone()), - }, - block_number_bytes: self.inner.block_number_bytes, - runtime, + let runtime = match HostVmPrototype::new(host::Config { + module: &finalized_storage_code, + heap_pages: decoded_heap_pages, + exec_hint, + allow_unresolved_imports, + }) { + Ok(runtime) => runtime, + Err(err) => { + self.inner.warped_block_ty = WarpedBlockTy::KnownBad; + self.inner.runtime_download = RuntimeDownload::NotStarted { + hint_doesnt_match: *hint_doesnt_match, + }; + return (self.inner, Err(BuildRuntimeError::RuntimeBuild(err))); + } + }; + + let chain_info_builder = chain_information::build::ChainInformationBuild::new( + chain_information::build::Config { + finalized_block_header: chain_information::build::ConfigFinalizedBlockHeader::Any { + scale_encoded_header: self.inner.warped_header.clone(), + known_finality: Some((&self.inner.warped_finality).clone()), }, + block_number_bytes: self.inner.block_number_bytes, + runtime, + }, + ); + + if let chain_information::build::ChainInformationBuild::InProgress(in_progress) = + &chain_info_builder + { + for call in in_progress.remaining_calls() { + if let hashbrown::hash_map::Entry::Vacant(entry) = + self.inner.runtime_calls.entry(call) + { + entry.insert(CallProof::NotStarted); + } + } + } + + self.inner.runtime_download = RuntimeDownload::Verified { + downloaded_runtime: DownloadedRuntime { + storage_code: Some(finalized_storage_code.to_vec()), + storage_heap_pages: finalized_storage_heappages.map(|v| v.to_vec()), + code_merkle_value: Some(finalized_storage_code_merkle_value), + closest_ancestor_excluding: Some(finalized_storage_code_closest_ancestor_excluding), + }, + chain_info_builder, + }; + + (self.inner, Ok(())) + } +} + +/// Problem encountered during a call to [`BuildChainInformation::build`]. +#[derive(Debug, derive_more::Display)] +pub enum BuildChainInformationError { + /// Error building the chain information. + #[display(fmt = "Error building the chain information: {_0}")] + ChainInformationBuild(chain_information::build::Error), + /// Source that has sent a proof didn't behave properly. + SourceMisbehavior(SourceMisbehavior), +} + +/// Ready to verify the parameters of the chain against the finalized block. +pub struct BuildChainInformation { + inner: WarpSync, +} + +impl BuildChainInformation { + /// Build the information about the chain. + pub fn build( + mut self, + ) -> ( + WarpSync, + Result, + ) { + let RuntimeDownload::Verified { + mut chain_info_builder, + downloaded_runtime, + .. + } = mem::replace( + &mut self.inner.runtime_download, + RuntimeDownload::NotStarted { + hint_doesnt_match: false, + }, + ) + else { + unreachable!() + }; + + let runtime_calls = mem::take(&mut self.inner.runtime_calls); + + debug_assert!(runtime_calls + .values() + .all(|c| matches!(c, CallProof::Downloaded { .. }))); + + // Decode all the Merkle proofs that have been received. + let calls = { + let mut decoded_proofs = hashbrown::HashMap::with_capacity_and_hasher( + runtime_calls.len(), + fnv::FnvBuildHasher::default(), ); - let (calls, chain_info_builder) = match chain_info_builder { + for (call, proof) in runtime_calls { + let CallProof::Downloaded { + proof, + downloaded_source, + } = proof + else { + unreachable!() + }; + + let decoded_proof = + match proof_decode::decode_and_verify_proof(proof_decode::Config { + proof: proof.into_iter(), + }) { + Ok(d) => d, + Err(err) => { + if let Some(SourceId(downloaded_source)) = downloaded_source { + self.inner.sources[downloaded_source].finalized_block_height = + Err(()); + } + return ( + self.inner, + Err(BuildChainInformationError::SourceMisbehavior( + SourceMisbehavior { + source_id: downloaded_source, + error: SourceMisbehaviorTy::InvalidMerkleProof(err), + }, + )), + ); + } + }; + + decoded_proofs.insert(call, (decoded_proof, downloaded_source)); + } + + decoded_proofs + }; + + loop { + let in_progress = match chain_info_builder { chain_information::build::ChainInformationBuild::Finished { result: Ok(chain_information), virtual_machine, } => { + // This `if` is necessary as in principle we might have continued warp syncing + // after downloading everything needed but before building the chain + // information. + if self.inner.warped_header_number + == chain_information.as_ref().finalized_block_header.number + { + self.inner.warped_block_ty = WarpedBlockTy::AlreadyVerified; + } + self.inner.verified_chain_information = chain_information; + self.inner.runtime_calls = runtime_calls_default_value( + self.inner.verified_chain_information.as_ref().consensus, + ); return ( - WarpSync::Finished(Success { - chain_information, + self.inner, + Ok(RuntimeInformation { finalized_runtime: virtual_machine, - finalized_storage_code: Some(finalized_storage_code.to_owned()), - finalized_storage_heap_pages: finalized_storage_heappages - .map(|v| v.to_vec()), - finalized_storage_code_merkle_value: Some( - finalized_storage_code_merkle_value, - ), - finalized_storage_code_closest_ancestor_excluding: Some( - finalized_storage_code_closest_ancestor_excluding, - ), - sources_ordered: mem::take(&mut self.inner.sources) - .into_iter() - .map(|(id, source)| { - ( - SourceId(id), - source.finalized_block_height, - source.user_data, - ) - }) - .collect(), - in_progress_requests: mem::take(&mut self.inner.in_progress_requests) - .into_iter() - .map(|(id, (src_id, user_data, detail))| { - (src_id, RequestId(id), user_data, detail) - }) - .collect(), + finalized_storage_code: downloaded_runtime.storage_code, + finalized_storage_heap_pages: downloaded_runtime.storage_heap_pages, + finalized_storage_code_merkle_value: downloaded_runtime + .code_merkle_value, + finalized_storage_code_closest_ancestor_excluding: downloaded_runtime + .closest_ancestor_excluding, }), - None, ); } chain_information::build::ChainInformationBuild::Finished { result: Err(err), .. } => { - self.inner.phase = Phase::DownloadFragments { - warp_sync_fragments_download: None, - }; + self.inner.warped_block_ty = WarpedBlockTy::KnownBad; return ( - WarpSync::InProgress(self.inner), - Some(Error::ChainInformationBuild(err)), + self.inner, + Err(BuildChainInformationError::ChainInformationBuild(err)), ); } chain_information::build::ChainInformationBuild::InProgress(in_progress) => { - let calls = in_progress - .remaining_calls() - .map(|call| (call, CallProof::NotStarted)) - .collect(); - (calls, in_progress) + in_progress } }; - self.inner.phase = Phase::ChainInformationDownload { - warp_sync_source_id: *warp_sync_source_id, - downloaded_runtime: Some(DownloadedRuntime { - storage_code: Some(finalized_storage_code.to_vec()), - storage_heap_pages: finalized_storage_heappages.map(|v| v.to_vec()), - code_merkle_value: Some(finalized_storage_code_merkle_value), - closest_ancestor_excluding: Some( - finalized_storage_code_closest_ancestor_excluding, - ), - }), - chain_info_builder: Some(chain_info_builder), - calls, - }; - - (WarpSync::InProgress(self.inner), None) - } else { - unreachable!() - } - } -} - -/// Ready to verify the parameters of the chain against the finalized block. -pub struct BuildChainInformation { - inner: InProgressWarpSync, -} - -impl BuildChainInformation { - /// Build the information about the chain. - /// - /// This function might return a [`WarpSync::Finished`], indicating the end of the warp sync. - // TODO: refactor this error or explain what it is - pub fn build(mut self) -> (WarpSync, Option) { - if let Phase::ChainInformationDownload { - chain_info_builder, - downloaded_runtime, - calls, - .. - } = &mut self.inner.phase - { - debug_assert!(calls - .values() - .all(|c| matches!(c, CallProof::Downloaded(_)))); - - // Decode all the Merkle proofs that have been received. - let calls = { - let mut decoded_proofs = hashbrown::HashMap::with_capacity_and_hasher( - calls.len(), - fnv::FnvBuildHasher::default(), - ); - - for (call, proof) in calls { - let CallProof::Downloaded(proof) = mem::replace(proof, CallProof::NotStarted) - else { - unreachable!() - }; - let decoded_proof = - match proof_decode::decode_and_verify_proof(proof_decode::Config { - proof: proof.into_iter(), - }) { - Ok(d) => d, - Err(err) => { - self.inner.phase = Phase::DownloadFragments { - warp_sync_fragments_download: None, - }; - return ( - WarpSync::InProgress(self.inner), - Some(Error::InvalidMerkleProof(err)), - ); + chain_info_builder = match in_progress { + chain_information::build::InProgress::StorageGet(get) => { + // TODO: child tries not supported + let (proof, downloaded_source) = calls.get(&get.call_in_progress()).unwrap(); + let value = match proof + .storage_value(&self.inner.warped_header_state_root, get.key().as_ref()) + { + Ok(v) => v, + Err(proof_decode::IncompleteProofError { .. }) => { + if let Some(SourceId(downloaded_source)) = *downloaded_source { + self.inner.sources[downloaded_source].finalized_block_height = + Err(()); } - }; - decoded_proofs.insert(*call, decoded_proof); - } - - decoded_proofs - }; + return ( + self.inner, + Err(BuildChainInformationError::SourceMisbehavior( + SourceMisbehavior { + source_id: *downloaded_source, + error: SourceMisbehaviorTy::MerkleProofEntriesMissing, + }, + )), + ); + } + }; - let mut chain_info_builder = chain_info_builder.take().unwrap(); - - loop { - match chain_info_builder { - chain_information::build::InProgress::StorageGet(get) => { - // TODO: child tries not supported - let proof = calls.get(&get.call_in_progress()).unwrap(); - let value = match proof - .storage_value(&self.inner.warped_header.state_root, get.key().as_ref()) - { - Ok(v) => v, - Err(proof_decode::IncompleteProofError { .. }) => { - self.inner.phase = Phase::DownloadFragments { - warp_sync_fragments_download: None, - }; - return ( - WarpSync::InProgress(self.inner), - Some(Error::MerkleProofEntriesMissing), - ); - } - }; - - match get.inject_value(value.map(|(val, ver)| (iter::once(val), ver))) { - chain_information::build::ChainInformationBuild::Finished { - result: Ok(chain_information), - virtual_machine, - } => { - let downloaded_runtime = downloaded_runtime.take().unwrap(); - - return ( - WarpSync::Finished(Success { - chain_information, - finalized_runtime: virtual_machine, - finalized_storage_code: downloaded_runtime.storage_code, - finalized_storage_heap_pages: downloaded_runtime - .storage_heap_pages, - finalized_storage_code_merkle_value: downloaded_runtime - .code_merkle_value, - finalized_storage_code_closest_ancestor_excluding: - downloaded_runtime.closest_ancestor_excluding, - sources_ordered: mem::take(&mut self.inner.sources) - .into_iter() - .map(|(id, source)| { - ( - SourceId(id), - source.finalized_block_height, - source.user_data, - ) - }) - .collect(), - in_progress_requests: mem::take( - &mut self.inner.in_progress_requests, - ) - .into_iter() - .map(|(id, (src_id, user_data, detail))| { - (src_id, RequestId(id), user_data, detail) - }) - .collect(), - }), - None, - ); - } - chain_information::build::ChainInformationBuild::Finished { - result: Err(err), - .. - } => { - self.inner.phase = Phase::DownloadFragments { - warp_sync_fragments_download: None, - }; - return ( - WarpSync::InProgress(self.inner), - Some(Error::ChainInformationBuild(err)), - ); + get.inject_value(value.map(|(val, ver)| (iter::once(val), ver))) + } + chain_information::build::InProgress::NextKey(nk) => { + // TODO: child tries not supported + let (proof, downloaded_source) = calls.get(&nk.call_in_progress()).unwrap(); + let value = match proof.next_key( + &self.inner.warped_header_state_root, + &nk.key().collect::>(), // TODO: overhead + nk.or_equal(), + &nk.prefix().collect::>(), // TODO: overhead + nk.branch_nodes(), + ) { + Ok(v) => v, + Err(proof_decode::IncompleteProofError { .. }) => { + if let Some(SourceId(downloaded_source)) = *downloaded_source { + self.inner.sources[downloaded_source].finalized_block_height = + Err(()); } - chain_information::build::ChainInformationBuild::InProgress( - in_progress, - ) => { - chain_info_builder = in_progress; + return ( + self.inner, + Err(BuildChainInformationError::SourceMisbehavior( + SourceMisbehavior { + source_id: *downloaded_source, + error: SourceMisbehaviorTy::MerkleProofEntriesMissing, + }, + )), + ); + } + }; + nk.inject_key(value.map(|v| v.iter().copied())) + } + chain_information::build::InProgress::ClosestDescendantMerkleValue(mv) => { + // TODO: child tries not supported + let (proof, downloaded_source) = calls.get(&mv.call_in_progress()).unwrap(); + let value = match proof.closest_descendant_merkle_value( + &self.inner.warped_header_state_root, + &mv.key().collect::>(), // TODO: overhead + ) { + Ok(v) => v, + Err(proof_decode::IncompleteProofError { .. }) => { + if let Some(SourceId(downloaded_source)) = *downloaded_source { + self.inner.sources[downloaded_source].finalized_block_height = + Err(()); } + return ( + self.inner, + Err(BuildChainInformationError::SourceMisbehavior( + SourceMisbehavior { + source_id: *downloaded_source, + error: SourceMisbehaviorTy::MerkleProofEntriesMissing, + }, + )), + ); } - } - chain_information::build::InProgress::NextKey(_) - | chain_information::build::InProgress::ClosestDescendantMerkleValue(_) => { - // TODO: implement - self.inner.phase = Phase::DownloadFragments { - warp_sync_fragments_download: None, - }; - return ( - WarpSync::InProgress(self.inner), - Some(Error::NextKeyUnimplemented), - ); - } + }; + mv.inject_merkle_value(value) } - } - } else { - unreachable!() + }; } } } diff --git a/lib/src/sync/warp_sync/verifier.rs b/lib/src/sync/warp_sync/verifier.rs deleted file mode 100644 index 94382b3ef2..0000000000 --- a/lib/src/sync/warp_sync/verifier.rs +++ /dev/null @@ -1,223 +0,0 @@ -// Smoldot -// Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. -// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -// TODO: really needs documentation - -use crate::chain::chain_information::{ChainInformationFinality, ChainInformationFinalityRef}; -use crate::finality; -use crate::finality::justification::verify::{ - verify, Config as VerifyConfig, Error as VerifyError, -}; -use crate::header::{self, DigestItemRef, GrandpaAuthority, GrandpaConsensusLogRef}; -use crate::informant::HashDisplay; - -use alloc::vec::Vec; -use core::fmt; - -#[derive(Debug)] -pub enum Error { - Verify(VerifyError), - TargetHashMismatch { - justification_target_hash: [u8; 32], - justification_target_height: u64, - header_hash: [u8; 32], - }, - NonMinimalProof, - EmptyProof, - InvalidHeader(header::Error), - InvalidJustification(finality::justification::decode::Error), - WrongChainAlgorithm, -} - -impl fmt::Display for Error { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - Error::Verify(err) => fmt::Display::fmt(err, f), - Error::TargetHashMismatch { - justification_target_hash, - justification_target_height, - header_hash, - } => { - write!( - f, - "Justification target hash ({}, height: {}) doesn't match the hash of the associated header ({})", - HashDisplay(justification_target_hash), - justification_target_height, - HashDisplay(header_hash) - ) - } - Error::NonMinimalProof => write!( - f, - "Warp sync proof fragment doesn't contain an authorities list change" - ), - Error::EmptyProof => write!(f, "Warp sync proof is empty"), - Error::InvalidHeader(_) => write!(f, "Failed to decode header"), - Error::InvalidJustification(_) => write!(f, "Failed to decode justification"), - Error::WrongChainAlgorithm => { - write!(f, "Chain information doesn't use the Grandpa algorithm") - } - } - } -} - -#[derive(Debug)] -pub struct Verifier { - /// If `true`, the verification should instantly fail with an error. - wrong_chain_algorithm: bool, - - index: usize, - authorities_set_id: u64, - authorities_list: Vec, - fragments: Vec, - is_proof_complete: bool, - - block_number_bytes: usize, -} - -impl Verifier { - pub fn new( - start_chain_information_finality: ChainInformationFinalityRef, - block_number_bytes: usize, - warp_sync_response_fragments: Vec, - is_proof_complete: bool, - ) -> Self { - let (wrong_chain_algorithm, authorities_list, authorities_set_id) = - match start_chain_information_finality { - ChainInformationFinalityRef::Grandpa { - finalized_triggered_authorities, - after_finalized_block_authorities_set_id, - .. - } => { - let authorities_list = finalized_triggered_authorities.to_vec(); - ( - false, - authorities_list, - after_finalized_block_authorities_set_id, - ) - } - _ => (true, Vec::new(), 0), - }; - - Self { - wrong_chain_algorithm, - index: 0, - authorities_set_id, - authorities_list, - fragments: warp_sync_response_fragments, - is_proof_complete, - block_number_bytes, - } - } - - pub fn next(mut self, randomness_seed: [u8; 32]) -> Result { - if self.wrong_chain_algorithm { - return Err(Error::WrongChainAlgorithm); - } - - if self.fragments.is_empty() { - if self.is_proof_complete { - return Ok(Next::EmptyProof); - } - return Err(Error::EmptyProof); - } - - debug_assert!(self.fragments.len() > self.index); - let fragment = &self.fragments[self.index]; - - let fragment_header_hash = - header::hash_from_scale_encoded_header(&fragment.scale_encoded_header); - let justification = finality::justification::decode::decode_grandpa( - &fragment.scale_encoded_justification, - self.block_number_bytes, - ) - .map_err(Error::InvalidJustification)?; - if *justification.target_hash != fragment_header_hash { - return Err(Error::TargetHashMismatch { - justification_target_hash: *justification.target_hash, - justification_target_height: justification.target_number, - header_hash: fragment_header_hash, - }); - } - - verify(VerifyConfig { - justification, - block_number_bytes: self.block_number_bytes, - authorities_list: self.authorities_list.iter().map(|a| &a.public_key[..]), - authorities_set_id: self.authorities_set_id, - randomness_seed, - }) - .map_err(Error::Verify)?; - - let authorities_list = - header::decode(&fragment.scale_encoded_header, self.block_number_bytes) - .map_err(Error::InvalidHeader)? - .digest - .logs() - .find_map(|log_item| match log_item { - DigestItemRef::GrandpaConsensus(grandpa_log_item) => match grandpa_log_item { - GrandpaConsensusLogRef::ScheduledChange(change) - | GrandpaConsensusLogRef::ForcedChange { change, .. } => { - Some(change.next_authorities) - } - _ => None, - }, - _ => None, - }) - .map(|next_authorities| next_authorities.map(GrandpaAuthority::from).collect()); - - self.index += 1; - - if let Some(authorities_list) = authorities_list { - self.authorities_list = authorities_list; - self.authorities_set_id += 1; - } else if !self.is_proof_complete || self.index != self.fragments.len() { - return Err(Error::NonMinimalProof); - } - - if self.index == self.fragments.len() { - Ok(Next::Success { - scale_encoded_header: fragment.scale_encoded_header.clone(), // TODO: cloning :-/ - chain_information_finality: ChainInformationFinality::Grandpa { - after_finalized_block_authorities_set_id: self.authorities_set_id, - finalized_triggered_authorities: self.authorities_list, - finalized_scheduled_change: None, - }, - }) - } else { - Ok(Next::NotFinished(self)) - } - } -} - -pub enum Next { - NotFinished(Verifier), - EmptyProof, - Success { - scale_encoded_header: Vec, - chain_information_finality: ChainInformationFinality, - }, -} - -/// Fragment to be verified. -#[derive(Debug)] -pub struct WarpSyncFragment { - /// Header of a block in the chain. - pub scale_encoded_header: Vec, - - /// Justification that proves the finality of [`WarpSyncFragment::scale_encoded_header`]. - pub scale_encoded_justification: Vec, -} diff --git a/light-base/src/sync_service/standalone.rs b/light-base/src/sync_service/standalone.rs index 580f8c215c..3acf57c8c7 100644 --- a/light-base/src/sync_service/standalone.rs +++ b/light-base/src/sync_service/standalone.rs @@ -21,7 +21,13 @@ use super::{ }; use crate::{network_service, platform::PlatformRef, util}; -use alloc::{borrow::ToOwned as _, boxed::Box, string::String, sync::Arc, vec::Vec}; +use alloc::{ + borrow::{Cow, ToOwned as _}, + boxed::Box, + string::{String, ToString as _}, + sync::Arc, + vec::Vec, +}; use core::{ iter, num::{NonZeroU32, NonZeroU64}, @@ -309,6 +315,7 @@ pub(super) async fn start_standalone_chain( .1 } RequestOutcome::Block(Err(_)) => { + // TODO: should disconnect peer task.sync .blocks_request_response(request_id, Err::, _>(())) .1 @@ -332,6 +339,7 @@ pub(super) async fn start_standalone_chain( .1 } RequestOutcome::WarpSync(Err(_)) => { + // TODO: should disconnect peer task.sync.grandpa_warp_sync_response_err(request_id); continue; } @@ -363,21 +371,19 @@ pub(super) async fn start_standalone_chain( ); } all::Status::WarpSyncFragments { - source: Some((_, (peer_id, _))), finalized_block_hash, finalized_block_number, + .. } | all::Status::WarpSyncChainInformation { - source: (_, (peer_id, _)), finalized_block_hash, finalized_block_number, } => { log::warn!( target: &task.log_target, - "GrandPa warp sync in progress. Block: #{} (0x{}). Peer attempt: {}.", + "GrandPa warp sync in progress. Block: #{} (0x{}).", finalized_block_number, - HashDisplay(&finalized_block_hash), - peer_id + HashDisplay(&finalized_block_hash) ); } }; @@ -679,8 +685,11 @@ impl Task { ); } Err(err) => { + // TODO: should disconnect peer log::debug!(target: &self.log_target, "Sync => WarpSyncRuntimeBuild(error={})", err); - log::warn!(target: &self.log_target, "Failed to compile runtime during warp syncing process: {}", err); + if !matches!(err, all::WarpSyncBuildRuntimeError::SourceMisbehavior(_)) { + log::warn!(target: &self.log_target, "Failed to compile runtime during warp syncing process: {}", err); + } } }; self.sync = new_sync; @@ -693,8 +702,14 @@ impl Task { log::debug!(target: &self.log_target, "Sync => WarpSyncBuildChainInformation(success=true)") } Err(err) => { + // TODO: should disconnect peer log::debug!(target: &self.log_target, "Sync => WarpSyncBuildChainInformation(error={})", err); - log::warn!(target: &self.log_target, "Failed to build the chain information during warp syncing process: {}", err); + if !matches!( + err, + all::WarpSyncBuildChainInformationError::SourceMisbehavior(_) + ) { + log::warn!(target: &self.log_target, "Failed to build the chain information during warp syncing process: {}", err); + } } }; self.sync = new_sync; @@ -739,7 +754,10 @@ impl Task { all::ProcessOne::VerifyWarpSyncFragment(verify) => { // Grandpa warp sync fragment to verify. - let sender_peer_id = verify.proof_sender().1 .0.clone(); // TODO: unnecessary cloning most of the time + let sender_peer_id = verify + .proof_sender() + .map(|(_, (peer_id, _))| Cow::Owned(peer_id.to_string())) // TODO: unnecessary cloning most of the time + .unwrap_or(Cow::Borrowed("")); let (sync, result) = verify.perform({ let mut seed = [0; 32]; @@ -748,25 +766,32 @@ impl Task { }); self.sync = sync; - if let Err(err) = result { - let maybe_forced_change = matches!(err, all::WarpSyncFragmentError::Verify(_)); - log::warn!( - target: &self.log_target, - "Failed to verify warp sync fragment from {}: {}{}", - sender_peer_id, - err, - if maybe_forced_change { - ". This might be caused by a forced GrandPa authorities change having \ - been enacted on the chain. If this is the case, please update the \ - chain specification with a checkpoint past this forced change." - } else { "" } - ); - } else { - log::debug!( - target: &self.log_target, - "Sync => WarpSyncFragmentVerified(sender={})", - sender_peer_id, - ); + match result { + Ok((fragment_hash, fragment_number)) => { + // TODO: must call `set_local_grandpa_state` and `set_local_best_block` so that other peers notify us of neighbor packets + log::debug!( + target: &self.log_target, + "Sync => WarpSyncFragmentVerified(sender={}, verified_hash={}, verified_height={fragment_number})", + sender_peer_id, + HashDisplay(&fragment_hash) + ); + } + Err(err) => { + // TODO: should disconnect peer + let maybe_forced_change = + matches!(err, all::VerifyFragmentError::JustificationVerify(_)); + log::warn!( + target: &self.log_target, + "Failed to verify warp sync fragment from {}: {}{}", + sender_peer_id, + err, + if maybe_forced_change { + ". This might be caused by a forced GrandPa authorities change having \ + been enacted on the chain. If this is the case, please update the \ + chain specification with a checkpoint past this forced change." + } else { "" } + ); + } } } diff --git a/wasm-node/CHANGELOG.md b/wasm-node/CHANGELOG.md index 65b748b7c5..e60bdcf126 100644 --- a/wasm-node/CHANGELOG.md +++ b/wasm-node/CHANGELOG.md @@ -4,11 +4,16 @@ ### Changed +- During the warp syncing process, smoldot will now download the runtime and call proofs from any peer whose finalized block is superior or equal to the target block, rather than always the peer that was used to download the warp sync fragments. ([#1060](https://github.com/smol-dot/smoldot/pull/1060)) +- During the warp syncing process, smoldot will now download warp sync fragments in parallel of verifying previously-downloaded fragments. This is expected to speed up the warp syncing process. ([#1060](https://github.com/smol-dot/smoldot/pull/1060)) +- When a warp sync response contains an invalid warp sync fragment, the earlier valid fragments are now still used to make the warp syncing progress instead of being thrown away. ([#1060](https://github.com/smol-dot/smoldot/pull/1060)) +- During the warp sync process, the runtime call Merkle proofs are now downloaded in parallel of the runtime. This should save several networking round trips. Because the list of runtime calls to perform depend on the runtime version, starting to download the Merkle proofs before the runtime has been fully obtained is built upon the assumption that the runtime is at the latest version. ([#1060](https://github.com/smol-dot/smoldot/pull/1060)) - The `index` field of `bestChainBlockIncluded` events of `chainHead_unstable_follow` subscriptions is now a number rather than a string, in accordance with the latest changes in the JSON-RPC API specification. ([#1097](https://github.com/smol-dot/smoldot/pull/1097)) ### Fixed - Justifications are no longer downloaded for blocks that can't be finalized because an earlier block needs to be finalized first. ([#1127](https://github.com/smol-dot/smoldot/pull/1127)) +- Fix warp sync process stagnating if a source sends a header whose height is inferior or equal to the currently warp synced block. ([#1060](https://github.com/smol-dot/smoldot/pull/1060)) ## 2.0.1 - 2023-09-08