diff --git a/lib/src/sync/all.rs b/lib/src/sync/all.rs index aafa9ddbda..e5a946f886 100644 --- a/lib/src/sync/all.rs +++ b/lib/src/sync/all.rs @@ -2924,9 +2924,7 @@ impl Shared { } }; - if let Some(finalized_block_height) = finalized_block_height { - all_forks.update_source_finality_state(updated_source_id, finalized_block_height); - } + all_forks.update_source_finality_state(updated_source_id, finalized_block_height); self.sources[source.outer_source_id.0] = SourceMapping::AllForks(updated_source_id); } diff --git a/lib/src/sync/optimistic.rs b/lib/src/sync/optimistic.rs index 7833dc1b5e..1fdbf23ce3 100644 --- a/lib/src/sync/optimistic.rs +++ b/lib/src/sync/optimistic.rs @@ -211,11 +211,6 @@ struct Source { /// Best block that the source has reported having. best_block_number: u64, - /// If `true`, this source is banned and shouldn't use be used to request blocks. - /// Note that the ban is lifted if the source is removed. This ban isn't meant to be a line of - /// defense against malicious peers but rather an optimization. - banned: bool, - /// Number of requests that use this source. num_ongoing_requests: u32, } @@ -384,7 +379,6 @@ impl OptimisticSync { Source { user_data: source, best_block_number, - banned: false, num_ongoing_requests: 0, }, ); @@ -658,15 +652,6 @@ impl OptimisticSync { .unwrap() .num_ongoing_requests -= 1; - self.inner.sources.get_mut(&source_id).unwrap().banned = true; - - // If all sources are banned, unban them. - if self.inner.sources.iter().all(|(_, s)| s.banned) { - for src in self.inner.sources.values_mut() { - src.banned = false; - } - } - user_data } @@ -870,24 +855,12 @@ impl BlockVerify { scale_encoded_extrinsics: block.scale_encoded_extrinsics, verified_header, scale_encoded_justifications: block.scale_encoded_justifications, - source_id, }, new_best_hash, new_best_number, } } Err(reason) => { - if let Some(src) = self.inner.sources.get_mut(&source_id) { - src.banned = true; - } - - // If all sources are banned, unban them. - if self.inner.sources.iter().all(|(_, s)| s.banned) { - for src in self.inner.sources.values_mut() { - src.banned = false; - } - } - self.inner.make_requests_obsolete(&self.chain); let previous_best_height = self.chain.best_block_header().number; @@ -944,7 +917,6 @@ pub struct BlockVerifySuccess { verified_header: blocks_tree::VerifiedHeader, scale_encoded_extrinsics: Vec>, scale_encoded_justifications: Vec<([u8; 4], Vec)>, - source_id: SourceId, } impl BlockVerifySuccess { @@ -1014,19 +986,7 @@ impl BlockVerifySuccess { /// Reject the block and mark it as bad. pub fn reject_bad_block(mut self) -> OptimisticSync { - if let Some(src) = self.parent.inner.sources.get_mut(&self.source_id) { - src.banned = true; - } - - // If all sources are banned, unban them. - if self.parent.inner.sources.iter().all(|(_, s)| s.banned) { - for src in self.parent.inner.sources.values_mut() { - src.banned = false; - } - } - self.parent.inner.make_requests_obsolete(&self.parent.chain); - self.parent } @@ -1083,7 +1043,7 @@ impl JustificationVerify { OptimisticSync, JustificationVerification, ) { - let (consensus_engine_id, justification, source_id) = + let (consensus_engine_id, justification, _) = self.inner.pending_encoded_justifications.next().unwrap(); let mut apply = match self.chain.verify_justification( @@ -1093,17 +1053,6 @@ impl JustificationVerify { ) { Ok(a) => a, Err(error) => { - if let Some(source) = self.inner.sources.get_mut(&source_id) { - source.banned = true; - } - - // If all sources are banned, unban them. - if self.inner.sources.iter().all(|(_, s)| s.banned) { - for src in self.inner.sources.values_mut() { - src.banned = false; - } - } - let chain = blocks_tree::NonFinalizedTree::new( self.inner.finalized_chain_information.clone(), ); diff --git a/lib/src/sync/warp_sync.rs b/lib/src/sync/warp_sync.rs index 7242403ee3..ab9fc31d40 100644 --- a/lib/src/sync/warp_sync.rs +++ b/lib/src/sync/warp_sync.rs @@ -91,8 +91,6 @@ //! previously been downloaded. //! -// TODO: this module is "vulnerable" to situations where new malicious sources are continuously added with a high finalized block, as the state machine will repeatedly try to download from that source and fail - use crate::{ chain::chain_information::{ self, ChainInformationConsensusRef, ChainInformationFinality, ChainInformationFinalityRef, @@ -278,8 +276,7 @@ pub struct Deconstructed { /// height and user data. /// The list is ordered by [`SourceId`]. // TODO: use a struct? - // TODO: this `Option` is weird - pub sources_ordered: Vec<(SourceId, Option, TSrc)>, + pub sources_ordered: Vec<(SourceId, u64, TSrc)>, /// The list of requests that were added to the state machine. pub in_progress_requests: Vec<(SourceId, RequestId, TRq, RequestDetail)>, @@ -367,9 +364,8 @@ pub struct WarpSync { struct Source { /// User data chosen by the API user. user_data: TSrc, - /// Height of the finalized block of the source, as reported by the source. Contains `Err` - /// if the source has sent invalid fragments or proofs in the past. - finalized_block_height: Result, + /// Height of the finalized block of the source, as reported by the source. + finalized_block_height: u64, } /// SeeĀ [`WarpSync::warped_block_ty`]. @@ -566,7 +562,7 @@ impl WarpSync { .map(|(id, source)| { ( SourceId(id), - source.finalized_block_height.ok(), + source.finalized_block_height, source.user_data, ) }) @@ -603,7 +599,7 @@ impl WarpSync { pub fn add_source(&mut self, user_data: TSrc) -> SourceId { let source_id = SourceId(self.sources.insert(Source { user_data, - finalized_block_height: Ok(0), + finalized_block_height: 0, })); let _inserted = self.sources_by_finalized_height.insert((0, source_id)); @@ -627,13 +623,10 @@ impl WarpSync { ) -> (TSrc, impl Iterator + '_) { debug_assert!(self.sources.contains(to_remove.0)); let removed = self.sources.remove(to_remove.0); - - if let Ok(finalized_block_height) = removed.finalized_block_height { - let _was_in = self - .sources_by_finalized_height - .remove(&(finalized_block_height, to_remove)); - debug_assert!(_was_in); - } + let _was_in = self + .sources_by_finalized_height + .remove(&(removed.finalized_block_height, to_remove)); + debug_assert!(_was_in); debug_assert!(self.sources.len() >= self.sources_by_finalized_height.len()); // We make sure to not leave invalid source IDs in the state of `self`. @@ -707,29 +700,29 @@ impl WarpSync { /// Panics if `source_id` is invalid. /// pub fn set_source_finality_state(&mut self, source_id: SourceId, finalized_block_height: u64) { - if let Ok(stored_height) = self.sources[source_id.0].finalized_block_height.as_mut() { - // Small optimization. No need to do anything more if the block doesn't actuall change. - if *stored_height == finalized_block_height { - return; - } + let stored_height = &mut self.sources[source_id.0].finalized_block_height; - // Note that if the new finalized block is below the former one (which is not something - // that is ever supposed to happen), we should in principle cancel the requests - // targeting that source that require a specific block height. In practice, however, - // we don't care as again this isn't supposed to ever happen. While ongoing requests - // might fail as a result, this is handled the same way as a regular request failure. + // Small optimization. No need to do anything more if the block doesn't actuall change. + if *stored_height == finalized_block_height { + return; + } - let _was_in = self - .sources_by_finalized_height - .remove(&(*stored_height, source_id)); - debug_assert!(_was_in); - let _inserted = self - .sources_by_finalized_height - .insert((finalized_block_height, source_id)); - debug_assert!(_inserted); + // Note that if the new finalized block is below the former one (which is not something + // that is ever supposed to happen), we should in principle cancel the requests + // targeting that source that require a specific block height. In practice, however, + // we don't care as again this isn't supposed to ever happen. While ongoing requests + // might fail as a result, this is handled the same way as a regular request failure. + + let _was_in = self + .sources_by_finalized_height + .remove(&(*stored_height, source_id)); + debug_assert!(_was_in); + let _inserted = self + .sources_by_finalized_height + .insert((finalized_block_height, source_id)); + debug_assert!(_inserted); - *stored_height = finalized_block_height; - } + *stored_height = finalized_block_height; } /// Returns a list of requests that should be started in order to drive the warp syncing @@ -782,11 +775,11 @@ impl WarpSync { if let Some(verify_queue_tail_block_number) = verify_queue_tail_block_number { // Combine the request with every single available source. either::Left(self.sources.iter().filter_map(move |(src_id, src)| { - if src.finalized_block_height.map_or(true, |h| { - h <= verify_queue_tail_block_number.saturating_add( + if src.finalized_block_height + <= verify_queue_tail_block_number.saturating_add( u64::try_from(warp_sync_minimum_gap).unwrap_or(u64::max_value()), ) - }) { + { return None; } @@ -956,9 +949,7 @@ impl WarpSync { Cow::Borrowed(&b":code"[..]) }; - if self.sources[source_id.0] - .finalized_block_height - .map_or(false, |h| h >= self.warped_header_number) + if self.sources[source_id.0].finalized_block_height >= self.warped_header_number && *block_hash == self.warped_header_hash && keys.iter().any(|k| *k == *code_key_to_request) && keys.iter().any(|k| k == b":heappages") @@ -979,9 +970,8 @@ impl WarpSync { ) => { for (info, status) in &mut self.runtime_calls { if matches!(status, CallProof::NotStarted) - && self.sources[source_id.0] - .finalized_block_height - .map_or(false, |h| h >= self.warped_header_number) + && self.sources[source_id.0].finalized_block_height + >= self.warped_header_number && *block_hash == self.warped_header_hash && function_name == info.function_name() && parameters_equal(parameter_vectored, info.parameter_vectored()) @@ -1177,33 +1167,31 @@ impl WarpSync { .last() .and_then(|h| header::decode(&h.scale_encoded_header, self.block_number_bytes).ok()) { - if let Ok(src_finalized_height) = - self.sources[rq_source_id.0].finalized_block_height.as_mut() - { - let new_height = if final_set_of_fragments { - // If the source indicated that this is the last fragment, then we know that - // it's also equal to their finalized block. - last_header.number - } else { - // If this is not the last fragment, we know that the finalized block of the - // source is *at least* the one provided. - // TODO: could maybe do + gap or something? - cmp::max(*src_finalized_height, last_header.number.saturating_add(1)) - }; + let src_finalized_height = &mut self.sources[rq_source_id.0].finalized_block_height; + + let new_height = if final_set_of_fragments { + // If the source indicated that this is the last fragment, then we know that + // it's also equal to their finalized block. + last_header.number + } else { + // If this is not the last fragment, we know that the finalized block of the + // source is *at least* the one provided. + // TODO: could maybe do + gap or something? + cmp::max(*src_finalized_height, last_header.number.saturating_add(1)) + }; - if *src_finalized_height != new_height { - let _was_in = self - .sources_by_finalized_height - .remove(&(*src_finalized_height, rq_source_id)); - debug_assert!(_was_in); + if *src_finalized_height != new_height { + let _was_in = self + .sources_by_finalized_height + .remove(&(*src_finalized_height, rq_source_id)); + debug_assert!(_was_in); - *src_finalized_height = new_height; + *src_finalized_height = new_height; - let _inserted = self - .sources_by_finalized_height - .insert((*src_finalized_height, rq_source_id)); - debug_assert!(_inserted); - } + let _inserted = self + .sources_by_finalized_height + .insert((*src_finalized_height, rq_source_id)); + debug_assert!(_inserted); } } @@ -1414,9 +1402,6 @@ impl VerifyWarpSyncFragment { // The source has sent an empty list of fragments. This is invalid. if fragments_to_verify.fragments.is_empty() { - if let Some(SourceId(downloaded_source)) = fragments_to_verify.downloaded_source { - self.inner.sources[downloaded_source].finalized_block_height = Err(()); - } self.inner.verify_queue.pop_front().unwrap(); return (self.inner, Err(VerifyFragmentError::EmptyProof)); } @@ -1449,9 +1434,6 @@ impl VerifyWarpSyncFragment { ) { Ok(j) => j, Err(err) => { - if let Some(SourceId(source_id)) = fragments_to_verify.downloaded_source { - self.inner.sources[source_id].finalized_block_height = Err(()); - } self.inner.verify_queue.clear(); self.inner.warp_sync_fragments_download = None; return (self.inner, Err(VerifyFragmentError::InvalidHeader(err))); @@ -1463,9 +1445,6 @@ impl VerifyWarpSyncFragment { ) { Ok(j) => j, Err(err) => { - if let Some(SourceId(source_id)) = fragments_to_verify.downloaded_source { - self.inner.sources[source_id].finalized_block_height = Err(()); - } self.inner.verify_queue.clear(); self.inner.warp_sync_fragments_download = None; return ( @@ -1477,9 +1456,6 @@ impl VerifyWarpSyncFragment { // Make sure that the header would actually advance the warp sync process forward. if fragment_decoded_header.number <= self.inner.warped_header_number { - if let Some(SourceId(source_id)) = fragments_to_verify.downloaded_source { - self.inner.sources[source_id].finalized_block_height = Err(()); - } self.inner.verify_queue.clear(); self.inner.warp_sync_fragments_download = None; return ( @@ -1498,9 +1474,6 @@ impl VerifyWarpSyncFragment { header_hash: fragment_header_hash, header_height: fragment_decoded_header.number, }; - if let Some(SourceId(source_id)) = fragments_to_verify.downloaded_source { - self.inner.sources[source_id].finalized_block_height = Err(()); - } self.inner.verify_queue.clear(); self.inner.warp_sync_fragments_download = None; return (self.inner, Err(error)); @@ -1516,9 +1489,6 @@ impl VerifyWarpSyncFragment { authorities_set_id: *after_finalized_block_authorities_set_id, randomness_seed, }) { - if let Some(SourceId(source_id)) = fragments_to_verify.downloaded_source { - self.inner.sources[source_id].finalized_block_height = Err(()); - } self.inner.verify_queue.clear(); self.inner.warp_sync_fragments_download = None; return ( @@ -1555,9 +1525,6 @@ impl VerifyWarpSyncFragment { || fragments_to_verify.next_fragment_to_verify_index != fragments_to_verify.fragments.len() - 1) { - if let Some(SourceId(source_id)) = fragments_to_verify.downloaded_source { - self.inner.sources[source_id].finalized_block_height = Err(()); - } self.inner.verify_queue.clear(); self.inner.warp_sync_fragments_download = None; return (self.inner, Err(VerifyFragmentError::NonMinimalProof)); @@ -1579,13 +1546,6 @@ impl VerifyWarpSyncFragment { *finalized_triggered_authorities = new_authorities_list; *after_finalized_block_authorities_set_id += 1; } - if let Some(SourceId(source_id)) = fragments_to_verify.downloaded_source { - let src_finalized = &mut self.inner.sources[source_id].finalized_block_height; - if src_finalized.is_err() { - self.inner.sources[source_id].finalized_block_height = - Ok(self.inner.warped_header_number); - } - } if fragments_to_verify.next_fragment_to_verify_index == fragments_to_verify.fragments.len() { self.inner.verify_queue.pop_front().unwrap(); @@ -1735,9 +1695,6 @@ impl BuildRuntime { Ok(p) => p, Err(err) => { let downloaded_source = *downloaded_source; - if let Some(SourceId(downloaded_source)) = downloaded_source { - self.inner.sources[downloaded_source].finalized_block_height = Err(()); - } self.inner.runtime_download = RuntimeDownload::NotStarted { hint_doesnt_match: *hint_doesnt_match, }; @@ -1789,9 +1746,6 @@ impl BuildRuntime { } Err(proof_decode::IncompleteProofError { .. }) => { let downloaded_source = *downloaded_source; - if let Some(SourceId(downloaded_source)) = downloaded_source { - self.inner.sources[downloaded_source].finalized_block_height = Err(()); - } self.inner.runtime_download = RuntimeDownload::NotStarted { hint_doesnt_match: *hint_doesnt_match, }; @@ -1831,9 +1785,6 @@ impl BuildRuntime { } Err(proof_decode::IncompleteProofError { .. }) => { let downloaded_source = *downloaded_source; - if let Some(SourceId(downloaded_source)) = downloaded_source { - self.inner.sources[downloaded_source].finalized_block_height = Err(()); - } return ( self.inner, Err(BuildRuntimeError::SourceMisbehavior(SourceMisbehavior { @@ -1851,9 +1802,6 @@ impl BuildRuntime { Ok(val) => val.map(|(v, _)| v), Err(proof_decode::IncompleteProofError { .. }) => { let downloaded_source = *downloaded_source; - if let Some(SourceId(downloaded_source)) = downloaded_source { - self.inner.sources[downloaded_source].finalized_block_height = Err(()); - } return ( self.inner, Err(BuildRuntimeError::SourceMisbehavior(SourceMisbehavior { @@ -1994,10 +1942,6 @@ impl BuildChainInformation { }) { Ok(d) => d, Err(err) => { - if let Some(SourceId(downloaded_source)) = downloaded_source { - self.inner.sources[downloaded_source].finalized_block_height = - Err(()); - } return ( self.inner, Err(BuildChainInformationError::SourceMisbehavior( @@ -2071,10 +2015,6 @@ impl BuildChainInformation { { Ok(v) => v, Err(proof_decode::IncompleteProofError { .. }) => { - if let Some(SourceId(downloaded_source)) = *downloaded_source { - self.inner.sources[downloaded_source].finalized_block_height = - Err(()); - } return ( self.inner, Err(BuildChainInformationError::SourceMisbehavior( @@ -2101,10 +2041,6 @@ impl BuildChainInformation { ) { Ok(v) => v, Err(proof_decode::IncompleteProofError { .. }) => { - if let Some(SourceId(downloaded_source)) = *downloaded_source { - self.inner.sources[downloaded_source].finalized_block_height = - Err(()); - } return ( self.inner, Err(BuildChainInformationError::SourceMisbehavior( @@ -2127,10 +2063,6 @@ impl BuildChainInformation { ) { Ok(v) => v, Err(proof_decode::IncompleteProofError { .. }) => { - if let Some(SourceId(downloaded_source)) = *downloaded_source { - self.inner.sources[downloaded_source].finalized_block_height = - Err(()); - } return ( self.inner, Err(BuildChainInformationError::SourceMisbehavior( diff --git a/wasm-node/CHANGELOG.md b/wasm-node/CHANGELOG.md index affaa49484..f090432300 100644 --- a/wasm-node/CHANGELOG.md +++ b/wasm-node/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +### Fixed + +- Fix state mismatch during warp syncing if a peer sends a bad header, justification, or proof. ([#1498](https://github.com/smol-dot/smoldot/pull/1498)) + ## 2.0.15 - 2023-12-20 ### Changed