Skip to content

Commit

Permalink
Prioritise important parts of block processing (sigp#3696)
Browse files Browse the repository at this point in the history
## Issue Addressed

Closes sigp#2327

## Proposed Changes

This is an extension of some ideas I implemented while working on `tree-states`:

- Cache the indexed attestations from blocks in the `ConsensusContext`. Previously we were re-computing them 3-4 times over.
- Clean up `import_block` by splitting each part into `import_block_XXX`.
- Move some stuff off hot paths, specifically:
  - Relocate non-essential tasks that were running between receiving the payload verification status and priming the early attester cache. These tasks are moved after the cache priming:
    - Attestation observation
    - Validator monitor updates
    - Slasher updates
    - Updating the shuffling cache
  - Fork choice attestation observation now happens at the end of block verification in parallel with payload verification (this seems to save 5-10ms).
  - Payload verification now happens _before_ advancing the pre-state and writing it to disk! States were previously being written eagerly and adding ~20-30ms in front of verifying the execution payload. State catchup also sometimes takes ~500ms if we get a cache miss and need to rebuild the tree hash cache.

The remaining task that's taking substantial time (~20ms) is importing the block to fork choice. I _think_ this is because of pull-tips, and we should be able to optimise it out with a clever total active balance cache in the state (which would be computed in parallel with payload verification). I've decided to leave that for future work though. For now it can be observed via the new `beacon_block_processing_post_exec_pre_attestable_seconds` metric.


Co-authored-by: Michael Sproul <[email protected]>
  • Loading branch information
2 people authored and macladson committed Dec 30, 2022
1 parent 26adb3d commit c0beffd
Show file tree
Hide file tree
Showing 14 changed files with 783 additions and 496 deletions.
732 changes: 449 additions & 283 deletions beacon_node/beacon_chain/src/beacon_chain.rs

Large diffs are not rendered by default.

241 changes: 141 additions & 100 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,22 @@ use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOC
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::{
beacon_chain::{
BeaconForkChoice, BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT, MAXIMUM_GOSSIP_CLOCK_DISPARITY,
VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT,
BeaconForkChoice, ForkChoiceError, BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT,
MAXIMUM_GOSSIP_CLOCK_DISPARITY, VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT,
},
metrics, BeaconChain, BeaconChainError, BeaconChainTypes,
};
use derivative::Derivative;
use eth2::types::EventKind;
use execution_layer::PayloadStatus;
use fork_choice::PayloadVerificationStatus;
use fork_choice::{AttestationFromBlock, PayloadVerificationStatus};
use parking_lot::RwLockReadGuard;
use proto_array::Block as ProtoBlock;
use safe_arith::ArithError;
use slog::{debug, error, warn, Logger};
use slot_clock::SlotClock;
use ssz::Encode;
use state_processing::per_block_processing::is_merge_transition_block;
use state_processing::per_block_processing::{errors::IntoWithIndex, is_merge_transition_block};
use state_processing::{
block_signature_verifier::{BlockSignatureVerifier, Error as BlockSignatureVerifierError},
per_block_processing, per_slot_processing,
Expand Down Expand Up @@ -550,8 +550,22 @@ pub fn signature_verify_chain_segment<T: BeaconChainTypes>(
let pubkey_cache = get_validator_pubkey_cache(chain)?;
let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec);

let mut signature_verified_blocks = Vec::with_capacity(chain_segment.len());

for (block_root, block) in &chain_segment {
signature_verifier.include_all_signatures(block, Some(*block_root), None)?;
let mut consensus_context =
ConsensusContext::new(block.slot()).set_current_block_root(*block_root);

signature_verifier.include_all_signatures(block, &mut consensus_context)?;

// Save the block and its consensus context. The context will have had its proposer index
// and attesting indices filled in, which can be used to accelerate later block processing.
signature_verified_blocks.push(SignatureVerifiedBlock {
block: block.clone(),
block_root: *block_root,
parent: None,
consensus_context,
});
}

if signature_verifier.verify().is_err() {
Expand All @@ -560,22 +574,6 @@ pub fn signature_verify_chain_segment<T: BeaconChainTypes>(

drop(pubkey_cache);

let mut signature_verified_blocks = chain_segment
.into_iter()
.map(|(block_root, block)| {
// Proposer index has already been verified above during signature verification.
let consensus_context = ConsensusContext::new(block.slot())
.set_current_block_root(block_root)
.set_proposer_index(block.message().proposer_index());
SignatureVerifiedBlock {
block,
block_root,
parent: None,
consensus_context,
}
})
.collect::<Vec<_>>();

if let Some(signature_verified_block) = signature_verified_blocks.first_mut() {
signature_verified_block.parent = Some(parent);
}
Expand Down Expand Up @@ -625,6 +623,7 @@ pub struct ExecutionPendingBlock<T: BeaconChainTypes> {
pub parent_block: SignedBeaconBlock<T::EthSpec, BlindedPayload<T::EthSpec>>,
pub parent_eth1_finalization_data: Eth1FinalizationData,
pub confirmed_state_roots: Vec<Hash256>,
pub consensus_context: ConsensusContext<T::EthSpec>,
pub payload_verification_handle: PayloadVerificationHandle<T::EthSpec>,
}

Expand Down Expand Up @@ -951,13 +950,14 @@ impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {

let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec);

signature_verifier.include_all_signatures(&block, Some(block_root), None)?;
let mut consensus_context =
ConsensusContext::new(block.slot()).set_current_block_root(block_root);

signature_verifier.include_all_signatures(&block, &mut consensus_context)?;

if signature_verifier.verify().is_ok() {
Ok(Self {
consensus_context: ConsensusContext::new(block.slot())
.set_current_block_root(block_root)
.set_proposer_index(block.message().proposer_index()),
consensus_context,
block,
block_root,
parent: Some(parent),
Expand Down Expand Up @@ -1002,16 +1002,16 @@ impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {

// Gossip verification has already checked the proposer index. Use it to check the RANDAO
// signature.
let verified_proposer_index = Some(block.message().proposer_index());
let mut consensus_context = from.consensus_context;
signature_verifier
.include_all_signatures_except_proposal(&block, verified_proposer_index)?;
.include_all_signatures_except_proposal(&block, &mut consensus_context)?;

if signature_verifier.verify().is_ok() {
Ok(Self {
block,
block_root: from.block_root,
parent: Some(parent),
consensus_context: from.consensus_context,
consensus_context,
})
} else {
Err(BlockError::InvalidSignature)
Expand Down Expand Up @@ -1138,6 +1138,79 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {

check_block_relevancy(&block, block_root, chain)?;

// Define a future that will verify the execution payload with an execution engine.
//
// We do this as early as possible so that later parts of this function can run in parallel
// with the payload verification.
let payload_notifier = PayloadNotifier::new(
chain.clone(),
block.clone(),
&parent.pre_state,
notify_execution_layer,
)?;
let is_valid_merge_transition_block =
is_merge_transition_block(&parent.pre_state, block.message().body());
let payload_verification_future = async move {
let chain = payload_notifier.chain.clone();
let block = payload_notifier.block.clone();

// If this block triggers the merge, check to ensure that it references valid execution
// blocks.
//
// The specification defines this check inside `on_block` in the fork-choice specification,
// however we perform the check here for two reasons:
//
// - There's no point in importing a block that will fail fork choice, so it's best to fail
// early.
// - Doing the check here means we can keep our fork-choice implementation "pure". I.e., no
// calls to remote servers.
if is_valid_merge_transition_block {
validate_merge_block(&chain, block.message(), AllowOptimisticImport::Yes).await?;
};

// The specification declares that this should be run *inside* `per_block_processing`,
// however we run it here to keep `per_block_processing` pure (i.e., no calls to external
// servers).
let payload_verification_status = payload_notifier.notify_new_payload().await?;

// If the payload did not validate or invalidate the block, check to see if this block is
// valid for optimistic import.
if payload_verification_status.is_optimistic() {
let block_hash_opt = block
.message()
.body()
.execution_payload()
.map(|full_payload| full_payload.execution_payload.block_hash);

// Ensure the block is a candidate for optimistic import.
if !is_optimistic_candidate_block(&chain, block.slot(), block.parent_root()).await?
{
warn!(
chain.log,
"Rejecting optimistic block";
"block_hash" => ?block_hash_opt,
"msg" => "the execution engine is not synced"
);
return Err(ExecutionPayloadError::UnverifiedNonOptimisticCandidate.into());
}
}

Ok(PayloadVerificationOutcome {
payload_verification_status,
is_valid_merge_transition_block,
})
};
// Spawn the payload verification future as a new task, but don't wait for it to complete.
// The `payload_verification_future` will be awaited later to ensure verification completed
// successfully.
let payload_verification_handle = chain
.task_executor
.spawn_handle(
payload_verification_future,
"execution_payload_verification",
)
.ok_or(BeaconChainError::RuntimeShutdown)?;

/*
* Advance the given `parent.beacon_state` to the slot of the given `block`.
*/
Expand Down Expand Up @@ -1242,80 +1315,11 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
summaries.push(summary);
}
}
metrics::stop_timer(catchup_timer);

let block_slot = block.slot();
let state_current_epoch = state.current_epoch();

// Define a future that will verify the execution payload with an execution engine (but
// don't execute it yet).
let payload_notifier =
PayloadNotifier::new(chain.clone(), block.clone(), &state, notify_execution_layer)?;
let is_valid_merge_transition_block =
is_merge_transition_block(&state, block.message().body());
let payload_verification_future = async move {
let chain = payload_notifier.chain.clone();
let block = payload_notifier.block.clone();

// If this block triggers the merge, check to ensure that it references valid execution
// blocks.
//
// The specification defines this check inside `on_block` in the fork-choice specification,
// however we perform the check here for two reasons:
//
// - There's no point in importing a block that will fail fork choice, so it's best to fail
// early.
// - Doing the check here means we can keep our fork-choice implementation "pure". I.e., no
// calls to remote servers.
if is_valid_merge_transition_block {
validate_merge_block(&chain, block.message(), AllowOptimisticImport::Yes).await?;
};

// The specification declares that this should be run *inside* `per_block_processing`,
// however we run it here to keep `per_block_processing` pure (i.e., no calls to external
// servers).
//
// It is important that this function is called *after* `per_slot_processing`, since the
// `randao` may change.
let payload_verification_status = payload_notifier.notify_new_payload().await?;

// If the payload did not validate or invalidate the block, check to see if this block is
// valid for optimistic import.
if payload_verification_status.is_optimistic() {
let block_hash_opt = block
.message()
.body()
.execution_payload()
.map(|full_payload| full_payload.execution_payload.block_hash);

// Ensure the block is a candidate for optimistic import.
if !is_optimistic_candidate_block(&chain, block.slot(), block.parent_root()).await?
{
warn!(
chain.log,
"Rejecting optimistic block";
"block_hash" => ?block_hash_opt,
"msg" => "the execution engine is not synced"
);
return Err(ExecutionPayloadError::UnverifiedNonOptimisticCandidate.into());
}
}

Ok(PayloadVerificationOutcome {
payload_verification_status,
is_valid_merge_transition_block,
})
};
// Spawn the payload verification future as a new task, but don't wait for it to complete.
// The `payload_verification_future` will be awaited later to ensure verification completed
// successfully.
let payload_verification_handle = chain
.task_executor
.spawn_handle(
payload_verification_future,
"execution_payload_verification",
)
.ok_or(BeaconChainError::RuntimeShutdown)?;

// If the block is sufficiently recent, notify the validator monitor.
if let Some(slot) = chain.slot_clock.now() {
let epoch = slot.epoch(T::EthSpec::slots_per_epoch());
Expand All @@ -1342,8 +1346,6 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
}
}

metrics::stop_timer(catchup_timer);

/*
* Build the committee caches on the state.
*/
Expand Down Expand Up @@ -1433,13 +1435,52 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
});
}

/*
* Apply the block's attestations to fork choice.
*
* We're running in parallel with the payload verification at this point, so this is
* free real estate.
*/
let current_slot = chain.slot()?;
let mut fork_choice = chain.canonical_head.fork_choice_write_lock();

// Register each attester slashing in the block with fork choice.
for attester_slashing in block.message().body().attester_slashings() {
fork_choice.on_attester_slashing(attester_slashing);
}

// Register each attestation in the block with fork choice.
for (i, attestation) in block.message().body().attestations().iter().enumerate() {
let _fork_choice_attestation_timer =
metrics::start_timer(&metrics::FORK_CHOICE_PROCESS_ATTESTATION_TIMES);

let indexed_attestation = consensus_context
.get_indexed_attestation(&state, attestation)
.map_err(|e| BlockError::PerBlockProcessingError(e.into_with_index(i)))?;

match fork_choice.on_attestation(
current_slot,
indexed_attestation,
AttestationFromBlock::True,
&chain.spec,
) {
Ok(()) => Ok(()),
// Ignore invalid attestations whilst importing attestations from a block. The
// block might be very old and therefore the attestations useless to fork choice.
Err(ForkChoiceError::InvalidAttestation(_)) => Ok(()),
Err(e) => Err(BlockError::BeaconChainError(e.into())),
}?;
}
drop(fork_choice);

Ok(Self {
block,
block_root,
state,
parent_block: parent.beacon_block,
parent_eth1_finalization_data,
confirmed_state_roots,
consensus_context,
payload_verification_handle,
})
}
Expand Down
4 changes: 3 additions & 1 deletion beacon_node/beacon_chain/src/execution_payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ impl<T: BeaconChainTypes> PayloadNotifier<T> {
// are cheap and doing them here ensures we protect the execution engine from junk.
partially_verify_execution_payload(
state,
block.slot(),
block.message().execution_payload()?,
&chain.spec,
)
Expand Down Expand Up @@ -373,7 +374,8 @@ pub fn get_execution_payload<
let spec = &chain.spec;
let current_epoch = state.current_epoch();
let is_merge_transition_complete = is_merge_transition_complete(state);
let timestamp = compute_timestamp_at_slot(state, spec).map_err(BeaconStateError::from)?;
let timestamp =
compute_timestamp_at_slot(state, state.slot(), spec).map_err(BeaconStateError::from)?;
let random = *state.get_randao_mix(current_epoch)?;
let latest_execution_payload_header_block_hash =
state.latest_execution_payload_header()?.block_hash;
Expand Down
5 changes: 5 additions & 0 deletions beacon_node/beacon_chain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ lazy_static! {
"beacon_block_processing_state_root_seconds",
"Time spent calculating the state root when processing a block."
);
pub static ref BLOCK_PROCESSING_POST_EXEC_PROCESSING: Result<Histogram> = try_create_histogram_with_buckets(
"beacon_block_processing_post_exec_pre_attestable_seconds",
"Time between finishing execution processing and the block becoming attestable",
linear_buckets(5e-3, 5e-3, 10)
);
pub static ref BLOCK_PROCESSING_DB_WRITE: Result<Histogram> = try_create_histogram(
"beacon_block_processing_db_write_seconds",
"Time spent writing a newly processed block and state to DB"
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ where

pub fn get_timestamp_at_slot(&self) -> u64 {
let state = self.get_current_state();
compute_timestamp_at_slot(&state, &self.spec).unwrap()
compute_timestamp_at_slot(&state, state.slot(), &self.spec).unwrap()
}

pub fn get_current_state_and_root(&self) -> (BeaconState<E>, Hash256) {
Expand Down
Loading

0 comments on commit c0beffd

Please sign in to comment.