diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 564e2582e6c..59abb860c80 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -73,7 +73,7 @@ use slog::{crit, debug, error, info, trace, warn, Logger}; use slot_clock::SlotClock; use ssz::Encode; use state_processing::{ - common::{get_attesting_indices_from_state, get_indexed_attestation}, + common::get_attesting_indices_from_state, per_block_processing, per_block_processing::{ errors::AttestationValidationError, verify_attestation_for_block_inclusion, @@ -2587,6 +2587,7 @@ impl BeaconChain { confirmed_state_roots, payload_verification_handle, parent_eth1_finalization_data, + consensus_context, } = execution_pending_block; let PayloadVerificationOutcome { @@ -2640,6 +2641,7 @@ impl BeaconChain { count_unrealized, parent_block, parent_eth1_finalization_data, + consensus_context, ) }, "payload_verification_handle", @@ -2665,70 +2667,36 @@ impl BeaconChain { count_unrealized: CountUnrealized, parent_block: SignedBlindedBeaconBlock, parent_eth1_finalization_data: Eth1FinalizationData, + mut consensus_context: ConsensusContext, ) -> Result> { + // ----------------------------- BLOCK NOT YET ATTESTABLE ---------------------------------- + // Everything in this initial section is on the hot path between processing the block and + // being able to attest to it. DO NOT add any extra processing in this initial section + // unless it must run before fork choice. + // ----------------------------------------------------------------------------------------- let current_slot = self.slot()?; let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch()); + let block = signed_block.message(); + let post_exec_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_POST_EXEC_PROCESSING); - let attestation_observation_timer = - metrics::start_timer(&metrics::BLOCK_PROCESSING_ATTESTATION_OBSERVATION); - - // Iterate through the attestations in the block and register them as an "observed - // attestation". This will stop us from propagating them on the gossip network. - for a in signed_block.message().body().attestations() { - match self.observed_attestations.write().observe_item(a, None) { - // If the observation was successful or if the slot for the attestation was too - // low, continue. - // - // We ignore `SlotTooLow` since this will be very common whilst syncing. - Ok(_) | Err(AttestationObservationError::SlotTooLow { .. }) => {} - Err(e) => return Err(BlockError::BeaconChainError(e.into())), - } - } - - metrics::stop_timer(attestation_observation_timer); - - // If a slasher is configured, provide the attestations from the block. - if let Some(slasher) = self.slasher.as_ref() { - for attestation in signed_block.message().body().attestations() { - let committee = - state.get_beacon_committee(attestation.data.slot, attestation.data.index)?; - let indexed_attestation = get_indexed_attestation(committee.committee, attestation) - .map_err(|e| BlockError::BeaconChainError(e.into()))?; - slasher.accept_attestation(indexed_attestation); - } - } + // Check against weak subjectivity checkpoint. + self.check_block_against_weak_subjectivity_checkpoint(block, block_root, &state)?; // If there are new validators in this block, update our pubkey cache. // - // We perform this _before_ adding the block to fork choice because the pubkey cache is - // used by attestation processing which will only process an attestation if the block is - // known to fork choice. This ordering ensure that the pubkey cache is always up-to-date. - self.validator_pubkey_cache + // The only keys imported here will be ones for validators deposited in this block, because + // the cache *must* already have been updated for the parent block when it was imported. + // Newly deposited validators are not active and their keys are not required by other parts + // of block processing. The reason we do this here and not after making the block attestable + // is so we don't have to think about lock ordering with respect to the fork choice lock. + // There are a bunch of places where we lock both fork choice and the pubkey cache and it + // would be difficult to check that they all lock fork choice first. + let mut kv_store_ops = self + .validator_pubkey_cache .try_write_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT) .ok_or(Error::ValidatorPubkeyCacheLockTimeout)? .import_new_pubkeys(&state)?; - // For the current and next epoch of this state, ensure we have the shuffling from this - // block in our cache. - for relative_epoch in &[RelativeEpoch::Current, RelativeEpoch::Next] { - let shuffling_id = AttestationShufflingId::new(block_root, &state, *relative_epoch)?; - - let shuffling_is_cached = self - .shuffling_cache - .try_read_for(ATTESTATION_CACHE_LOCK_TIMEOUT) - .ok_or(Error::AttestationCacheLockTimeout)? - .contains(&shuffling_id); - - if !shuffling_is_cached { - state.build_committee_cache(*relative_epoch, &self.spec)?; - let committee_cache = state.committee_cache(*relative_epoch)?; - self.shuffling_cache - .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) - .ok_or(Error::AttestationCacheLockTimeout)? - .insert_committee_cache(shuffling_id, committee_cache); - } - } - // Apply the state to the attester cache, only if it is from the previous epoch or later. // // In a perfect scenario there should be no need to add previous-epoch states to the cache. @@ -2740,52 +2708,7 @@ impl BeaconChain { .map_err(BeaconChainError::from)?; } - // Alias for readability. - let block = signed_block.message(); - - // Only perform the weak subjectivity check if it was configured. - if let Some(wss_checkpoint) = self.config.weak_subjectivity_checkpoint { - // Note: we're using the finalized checkpoint from the head state, rather than fork - // choice. - // - // We are doing this to ensure that we detect changes in finalization. It's possible - // that fork choice has already been updated to the finalized checkpoint in the block - // we're importing. - let current_head_finalized_checkpoint = - self.canonical_head.cached_head().finalized_checkpoint(); - // Compare the existing finalized checkpoint with the incoming block's finalized checkpoint. - let new_finalized_checkpoint = state.finalized_checkpoint(); - - // This ensures we only perform the check once. - if (current_head_finalized_checkpoint.epoch < wss_checkpoint.epoch) - && (wss_checkpoint.epoch <= new_finalized_checkpoint.epoch) - { - if let Err(e) = - self.verify_weak_subjectivity_checkpoint(wss_checkpoint, block_root, &state) - { - let mut shutdown_sender = self.shutdown_sender(); - crit!( - self.log, - "Weak subjectivity checkpoint verification failed while importing block!"; - "block_root" => ?block_root, - "parent_root" => ?block.parent_root(), - "old_finalized_epoch" => ?current_head_finalized_checkpoint.epoch, - "new_finalized_epoch" => ?new_finalized_checkpoint.epoch, - "weak_subjectivity_epoch" => ?wss_checkpoint.epoch, - "error" => ?e, - ); - crit!(self.log, "You must use the `--purge-db` flag to clear the database and restart sync. You may be on a hostile network."); - shutdown_sender - .try_send(ShutdownReason::Failure( - "Weak subjectivity checkpoint verification failed. Provided block root is not a checkpoint." - )) - .map_err(|err| BlockError::BeaconChainError(BeaconChainError::WeakSubjectivtyShutdownError(err)))?; - return Err(BlockError::WeakSubjectivityConflict); - } - } - } - - // Take an exclusive write-lock on fork choice. It's very important prevent deadlocks by + // Take an exclusive write-lock on fork choice. It's very important to prevent deadlocks by // avoiding taking other locks whilst holding this lock. let mut fork_choice = self.canonical_head.fork_choice_write_lock(); @@ -2815,77 +2738,6 @@ impl BeaconChain { .map_err(|e| BlockError::BeaconChainError(e.into()))?; } - // Allow the validator monitor to learn about a new valid state. - self.validator_monitor - .write() - .process_valid_state(current_slot.epoch(T::EthSpec::slots_per_epoch()), &state); - let validator_monitor = self.validator_monitor.read(); - - // Register each attester slashing in the block with fork choice. - for attester_slashing in block.body().attester_slashings() { - fork_choice.on_attester_slashing(attester_slashing); - } - - // Register each attestation in the block with the fork choice service. - for attestation in block.body().attestations() { - let _fork_choice_attestation_timer = - metrics::start_timer(&metrics::FORK_CHOICE_PROCESS_ATTESTATION_TIMES); - let attestation_target_epoch = attestation.data.target.epoch; - - let committee = - state.get_beacon_committee(attestation.data.slot, attestation.data.index)?; - let indexed_attestation = get_indexed_attestation(committee.committee, attestation) - .map_err(|e| BlockError::BeaconChainError(e.into()))?; - - match fork_choice.on_attestation( - current_slot, - &indexed_attestation, - AttestationFromBlock::True, - &self.spec, - ) { - Ok(()) => Ok(()), - // Ignore invalid attestations whilst importing attestations from a block. The - // block might be very old and therefore the attestations useless to fork choice. - Err(ForkChoiceError::InvalidAttestation(_)) => Ok(()), - Err(e) => Err(BlockError::BeaconChainError(e.into())), - }?; - - // To avoid slowing down sync, only register attestations for the - // `observed_block_attesters` if they are from the previous epoch or later. - if attestation_target_epoch + 1 >= current_epoch { - let mut observed_block_attesters = self.observed_block_attesters.write(); - for &validator_index in &indexed_attestation.attesting_indices { - if let Err(e) = observed_block_attesters - .observe_validator(attestation_target_epoch, validator_index as usize) - { - debug!( - self.log, - "Failed to register observed block attester"; - "error" => ?e, - "epoch" => attestation_target_epoch, - "validator_index" => validator_index, - ) - } - } - } - - // Only register this with the validator monitor when the block is sufficiently close to - // the current slot. - if VALIDATOR_MONITOR_HISTORIC_EPOCHS as u64 * T::EthSpec::slots_per_epoch() - + block.slot().as_u64() - >= current_slot.as_u64() - { - match fork_choice.get_block(&block.parent_root()) { - Some(parent_block) => validator_monitor.register_attestation_in_block( - &indexed_attestation, - parent_block.slot, - &self.spec, - ), - None => warn!(self.log, "Failed to get parent block"; "slot" => %block.slot()), - } - } - } - // If the block is recent enough and it was not optimistically imported, check to see if it // becomes the head block. If so, apply it to the early attester cache. This will allow // attestations to the block without waiting for the block and state to be inserted to the @@ -2934,56 +2786,28 @@ impl BeaconChain { ), } } + drop(post_exec_timer); - // Register sync aggregate with validator monitor - if let Ok(sync_aggregate) = block.body().sync_aggregate() { - // `SyncCommittee` for the sync_aggregate should correspond to the duty slot - let duty_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); - let sync_committee = self.sync_committee_at_epoch(duty_epoch)?; - let participant_pubkeys = sync_committee - .pubkeys - .iter() - .zip(sync_aggregate.sync_committee_bits.iter()) - .filter_map(|(pubkey, bit)| bit.then_some(pubkey)) - .collect::>(); - - validator_monitor.register_sync_aggregate_in_block( - block.slot(), - block.parent_root(), - participant_pubkeys, - ); - } - - for exit in block.body().voluntary_exits() { - validator_monitor.register_block_voluntary_exit(&exit.message) - } - - for slashing in block.body().attester_slashings() { - validator_monitor.register_block_attester_slashing(slashing) - } - - for slashing in block.body().proposer_slashings() { - validator_monitor.register_block_proposer_slashing(slashing) - } - - drop(validator_monitor); - - // Only present some metrics for blocks from the previous epoch or later. - // - // This helps avoid noise in the metrics during sync. - if block.slot().epoch(T::EthSpec::slots_per_epoch()) + 1 >= self.epoch()? { - metrics::observe( - &metrics::OPERATIONS_PER_BLOCK_ATTESTATION, - block.body().attestations().len() as f64, - ); + // ---------------------------- BLOCK PROBABLY ATTESTABLE ---------------------------------- + // Most blocks are now capable of being attested to thanks to the `early_attester_cache` + // cache above. Resume non-essential processing. + // ----------------------------------------------------------------------------------------- - if let Ok(sync_aggregate) = block.body().sync_aggregate() { - metrics::set_gauge( - &metrics::BLOCK_SYNC_AGGREGATE_SET_BITS, - sync_aggregate.num_set_bits() as i64, - ); - } - } + self.import_block_update_shuffling_cache(block_root, &mut state)?; + self.import_block_observe_attestations( + block, + &state, + &mut consensus_context, + current_epoch, + ); + self.import_block_update_validator_monitor( + block, + &state, + &mut consensus_context, + current_slot, + parent_block.slot(), + ); + self.import_block_update_slasher(block, &state, &mut consensus_context); let db_write_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_WRITE); @@ -3000,7 +2824,9 @@ impl BeaconChain { ops.push(StoreOp::PutState(block.state_root(), &state)); let txn_lock = self.store.hot_db.begin_rw_transaction(); - if let Err(e) = self.store.do_atomically(ops) { + kv_store_ops.extend(self.store.convert_to_kv_batch(ops)?); + + if let Err(e) = self.store.hot_db.do_atomically(kv_store_ops) { error!( self.log, "Database write failed!"; @@ -3008,6 +2834,10 @@ impl BeaconChain { "error" => ?e, ); + // Clear the early attester cache to prevent attestations which we would later be unable + // to verify due to the failure. + self.early_attester_cache.clear(); + // Since the write failed, try to revert the canonical head back to what was stored // in the database. This attempts to prevent inconsistency between the database and // fork choice. @@ -3050,6 +2880,7 @@ impl BeaconChain { eth1_deposit_index: state.eth1_deposit_index(), }; let current_finalized_checkpoint = state.finalized_checkpoint(); + self.snapshot_cache .try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) .ok_or(Error::SnapshotCacheLockTimeout) @@ -3057,7 +2888,7 @@ impl BeaconChain { snapshot_cache.insert( BeaconSnapshot { beacon_state: state, - beacon_block: signed_block, + beacon_block: signed_block.clone(), beacon_block_root: block_root, }, None, @@ -3076,22 +2907,312 @@ impl BeaconChain { self.head_tracker .register_block(block_root, parent_root, slot); - // Send an event to the `events` endpoint after fully processing the block. - if let Some(event_handler) = self.event_handler.as_ref() { - if event_handler.has_block_subscribers() { - event_handler.register(EventKind::Block(SseBlock { - slot, - block: block_root, - execution_optimistic: payload_verification_status.is_optimistic(), - })); + metrics::stop_timer(db_write_timer); + + metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES); + + // Update the deposit contract cache. + self.import_block_update_deposit_contract_finalization( + block, + block_root, + current_epoch, + current_finalized_checkpoint, + current_eth1_finalization_data, + parent_eth1_finalization_data, + parent_block.slot(), + ); + + // Inform the unknown block cache, in case it was waiting on this block. + self.pre_finalization_block_cache + .block_processed(block_root); + + self.import_block_update_metrics_and_events( + block, + block_root, + block_time_imported, + payload_verification_status, + current_slot, + ); + + Ok(block_root) + } + + /// Check block's consistentency with any configured weak subjectivity checkpoint. + fn check_block_against_weak_subjectivity_checkpoint( + &self, + block: BeaconBlockRef, + block_root: Hash256, + state: &BeaconState, + ) -> Result<(), BlockError> { + // Only perform the weak subjectivity check if it was configured. + let wss_checkpoint = if let Some(checkpoint) = self.config.weak_subjectivity_checkpoint { + checkpoint + } else { + return Ok(()); + }; + // Note: we're using the finalized checkpoint from the head state, rather than fork + // choice. + // + // We are doing this to ensure that we detect changes in finalization. It's possible + // that fork choice has already been updated to the finalized checkpoint in the block + // we're importing. + let current_head_finalized_checkpoint = + self.canonical_head.cached_head().finalized_checkpoint(); + // Compare the existing finalized checkpoint with the incoming block's finalized checkpoint. + let new_finalized_checkpoint = state.finalized_checkpoint(); + + // This ensures we only perform the check once. + if current_head_finalized_checkpoint.epoch < wss_checkpoint.epoch + && wss_checkpoint.epoch <= new_finalized_checkpoint.epoch + { + if let Err(e) = + self.verify_weak_subjectivity_checkpoint(wss_checkpoint, block_root, state) + { + let mut shutdown_sender = self.shutdown_sender(); + crit!( + self.log, + "Weak subjectivity checkpoint verification failed while importing block!"; + "block_root" => ?block_root, + "parent_root" => ?block.parent_root(), + "old_finalized_epoch" => ?current_head_finalized_checkpoint.epoch, + "new_finalized_epoch" => ?new_finalized_checkpoint.epoch, + "weak_subjectivity_epoch" => ?wss_checkpoint.epoch, + "error" => ?e + ); + crit!( + self.log, + "You must use the `--purge-db` flag to clear the database and restart sync. \ + You may be on a hostile network." + ); + shutdown_sender + .try_send(ShutdownReason::Failure( + "Weak subjectivity checkpoint verification failed. \ + Provided block root is not a checkpoint.", + )) + .map_err(|err| { + BlockError::BeaconChainError( + BeaconChainError::WeakSubjectivtyShutdownError(err), + ) + })?; + return Err(BlockError::WeakSubjectivityConflict); } } + Ok(()) + } - metrics::stop_timer(db_write_timer); + /// Process a block for the validator monitor, including all its constituent messages. + fn import_block_update_validator_monitor( + &self, + block: BeaconBlockRef, + state: &BeaconState, + ctxt: &mut ConsensusContext, + current_slot: Slot, + parent_block_slot: Slot, + ) { + // Only register blocks with the validator monitor when the block is sufficiently close to + // the current slot. + if VALIDATOR_MONITOR_HISTORIC_EPOCHS as u64 * T::EthSpec::slots_per_epoch() + + block.slot().as_u64() + < current_slot.as_u64() + { + return; + } - metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES); + // Allow the validator monitor to learn about a new valid state. + self.validator_monitor + .write() + .process_valid_state(current_slot.epoch(T::EthSpec::slots_per_epoch()), state); - let block_delay_total = get_slot_delay_ms(block_time_imported, slot, &self.slot_clock); + let validator_monitor = self.validator_monitor.read(); + + // Sync aggregate. + if let Ok(sync_aggregate) = block.body().sync_aggregate() { + // `SyncCommittee` for the sync_aggregate should correspond to the duty slot + let duty_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); + + match self.sync_committee_at_epoch(duty_epoch) { + Ok(sync_committee) => { + let participant_pubkeys = sync_committee + .pubkeys + .iter() + .zip(sync_aggregate.sync_committee_bits.iter()) + .filter_map(|(pubkey, bit)| bit.then_some(pubkey)) + .collect::>(); + + validator_monitor.register_sync_aggregate_in_block( + block.slot(), + block.parent_root(), + participant_pubkeys, + ); + } + Err(e) => { + warn!( + self.log, + "Unable to fetch sync committee"; + "epoch" => duty_epoch, + "purpose" => "validator monitor", + "error" => ?e, + ); + } + } + } + + // Attestations. + for attestation in block.body().attestations() { + let indexed_attestation = match ctxt.get_indexed_attestation(state, attestation) { + Ok(indexed) => indexed, + Err(e) => { + debug!( + self.log, + "Failed to get indexed attestation"; + "purpose" => "validator monitor", + "attestation_slot" => attestation.data.slot, + "error" => ?e, + ); + continue; + } + }; + validator_monitor.register_attestation_in_block( + indexed_attestation, + parent_block_slot, + &self.spec, + ); + } + + for exit in block.body().voluntary_exits() { + validator_monitor.register_block_voluntary_exit(&exit.message) + } + + for slashing in block.body().attester_slashings() { + validator_monitor.register_block_attester_slashing(slashing) + } + + for slashing in block.body().proposer_slashings() { + validator_monitor.register_block_proposer_slashing(slashing) + } + } + + /// Iterate through the attestations in the block and register them as "observed". + /// + /// This will stop us from propagating them on the gossip network. + fn import_block_observe_attestations( + &self, + block: BeaconBlockRef, + state: &BeaconState, + ctxt: &mut ConsensusContext, + current_epoch: Epoch, + ) { + // To avoid slowing down sync, only observe attestations if the block is from the + // previous epoch or later. + if state.current_epoch() + 1 < current_epoch { + return; + } + + let _timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_ATTESTATION_OBSERVATION); + + for a in block.body().attestations() { + match self.observed_attestations.write().observe_item(a, None) { + // If the observation was successful or if the slot for the attestation was too + // low, continue. + // + // We ignore `SlotTooLow` since this will be very common whilst syncing. + Ok(_) | Err(AttestationObservationError::SlotTooLow { .. }) => {} + Err(e) => { + debug!( + self.log, + "Failed to register observed attestation"; + "error" => ?e, + "epoch" => a.data.target.epoch + ); + } + } + + let indexed_attestation = match ctxt.get_indexed_attestation(state, a) { + Ok(indexed) => indexed, + Err(e) => { + debug!( + self.log, + "Failed to get indexed attestation"; + "purpose" => "observation", + "attestation_slot" => a.data.slot, + "error" => ?e, + ); + continue; + } + }; + + let mut observed_block_attesters = self.observed_block_attesters.write(); + + for &validator_index in &indexed_attestation.attesting_indices { + if let Err(e) = observed_block_attesters + .observe_validator(a.data.target.epoch, validator_index as usize) + { + debug!( + self.log, + "Failed to register observed block attester"; + "error" => ?e, + "epoch" => a.data.target.epoch, + "validator_index" => validator_index, + ) + } + } + } + } + + /// If a slasher is configured, provide the attestations from the block. + fn import_block_update_slasher( + &self, + block: BeaconBlockRef, + state: &BeaconState, + ctxt: &mut ConsensusContext, + ) { + if let Some(slasher) = self.slasher.as_ref() { + for attestation in block.body().attestations() { + let indexed_attestation = match ctxt.get_indexed_attestation(state, attestation) { + Ok(indexed) => indexed, + Err(e) => { + debug!( + self.log, + "Failed to get indexed attestation"; + "purpose" => "slasher", + "attestation_slot" => attestation.data.slot, + "error" => ?e, + ); + continue; + } + }; + slasher.accept_attestation(indexed_attestation.clone()); + } + } + } + + fn import_block_update_metrics_and_events( + &self, + block: BeaconBlockRef, + block_root: Hash256, + block_time_imported: Duration, + payload_verification_status: PayloadVerificationStatus, + current_slot: Slot, + ) { + // Only present some metrics for blocks from the previous epoch or later. + // + // This helps avoid noise in the metrics during sync. + if block.slot() + 2 * T::EthSpec::slots_per_epoch() >= current_slot { + metrics::observe( + &metrics::OPERATIONS_PER_BLOCK_ATTESTATION, + block.body().attestations().len() as f64, + ); + + if let Ok(sync_aggregate) = block.body().sync_aggregate() { + metrics::set_gauge( + &metrics::BLOCK_SYNC_AGGREGATE_SET_BITS, + sync_aggregate.num_set_bits() as i64, + ); + } + } + + let block_delay_total = + get_slot_delay_ms(block_time_imported, block.slot(), &self.slot_clock); // Do not write to the cache for blocks older than 2 epochs, this helps reduce writes to // the cache during sync. @@ -3123,62 +3244,105 @@ impl BeaconChain { ); } - // Do not write to eth1 finalization cache for blocks older than 5 epochs - // this helps reduce noise during sync - if block_delay_total - < self.slot_clock.slot_duration() * 5 * (T::EthSpec::slots_per_epoch() as u32) - { - let parent_block_epoch = parent_block.slot().epoch(T::EthSpec::slots_per_epoch()); - if parent_block_epoch < current_epoch { - // we've crossed epoch boundary, store Eth1FinalizationData - let (checkpoint, eth1_finalization_data) = - if current_slot % T::EthSpec::slots_per_epoch() == 0 { - // current block is the checkpoint - ( - Checkpoint { - epoch: current_epoch, - root: block_root, - }, - current_eth1_finalization_data, - ) - } else { - // parent block is the checkpoint - ( - Checkpoint { - epoch: current_epoch, - root: parent_block.canonical_root(), - }, - parent_eth1_finalization_data, - ) - }; + if let Some(event_handler) = self.event_handler.as_ref() { + if event_handler.has_block_subscribers() { + event_handler.register(EventKind::Block(SseBlock { + slot: block.slot(), + block: block_root, + execution_optimistic: payload_verification_status.is_optimistic(), + })); + } + } + } - if let Some(finalized_eth1_data) = self - .eth1_finalization_cache - .try_write_for(ETH1_FINALIZATION_CACHE_LOCK_TIMEOUT) - .and_then(|mut cache| { - cache.insert(checkpoint, eth1_finalization_data); - cache.finalize(¤t_finalized_checkpoint) - }) - { - if let Some(eth1_chain) = self.eth1_chain.as_ref() { - let finalized_deposit_count = finalized_eth1_data.deposit_count; - eth1_chain.finalize_eth1_data(finalized_eth1_data); - debug!( - self.log, - "called eth1_chain.finalize_eth1_data()"; - "epoch" => current_finalized_checkpoint.epoch, - "deposit count" => finalized_deposit_count, - ); - } - } + fn import_block_update_shuffling_cache( + &self, + block_root: Hash256, + state: &mut BeaconState, + ) -> Result<(), BlockError> { + // For the current and next epoch of this state, ensure we have the shuffling from this + // block in our cache. + for relative_epoch in [RelativeEpoch::Current, RelativeEpoch::Next] { + let shuffling_id = AttestationShufflingId::new(block_root, state, relative_epoch)?; + + let shuffling_is_cached = self + .shuffling_cache + .try_read_for(ATTESTATION_CACHE_LOCK_TIMEOUT) + .ok_or(Error::AttestationCacheLockTimeout)? + .contains(&shuffling_id); + + if !shuffling_is_cached { + state.build_committee_cache(relative_epoch, &self.spec)?; + let committee_cache = state.committee_cache(relative_epoch)?; + self.shuffling_cache + .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) + .ok_or(Error::AttestationCacheLockTimeout)? + .insert_committee_cache(shuffling_id, committee_cache); } } + Ok(()) + } - // Inform the unknown block cache, in case it was waiting on this block. - self.pre_finalization_block_cache - .block_processed(block_root); + #[allow(clippy::too_many_arguments)] + fn import_block_update_deposit_contract_finalization( + &self, + block: BeaconBlockRef, + block_root: Hash256, + current_epoch: Epoch, + current_finalized_checkpoint: Checkpoint, + current_eth1_finalization_data: Eth1FinalizationData, + parent_eth1_finalization_data: Eth1FinalizationData, + parent_block_slot: Slot, + ) { + // Do not write to eth1 finalization cache for blocks older than 5 epochs. + if block.slot().epoch(T::EthSpec::slots_per_epoch()) + 5 < current_epoch { + return; + } - Ok(block_root) + let parent_block_epoch = parent_block_slot.epoch(T::EthSpec::slots_per_epoch()); + if parent_block_epoch < current_epoch { + // we've crossed epoch boundary, store Eth1FinalizationData + let (checkpoint, eth1_finalization_data) = + if block.slot() % T::EthSpec::slots_per_epoch() == 0 { + // current block is the checkpoint + ( + Checkpoint { + epoch: current_epoch, + root: block_root, + }, + current_eth1_finalization_data, + ) + } else { + // parent block is the checkpoint + ( + Checkpoint { + epoch: current_epoch, + root: block.parent_root(), + }, + parent_eth1_finalization_data, + ) + }; + + if let Some(finalized_eth1_data) = self + .eth1_finalization_cache + .try_write_for(ETH1_FINALIZATION_CACHE_LOCK_TIMEOUT) + .and_then(|mut cache| { + cache.insert(checkpoint, eth1_finalization_data); + cache.finalize(¤t_finalized_checkpoint) + }) + { + if let Some(eth1_chain) = self.eth1_chain.as_ref() { + let finalized_deposit_count = finalized_eth1_data.deposit_count; + eth1_chain.finalize_eth1_data(finalized_eth1_data); + debug!( + self.log, + "called eth1_chain.finalize_eth1_data()"; + "epoch" => current_finalized_checkpoint.epoch, + "deposit count" => finalized_deposit_count, + ); + } + } + } } /// If configured, wait for the fork choice run at the start of the slot to complete. @@ -3559,10 +3723,12 @@ impl BeaconChain { // This will be a lot slower but guards against bugs in block production and can be // quickly rolled out without a release. if self.config.paranoid_block_proposal { + let mut tmp_ctxt = ConsensusContext::new(state.slot()); attestations.retain(|att| { verify_attestation_for_block_inclusion( &state, att, + &mut tmp_ctxt, VerifySignatures::True, &self.spec, ) diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 1fdc1518a26..ab317e96b96 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -52,22 +52,22 @@ use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOC use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::{ beacon_chain::{ - BeaconForkChoice, BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT, MAXIMUM_GOSSIP_CLOCK_DISPARITY, - VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT, + BeaconForkChoice, ForkChoiceError, BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT, + MAXIMUM_GOSSIP_CLOCK_DISPARITY, VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT, }, metrics, BeaconChain, BeaconChainError, BeaconChainTypes, }; use derivative::Derivative; use eth2::types::EventKind; use execution_layer::PayloadStatus; -use fork_choice::PayloadVerificationStatus; +use fork_choice::{AttestationFromBlock, PayloadVerificationStatus}; use parking_lot::RwLockReadGuard; use proto_array::Block as ProtoBlock; use safe_arith::ArithError; use slog::{debug, error, warn, Logger}; use slot_clock::SlotClock; use ssz::Encode; -use state_processing::per_block_processing::is_merge_transition_block; +use state_processing::per_block_processing::{errors::IntoWithIndex, is_merge_transition_block}; use state_processing::{ block_signature_verifier::{BlockSignatureVerifier, Error as BlockSignatureVerifierError}, per_block_processing, per_slot_processing, @@ -550,8 +550,22 @@ pub fn signature_verify_chain_segment( let pubkey_cache = get_validator_pubkey_cache(chain)?; let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec); + let mut signature_verified_blocks = Vec::with_capacity(chain_segment.len()); + for (block_root, block) in &chain_segment { - signature_verifier.include_all_signatures(block, Some(*block_root), None)?; + let mut consensus_context = + ConsensusContext::new(block.slot()).set_current_block_root(*block_root); + + signature_verifier.include_all_signatures(block, &mut consensus_context)?; + + // Save the block and its consensus context. The context will have had its proposer index + // and attesting indices filled in, which can be used to accelerate later block processing. + signature_verified_blocks.push(SignatureVerifiedBlock { + block: block.clone(), + block_root: *block_root, + parent: None, + consensus_context, + }); } if signature_verifier.verify().is_err() { @@ -560,22 +574,6 @@ pub fn signature_verify_chain_segment( drop(pubkey_cache); - let mut signature_verified_blocks = chain_segment - .into_iter() - .map(|(block_root, block)| { - // Proposer index has already been verified above during signature verification. - let consensus_context = ConsensusContext::new(block.slot()) - .set_current_block_root(block_root) - .set_proposer_index(block.message().proposer_index()); - SignatureVerifiedBlock { - block, - block_root, - parent: None, - consensus_context, - } - }) - .collect::>(); - if let Some(signature_verified_block) = signature_verified_blocks.first_mut() { signature_verified_block.parent = Some(parent); } @@ -625,6 +623,7 @@ pub struct ExecutionPendingBlock { pub parent_block: SignedBeaconBlock>, pub parent_eth1_finalization_data: Eth1FinalizationData, pub confirmed_state_roots: Vec, + pub consensus_context: ConsensusContext, pub payload_verification_handle: PayloadVerificationHandle, } @@ -951,13 +950,14 @@ impl SignatureVerifiedBlock { let mut signature_verifier = get_signature_verifier(&state, &pubkey_cache, &chain.spec); - signature_verifier.include_all_signatures(&block, Some(block_root), None)?; + let mut consensus_context = + ConsensusContext::new(block.slot()).set_current_block_root(block_root); + + signature_verifier.include_all_signatures(&block, &mut consensus_context)?; if signature_verifier.verify().is_ok() { Ok(Self { - consensus_context: ConsensusContext::new(block.slot()) - .set_current_block_root(block_root) - .set_proposer_index(block.message().proposer_index()), + consensus_context, block, block_root, parent: Some(parent), @@ -1002,16 +1002,16 @@ impl SignatureVerifiedBlock { // Gossip verification has already checked the proposer index. Use it to check the RANDAO // signature. - let verified_proposer_index = Some(block.message().proposer_index()); + let mut consensus_context = from.consensus_context; signature_verifier - .include_all_signatures_except_proposal(&block, verified_proposer_index)?; + .include_all_signatures_except_proposal(&block, &mut consensus_context)?; if signature_verifier.verify().is_ok() { Ok(Self { block, block_root: from.block_root, parent: Some(parent), - consensus_context: from.consensus_context, + consensus_context, }) } else { Err(BlockError::InvalidSignature) @@ -1138,6 +1138,79 @@ impl ExecutionPendingBlock { check_block_relevancy(&block, block_root, chain)?; + // Define a future that will verify the execution payload with an execution engine. + // + // We do this as early as possible so that later parts of this function can run in parallel + // with the payload verification. + let payload_notifier = PayloadNotifier::new( + chain.clone(), + block.clone(), + &parent.pre_state, + notify_execution_layer, + )?; + let is_valid_merge_transition_block = + is_merge_transition_block(&parent.pre_state, block.message().body()); + let payload_verification_future = async move { + let chain = payload_notifier.chain.clone(); + let block = payload_notifier.block.clone(); + + // If this block triggers the merge, check to ensure that it references valid execution + // blocks. + // + // The specification defines this check inside `on_block` in the fork-choice specification, + // however we perform the check here for two reasons: + // + // - There's no point in importing a block that will fail fork choice, so it's best to fail + // early. + // - Doing the check here means we can keep our fork-choice implementation "pure". I.e., no + // calls to remote servers. + if is_valid_merge_transition_block { + validate_merge_block(&chain, block.message(), AllowOptimisticImport::Yes).await?; + }; + + // The specification declares that this should be run *inside* `per_block_processing`, + // however we run it here to keep `per_block_processing` pure (i.e., no calls to external + // servers). + let payload_verification_status = payload_notifier.notify_new_payload().await?; + + // If the payload did not validate or invalidate the block, check to see if this block is + // valid for optimistic import. + if payload_verification_status.is_optimistic() { + let block_hash_opt = block + .message() + .body() + .execution_payload() + .map(|full_payload| full_payload.execution_payload.block_hash); + + // Ensure the block is a candidate for optimistic import. + if !is_optimistic_candidate_block(&chain, block.slot(), block.parent_root()).await? + { + warn!( + chain.log, + "Rejecting optimistic block"; + "block_hash" => ?block_hash_opt, + "msg" => "the execution engine is not synced" + ); + return Err(ExecutionPayloadError::UnverifiedNonOptimisticCandidate.into()); + } + } + + Ok(PayloadVerificationOutcome { + payload_verification_status, + is_valid_merge_transition_block, + }) + }; + // Spawn the payload verification future as a new task, but don't wait for it to complete. + // The `payload_verification_future` will be awaited later to ensure verification completed + // successfully. + let payload_verification_handle = chain + .task_executor + .spawn_handle( + payload_verification_future, + "execution_payload_verification", + ) + .ok_or(BeaconChainError::RuntimeShutdown)?; + /* * Advance the given `parent.beacon_state` to the slot of the given `block`. */ @@ -1242,80 +1315,11 @@ impl ExecutionPendingBlock { summaries.push(summary); } } + metrics::stop_timer(catchup_timer); let block_slot = block.slot(); let state_current_epoch = state.current_epoch(); - // Define a future that will verify the execution payload with an execution engine (but - // don't execute it yet). - let payload_notifier = - PayloadNotifier::new(chain.clone(), block.clone(), &state, notify_execution_layer)?; - let is_valid_merge_transition_block = - is_merge_transition_block(&state, block.message().body()); - let payload_verification_future = async move { - let chain = payload_notifier.chain.clone(); - let block = payload_notifier.block.clone(); - - // If this block triggers the merge, check to ensure that it references valid execution - // blocks. - // - // The specification defines this check inside `on_block` in the fork-choice specification, - // however we perform the check here for two reasons: - // - // - There's no point in importing a block that will fail fork choice, so it's best to fail - // early. - // - Doing the check here means we can keep our fork-choice implementation "pure". I.e., no - // calls to remote servers. - if is_valid_merge_transition_block { - validate_merge_block(&chain, block.message(), AllowOptimisticImport::Yes).await?; - }; - - // The specification declares that this should be run *inside* `per_block_processing`, - // however we run it here to keep `per_block_processing` pure (i.e., no calls to external - // servers). - // - // It is important that this function is called *after* `per_slot_processing`, since the - // `randao` may change. - let payload_verification_status = payload_notifier.notify_new_payload().await?; - - // If the payload did not validate or invalidate the block, check to see if this block is - // valid for optimistic import. - if payload_verification_status.is_optimistic() { - let block_hash_opt = block - .message() - .body() - .execution_payload() - .map(|full_payload| full_payload.execution_payload.block_hash); - - // Ensure the block is a candidate for optimistic import. - if !is_optimistic_candidate_block(&chain, block.slot(), block.parent_root()).await? - { - warn!( - chain.log, - "Rejecting optimistic block"; - "block_hash" => ?block_hash_opt, - "msg" => "the execution engine is not synced" - ); - return Err(ExecutionPayloadError::UnverifiedNonOptimisticCandidate.into()); - } - } - - Ok(PayloadVerificationOutcome { - payload_verification_status, - is_valid_merge_transition_block, - }) - }; - // Spawn the payload verification future as a new task, but don't wait for it to complete. - // The `payload_verification_future` will be awaited later to ensure verification completed - // successfully. - let payload_verification_handle = chain - .task_executor - .spawn_handle( - payload_verification_future, - "execution_payload_verification", - ) - .ok_or(BeaconChainError::RuntimeShutdown)?; - // If the block is sufficiently recent, notify the validator monitor. if let Some(slot) = chain.slot_clock.now() { let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); @@ -1342,8 +1346,6 @@ impl ExecutionPendingBlock { } } - metrics::stop_timer(catchup_timer); - /* * Build the committee caches on the state. */ @@ -1433,6 +1435,44 @@ impl ExecutionPendingBlock { }); } + /* + * Apply the block's attestations to fork choice. + * + * We're running in parallel with the payload verification at this point, so this is + * free real estate. + */ + let current_slot = chain.slot()?; + let mut fork_choice = chain.canonical_head.fork_choice_write_lock(); + + // Register each attester slashing in the block with fork choice. + for attester_slashing in block.message().body().attester_slashings() { + fork_choice.on_attester_slashing(attester_slashing); + } + + // Register each attestation in the block with fork choice. + for (i, attestation) in block.message().body().attestations().iter().enumerate() { + let _fork_choice_attestation_timer = + metrics::start_timer(&metrics::FORK_CHOICE_PROCESS_ATTESTATION_TIMES); + + let indexed_attestation = consensus_context + .get_indexed_attestation(&state, attestation) + .map_err(|e| BlockError::PerBlockProcessingError(e.into_with_index(i)))?; + + match fork_choice.on_attestation( + current_slot, + indexed_attestation, + AttestationFromBlock::True, + &chain.spec, + ) { + Ok(()) => Ok(()), + // Ignore invalid attestations whilst importing attestations from a block. The + // block might be very old and therefore the attestations useless to fork choice. + Err(ForkChoiceError::InvalidAttestation(_)) => Ok(()), + Err(e) => Err(BlockError::BeaconChainError(e.into())), + }?; + } + drop(fork_choice); + Ok(Self { block, block_root, @@ -1440,6 +1480,7 @@ impl ExecutionPendingBlock { parent_block: parent.beacon_block, parent_eth1_finalization_data, confirmed_state_roots, + consensus_context, payload_verification_handle, }) } diff --git a/beacon_node/beacon_chain/src/execution_payload.rs b/beacon_node/beacon_chain/src/execution_payload.rs index c859aa54fc9..85f7629bb79 100644 --- a/beacon_node/beacon_chain/src/execution_payload.rs +++ b/beacon_node/beacon_chain/src/execution_payload.rs @@ -69,6 +69,7 @@ impl PayloadNotifier { // are cheap and doing them here ensures we protect the execution engine from junk. partially_verify_execution_payload( state, + block.slot(), block.message().execution_payload()?, &chain.spec, ) @@ -373,7 +374,8 @@ pub fn get_execution_payload< let spec = &chain.spec; let current_epoch = state.current_epoch(); let is_merge_transition_complete = is_merge_transition_complete(state); - let timestamp = compute_timestamp_at_slot(state, spec).map_err(BeaconStateError::from)?; + let timestamp = + compute_timestamp_at_slot(state, state.slot(), spec).map_err(BeaconStateError::from)?; let random = *state.get_randao_mix(current_epoch)?; let latest_execution_payload_header_block_hash = state.latest_execution_payload_header()?.block_hash; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index ead4a540254..b37c5afc35f 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -64,6 +64,11 @@ lazy_static! { "beacon_block_processing_state_root_seconds", "Time spent calculating the state root when processing a block." ); + pub static ref BLOCK_PROCESSING_POST_EXEC_PROCESSING: Result = try_create_histogram_with_buckets( + "beacon_block_processing_post_exec_pre_attestable_seconds", + "Time between finishing execution processing and the block becoming attestable", + linear_buckets(5e-3, 5e-3, 10) + ); pub static ref BLOCK_PROCESSING_DB_WRITE: Result = try_create_histogram( "beacon_block_processing_db_write_seconds", "Time spent writing a newly processed block and state to DB" diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index a0f42ec214a..b88966b41a9 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -586,7 +586,7 @@ where pub fn get_timestamp_at_slot(&self) -> u64 { let state = self.get_current_state(); - compute_timestamp_at_slot(&state, &self.spec).unwrap() + compute_timestamp_at_slot(&state, state.slot(), &self.spec).unwrap() } pub fn get_current_state_and_root(&self) -> (BeaconState, Hash256) { diff --git a/beacon_node/beacon_chain/src/validator_pubkey_cache.rs b/beacon_node/beacon_chain/src/validator_pubkey_cache.rs index 60fdb607c86..26aea2d2722 100644 --- a/beacon_node/beacon_chain/src/validator_pubkey_cache.rs +++ b/beacon_node/beacon_chain/src/validator_pubkey_cache.rs @@ -3,7 +3,8 @@ use crate::{BeaconChainTypes, BeaconStore}; use ssz::{Decode, Encode}; use std::collections::HashMap; use std::convert::TryInto; -use store::{DBColumn, Error as StoreError, StoreItem}; +use std::marker::PhantomData; +use store::{DBColumn, Error as StoreError, KeyValueStore, KeyValueStoreOp, StoreItem}; use types::{BeaconState, Hash256, PublicKey, PublicKeyBytes}; /// Provides a mapping of `validator_index -> validator_publickey`. @@ -14,21 +15,17 @@ use types::{BeaconState, Hash256, PublicKey, PublicKeyBytes}; /// 2. To reduce the amount of public key _decompression_ required. A `BeaconState` stores public /// keys in compressed form and they are needed in decompressed form for signature verification. /// Decompression is expensive when many keys are involved. -/// -/// The cache has a `backing` that it uses to maintain a persistent, on-disk -/// copy of itself. This allows it to be restored between process invocations. pub struct ValidatorPubkeyCache { pubkeys: Vec, indices: HashMap, pubkey_bytes: Vec, - store: BeaconStore, + _phantom: PhantomData, } impl ValidatorPubkeyCache { /// Create a new public key cache using the keys in `state.validators`. /// - /// Also creates a new persistence file, returning an error if there is already a file at - /// `persistence_path`. + /// The new cache will be updated with the keys from `state` and immediately written to disk. pub fn new( state: &BeaconState, store: BeaconStore, @@ -37,10 +34,11 @@ impl ValidatorPubkeyCache { pubkeys: vec![], indices: HashMap::new(), pubkey_bytes: vec![], - store, + _phantom: PhantomData, }; - cache.import_new_pubkeys(state)?; + let store_ops = cache.import_new_pubkeys(state)?; + store.hot_db.do_atomically(store_ops)?; Ok(cache) } @@ -69,17 +67,19 @@ impl ValidatorPubkeyCache { pubkeys, indices, pubkey_bytes, - store, + _phantom: PhantomData, }) } /// Scan the given `state` and add any new validator public keys. /// /// Does not delete any keys from `self` if they don't appear in `state`. + /// + /// NOTE: The caller *must* commit the returned I/O batch as part of the block import process. pub fn import_new_pubkeys( &mut self, state: &BeaconState, - ) -> Result<(), BeaconChainError> { + ) -> Result, BeaconChainError> { if state.validators().len() > self.pubkeys.len() { self.import( state.validators()[self.pubkeys.len()..] @@ -87,12 +87,12 @@ impl ValidatorPubkeyCache { .map(|v| v.pubkey), ) } else { - Ok(()) + Ok(vec![]) } } /// Adds zero or more validators to `self`. - fn import(&mut self, validator_keys: I) -> Result<(), BeaconChainError> + fn import(&mut self, validator_keys: I) -> Result, BeaconChainError> where I: Iterator + ExactSizeIterator, { @@ -100,6 +100,7 @@ impl ValidatorPubkeyCache { self.pubkeys.reserve(validator_keys.len()); self.indices.reserve(validator_keys.len()); + let mut store_ops = Vec::with_capacity(validator_keys.len()); for pubkey in validator_keys { let i = self.pubkeys.len(); @@ -107,17 +108,11 @@ impl ValidatorPubkeyCache { return Err(BeaconChainError::DuplicateValidatorPublicKey); } - // The item is written to disk _before_ it is written into - // the local struct. - // - // This means that a pubkey cache read from disk will always be equivalent to or - // _later than_ the cache that was running in the previous instance of Lighthouse. - // - // The motivation behind this ordering is that we do not want to have states that - // reference a pubkey that is not in our cache. However, it's fine to have pubkeys - // that are never referenced in a state. - self.store - .put_item(&DatabasePubkey::key_for_index(i), &DatabasePubkey(pubkey))?; + // Stage the new validator key for writing to disk. + // It will be committed atomically when the block that introduced it is written to disk. + // Notably it is NOT written while the write lock on the cache is held. + // See: https://github.com/sigp/lighthouse/issues/2327 + store_ops.push(DatabasePubkey(pubkey).as_kv_store_op(DatabasePubkey::key_for_index(i))); self.pubkeys.push( (&pubkey) @@ -129,7 +124,7 @@ impl ValidatorPubkeyCache { self.indices.insert(pubkey, i); } - Ok(()) + Ok(store_ops) } /// Get the public key for a validator with index `i`. @@ -296,9 +291,10 @@ mod test { // Add some more keypairs. let (state, keypairs) = get_state(12); - cache + let ops = cache .import_new_pubkeys(&state) .expect("should import pubkeys"); + store.hot_db.do_atomically(ops).unwrap(); check_cache_get(&cache, &keypairs[..]); drop(cache); diff --git a/consensus/ssz_types/src/bitfield.rs b/consensus/ssz_types/src/bitfield.rs index b0cf4551eea..0539cc7d2c6 100644 --- a/consensus/ssz_types/src/bitfield.rs +++ b/consensus/ssz_types/src/bitfield.rs @@ -22,7 +22,7 @@ pub trait BitfieldBehaviour: Clone {} /// A marker struct used to declare SSZ `Variable` behaviour on a `Bitfield`. /// /// See the [`Bitfield`](struct.Bitfield.html) docs for usage. -#[derive(Clone, PartialEq, Debug)] +#[derive(Clone, PartialEq, Eq, Debug)] pub struct Variable { _phantom: PhantomData, } @@ -30,7 +30,7 @@ pub struct Variable { /// A marker struct used to declare SSZ `Fixed` behaviour on a `Bitfield`. /// /// See the [`Bitfield`](struct.Bitfield.html) docs for usage. -#[derive(Clone, PartialEq, Debug)] +#[derive(Clone, PartialEq, Eq, Debug)] pub struct Fixed { _phantom: PhantomData, } @@ -96,7 +96,7 @@ pub type BitVector = Bitfield>; /// byte (by `Vec` index) stores the lowest bit-indices and the right-most bit stores the lowest /// bit-index. E.g., `smallvec![0b0000_0001, 0b0000_0010]` has bits `0, 9` set. #[derive(Clone, Debug, Derivative)] -#[derivative(PartialEq, Hash(bound = ""))] +#[derivative(PartialEq, Eq, Hash(bound = ""))] pub struct Bitfield { bytes: SmallVec<[u8; SMALLVEC_LEN]>, len: usize, diff --git a/consensus/state_processing/src/consensus_context.rs b/consensus/state_processing/src/consensus_context.rs index fdd3f95a65b..0bd5f61aff8 100644 --- a/consensus/state_processing/src/consensus_context.rs +++ b/consensus/state_processing/src/consensus_context.rs @@ -1,8 +1,11 @@ +use crate::common::get_indexed_attestation; +use crate::per_block_processing::errors::{AttestationInvalid, BlockOperationError}; +use std::collections::{hash_map::Entry, HashMap}; use std::marker::PhantomData; use tree_hash::TreeHash; use types::{ - BeaconState, BeaconStateError, ChainSpec, EthSpec, ExecPayload, Hash256, SignedBeaconBlock, - Slot, + Attestation, AttestationData, BeaconState, BeaconStateError, BitList, ChainSpec, Epoch, + EthSpec, ExecPayload, Hash256, IndexedAttestation, SignedBeaconBlock, Slot, }; #[derive(Debug)] @@ -13,6 +16,9 @@ pub struct ConsensusContext { proposer_index: Option, /// Block root of the block at `slot`. current_block_root: Option, + /// Cache of indexed attestations constructed during block processing. + indexed_attestations: + HashMap<(AttestationData, BitList), IndexedAttestation>, _phantom: PhantomData, } @@ -20,6 +26,7 @@ pub struct ConsensusContext { pub enum ContextError { BeaconState(BeaconStateError), SlotMismatch { slot: Slot, expected: Slot }, + EpochMismatch { epoch: Epoch, expected: Epoch }, } impl From for ContextError { @@ -34,6 +41,7 @@ impl ConsensusContext { slot, proposer_index: None, current_block_root: None, + indexed_attestations: HashMap::new(), _phantom: PhantomData, } } @@ -43,13 +51,39 @@ impl ConsensusContext { self } + /// Strict method for fetching the proposer index. + /// + /// Gets the proposer index for `self.slot` while ensuring that it matches `state.slot()`. This + /// method should be used in block processing and almost everywhere the proposer index is + /// required. If the slot check is too restrictive, see `get_proposer_index_from_epoch_state`. pub fn get_proposer_index( &mut self, state: &BeaconState, spec: &ChainSpec, ) -> Result { self.check_slot(state.slot())?; + self.get_proposer_index_no_checks(state, spec) + } + + /// More liberal method for fetching the proposer index. + /// + /// Fetches the proposer index for `self.slot` but does not require the state to be from an + /// exactly matching slot (merely a matching epoch). This is useful in batch verification where + /// we want to extract the proposer index from a single state for every slot in the epoch. + pub fn get_proposer_index_from_epoch_state( + &mut self, + state: &BeaconState, + spec: &ChainSpec, + ) -> Result { + self.check_epoch(state.current_epoch())?; + self.get_proposer_index_no_checks(state, spec) + } + fn get_proposer_index_no_checks( + &mut self, + state: &BeaconState, + spec: &ChainSpec, + ) -> Result { if let Some(proposer_index) = self.proposer_index { return Ok(proposer_index); } @@ -89,4 +123,39 @@ impl ConsensusContext { }) } } + + fn check_epoch(&self, epoch: Epoch) -> Result<(), ContextError> { + let expected = self.slot.epoch(T::slots_per_epoch()); + if epoch == expected { + Ok(()) + } else { + Err(ContextError::EpochMismatch { epoch, expected }) + } + } + + pub fn get_indexed_attestation( + &mut self, + state: &BeaconState, + attestation: &Attestation, + ) -> Result<&IndexedAttestation, BlockOperationError> { + let key = ( + attestation.data.clone(), + attestation.aggregation_bits.clone(), + ); + + match self.indexed_attestations.entry(key) { + Entry::Occupied(occupied) => Ok(occupied.into_mut()), + Entry::Vacant(vacant) => { + let committee = + state.get_beacon_committee(attestation.data.slot, attestation.data.index)?; + let indexed_attestation = + get_indexed_attestation(committee.committee, attestation)?; + Ok(vacant.insert(indexed_attestation)) + } + } + } + + pub fn num_cached_indexed_attestations(&self) -> usize { + self.indexed_attestations.len() + } } diff --git a/consensus/state_processing/src/per_block_processing.rs b/consensus/state_processing/src/per_block_processing.rs index cccc8eacd9f..7d0cb01aebc 100644 --- a/consensus/state_processing/src/per_block_processing.rs +++ b/consensus/state_processing/src/per_block_processing.rs @@ -111,16 +111,13 @@ pub fn per_block_processing>( let verify_signatures = match block_signature_strategy { BlockSignatureStrategy::VerifyBulk => { // Verify all signatures in the block at once. - let block_root = Some(ctxt.get_current_block_root(signed_block)?); - let proposer_index = Some(ctxt.get_proposer_index(state, spec)?); block_verify!( BlockSignatureVerifier::verify_entire_block( state, |i| get_pubkey_from_state(state, i), |pk_bytes| pk_bytes.decompress().ok().map(Cow::Owned), signed_block, - block_root, - proposer_index, + ctxt, spec ) .is_ok(), @@ -339,6 +336,7 @@ pub fn get_new_eth1_data( /// https://github.com/ethereum/consensus-specs/blob/v1.1.5/specs/merge/beacon-chain.md#process_execution_payload pub fn partially_verify_execution_payload>( state: &BeaconState, + block_slot: Slot, payload: &Payload, spec: &ChainSpec, ) -> Result<(), BlockProcessingError> { @@ -359,7 +357,7 @@ pub fn partially_verify_execution_payload>( } ); - let timestamp = compute_timestamp_at_slot(state, spec)?; + let timestamp = compute_timestamp_at_slot(state, block_slot, spec)?; block_verify!( payload.timestamp() == timestamp, BlockProcessingError::ExecutionInvalidTimestamp { @@ -383,7 +381,7 @@ pub fn process_execution_payload>( payload: &Payload, spec: &ChainSpec, ) -> Result<(), BlockProcessingError> { - partially_verify_execution_payload(state, payload, spec)?; + partially_verify_execution_payload(state, state.slot(), payload, spec)?; *state.latest_execution_payload_header_mut()? = payload.to_execution_payload_header(); @@ -420,9 +418,10 @@ pub fn is_execution_enabled>( /// https://github.com/ethereum/consensus-specs/blob/dev/specs/merge/beacon-chain.md#compute_timestamp_at_slot pub fn compute_timestamp_at_slot( state: &BeaconState, + block_slot: Slot, spec: &ChainSpec, ) -> Result { - let slots_since_genesis = state.slot().as_u64().safe_sub(spec.genesis_slot.as_u64())?; + let slots_since_genesis = block_slot.as_u64().safe_sub(spec.genesis_slot.as_u64())?; slots_since_genesis .safe_mul(spec.seconds_per_slot) .and_then(|since_genesis| state.genesis_time().safe_add(since_genesis)) diff --git a/consensus/state_processing/src/per_block_processing/block_signature_verifier.rs b/consensus/state_processing/src/per_block_processing/block_signature_verifier.rs index 7584df14ec9..5e52ff8cb83 100644 --- a/consensus/state_processing/src/per_block_processing/block_signature_verifier.rs +++ b/consensus/state_processing/src/per_block_processing/block_signature_verifier.rs @@ -1,14 +1,13 @@ #![allow(clippy::integer_arithmetic)] use super::signature_sets::{Error as SignatureSetError, *}; -use crate::common::get_indexed_attestation; use crate::per_block_processing::errors::{AttestationInvalid, BlockOperationError}; +use crate::{ConsensusContext, ContextError}; use bls::{verify_signature_sets, PublicKey, PublicKeyBytes, SignatureSet}; use rayon::prelude::*; use std::borrow::Cow; use types::{ - BeaconState, BeaconStateError, ChainSpec, EthSpec, ExecPayload, Hash256, IndexedAttestation, - SignedBeaconBlock, + BeaconState, BeaconStateError, ChainSpec, EthSpec, ExecPayload, Hash256, SignedBeaconBlock, }; pub type Result = std::result::Result; @@ -28,6 +27,8 @@ pub enum Error { IncorrectBlockProposer { block: u64, local_shuffling: u64 }, /// Failed to load a signature set. The block may be invalid or we failed to process it. SignatureSetError(SignatureSetError), + /// Error related to the consensus context, likely the proposer index or block root calc. + ContextError(ContextError), } impl From for Error { @@ -36,6 +37,12 @@ impl From for Error { } } +impl From for Error { + fn from(e: ContextError) -> Error { + Error::ContextError(e) + } +} + impl From for Error { fn from(e: SignatureSetError) -> Error { match e { @@ -122,12 +129,11 @@ where get_pubkey: F, decompressor: D, block: &'a SignedBeaconBlock, - block_root: Option, - verified_proposer_index: Option, + ctxt: &mut ConsensusContext, spec: &'a ChainSpec, ) -> Result<()> { let mut verifier = Self::new(state, get_pubkey, decompressor, spec); - verifier.include_all_signatures(block, block_root, verified_proposer_index)?; + verifier.include_all_signatures(block, ctxt)?; verifier.verify() } @@ -135,11 +141,14 @@ where pub fn include_all_signatures>( &mut self, block: &'a SignedBeaconBlock, - block_root: Option, - verified_proposer_index: Option, + ctxt: &mut ConsensusContext, ) -> Result<()> { + let block_root = Some(ctxt.get_current_block_root(block)?); + let verified_proposer_index = + Some(ctxt.get_proposer_index_from_epoch_state(self.state, self.spec)?); + self.include_block_proposal(block, block_root, verified_proposer_index)?; - self.include_all_signatures_except_proposal(block, verified_proposer_index)?; + self.include_all_signatures_except_proposal(block, ctxt)?; Ok(()) } @@ -149,12 +158,14 @@ where pub fn include_all_signatures_except_proposal>( &mut self, block: &'a SignedBeaconBlock, - verified_proposer_index: Option, + ctxt: &mut ConsensusContext, ) -> Result<()> { + let verified_proposer_index = + Some(ctxt.get_proposer_index_from_epoch_state(self.state, self.spec)?); self.include_randao_reveal(block, verified_proposer_index)?; self.include_proposer_slashings(block)?; self.include_attester_slashings(block)?; - self.include_attestations(block)?; + self.include_attestations(block, ctxt)?; // Deposits are not included because they can legally have invalid signatures. self.include_exits(block)?; self.include_sync_aggregate(block)?; @@ -260,7 +271,8 @@ where pub fn include_attestations>( &mut self, block: &'a SignedBeaconBlock, - ) -> Result>> { + ctxt: &mut ConsensusContext, + ) -> Result<()> { self.sets .sets .reserve(block.message().body().attestations().len()); @@ -270,28 +282,18 @@ where .body() .attestations() .iter() - .try_fold( - Vec::with_capacity(block.message().body().attestations().len()), - |mut vec, attestation| { - let committee = self - .state - .get_beacon_committee(attestation.data.slot, attestation.data.index)?; - let indexed_attestation = - get_indexed_attestation(committee.committee, attestation)?; - - self.sets.push(indexed_attestation_signature_set( - self.state, - self.get_pubkey.clone(), - &attestation.signature, - &indexed_attestation, - self.spec, - )?); - - vec.push(indexed_attestation); - - Ok(vec) - }, - ) + .try_for_each(|attestation| { + let indexed_attestation = ctxt.get_indexed_attestation(self.state, attestation)?; + + self.sets.push(indexed_attestation_signature_set( + self.state, + self.get_pubkey.clone(), + &attestation.signature, + indexed_attestation, + self.spec, + )?); + Ok(()) + }) .map_err(Error::into) } diff --git a/consensus/state_processing/src/per_block_processing/process_operations.rs b/consensus/state_processing/src/per_block_processing/process_operations.rs index 1000586e660..9f27c4c9a1e 100644 --- a/consensus/state_processing/src/per_block_processing/process_operations.rs +++ b/consensus/state_processing/src/per_block_processing/process_operations.rs @@ -57,8 +57,14 @@ pub mod base { // Verify and apply each attestation. for (i, attestation) in attestations.iter().enumerate() { - verify_attestation_for_block_inclusion(state, attestation, verify_signatures, spec) - .map_err(|e| e.into_with_index(i))?; + verify_attestation_for_block_inclusion( + state, + attestation, + ctxt, + verify_signatures, + spec, + ) + .map_err(|e| e.into_with_index(i))?; let pending_attestation = PendingAttestation { aggregation_bits: attestation.aggregation_bits.clone(), @@ -94,19 +100,11 @@ pub mod altair { ctxt: &mut ConsensusContext, spec: &ChainSpec, ) -> Result<(), BlockProcessingError> { - let proposer_index = ctxt.get_proposer_index(state, spec)?; attestations .iter() .enumerate() .try_for_each(|(i, attestation)| { - process_attestation( - state, - attestation, - i, - proposer_index, - verify_signatures, - spec, - ) + process_attestation(state, attestation, i, ctxt, verify_signatures, spec) }) } @@ -114,16 +112,24 @@ pub mod altair { state: &mut BeaconState, attestation: &Attestation, att_index: usize, - proposer_index: u64, + ctxt: &mut ConsensusContext, verify_signatures: VerifySignatures, spec: &ChainSpec, ) -> Result<(), BlockProcessingError> { state.build_committee_cache(RelativeEpoch::Previous, spec)?; state.build_committee_cache(RelativeEpoch::Current, spec)?; - let indexed_attestation = - verify_attestation_for_block_inclusion(state, attestation, verify_signatures, spec) - .map_err(|e| e.into_with_index(att_index))?; + let proposer_index = ctxt.get_proposer_index(state, spec)?; + + let attesting_indices = &verify_attestation_for_block_inclusion( + state, + attestation, + ctxt, + verify_signatures, + spec, + ) + .map_err(|e| e.into_with_index(att_index))? + .attesting_indices; // Matching roots, participation flag indices let data = &attestation.data; @@ -135,7 +141,7 @@ pub mod altair { let total_active_balance = state.get_total_active_balance()?; let base_reward_per_increment = BaseRewardPerIncrement::new(total_active_balance, spec)?; let mut proposer_reward_numerator = 0; - for index in &indexed_attestation.attesting_indices { + for index in attesting_indices { let index = *index as usize; for (flag_index, &weight) in PARTICIPATION_FLAG_WEIGHTS.iter().enumerate() { diff --git a/consensus/state_processing/src/per_block_processing/verify_attestation.rs b/consensus/state_processing/src/per_block_processing/verify_attestation.rs index 5d8113af4f0..303a6e3913a 100644 --- a/consensus/state_processing/src/per_block_processing/verify_attestation.rs +++ b/consensus/state_processing/src/per_block_processing/verify_attestation.rs @@ -1,7 +1,7 @@ use super::errors::{AttestationInvalid as Invalid, BlockOperationError}; use super::VerifySignatures; -use crate::common::get_indexed_attestation; use crate::per_block_processing::is_valid_indexed_attestation; +use crate::ConsensusContext; use safe_arith::SafeArith; use types::*; @@ -15,12 +15,13 @@ fn error(reason: Invalid) -> BlockOperationError { /// to `state`. Otherwise, returns a descriptive `Err`. /// /// Optionally verifies the aggregate signature, depending on `verify_signatures`. -pub fn verify_attestation_for_block_inclusion( +pub fn verify_attestation_for_block_inclusion<'ctxt, T: EthSpec>( state: &BeaconState, attestation: &Attestation, + ctxt: &'ctxt mut ConsensusContext, verify_signatures: VerifySignatures, spec: &ChainSpec, -) -> Result> { +) -> Result<&'ctxt IndexedAttestation> { let data = &attestation.data; verify!( @@ -39,7 +40,7 @@ pub fn verify_attestation_for_block_inclusion( } ); - verify_attestation_for_state(state, attestation, verify_signatures, spec) + verify_attestation_for_state(state, attestation, ctxt, verify_signatures, spec) } /// Returns `Ok(())` if `attestation` is a valid attestation to the chain that precedes the given @@ -49,12 +50,13 @@ pub fn verify_attestation_for_block_inclusion( /// prior blocks in `state`. /// /// Spec v0.12.1 -pub fn verify_attestation_for_state( +pub fn verify_attestation_for_state<'ctxt, T: EthSpec>( state: &BeaconState, attestation: &Attestation, + ctxt: &'ctxt mut ConsensusContext, verify_signatures: VerifySignatures, spec: &ChainSpec, -) -> Result> { +) -> Result<&'ctxt IndexedAttestation> { let data = &attestation.data; verify!( @@ -66,9 +68,8 @@ pub fn verify_attestation_for_state( verify_casper_ffg_vote(attestation, state)?; // Check signature and bitfields - let committee = state.get_beacon_committee(attestation.data.slot, attestation.data.index)?; - let indexed_attestation = get_indexed_attestation(committee.committee, attestation)?; - is_valid_indexed_attestation(state, &indexed_attestation, verify_signatures, spec)?; + let indexed_attestation = ctxt.get_indexed_attestation(state, attestation)?; + is_valid_indexed_attestation(state, indexed_attestation, verify_signatures, spec)?; Ok(indexed_attestation) } diff --git a/lcli/src/transition_blocks.rs b/lcli/src/transition_blocks.rs index 84d0a517652..44a1772ccd2 100644 --- a/lcli/src/transition_blocks.rs +++ b/lcli/src/transition_blocks.rs @@ -339,6 +339,10 @@ fn do_transition( .map_err(|e| format!("Unable to build caches: {:?}", e))?; debug!("Build all caches (again): {:?}", t.elapsed()); + let mut ctxt = ConsensusContext::new(pre_state.slot()) + .set_current_block_root(block_root) + .set_proposer_index(block.message().proposer_index()); + if !config.no_signature_verification { let get_pubkey = move |validator_index| { validator_pubkey_cache @@ -359,18 +363,20 @@ fn do_transition( get_pubkey, decompressor, &block, - Some(block_root), - Some(block.message().proposer_index()), + &mut ctxt, spec, ) .map_err(|e| format!("Invalid block signature: {:?}", e))?; debug!("Batch verify block signatures: {:?}", t.elapsed()); + + // Signature verification should prime the indexed attestation cache. + assert_eq!( + ctxt.num_cached_indexed_attestations(), + block.message().body().attestations().len() + ); } let t = Instant::now(); - let mut ctxt = ConsensusContext::new(pre_state.slot()) - .set_current_block_root(block_root) - .set_proposer_index(block.message().proposer_index()); per_block_processing( &mut pre_state, &block, diff --git a/testing/ef_tests/src/cases/operations.rs b/testing/ef_tests/src/cases/operations.rs index a351a597c0a..aaa725f567a 100644 --- a/testing/ef_tests/src/cases/operations.rs +++ b/testing/ef_tests/src/cases/operations.rs @@ -80,7 +80,6 @@ impl Operation for Attestation { _: &Operations, ) -> Result<(), BlockProcessingError> { let mut ctxt = ConsensusContext::new(state.slot()); - let proposer_index = ctxt.get_proposer_index(state, spec)?; match state { BeaconState::Base(_) => base::process_attestations( state, @@ -89,14 +88,9 @@ impl Operation for Attestation { &mut ctxt, spec, ), - BeaconState::Altair(_) | BeaconState::Merge(_) => altair::process_attestation( - state, - self, - 0, - proposer_index, - VerifySignatures::True, - spec, - ), + BeaconState::Altair(_) | BeaconState::Merge(_) => { + altair::process_attestation(state, self, 0, &mut ctxt, VerifySignatures::True, spec) + } } } }