Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite the warp syncing code to use call proofs #2578

Merged
merged 62 commits into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from 56 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
60627e9
VirtualMachineParamsGet -> ChainInfoQuery
tomaka Aug 1, 2022
2036e45
Add call proofs requests to AllSync
tomaka Aug 1, 2022
474e521
Start adding a requests system to warp_sync.rs
tomaka Aug 2, 2022
f413a21
warp_sync_source no longer public
tomaka Aug 2, 2022
8323a7f
Start adding call proof requests
tomaka Aug 2, 2022
7e880ec
Provide block_hash in warp_sync::RequestDetail
tomaka Aug 2, 2022
4efb51c
Fix implementation in sync_service
tomaka Aug 2, 2022
5c197e3
Send the call proofs
tomaka Aug 2, 2022
13abcd4
Store the call proof responses
tomaka Aug 3, 2022
9fc643e
Use the call proofs
tomaka Aug 3, 2022
dd479e9
Remove StorageGet and NextKey variants from warp_sync
tomaka Aug 3, 2022
07a70df
Add RequestDetail::WarpSyncRequest
tomaka Aug 3, 2022
1ecb2b9
Inline PostVerificationState
tomaka Aug 3, 2022
46ec8b2
Add a Phase enum to ChainInfoQuery
tomaka Aug 3, 2022
cbe5707
Do the warp sync as part of ChainInfoQuery
tomaka Aug 3, 2022
d632883
Remove GrandpaWarpSync and Verifier variants
tomaka Aug 3, 2022
c3a157b
Remove ChainInfoQuery::current_source
tomaka Aug 3, 2022
1ca81c4
Remove source_id field, instead get source on the fly
tomaka Aug 3, 2022
30a546f
Remove WaitingForSources struct
tomaka Aug 3, 2022
77cb3ee
Merge InProgressWarpSync and ChainInfoQuery into one
tomaka Aug 3, 2022
ff59fd9
Remove warp_sync_header()
tomaka Aug 3, 2022
656e5a4
Have a run() function that drives the CPU process
tomaka Aug 4, 2022
5470748
Properly transition requests
tomaka Aug 4, 2022
ff1a01f
Update inject_error function
tomaka Aug 4, 2022
dc91084
as_chain_information is in fact correct
tomaka Aug 4, 2022
f41a52a
Add warp_sync::Config::requests_capacity
tomaka Aug 4, 2022
5d86d54
Update remove_source
tomaka Aug 4, 2022
dfd2b75
Store the raw downloaded runtime code and compile later
tomaka Aug 4, 2022
1b0c670
Store the downloaded fragments and verify them in run()
tomaka Aug 4, 2022
71319c6
Small tweak to run()
tomaka Aug 4, 2022
97c742d
handle_response_ok no longer takes ownership
tomaka Aug 4, 2022
3f816aa
Tweaks to downloaded_proof
tomaka Aug 4, 2022
5a0316c
Tweak desired_requests so that rustfmt is happy
tomaka Aug 4, 2022
fd437dd
Small tweaks to desired_requests
tomaka Aug 4, 2022
8b47c71
Add a TRq user data for requests
tomaka Aug 4, 2022
991f734
Remove functions a bit
tomaka Aug 4, 2022
7471b55
Split RequestDetail from DesiredRequest, to remove state_trie_root field
tomaka Aug 4, 2022
324a9f5
Use Cow for the function name and parameters
tomaka Aug 4, 2022
1d654b5
Split PreVerification phase in two
tomaka Aug 4, 2022
c459589
Immediately create a Verifier after download
tomaka Aug 4, 2022
424d9d1
Extract body of run() to different sub-structs
tomaka Aug 4, 2022
53612d7
run() -> process_one() and tweaks
tomaka Aug 4, 2022
3b424d0
BuildChainInformation::verify() -> build()
tomaka Aug 4, 2022
e1eb68f
Fix one todo in VerifyWarpSyncFragment::verify
tomaka Aug 4, 2022
b66e95e
Restore verifying fragments one by one
tomaka Aug 4, 2022
781d8e4
Only verify one fragment at a time in warp_sync
tomaka Aug 4, 2022
c7f18fe
CHANGELOG
tomaka Aug 4, 2022
04010db
Remove all remaining todo!()s from warp_sync.rs
tomaka Aug 4, 2022
d1dafbc
Fix most warnings
tomaka Aug 4, 2022
9c3016a
Properly check block hash on warp sync request success
tomaka Aug 4, 2022
7d27437
Try again sources if all sources banned
tomaka Aug 4, 2022
b7f881f
Tweak usage
tomaka Aug 4, 2022
0b54af2
Properly remove obsolete requests when removing source
tomaka Aug 4, 2022
79b0f7e
Improve docs somewhat
tomaka Aug 4, 2022
3eaaf67
Move process_one to the bottom
tomaka Aug 4, 2022
48d12d5
PR number
tomaka Aug 4, 2022
93c18c5
Rustfmt
tomaka Aug 4, 2022
c5c959f
Fix doclink
tomaka Aug 4, 2022
e8f20f4
Fix spellcheck
tomaka Aug 4, 2022
53f64ba
Properly change phase if multiple sets of fragments to download
tomaka Aug 4, 2022
39a0273
No longer ignore responses from banned sources, to prevent infinite loop
tomaka Aug 5, 2022
31f2c3f
Merge branch 'main' into warp-sync-call-proofs
mergify[bot] Aug 5, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 12 additions & 15 deletions bin/full-node/src/run/consensus_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,10 +533,6 @@ impl SyncBackground {
| all::ResponseOutcome::NotFinalizedChain { .. }
| all::ResponseOutcome::AllAlreadyInChain { .. } => {
}
all::ResponseOutcome::WarpSyncError { .. } |
all::ResponseOutcome::WarpSyncFinished { .. } => {
unreachable!()
}
}
}
},
Expand Down Expand Up @@ -802,15 +798,15 @@ impl SyncBackground {
// Locally-authored blocks source.
match (request_details, &self.authored_block) {
(
all::RequestDetail::BlocksRequest {
all::DesiredRequest::BlocksRequest {
first_block_hash: None,
first_block_height,
..
},
Some((authored_height, _, _, _)),
) if first_block_height == authored_height => true,
(
all::RequestDetail::BlocksRequest {
all::DesiredRequest::BlocksRequest {
first_block_hash: Some(first_block_hash),
first_block_height,
..
Expand All @@ -834,7 +830,7 @@ impl SyncBackground {
request_info.num_blocks_clamp(NonZeroU64::new(64).unwrap());

match request_info {
all::RequestDetail::BlocksRequest { .. }
all::DesiredRequest::BlocksRequest { .. }
if source_id == self.block_author_sync_source =>
{
tracing::debug!("queue-locally-authored-block-for-import");
Expand All @@ -847,7 +843,7 @@ impl SyncBackground {
// Create a request that is immediately answered right below.
let request_id = self.sync.add_request(
source_id,
request_info,
request_info.into(),
future::AbortHandle::new_pair().0, // Temporary dummy.
);

Expand All @@ -863,7 +859,7 @@ impl SyncBackground {
);
}

all::RequestDetail::BlocksRequest {
all::DesiredRequest::BlocksRequest {
first_block_hash,
first_block_height,
ascending,
Expand Down Expand Up @@ -905,15 +901,14 @@ impl SyncBackground {
);

let (request, abort) = future::abortable(request);
let request_id = self
.sync
.add_request(source_id, request_info.clone(), abort);
let request_id = self.sync.add_request(source_id, request_info.into(), abort);

self.block_requests_finished
.push(request.map(move |r| (request_id, r)).boxed());
}
all::RequestDetail::GrandpaWarpSync { .. }
| all::RequestDetail::StorageGet { .. } => {
all::DesiredRequest::GrandpaWarpSync { .. }
| all::DesiredRequest::StorageGet { .. }
| all::DesiredRequest::RuntimeCallMerkleProof { .. } => {
// Not used in "full" mode.
unreachable!()
}
Expand All @@ -937,7 +932,9 @@ impl SyncBackground {
self.sync = idle;
break;
}
all::ProcessOne::VerifyWarpSyncFragment(_) => unreachable!(),
all::ProcessOne::VerifyWarpSyncFragment(_)
| all::ProcessOne::WarpSyncError { .. }
| all::ProcessOne::WarpSyncFinished { .. } => unreachable!(),
all::ProcessOne::VerifyBodyHeader(verify) => {
let hash_to_verify = verify.hash();
let height_to_verify = verify.height();
Expand Down
166 changes: 122 additions & 44 deletions bin/light-base/src/sync_service/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use smoldot::{
};
use std::{
collections::{HashMap, HashSet},
iter,
marker::PhantomData,
num::{NonZeroU32, NonZeroU64},
sync::Arc,
Expand Down Expand Up @@ -81,6 +82,7 @@ pub(super) async fn start_standalone_chain<TPlat: Platform>(
pending_block_requests: stream::FuturesUnordered::new(),
pending_grandpa_requests: stream::FuturesUnordered::new(),
pending_storage_requests: stream::FuturesUnordered::new(),
pending_call_proof_requests: stream::FuturesUnordered::new(),
warp_sync_taking_long_time_warning: future::Either::Left(TPlat::sleep(
Duration::from_secs(15),
))
Expand Down Expand Up @@ -277,8 +279,7 @@ pub(super) async fn start_standalone_chain<TPlat: Platform>(

(request_id, result) = task.pending_storage_requests.select_next_some() => {
// A storage request has been finished.
// `result` is an error if the block request got cancelled by the sync state
// machine.
// `result` is an error if the request got cancelled by the sync state machine.
if let Ok(result) = result {
// Inject the result of the request into the sync state machine.
task.sync.storage_get_response(
Expand All @@ -293,6 +294,26 @@ pub(super) async fn start_standalone_chain<TPlat: Platform>(
}
},

(request_id, result) = task.pending_call_proof_requests.select_next_some() => {
// A call proof request has been finished.
// `result` is an error if the request got cancelled by the sync state machine.
if let Ok(result) = result {
// Inject the result of the request into the sync state machine.
task.sync.call_proof_response(
request_id,
match result {
Ok(ref r) => Ok(r.decode().into_iter()),
Err(err) => Err(err),
}
).1

} else {
// The sync state machine has emitted a `Action::Cancel` earlier, and is
// thus no longer interested in the response.
continue;
}
},

() = &mut task.warp_sync_taking_long_time_warning => {
log::warn!(
target: &task.log_target,
Expand Down Expand Up @@ -325,42 +346,6 @@ pub(super) async fn start_standalone_chain<TPlat: Platform>(
| all::ResponseOutcome::Queued
| all::ResponseOutcome::NotFinalizedChain { .. }
| all::ResponseOutcome::AllAlreadyInChain { .. } => {}
all::ResponseOutcome::WarpSyncError { error } => {
log::warn!(
target: &task.log_target,
"Error during GrandPa warp syncing: {}",
error
);
}
all::ResponseOutcome::WarpSyncFinished {
finalized_block_runtime,
finalized_storage_code,
finalized_storage_heap_pages,
} => {
let finalized_header = task.sync.finalized_block_header();
log::info!(
target: &task.log_target,
"GrandPa warp sync finished to #{} ({})",
finalized_header.number,
HashDisplay(&finalized_header.hash(task.sync.block_number_bytes()))
);

task.warp_sync_taking_long_time_warning =
future::Either::Right(future::pending()).fuse();

debug_assert!(task.known_finalized_runtime.is_none());
task.known_finalized_runtime = Some(FinalizedBlockRuntime {
virtual_machine: finalized_block_runtime,
storage_code: finalized_storage_code,
storage_heap_pages: finalized_storage_heap_pages,
});

task.network_up_to_date_finalized = false;
task.network_up_to_date_best = false;
// Since there is a gap in the blocks, all active notifications to all blocks
// must be cleared.
task.all_notifications.clear();
}
}
}
}
Expand Down Expand Up @@ -446,6 +431,17 @@ struct Task<TPlat: Platform> {
>,
>,

/// List of call proof requests currently in progress.
pending_call_proof_requests: stream::FuturesUnordered<
future::BoxFuture<
'static,
(
all::RequestId,
Result<Result<network::service::EncodedMerkleProof, ()>, future::Aborted>,
),
>,
>,

platform: PhantomData<fn() -> TPlat>,
}

Expand Down Expand Up @@ -475,7 +471,7 @@ impl<TPlat: Platform> Task<TPlat> {
request_detail.num_blocks_clamp(NonZeroU64::new(64).unwrap());

match request_detail {
all::RequestDetail::BlocksRequest {
all::DesiredRequest::BlocksRequest {
first_block_hash,
first_block_height,
ascending,
Expand Down Expand Up @@ -514,13 +510,13 @@ impl<TPlat: Platform> Task<TPlat> {
);

let (block_request, abort) = future::abortable(block_request);
let request_id = self.sync.add_request(source_id, request_detail, abort);
let request_id = self.sync.add_request(source_id, request_detail.into(), abort);

self.pending_block_requests
.push(async move { (request_id, block_request.await) }.boxed());
}

all::RequestDetail::GrandpaWarpSync {
all::DesiredRequest::GrandpaWarpSync {
sync_start_block_hash,
} => {
let peer_id = self.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue
Expand All @@ -537,13 +533,13 @@ impl<TPlat: Platform> Task<TPlat> {
);

let (grandpa_request, abort) = future::abortable(grandpa_request);
let request_id = self.sync.add_request(source_id, request_detail, abort);
let request_id = self.sync.add_request(source_id, request_detail.into(), abort);

self.pending_grandpa_requests
.push(async move { (request_id, grandpa_request.await) }.boxed());
}

all::RequestDetail::StorageGet {
all::DesiredRequest::StorageGet {
block_hash,
state_trie_root,
ref keys,
Expand Down Expand Up @@ -583,11 +579,48 @@ impl<TPlat: Platform> Task<TPlat> {
};

let (storage_request, abort) = future::abortable(storage_request);
let request_id = self.sync.add_request(source_id, request_detail, abort);
let request_id = self.sync.add_request(source_id, request_detail.into(), abort);

self.pending_storage_requests
.push(async move { (request_id, storage_request.await) }.boxed());
}

all::DesiredRequest::RuntimeCallMerkleProof {
block_hash,
ref function_name,
ref parameter_vectored,
} => {
let peer_id = self.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue
let network_service = self.network_service.clone();
let network_chain_index = self.network_chain_index;
// TODO: all this copying is done because of lifetime requirements in NetworkService::call_proof_request; maybe check if it can be avoided
let parameter_vectored = parameter_vectored.clone();
let function_name = function_name.clone();

let call_proof_request = async move {
let rq = network_service.call_proof_request(
network_chain_index,
peer_id,
network::protocol::CallProofRequestConfig {
block_hash,
method: &function_name,
parameter_vectored: iter::once(parameter_vectored),
},
Duration::from_secs(16),
);

match rq.await {
Ok(p) => Ok(p),
Err(_) => Err(()),
}
};

let (call_proof_request, abort) = future::abortable(call_proof_request);
let request_id = self.sync.add_request(source_id, request_detail.into(), abort);

self.pending_call_proof_requests
.push(async move { (request_id, call_proof_request.await) }.boxed());
}
}

true
Expand All @@ -607,6 +640,51 @@ impl<TPlat: Platform> Task<TPlat> {
return (self, false);
}

all::ProcessOne::WarpSyncError { sync, error } => {
self.sync = sync;
log::warn!(
target: &self.log_target,
"Error during GrandPa warp syncing: {}",
error
);
return (self, true);
}

all::ProcessOne::WarpSyncFinished {
sync,
finalized_block_runtime,
finalized_storage_code,
finalized_storage_heap_pages,
} => {
self.sync = sync;

let finalized_header = self.sync.finalized_block_header();
log::info!(
target: &self.log_target,
"GrandPa warp sync finished to #{} ({})",
finalized_header.number,
HashDisplay(&finalized_header.hash(self.sync.block_number_bytes()))
);

self.warp_sync_taking_long_time_warning =
future::Either::Right(future::pending()).fuse();

debug_assert!(self.known_finalized_runtime.is_none());
self.known_finalized_runtime = Some(FinalizedBlockRuntime {
virtual_machine: finalized_block_runtime,
storage_code: finalized_storage_code,
storage_heap_pages: finalized_storage_heap_pages,
});

self.network_up_to_date_finalized = false;
self.network_up_to_date_best = false;
// Since there is a gap in the blocks, all active notifications to all blocks
// must be cleared.
self.all_notifications.clear();

return (self, true);
}

all::ProcessOne::VerifyWarpSyncFragment(verify) => {
// Grandpa warp sync fragment to verify.
let sender_peer_id = verify.proof_sender().1 .0.clone(); // TODO: unnecessary cloning most of the time
Expand Down
5 changes: 5 additions & 0 deletions bin/wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## Unreleased

### Changed

- The GRANDPA warp sync algorithm now downloads Merkle proofs of all the necessary storage items at once, rather than one by one sequentially. This removes approximately 11 networking round-trips and thus significantly reduces the time the warp syncing takes. ([#2578](https://github.com/paritytech/smoldot/pull/2578))
- The GRANDPA warp sync implementation has been considerably refactored. It is possible that unintended changes in behaviour have accidentally been introduced. ([#2578](https://github.com/paritytech/smoldot/pull/2578))

## 0.6.27 - 2022-07-29

### Changed
Expand Down
Loading