From aa10943b8eebf13e38231b1d6592ce0d3a80eb4f Mon Sep 17 00:00:00 2001 From: realbigsean Date: Fri, 24 Mar 2023 16:56:52 -0400 Subject: [PATCH] Partial processing (#4) * move beacon block and blob to eth2/types * rename gossip blob cache to data availability checker * lots of changes * fix some compilation issues * fix compilation issues * fix compilation issues * fix compilation issues * fix compilation issues * fix compilation issues * cargo fmt * use a common data structure for block import types * fix availability check on proposal import * refactor the blob cache and split the block wrapper into two types * add type conversion for signed block and block wrapper * fix beacon chain tests and do some renaming, add some comments --- beacon_node/beacon_chain/src/beacon_chain.rs | 259 ++++----- .../beacon_chain/src/blob_verification.rs | 343 +++++------- .../beacon_chain/src/block_verification.rs | 229 +++++--- beacon_node/beacon_chain/src/builder.rs | 12 +- .../src/data_availability_checker.rs | 516 ++++++++++++++++++ .../beacon_chain/src/early_attester_cache.rs | 2 +- .../beacon_chain/src/gossip_blob_cache.rs | 229 -------- beacon_node/beacon_chain/src/lib.rs | 2 +- .../tests/attestation_production.rs | 27 +- .../beacon_chain/tests/block_verification.rs | 3 +- .../tests/payload_invalidation.rs | 4 + beacon_node/beacon_chain/tests/tests.rs | 27 +- .../http_api/src/build_block_contents.rs | 4 +- beacon_node/http_api/src/publish_blocks.rs | 44 +- .../work_reprocessing_queue.rs | 3 +- .../beacon_processor/worker/gossip_methods.rs | 2 +- .../beacon_processor/worker/sync_methods.rs | 3 +- .../network/src/sync/block_lookups/mod.rs | 3 +- .../src/sync/block_lookups/parent_lookup.rs | 3 +- .../sync/block_lookups/single_block_lookup.rs | 3 +- beacon_node/network/src/sync/manager.rs | 7 +- common/eth2/src/types.rs | 29 + .../src/beacon_block_and_blob_sidecars.rs | 37 -- consensus/types/src/lib.rs | 2 - consensus/types/src/signed_beacon_block.rs | 8 - 25 files changed, 1019 insertions(+), 782 deletions(-) create mode 100644 beacon_node/beacon_chain/src/data_availability_checker.rs delete mode 100644 beacon_node/beacon_chain/src/gossip_blob_cache.rs delete mode 100644 consensus/types/src/beacon_block_and_blob_sidecars.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 6029ee1e73d..f6a564659c5 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -7,17 +7,19 @@ use crate::attester_cache::{AttesterCache, AttesterCacheKey}; use crate::beacon_proposer_cache::compute_proposer_duties_from_head; use crate::beacon_proposer_cache::BeaconProposerCache; use crate::blob_cache::BlobCache; -use crate::blob_verification::{ - self, AsBlock, AvailableBlock, BlobError, BlockWrapper, GossipVerifiedBlob, -}; +use crate::blob_verification::{self, AsBlock, BlobError, BlockWrapper, GossipVerifiedBlob}; use crate::block_times_cache::BlockTimesCache; +use crate::block_verification::POS_PANDA_BANNER; use crate::block_verification::{ check_block_is_finalized_checkpoint_or_descendant, check_block_relevancy, get_block_root, - signature_verify_chain_segment, BlockError, ExecutedBlock, ExecutionPendingBlock, - GossipVerifiedBlock, IntoExecutionPendingBlock, POS_PANDA_BANNER, + signature_verify_chain_segment, AvailableExecutedBlock, BlockError, BlockImportData, + ExecutedBlock, ExecutionPendingBlock, GossipVerifiedBlock, IntoExecutionPendingBlock, }; pub use crate::canonical_head::{CanonicalHead, CanonicalHeadRwLock}; use crate::chain_config::ChainConfig; +use crate::data_availability_checker::{ + Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker, +}; use crate::early_attester_cache::EarlyAttesterCache; use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend}; @@ -25,7 +27,6 @@ use crate::eth1_finalization_cache::{Eth1FinalizationCache, Eth1FinalizationData use crate::events::ServerSentEventHandler; use crate::execution_payload::{get_execution_payload, NotifyExecutionLayer, PreparePayloadHandle}; use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult}; -use crate::gossip_blob_cache::{Availability, AvailabilityCheckError, DataAvailabilityChecker}; use crate::head_tracker::HeadTracker; use crate::historical_blocks::HistoricalBlockError; use crate::kzg_utils; @@ -185,7 +186,7 @@ pub enum WhenSlotSkipped { Prev, } -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub enum AvailabilityProcessingStatus { PendingBlobs(Vec), PendingBlock(Hash256), @@ -204,6 +205,17 @@ impl TryInto for AvailabilityProcessingStatus { } } +impl TryInto for AvailabilityProcessingStatus { + type Error = (); + + fn try_into(self) -> Result { + match self { + AvailabilityProcessingStatus::Imported(hash) => Ok(hash), + _ => Err(()), + } + } +} + /// The result of a chain segment processing. pub enum ChainSegmentResult { /// Processing this chain segment finished successfully. @@ -455,7 +467,7 @@ pub struct BeaconChain { /// Provides monitoring of a set of explicitly defined validators. pub validator_monitor: RwLock>, pub proposal_blob_cache: BlobCache, - pub data_availability_checker: DataAvailabilityChecker, + pub data_availability_checker: DataAvailabilityChecker, pub kzg: Option>, } @@ -2657,11 +2669,11 @@ impl BeaconChain { pub async fn process_blob( self: &Arc, - blob: Arc>, + blob: GossipVerifiedBlob, count_unrealized: CountUnrealized, ) -> Result> { self.check_availability_and_maybe_import( - |chain| chain.data_availability_checker.put_blob(blob), + |chain| chain.data_availability_checker.put_gossip_blob(blob), count_unrealized, ) .await @@ -2704,24 +2716,29 @@ impl BeaconChain { notify_execution_layer, )?; - // TODO(log required errors) let executed_block = self .clone() .into_executed_block(execution_pending) .await .map_err(|e| self.handle_block_error(e))?; - self.check_availability_and_maybe_import( - |chain| { - chain - .data_availability_checker - .check_block_availability(executed_block, |epoch| { - chain.block_needs_da_check(epoch) - }) - }, - count_unrealized, - ) - .await + match executed_block { + ExecutedBlock::Available(block) => { + self.import_available_block(Box::new(block), count_unrealized) + .await + } + ExecutedBlock::AvailabilityPending(block) => { + self.check_availability_and_maybe_import( + |chain| { + chain + .data_availability_checker + .put_pending_executed_block(block) + }, + count_unrealized, + ) + .await + } + } } /// Accepts a fully-verified block and awaits on it's payload verification handle to @@ -2734,13 +2751,8 @@ impl BeaconChain { ) -> Result, BlockError> { let ExecutionPendingBlock { block, - block_root, - state, - parent_block, - confirmed_state_roots, + import_data, payload_verification_handle, - parent_eth1_finalization_data, - consensus_context, } = execution_pending_block; let payload_verification_outcome = payload_verification_handle @@ -2777,16 +2789,11 @@ impl BeaconChain { .into_root() ); } - Ok(ExecutedBlock { + Ok(ExecutedBlock::new( block, - block_root, - state, - parent_block, - confirmed_state_roots, - parent_eth1_finalization_data, - consensus_context, + import_data, payload_verification_outcome, - }) + )) } fn handle_block_error(&self, e: BlockError) -> BlockError { @@ -2831,68 +2838,7 @@ impl BeaconChain { let availability = cache_fn(self.clone())?; match availability { Availability::Available(block) => { - let ExecutedBlock { - block, - block_root, - state, - parent_block, - parent_eth1_finalization_data, - confirmed_state_roots, - consensus_context, - payload_verification_outcome, - } = *block; - - let available_block = match block { - BlockWrapper::Available(block) => block, - BlockWrapper::AvailabilityPending(_) => { - todo!() // logic error - } - }; - - let slot = available_block.slot(); - - // import - let chain = self.clone(); - let result = self - .spawn_blocking_handle( - move || { - chain.import_block( - available_block, - block_root, - state, - confirmed_state_roots, - payload_verification_outcome.payload_verification_status, - count_unrealized, - parent_block, - parent_eth1_finalization_data, - consensus_context, - ) - }, - "payload_verification_handle", - ) - .await - .map_err(|e| { - let b = BlockError::from(e); - self.handle_block_error(b) - })?; - - match result { - // The block was successfully verified and imported. Yay. - Ok(block_root) => { - trace!( - self.log, - "Beacon block imported"; - "block_root" => ?block_root, - "block_slot" => slot, - ); - - // Increment the Prometheus counter for block processing successes. - metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES); - - Ok(AvailabilityProcessingStatus::Imported(block_root)) - } - Err(e) => Err(self.handle_block_error(e)), - } + self.import_available_block(block, count_unrealized).await } Availability::PendingBlock(block_root) => { Ok(AvailabilityProcessingStatus::PendingBlock(block_root)) @@ -2903,6 +2849,72 @@ impl BeaconChain { } } + async fn import_available_block( + self: &Arc, + block: Box>, + count_unrealized: CountUnrealized, + ) -> Result> { + let AvailableExecutedBlock { + block, + import_data, + payload_verification_outcome, + } = *block; + + let BlockImportData { + block_root, + state, + parent_block, + parent_eth1_finalization_data, + confirmed_state_roots, + consensus_context, + } = import_data; + + let slot = block.slot(); + + // import + let chain = self.clone(); + let result = self + .spawn_blocking_handle( + move || { + chain.import_block( + block, + block_root, + state, + confirmed_state_roots, + payload_verification_outcome.payload_verification_status, + count_unrealized, + parent_block, + parent_eth1_finalization_data, + consensus_context, + ) + }, + "payload_verification_handle", + ) + .await + .map_err(|e| { + let b = BlockError::from(e); + self.handle_block_error(b) + })?; + + match result { + // The block was successfully verified and imported. Yay. + Ok(block_root) => { + trace!( + self.log, + "Beacon block imported"; + "block_root" => ?block_root, + "block_slot" => slot, + ); + + // Increment the Prometheus counter for block processing successes. + metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES); + + Ok(AvailabilityProcessingStatus::Imported(block_root)) + } + Err(e) => Err(self.handle_block_error(e)), + } + } + /// Accepts a fully-verified and available block and imports it into the chain without performing any /// additional verification. /// @@ -2965,13 +2977,8 @@ impl BeaconChain { let mut fork_choice = self.canonical_head.fork_choice_write_lock(); // Do not import a block that doesn't descend from the finalized root. - let signed_block = check_block_is_finalized_checkpoint_or_descendant( - self, - &fork_choice, - BlockWrapper::from(signed_block), - )?; - // TODO(pawan): fix this atrocity - let signed_block = signed_block.into_available_block().unwrap(); + let signed_block = + check_block_is_finalized_checkpoint_or_descendant(self, &fork_choice, signed_block)?; let block = signed_block.message(); // Register the new block with the fork choice service. @@ -3092,27 +3099,17 @@ impl BeaconChain { ops.push(StoreOp::PutBlock(block_root, signed_block.clone())); ops.push(StoreOp::PutState(block.state_root(), &state)); - // Only consider blobs if the eip4844 fork is enabled. - if let Some(data_availability_boundary) = self.data_availability_boundary() { - let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); - let margin_epochs = self.store.get_config().blob_prune_margin_epochs; - let import_boundary = data_availability_boundary - margin_epochs; - - // Only store blobs at the data availability boundary, minus any configured epochs - // margin, or younger (of higher epoch number). - if block_epoch >= import_boundary { - if let Some(blobs) = blobs { - if !blobs.is_empty() { - //FIXME(sean) using this for debugging for now - info!( - self.log, "Writing blobs to store"; - "block_root" => ?block_root - ); - ops.push(StoreOp::PutBlobs(block_root, blobs)); - } - } + if let Some(blobs) = blobs { + if !blobs.is_empty() { + //FIXME(sean) using this for debugging for now + info!( + self.log, "Writing blobs to store"; + "block_root" => ?block_root + ); + ops.push(StoreOp::PutBlobs(block_root, blobs)); } } + let txn_lock = self.store.hot_db.begin_rw_transaction(); if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) { @@ -6180,20 +6177,6 @@ impl BeaconChain { .map_or(false, |da_epoch| block_epoch >= da_epoch) } - /// The epoch that is a data availability boundary, or the latest finalized epoch. - /// `None` if the `Eip4844` fork is disabled. - pub fn finalized_data_availability_boundary(&self) -> Option { - self.data_availability_boundary().map(|boundary| { - std::cmp::max( - boundary, - self.canonical_head - .cached_head() - .finalized_checkpoint() - .epoch, - ) - }) - } - /// Returns `true` if we are at or past the `Eip4844` fork. This will always return `false` if /// the `Eip4844` fork is disabled. pub fn is_data_availability_check_required(&self) -> Result { diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index d2d8faa1a97..9d1a7c708ec 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -1,3 +1,4 @@ +use derivative::Derivative; use slot_clock::SlotClock; use std::sync::Arc; @@ -5,14 +6,15 @@ use crate::beacon_chain::{ BeaconChain, BeaconChainTypes, MAXIMUM_GOSSIP_CLOCK_DISPARITY, VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT, }; -use crate::gossip_blob_cache::AvailabilityCheckError; +use crate::data_availability_checker::{ + AvailabilityCheckError, AvailabilityPendingBlock, AvailableBlock, +}; +use crate::kzg_utils::{validate_blob, validate_blobs}; use crate::BeaconChainError; -use derivative::Derivative; -use state_processing::per_block_processing::eip4844::eip4844::verify_kzg_commitments_against_transactions; +use kzg::Kzg; use types::{ BeaconBlockRef, BeaconStateError, BlobSidecar, BlobSidecarList, Epoch, EthSpec, Hash256, KzgCommitment, SignedBeaconBlock, SignedBeaconBlockHeader, SignedBlobSidecar, Slot, - Transactions, }; #[derive(Debug)] @@ -134,8 +136,8 @@ pub struct GossipVerifiedBlob { } impl GossipVerifiedBlob { - pub fn to_blob(self) -> Arc> { - self.blob + pub fn block_root(&self) -> Hash256 { + self.blob.block_root } } @@ -261,167 +263,85 @@ pub fn validate_blob_sidecar_for_gossip( }) } -pub fn verify_data_availability( - _blob_sidecar: &BlobSidecarList, - kzg_commitments: &[KzgCommitment], - transactions: &Transactions, - _block_slot: Slot, - _block_root: Hash256, - chain: &BeaconChain, -) -> Result<(), BlobError> { - if verify_kzg_commitments_against_transactions::(transactions, kzg_commitments) - .is_err() - { - return Err(BlobError::TransactionCommitmentMismatch); - } - - // Validatate that the kzg proof is valid against the commitments and blobs - let _kzg = chain - .kzg - .as_ref() - .ok_or(BlobError::TrustedSetupNotInitialized)?; - - todo!("use `kzg_utils::validate_blobs` once the function is updated") - // if !kzg_utils::validate_blobs_sidecar( - // kzg, - // block_slot, - // block_root, - // kzg_commitments, - // blob_sidecar, - // ) - // .map_err(BlobError::KzgError)? - // { - // return Err(BlobError::InvalidKzgProof); - // } - // Ok(()) -} - -#[derive(Copy, Clone)] -pub enum DataAvailabilityCheckRequired { - Yes, - No, -} - -pub trait IntoAvailableBlock { - fn into_available_block( - self, - block_root: Hash256, - chain: &BeaconChain, - ) -> Result, BlobError>; +#[derive(Debug, Clone)] +pub struct KzgVerifiedBlob { + blob: Arc>, } -impl IntoAvailableBlock for BlockWrapper { - fn into_available_block( - self, - _block_root: Hash256, - _chain: &BeaconChain, - ) -> Result, BlobError> { - todo!() +impl KzgVerifiedBlob { + pub fn to_blob(self) -> Arc> { + self.blob } -} - -#[derive(Clone, Debug, PartialEq, Derivative)] -#[derivative(Hash(bound = "E: EthSpec"))] -pub struct AvailableBlock(AvailableBlockInner); - -#[derive(Clone, Debug, PartialEq, Derivative)] -#[derivative(Hash(bound = "E: EthSpec"))] -struct AvailableBlockInner { - block: Arc>, - blobs: VerifiedBlobs, -} - -impl AvailableBlock { - pub fn new( - block: Arc>, - blobs: Vec>>, - da_check: impl FnOnce(Epoch) -> bool, - ) -> Result { - if let Ok(block_kzg_commitments) = block.message().body().blob_kzg_commitments() { - if blobs.is_empty() && block_kzg_commitments.is_empty() { - return Ok(Self(AvailableBlockInner { - block, - blobs: VerifiedBlobs::EmptyBlobs, - })); - } - - if blobs.is_empty() { - if da_check(block.epoch()) { - return Ok(Self(AvailableBlockInner { - block, - blobs: VerifiedBlobs::NotRequired, - })); - } else { - return Err("Block within DA boundary but no blobs provided".to_string()); - } - } - - if blobs.len() != block_kzg_commitments.len() { - return Err(format!( - "Block commitments and blobs length must be the same. - Block commitments len: {}, blobs length: {}", - block_kzg_commitments.len(), - blobs.len() - )); - } - - for (block_commitment, blob) in block_kzg_commitments.iter().zip(blobs.iter()) { - if *block_commitment != blob.kzg_commitment { - return Err(format!( - "Invalid input. Blob and block commitment mismatch at index {}", - blob.index - )); - } - } - Ok(Self(AvailableBlockInner { - block, - blobs: VerifiedBlobs::Available(blobs.into()), - })) - } - // This is a pre 4844 block - else { - Ok(Self(AvailableBlockInner { - block, - blobs: VerifiedBlobs::PreEip4844, - })) - } + pub fn as_blob(&self) -> &BlobSidecar { + &self.blob } - - pub fn block(&self) -> &SignedBeaconBlock { - &self.0.block + pub fn clone_blob(&self) -> Arc> { + self.blob.clone() } + pub fn kzg_commitment(&self) -> KzgCommitment { + self.blob.kzg_commitment + } + pub fn block_root(&self) -> Hash256 { + self.blob.block_root + } +} - pub fn blobs(&self) -> Option> { - match &self.0.blobs { - VerifiedBlobs::EmptyBlobs | VerifiedBlobs::NotRequired | VerifiedBlobs::PreEip4844 => { - None - } - VerifiedBlobs::Available(blobs) => Some(blobs.clone()), - } +pub fn verify_kzg_for_blob( + blob: GossipVerifiedBlob, + kzg: &Kzg, +) -> Result, AvailabilityCheckError> { + //TODO(sean) remove clone + if validate_blob::( + kzg, + blob.blob.blob.clone(), + blob.blob.kzg_commitment, + blob.blob.kzg_proof, + ) + .map_err(AvailabilityCheckError::Kzg)? + { + Ok(KzgVerifiedBlob { blob: blob.blob }) + } else { + Err(AvailabilityCheckError::KzgVerificationFailed) } +} - pub fn deconstruct(self) -> (Arc>, Option>) { - match self.0.blobs { - VerifiedBlobs::EmptyBlobs | VerifiedBlobs::NotRequired | VerifiedBlobs::PreEip4844 => { - (self.0.block, None) - } - VerifiedBlobs::Available(blobs) => (self.0.block, Some(blobs)), - } +pub fn verify_kzg_for_blob_list( + blob_list: BlobSidecarList, + kzg: &Kzg, +) -> Result, AvailabilityCheckError> { + let (blobs, (commitments, proofs)): (Vec<_>, (Vec<_>, Vec<_>)) = blob_list + .clone() + .into_iter() + //TODO(sean) remove clone + .map(|blob| (blob.blob.clone(), (blob.kzg_commitment, blob.kzg_proof))) + .unzip(); + if validate_blobs::( + kzg, + commitments.as_slice(), + blobs.as_slice(), + proofs.as_slice(), + ) + .map_err(AvailabilityCheckError::Kzg)? + { + Ok(blob_list + .into_iter() + .map(|blob| KzgVerifiedBlob { blob }) + .collect()) + } else { + Err(AvailabilityCheckError::KzgVerificationFailed) } } -#[derive(Clone, Debug, PartialEq, Derivative)] -#[derivative(Hash(bound = "E: EthSpec"))] -pub enum VerifiedBlobs { - /// These blobs are available. - Available(BlobSidecarList), - /// This block is from outside the data availability boundary so doesn't require - /// a data availability check. - NotRequired, - /// The block's `kzg_commitments` field is empty so it does not contain any blobs. - EmptyBlobs, - /// This is a block prior to the 4844 fork, so doesn't require any blobs - PreEip4844, +pub type KzgVerifiedBlobList = Vec>; + +#[derive(Debug, Clone)] +pub enum MaybeAvailableBlock { + /// This variant is fully available. + /// i.e. for pre-eip4844 blocks, it contains a (`SignedBeaconBlock`, `Blobs::None`) and for + /// post-4844 blocks, it contains a `SignedBeaconBlock` and a Blobs variant other than `Blobs::None`. + Available(AvailableBlock), + /// This variant is not fully available and requires blobs to become fully available. + AvailabilityPending(AvailabilityPendingBlock), } pub trait AsBlock { @@ -434,67 +354,55 @@ pub trait AsBlock { fn as_block(&self) -> &SignedBeaconBlock; fn block_cloned(&self) -> Arc>; fn canonical_root(&self) -> Hash256; + fn into_block_wrapper(self) -> BlockWrapper; } -#[derive(Debug, Clone, Derivative)] -#[derivative(Hash(bound = "E: EthSpec"))] -pub enum BlockWrapper { - /// This variant is fully available. - /// i.e. for pre-4844 blocks, it contains a (`SignedBeaconBlock`, `Blobs::None`) and for - /// post-4844 blocks, it contains a `SignedBeaconBlock` and a Blobs variant other than `Blobs::None`. - Available(AvailableBlock), - /// This variant is not fully available and requires blobs to become fully available. - AvailabilityPending(Arc>), -} - -impl BlockWrapper { - pub fn into_available_block(self) -> Option> { - match self { - BlockWrapper::AvailabilityPending(_) => None, - BlockWrapper::Available(block) => Some(block), - } - } -} - -impl AsBlock for AvailableBlock { +impl AsBlock for MaybeAvailableBlock { fn slot(&self) -> Slot { - self.0.block.slot() + self.as_block().slot() } - fn epoch(&self) -> Epoch { - self.0.block.epoch() + self.as_block().epoch() } - fn parent_root(&self) -> Hash256 { - self.0.block.parent_root() + self.as_block().parent_root() } - fn state_root(&self) -> Hash256 { - self.0.block.state_root() + self.as_block().state_root() } - fn signed_block_header(&self) -> SignedBeaconBlockHeader { - self.0.block.signed_block_header() + self.as_block().signed_block_header() } - fn message(&self) -> BeaconBlockRef { - self.0.block.message() + self.as_block().message() } - fn as_block(&self) -> &SignedBeaconBlock { - &self.0.block + match &self { + MaybeAvailableBlock::Available(block) => block.as_block(), + MaybeAvailableBlock::AvailabilityPending(block) => block.as_block(), + } } - fn block_cloned(&self) -> Arc> { - self.0.block.clone() + match &self { + MaybeAvailableBlock::Available(block) => block.block_cloned(), + MaybeAvailableBlock::AvailabilityPending(block) => block.block_cloned(), + } } - fn canonical_root(&self) -> Hash256 { - self.0.block.canonical_root() + self.as_block().canonical_root() + } + + fn into_block_wrapper(self) -> BlockWrapper { + match self { + MaybeAvailableBlock::Available(available_block) => available_block.into_block_wrapper(), + MaybeAvailableBlock::AvailabilityPending(pending_block) => { + BlockWrapper::Block(pending_block.to_block()) + } + } } } -impl AsBlock for BlockWrapper { +impl AsBlock for &MaybeAvailableBlock { fn slot(&self) -> Slot { self.as_block().slot() } @@ -515,22 +423,33 @@ impl AsBlock for BlockWrapper { } fn as_block(&self) -> &SignedBeaconBlock { match &self { - BlockWrapper::Available(block) => &block.0.block, - BlockWrapper::AvailabilityPending(block) => block, + MaybeAvailableBlock::Available(block) => block.as_block(), + MaybeAvailableBlock::AvailabilityPending(block) => block.as_block(), } } fn block_cloned(&self) -> Arc> { match &self { - BlockWrapper::Available(block) => block.0.block.clone(), - BlockWrapper::AvailabilityPending(block) => block.clone(), + MaybeAvailableBlock::Available(block) => block.block_cloned(), + MaybeAvailableBlock::AvailabilityPending(block) => block.block_cloned(), } } fn canonical_root(&self) -> Hash256 { self.as_block().canonical_root() } + + fn into_block_wrapper(self) -> BlockWrapper { + self.clone().into_block_wrapper() + } +} + +#[derive(Debug, Clone, Derivative)] +#[derivative(Hash(bound = "E: EthSpec"))] +pub enum BlockWrapper { + Block(Arc>), + BlockAndBlobs(Arc>, BlobSidecarList), } -impl AsBlock for &BlockWrapper { +impl AsBlock for BlockWrapper { fn slot(&self) -> Slot { self.as_block().slot() } @@ -551,35 +470,33 @@ impl AsBlock for &BlockWrapper { } fn as_block(&self) -> &SignedBeaconBlock { match &self { - BlockWrapper::Available(block) => &block.0.block, - BlockWrapper::AvailabilityPending(block) => block, + BlockWrapper::Block(block) => block, + BlockWrapper::BlockAndBlobs(block, _) => block, } } fn block_cloned(&self) -> Arc> { match &self { - BlockWrapper::Available(block) => block.0.block.clone(), - BlockWrapper::AvailabilityPending(block) => block.clone(), + BlockWrapper::Block(block) => block.clone(), + BlockWrapper::BlockAndBlobs(block, _) => block.clone(), } } fn canonical_root(&self) -> Hash256 { self.as_block().canonical_root() } -} -impl From> for BlockWrapper { - fn from(block: SignedBeaconBlock) -> Self { - BlockWrapper::AvailabilityPending(Arc::new(block)) + fn into_block_wrapper(self) -> BlockWrapper { + self } } impl From>> for BlockWrapper { - fn from(block: Arc>) -> Self { - BlockWrapper::AvailabilityPending(block) + fn from(value: Arc>) -> Self { + Self::Block(value) } } -impl From> for BlockWrapper { - fn from(block: AvailableBlock) -> Self { - BlockWrapper::Available(block) +impl From> for BlockWrapper { + fn from(value: SignedBeaconBlock) -> Self { + Self::Block(Arc::new(value)) } } diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 9af753d8e97..6ecb16d5876 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -48,13 +48,15 @@ // returned alongside. #![allow(clippy::result_large_err)] -use crate::blob_verification::{AsBlock, AvailableBlock, BlobError, BlockWrapper}; +use crate::blob_verification::{AsBlock, BlobError, BlockWrapper, MaybeAvailableBlock}; +use crate::data_availability_checker::{ + AvailabilityCheckError, AvailabilityPendingBlock, AvailableBlock, +}; use crate::eth1_finalization_cache::Eth1FinalizationData; use crate::execution_payload::{ is_optimistic_candidate_block, validate_execution_payload_for_gossip, validate_merge_block, AllowOptimisticImport, NotifyExecutionLayer, PayloadNotifier, }; -use crate::gossip_blob_cache::AvailabilityCheckError; use crate::snapshot_cache::PreProcessingSnapshot; use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS; use crate::validator_pubkey_cache::ValidatorPubkeyCache; @@ -93,9 +95,9 @@ use task_executor::JoinHandle; use tree_hash::TreeHash; use types::ExecPayload; use types::{ - BeaconBlockRef, BeaconState, BeaconStateError, BlindedPayload, BlobSidecar, ChainSpec, - CloneConfig, Epoch, EthSpec, ExecutionBlockHash, Hash256, InconsistentFork, PublicKey, - PublicKeyBytes, RelativeEpoch, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, + BeaconBlockRef, BeaconState, BeaconStateError, BlindedPayload, ChainSpec, CloneConfig, Epoch, + EthSpec, ExecutionBlockHash, Hash256, InconsistentFork, PublicKey, PublicKeyBytes, + RelativeEpoch, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, }; pub const POS_PANDA_BANNER: &str = r#" @@ -601,13 +603,14 @@ pub fn signature_verify_chain_segment( signature_verifier.include_all_signatures(block.as_block(), &mut consensus_context)?; - //FIXME(sean) batch kzg verification - consensus_context = consensus_context.set_kzg_commitments_consistent(true); + let maybe_available_block = chain + .data_availability_checker + .check_availability(block.clone())?; // Save the block and its consensus context. The context will have had its proposer index // and attesting indices filled in, which can be used to accelerate later block processing. signature_verified_blocks.push(SignatureVerifiedBlock { - block: block.clone(), + block: maybe_available_block, block_root: *block_root, parent: None, consensus_context, @@ -632,7 +635,7 @@ pub fn signature_verify_chain_segment( #[derive(Derivative)] #[derivative(Debug(bound = "T: BeaconChainTypes"))] pub struct GossipVerifiedBlock { - pub block: BlockWrapper, + pub block: MaybeAvailableBlock, pub block_root: Hash256, parent: Option>, consensus_context: ConsensusContext, @@ -641,7 +644,7 @@ pub struct GossipVerifiedBlock { /// A wrapper around a `SignedBeaconBlock` that indicates that all signatures (except the deposit /// signatures) have been verified. pub struct SignatureVerifiedBlock { - block: BlockWrapper, + block: MaybeAvailableBlock, block_root: Hash256, parent: Option>, consensus_context: ConsensusContext, @@ -664,32 +667,103 @@ type PayloadVerificationHandle = /// due to finality or some other event. A `ExecutionPendingBlock` should be imported into the /// `BeaconChain` immediately after it is instantiated. pub struct ExecutionPendingBlock { - pub block: BlockWrapper, - pub block_root: Hash256, - pub state: BeaconState, - pub parent_block: SignedBeaconBlock>, - pub parent_eth1_finalization_data: Eth1FinalizationData, - pub confirmed_state_roots: Vec, - pub consensus_context: ConsensusContext, + pub block: MaybeAvailableBlock, + pub import_data: BlockImportData, pub payload_verification_handle: PayloadVerificationHandle, } -#[derive(Clone)] -pub struct ExecutedBlock { - pub block: BlockWrapper, +pub enum ExecutedBlock { + Available(AvailableExecutedBlock), + AvailabilityPending(AvailabilityPendingExecutedBlock), +} + +impl ExecutedBlock { + pub fn as_block(&self) -> &SignedBeaconBlock { + match self { + Self::Available(available) => available.block.block(), + Self::AvailabilityPending(pending) => pending.block.as_block(), + } + } +} + +impl std::fmt::Debug for ExecutedBlock { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.as_block()) + } +} + +impl ExecutedBlock { + pub fn new( + block: MaybeAvailableBlock, + import_data: BlockImportData, + payload_verification_outcome: PayloadVerificationOutcome, + ) -> Self { + match block { + MaybeAvailableBlock::Available(available_block) => { + Self::Available(AvailableExecutedBlock::new( + available_block, + import_data, + payload_verification_outcome, + )) + } + MaybeAvailableBlock::AvailabilityPending(pending_block) => { + Self::AvailabilityPending(AvailabilityPendingExecutedBlock::new( + pending_block, + import_data, + payload_verification_outcome, + )) + } + } + } +} + +pub struct AvailableExecutedBlock { + pub block: AvailableBlock, + pub import_data: BlockImportData, + pub payload_verification_outcome: PayloadVerificationOutcome, +} + +impl AvailableExecutedBlock { + pub fn new( + block: AvailableBlock, + import_data: BlockImportData, + payload_verification_outcome: PayloadVerificationOutcome, + ) -> Self { + Self { + block, + import_data, + payload_verification_outcome, + } + } +} + +pub struct AvailabilityPendingExecutedBlock { + pub block: AvailabilityPendingBlock, + pub import_data: BlockImportData, + pub payload_verification_outcome: PayloadVerificationOutcome, +} + +impl AvailabilityPendingExecutedBlock { + pub fn new( + block: AvailabilityPendingBlock, + import_data: BlockImportData, + payload_verification_outcome: PayloadVerificationOutcome, + ) -> Self { + Self { + block, + import_data, + payload_verification_outcome, + } + } +} + +pub struct BlockImportData { pub block_root: Hash256, pub state: BeaconState, pub parent_block: SignedBeaconBlock>, pub parent_eth1_finalization_data: Eth1FinalizationData, pub confirmed_state_roots: Vec, pub consensus_context: ConsensusContext, - pub payload_verification_outcome: PayloadVerificationOutcome, -} - -impl std::fmt::Debug for ExecutedBlock { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.block) - } } /// Implemented on types that can be converted into a `ExecutionPendingBlock`. @@ -733,19 +807,20 @@ impl GossipVerifiedBlock { block: BlockWrapper, chain: &BeaconChain, ) -> Result> { + let maybe_available = chain.data_availability_checker.check_availability(block)?; // If the block is valid for gossip we don't supply it to the slasher here because // we assume it will be transformed into a fully verified block. We *do* need to supply // it to the slasher if an error occurs, because that's the end of this block's journey, // and it could be a repeat proposal (a likely cause for slashing!). - let header = block.signed_block_header(); - Self::new_without_slasher_checks(block, chain).map_err(|e| { + let header = maybe_available.signed_block_header(); + Self::new_without_slasher_checks(maybe_available, chain).map_err(|e| { process_block_slash_info(chain, BlockSlashInfo::from_early_error(header, e)) }) } /// As for new, but doesn't pass the block to the slasher. fn new_without_slasher_checks( - block: BlockWrapper, + block: MaybeAvailableBlock, chain: &BeaconChain, ) -> Result> { // Ensure the block is the correct structure for the fork at `block.slot()`. @@ -989,11 +1064,11 @@ impl SignatureVerifiedBlock { /// /// Returns an error if the block is invalid, or if the block was unable to be verified. pub fn new( - block: Arc>, + block: BlockWrapper, block_root: Hash256, chain: &BeaconChain, ) -> Result> { - let block = BlockWrapper::from(block); + let block = chain.data_availability_checker.check_availability(block)?; // Ensure the block is the correct structure for the fork at `block.slot()`. block .as_block() @@ -1040,7 +1115,7 @@ impl SignatureVerifiedBlock { /// As for `new` above but producing `BlockSlashInfo`. pub fn check_slashable( - block: Arc>, + block: BlockWrapper, block_root: Hash256, chain: &BeaconChain, ) -> Result>> { @@ -1149,7 +1224,7 @@ impl IntoExecutionPendingBlock for Arc IntoExecutionPendingBlock for Arc IntoExecutionPendingBlock for BlockWrapper { + /// Verifies the `SignedBeaconBlock` by first transforming it into a `SignatureVerifiedBlock` + /// and then using that implementation of `IntoExecutionPendingBlock` to complete verification. fn into_execution_pending_block_slashable( self, block_root: Hash256, chain: &Arc>, notify_execution_layer: NotifyExecutionLayer, - ) -> Result< - ExecutionPendingBlock, - BlockSlashInfo::EthSpec>>, - > { - match self { - BlockWrapper::AvailabilityPending(block) => block - .into_execution_pending_block_slashable(block_root, chain, notify_execution_layer), - BlockWrapper::Available(available_block) => { - let (block, blobs) = available_block.deconstruct(); - let mut execution_pending_block = block.into_execution_pending_block_slashable( - block_root, - chain, - notify_execution_layer, - )?; - let block = execution_pending_block.block.block_cloned(); + ) -> Result, BlockSlashInfo>> { + // Perform an early check to prevent wasting time on irrelevant blocks. + let block_root = check_block_relevancy(self.as_block(), block_root, chain) + .map_err(|e| BlockSlashInfo::SignatureNotChecked(self.signed_block_header(), e))?; - let blobs: Vec>> = match blobs { - Some(blob_list) => blob_list.into(), - None => vec![], - }; - let available_execution_pending_block = - AvailableBlock::new(block, blobs, |epoch| chain.block_needs_da_check(epoch)) - .map_err(|e| { - BlockSlashInfo::SignatureValid( - execution_pending_block.block.signed_block_header(), - BlockError::AvailabilityCheck( - AvailabilityCheckError::InvalidBlockOrBlob(e), - ), - ) - })?; - execution_pending_block.block = available_execution_pending_block.into(); - Ok(execution_pending_block) - } - } + SignatureVerifiedBlock::check_slashable(self, block_root, chain)? + .into_execution_pending_block_slashable(block_root, chain, notify_execution_layer) } - fn block(&self) -> &SignedBeaconBlock<::EthSpec> { + fn block(&self) -> &SignedBeaconBlock { self.as_block() } } @@ -1214,7 +1264,7 @@ impl ExecutionPendingBlock { /// /// Returns an error if the block is invalid, or if the block was unable to be verified. pub fn from_signature_verified_components( - block: BlockWrapper, + block: MaybeAvailableBlock, block_root: Hash256, parent: PreProcessingSnapshot, mut consensus_context: ConsensusContext, @@ -1244,7 +1294,7 @@ impl ExecutionPendingBlock { // because it will revert finalization. Note that the finalized block is stored in fork // choice, so we will not reject any child of the finalized block (this is relevant during // genesis). - return Err(BlockError::ParentUnknown(block)); + return Err(BlockError::ParentUnknown(block.into_block_wrapper())); } // Reject any block that exceeds our limit on skipped slots. @@ -1595,12 +1645,14 @@ impl ExecutionPendingBlock { Ok(Self { block, - block_root, - state, - parent_block: parent.beacon_block, - parent_eth1_finalization_data, - confirmed_state_roots, - consensus_context, + import_data: BlockImportData { + block_root, + state, + parent_block: parent.beacon_block, + parent_eth1_finalization_data, + confirmed_state_roots, + consensus_context, + }, payload_verification_handle, }) } @@ -1679,11 +1731,14 @@ fn check_block_against_finalized_slot( /// ## Warning /// /// Taking a lock on the `chain.canonical_head.fork_choice` might cause a deadlock here. -pub fn check_block_is_finalized_checkpoint_or_descendant( +pub fn check_block_is_finalized_checkpoint_or_descendant< + T: BeaconChainTypes, + B: AsBlock, +>( chain: &BeaconChain, fork_choice: &BeaconForkChoice, - block: BlockWrapper, -) -> Result, BlockError> { + block: B, +) -> Result> { if fork_choice.is_finalized_checkpoint_or_descendant(block.parent_root()) { Ok(block) } else { @@ -1704,7 +1759,7 @@ pub fn check_block_is_finalized_checkpoint_or_descendant( block_parent_root: block.parent_root(), }) } else { - Err(BlockError::ParentUnknown(block)) + Err(BlockError::ParentUnknown(block.into_block_wrapper())) } } } @@ -1776,8 +1831,8 @@ pub fn get_block_root(block: &SignedBeaconBlock) -> Hash256 { #[allow(clippy::type_complexity)] fn verify_parent_block_is_known( chain: &BeaconChain, - block: BlockWrapper, -) -> Result<(ProtoBlock, BlockWrapper), BlockError> { + block: MaybeAvailableBlock, +) -> Result<(ProtoBlock, MaybeAvailableBlock), BlockError> { if let Some(proto_block) = chain .canonical_head .fork_choice_read_lock() @@ -1785,7 +1840,7 @@ fn verify_parent_block_is_known( { Ok((proto_block, block)) } else { - Err(BlockError::ParentUnknown(block)) + Err(BlockError::ParentUnknown(block.into_block_wrapper())) } } @@ -1794,11 +1849,11 @@ fn verify_parent_block_is_known( /// Returns `Err(BlockError::ParentUnknown)` if the parent is not found, or if an error occurs /// whilst attempting the operation. #[allow(clippy::type_complexity)] -fn load_parent( +fn load_parent>( block_root: Hash256, - block: BlockWrapper, + block: B, chain: &BeaconChain, -) -> Result<(PreProcessingSnapshot, BlockWrapper), BlockError> { +) -> Result<(PreProcessingSnapshot, B), BlockError> { let spec = &chain.spec; // Reject any block if its parent is not known to fork choice. @@ -1816,7 +1871,7 @@ fn load_parent( .fork_choice_read_lock() .contains_block(&block.parent_root()) { - return Err(BlockError::ParentUnknown(block)); + return Err(BlockError::ParentUnknown(block.into_block_wrapper())); } let block_delay = chain diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 9c97983f2bd..0ed52ea6edc 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -1,10 +1,10 @@ use crate::beacon_chain::{CanonicalHead, BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY}; use crate::blob_cache::BlobCache; +use crate::data_availability_checker::DataAvailabilityChecker; use crate::eth1_chain::{CachingEth1Backend, SszEth1}; use crate::eth1_finalization_cache::Eth1FinalizationCache; use crate::fork_choice_signal::ForkChoiceSignalTx; use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_boundary}; -use crate::gossip_blob_cache::DataAvailabilityChecker; use crate::head_tracker::HeadTracker; use crate::migrate::{BackgroundMigrator, MigratorConfig}; use crate::persisted_beacon_chain::PersistedBeaconChain; @@ -786,14 +786,14 @@ where let canonical_head = CanonicalHead::new(fork_choice, Arc::new(head_snapshot)); let beacon_chain = BeaconChain { - spec: self.spec, + spec: self.spec.clone(), config: self.chain_config, store, task_executor: self .task_executor .ok_or("Cannot build without task executor")?, store_migrator, - slot_clock, + slot_clock: slot_clock.clone(), op_pool: self.op_pool.ok_or("Cannot build without op pool")?, // TODO: allow for persisting and loading the pool from disk. naive_aggregation_pool: <_>::default(), @@ -853,7 +853,11 @@ where slasher: self.slasher.clone(), validator_monitor: RwLock::new(validator_monitor), //TODO(sean) should we move kzg solely to the da checker? - data_availability_checker: DataAvailabilityChecker::new(kzg.clone()), + data_availability_checker: DataAvailabilityChecker::new( + slot_clock, + kzg.clone(), + self.spec, + ), proposal_blob_cache: BlobCache::default(), kzg, }; diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs new file mode 100644 index 00000000000..42e97a974e4 --- /dev/null +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -0,0 +1,516 @@ +use crate::blob_verification::{ + verify_kzg_for_blob, verify_kzg_for_blob_list, AsBlock, BlockWrapper, GossipVerifiedBlob, + KzgVerifiedBlob, KzgVerifiedBlobList, MaybeAvailableBlock, +}; +use crate::block_verification::{AvailabilityPendingExecutedBlock, AvailableExecutedBlock}; + +use kzg::Error as KzgError; +use kzg::Kzg; +use parking_lot::{Mutex, RwLock}; +use slot_clock::SlotClock; +use ssz_types::{Error, VariableList}; +use state_processing::per_block_processing::eip4844::eip4844::verify_kzg_commitments_against_transactions; +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::sync::Arc; +use types::beacon_block_body::KzgCommitments; +use types::blob_sidecar::{BlobIdentifier, BlobSidecar}; +use types::consts::eip4844::MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS; +use types::{ + BeaconBlockRef, BlobSidecarList, ChainSpec, Epoch, EthSpec, ExecPayload, FullPayload, Hash256, + SignedBeaconBlock, SignedBeaconBlockHeader, Slot, +}; + +#[derive(Debug)] +pub enum AvailabilityCheckError { + DuplicateBlob(Hash256), + Kzg(KzgError), + KzgVerificationFailed, + KzgNotInitialized, + SszTypes(ssz_types::Error), + MissingBlobs, + NumBlobsMismatch { + num_kzg_commitments: usize, + num_blobs: usize, + }, + TxKzgCommitmentMismatch, + KzgCommitmentMismatch { + blob_index: u64, + }, + Pending, + IncorrectFork, +} + +impl From for AvailabilityCheckError { + fn from(value: Error) -> Self { + Self::SszTypes(value) + } +} + +/// This cache contains +/// - blobs that have been gossip verified +/// - commitments for blocks that have been gossip verified, but the commitments themselves +/// have not been verified against blobs +/// - blocks that have been fully verified and only require a data availability check +pub struct DataAvailabilityChecker { + rpc_blob_cache: RwLock>>>, + gossip_blob_cache: Mutex>>, + slot_clock: S, + kzg: Option>, + spec: ChainSpec, +} + +struct GossipBlobCache { + verified_blobs: Vec>, + executed_block: Option>, +} + +pub enum Availability { + PendingBlobs(Vec), + PendingBlock(Hash256), + Available(Box>), +} + +impl DataAvailabilityChecker { + pub fn new(slot_clock: S, kzg: Option>, spec: ChainSpec) -> Self { + Self { + rpc_blob_cache: <_>::default(), + gossip_blob_cache: <_>::default(), + slot_clock, + kzg, + spec, + } + } + + /// This first validate the KZG commitments included in the blob sidecar. + /// Check if we've cached other blobs for this block. If it completes a set and we also + /// have a block cached, return the Availability variant triggering block import. + /// Otherwise cache the blob sidecar. + /// + /// This should only accept gossip verified blobs, so we should not have to worry about dupes. + pub fn put_gossip_blob( + &self, + verified_blob: GossipVerifiedBlob, + ) -> Result, AvailabilityCheckError> { + let block_root = verified_blob.block_root(); + + let kzg_verified_blob = if let Some(kzg) = self.kzg.as_ref() { + verify_kzg_for_blob(verified_blob, kzg)? + } else { + return Err(AvailabilityCheckError::KzgNotInitialized); + }; + + //TODO(sean) can we just use a referece to the blob here? + let blob = kzg_verified_blob.clone_blob(); + + // check if we have a block + // check if the complete set matches the block + // verify, otherwise cache + + let mut blob_cache = self.gossip_blob_cache.lock(); + + // Gossip cache. + let availability = match blob_cache.entry(blob.block_root) { + Entry::Occupied(mut occupied_entry) => { + // All blobs reaching this cache should be gossip verified and gossip verification + // should filter duplicates, as well as validate indices. + let cache = occupied_entry.get_mut(); + + cache + .verified_blobs + .insert(blob.index as usize, kzg_verified_blob); + + if let Some(executed_block) = cache.executed_block.take() { + self.check_block_availability_or_cache(cache, executed_block)? + } else { + Availability::PendingBlock(block_root) + } + } + Entry::Vacant(vacant_entry) => { + let block_root = kzg_verified_blob.block_root(); + vacant_entry.insert(GossipBlobCache { + verified_blobs: vec![kzg_verified_blob], + executed_block: None, + }); + Availability::PendingBlock(block_root) + } + }; + + drop(blob_cache); + + // RPC cache. + self.rpc_blob_cache.write().insert(blob.id(), blob.clone()); + + Ok(availability) + } + + /// Check if we have all the blobs for a block. If we do, return the Availability variant that + /// triggers import of the block. + pub fn put_pending_executed_block( + &self, + executed_block: AvailabilityPendingExecutedBlock, + ) -> Result, AvailabilityCheckError> { + let mut guard = self.gossip_blob_cache.lock(); + let entry = guard.entry(executed_block.import_data.block_root); + + let availability = match entry { + Entry::Occupied(mut occupied_entry) => { + let cache: &mut GossipBlobCache = occupied_entry.get_mut(); + + self.check_block_availability_or_cache(cache, executed_block)? + } + Entry::Vacant(vacant_entry) => { + let kzg_commitments_len = executed_block.block.kzg_commitments()?.len(); + let mut blob_ids = Vec::with_capacity(kzg_commitments_len); + for i in 0..kzg_commitments_len { + blob_ids.push(BlobIdentifier { + block_root: executed_block.import_data.block_root, + index: i as u64, + }); + } + + vacant_entry.insert(GossipBlobCache { + verified_blobs: vec![], + executed_block: Some(executed_block), + }); + + Availability::PendingBlobs(blob_ids) + } + }; + + Ok(availability) + } + + fn check_block_availability_or_cache( + &self, + cache: &mut GossipBlobCache, + executed_block: AvailabilityPendingExecutedBlock, + ) -> Result, AvailabilityCheckError> { + let AvailabilityPendingExecutedBlock { + block, + import_data, + payload_verification_outcome, + } = executed_block; + let kzg_commitments_len = block.kzg_commitments()?.len(); + let verified_commitments_len = cache.verified_blobs.len(); + if kzg_commitments_len == verified_commitments_len { + //TODO(sean) can we remove this clone + let blobs = cache.verified_blobs.clone(); + let available_block = self.make_available(block, blobs)?; + Ok(Availability::Available(Box::new( + AvailableExecutedBlock::new( + available_block, + import_data, + payload_verification_outcome, + ), + ))) + } else { + let mut missing_blobs = Vec::with_capacity(kzg_commitments_len); + for i in 0..kzg_commitments_len { + if cache.verified_blobs.get(i).is_none() { + missing_blobs.push(BlobIdentifier { + block_root: import_data.block_root, + index: i as u64, + }) + } + } + + let _ = cache + .executed_block + .insert(AvailabilityPendingExecutedBlock::new( + block, + import_data, + payload_verification_outcome, + )); + + Ok(Availability::PendingBlobs(missing_blobs)) + } + } + + /// Checks if a block is available, returns a MaybeAvailableBlock enum that may include the fully + /// available block. + pub fn check_availability( + &self, + block: BlockWrapper, + ) -> Result, AvailabilityCheckError> { + match block { + BlockWrapper::Block(block) => self.check_availability_without_blobs(block), + BlockWrapper::BlockAndBlobs(block, blob_list) => { + let kzg = self + .kzg + .as_ref() + .ok_or(AvailabilityCheckError::KzgNotInitialized)?; + let verified_blobs = verify_kzg_for_blob_list(blob_list, kzg)?; + + Ok(MaybeAvailableBlock::Available( + self.check_availability_with_blobs(block, verified_blobs)?, + )) + } + } + } + + /// Checks if a block is available, returning an error if the block is not immediately available. + /// Does not access the gossip cache. + pub fn try_check_availability( + &self, + block: BlockWrapper, + ) -> Result, AvailabilityCheckError> { + match block { + BlockWrapper::Block(block) => { + let blob_requirements = self.get_blob_requirements(&block)?; + let blobs = match blob_requirements { + BlobRequirements::EmptyBlobs => VerifiedBlobs::EmptyBlobs, + BlobRequirements::NotRequired => VerifiedBlobs::NotRequired, + BlobRequirements::PreEip4844 => VerifiedBlobs::PreEip4844, + BlobRequirements::Required => return Err(AvailabilityCheckError::MissingBlobs), + }; + Ok(AvailableBlock { block, blobs }) + } + BlockWrapper::BlockAndBlobs(_, _) => Err(AvailabilityCheckError::Pending), + } + } + + /// Verifies a block against a set of KZG verified blobs. Returns an AvailableBlock if block's + /// commitments are consistent with the provided verified blob commitments. + pub fn check_availability_with_blobs( + &self, + block: Arc>, + blobs: KzgVerifiedBlobList, + ) -> Result, AvailabilityCheckError> { + match self.check_availability_without_blobs(block)? { + MaybeAvailableBlock::Available(block) => Ok(block), + MaybeAvailableBlock::AvailabilityPending(pending_block) => { + self.make_available(pending_block, blobs) + } + } + } + + /// Verifies a block as much as possible, returning a MaybeAvailableBlock enum that may include + /// an AvailableBlock if no blobs are required. Otherwise this will return an AvailabilityPendingBlock. + pub fn check_availability_without_blobs( + &self, + block: Arc>, + ) -> Result, AvailabilityCheckError> { + let blob_requirements = self.get_blob_requirements(&block)?; + let blobs = match blob_requirements { + BlobRequirements::EmptyBlobs => VerifiedBlobs::EmptyBlobs, + BlobRequirements::NotRequired => VerifiedBlobs::NotRequired, + BlobRequirements::PreEip4844 => VerifiedBlobs::PreEip4844, + BlobRequirements::Required => { + return Ok(MaybeAvailableBlock::AvailabilityPending( + AvailabilityPendingBlock { block }, + )) + } + }; + Ok(MaybeAvailableBlock::Available(AvailableBlock { + block, + blobs, + })) + } + + /// Verifies an AvailabilityPendingBlock against a set of KZG verified blobs. + /// This does not check whether a block *should* have blobs, these checks should must have been + /// completed when producing the AvailabilityPendingBlock. + pub fn make_available( + &self, + block: AvailabilityPendingBlock, + blobs: KzgVerifiedBlobList, + ) -> Result, AvailabilityCheckError> { + let block_kzg_commitments = block.kzg_commitments()?; + if blobs.len() != block_kzg_commitments.len() { + return Err(AvailabilityCheckError::NumBlobsMismatch { + num_kzg_commitments: block_kzg_commitments.len(), + num_blobs: blobs.len(), + }); + } + + for (block_commitment, blob) in block_kzg_commitments.iter().zip(blobs.iter()) { + if *block_commitment != blob.kzg_commitment() { + return Err(AvailabilityCheckError::KzgCommitmentMismatch { + blob_index: blob.as_blob().index, + }); + } + } + + let blobs = VariableList::new(blobs.into_iter().map(|blob| blob.to_blob()).collect())?; + + Ok(AvailableBlock { + block: block.block, + blobs: VerifiedBlobs::Available(blobs), + }) + } + + /// Determines the blob requirements for a block. Answers the question: "Does this block require + /// blobs?". + fn get_blob_requirements( + &self, + block: &Arc>>, + ) -> Result { + let verified_blobs = if let (Ok(block_kzg_commitments), Ok(payload)) = ( + block.message().body().blob_kzg_commitments(), + block.message().body().execution_payload(), + ) { + if let Some(transactions) = payload.transactions() { + let verified = verify_kzg_commitments_against_transactions::( + transactions, + block_kzg_commitments, + ) + .map_err(|_| AvailabilityCheckError::TxKzgCommitmentMismatch)?; + if !verified { + return Err(AvailabilityCheckError::TxKzgCommitmentMismatch); + } + } + + if self.da_check_required(block.epoch()) { + if block_kzg_commitments.is_empty() { + BlobRequirements::EmptyBlobs + } else { + BlobRequirements::Required + } + } else { + BlobRequirements::NotRequired + } + } else { + BlobRequirements::PreEip4844 + }; + Ok(verified_blobs) + } + + /// The epoch at which we require a data availability check in block processing. + /// `None` if the `Eip4844` fork is disabled. + pub fn data_availability_boundary(&self) -> Option { + self.spec.eip4844_fork_epoch.and_then(|fork_epoch| { + self.slot_clock + .now() + .map(|slot| slot.epoch(T::slots_per_epoch())) + .map(|current_epoch| { + std::cmp::max( + fork_epoch, + current_epoch.saturating_sub(*MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS), + ) + }) + }) + } + + /// Returns true if the given epoch lies within the da boundary and false otherwise. + pub fn da_check_required(&self, block_epoch: Epoch) -> bool { + self.data_availability_boundary() + .map_or(false, |da_epoch| block_epoch >= da_epoch) + } +} + +pub enum BlobRequirements { + Required, + /// This block is from outside the data availability boundary so doesn't require + /// a data availability check. + NotRequired, + /// The block's `kzg_commitments` field is empty so it does not contain any blobs. + EmptyBlobs, + /// This is a block prior to the 4844 fork, so doesn't require any blobs + PreEip4844, +} + +#[derive(Clone, Debug, PartialEq)] +pub struct AvailabilityPendingBlock { + block: Arc>, +} + +impl AvailabilityPendingBlock { + pub fn to_block(self) -> Arc> { + self.block + } + pub fn as_block(&self) -> &SignedBeaconBlock { + &self.block + } + pub fn block_cloned(&self) -> Arc> { + self.block.clone() + } + pub fn kzg_commitments(&self) -> Result<&KzgCommitments, AvailabilityCheckError> { + self.block + .message() + .body() + .blob_kzg_commitments() + .map_err(|_| AvailabilityCheckError::IncorrectFork) + } +} + +#[derive(Clone, Debug, PartialEq)] +pub struct AvailableBlock { + block: Arc>, + blobs: VerifiedBlobs, +} + +impl AvailableBlock { + pub fn block(&self) -> &SignedBeaconBlock { + &self.block + } + + pub fn deconstruct(self) -> (Arc>, Option>) { + match self.blobs { + VerifiedBlobs::EmptyBlobs | VerifiedBlobs::NotRequired | VerifiedBlobs::PreEip4844 => { + (self.block, None) + } + VerifiedBlobs::Available(blobs) => (self.block, Some(blobs)), + } + } +} + +#[derive(Clone, Debug, PartialEq)] +pub enum VerifiedBlobs { + /// These blobs are available. + Available(BlobSidecarList), + /// This block is from outside the data availability boundary so doesn't require + /// a data availability check. + NotRequired, + /// The block's `kzg_commitments` field is empty so it does not contain any blobs. + EmptyBlobs, + /// This is a block prior to the 4844 fork, so doesn't require any blobs + PreEip4844, +} + +impl AsBlock for AvailableBlock { + fn slot(&self) -> Slot { + self.block.slot() + } + + fn epoch(&self) -> Epoch { + self.block.epoch() + } + + fn parent_root(&self) -> Hash256 { + self.block.parent_root() + } + + fn state_root(&self) -> Hash256 { + self.block.state_root() + } + + fn signed_block_header(&self) -> SignedBeaconBlockHeader { + self.block.signed_block_header() + } + + fn message(&self) -> BeaconBlockRef { + self.block.message() + } + + fn as_block(&self) -> &SignedBeaconBlock { + &self.block + } + + fn block_cloned(&self) -> Arc> { + self.block.clone() + } + + fn canonical_root(&self) -> Hash256 { + self.block.canonical_root() + } + + fn into_block_wrapper(self) -> BlockWrapper { + let (block, blobs_opt) = self.deconstruct(); + if let Some(blobs) = blobs_opt { + BlockWrapper::BlockAndBlobs(block, blobs) + } else { + BlockWrapper::Block(block) + } + } +} diff --git a/beacon_node/beacon_chain/src/early_attester_cache.rs b/beacon_node/beacon_chain/src/early_attester_cache.rs index 7f32595dc34..082c2242c84 100644 --- a/beacon_node/beacon_chain/src/early_attester_cache.rs +++ b/beacon_node/beacon_chain/src/early_attester_cache.rs @@ -1,4 +1,4 @@ -use crate::blob_verification::AvailableBlock; +use crate::data_availability_checker::AvailableBlock; use crate::{ attester_cache::{CommitteeLengths, Error}, metrics, diff --git a/beacon_node/beacon_chain/src/gossip_blob_cache.rs b/beacon_node/beacon_chain/src/gossip_blob_cache.rs deleted file mode 100644 index 673fd6c9d4c..00000000000 --- a/beacon_node/beacon_chain/src/gossip_blob_cache.rs +++ /dev/null @@ -1,229 +0,0 @@ -use crate::blob_verification::{AsBlock, AvailableBlock, BlockWrapper}; -use crate::block_verification::ExecutedBlock; -use crate::kzg_utils::validate_blob; -use kzg::Error as KzgError; -use kzg::Kzg; -use parking_lot::{Mutex, RwLock}; -use ssz_types::Error; -use std::collections::hash_map::Entry; -use std::collections::HashMap; -use std::sync::Arc; -use types::blob_sidecar::{BlobIdentifier, BlobSidecar}; -use types::{Epoch, EthSpec, Hash256}; - -#[derive(Debug)] -pub enum AvailabilityCheckError { - DuplicateBlob(Hash256), - Kzg(KzgError), - SszTypes(ssz_types::Error), - InvalidBlockOrBlob(String), -} - -impl From for AvailabilityCheckError { - fn from(value: Error) -> Self { - Self::SszTypes(value) - } -} - -/// This cache contains -/// - blobs that have been gossip verified -/// - commitments for blocks that have been gossip verified, but the commitments themselves -/// have not been verified against blobs -/// - blocks that have been fully verified and only require a data availability check -pub struct DataAvailabilityChecker { - rpc_blob_cache: RwLock>>>, - gossip_blob_cache: Mutex>>, - kzg: Option>, -} - -pub enum Availability { - PendingBlobs(Vec), - PendingBlock(Hash256), - Available(Box>), -} - -struct GossipBlobCache { - verified_blobs: Vec>>, - executed_block: Option>, -} - -impl DataAvailabilityChecker { - pub fn new(kzg: Option>) -> Self { - Self { - rpc_blob_cache: <_>::default(), - gossip_blob_cache: <_>::default(), - kzg, - } - } - - /// When we receive a blob check if we've cached it. If it completes a set and we have the - /// corresponding commitments, verify the commitments. If it completes a set and we have a block - /// cached, verify the block and import it. - /// - /// This should only accept gossip verified blobs, so we should not have to worry about dupes. - // return an enum here that may include the full block - pub fn put_blob( - &self, - blob: Arc>, - ) -> Result, AvailabilityCheckError> { - let verified = if let Some(kzg) = self.kzg.as_ref() { - validate_blob::(kzg, blob.blob.clone(), blob.kzg_commitment, blob.kzg_proof) - .map_err(AvailabilityCheckError::Kzg)? - } else { - false - // error wrong fork - }; - - // TODO(remove clones) - - if verified { - let mut blob_cache = self.gossip_blob_cache.lock(); - - // Gossip cache. - blob_cache - .entry(blob.block_root) - .and_modify(|inner| { - // All blobs reaching this cache should be gossip verified and gossip verification - // should filter duplicates, as well as validate indices. - inner - .verified_blobs - .insert(blob.index as usize, blob.clone()); - - if let Some(executed_block) = inner.executed_block.take() { - let verified_commitments: Vec<_> = inner - .verified_blobs - .iter() - .map(|blob| blob.kzg_commitment) - .collect(); - if verified_commitments - == executed_block - .block - .as_block() - .message_eip4844() - .unwrap() //TODO(sean) errors - .body - .blob_kzg_commitments - .clone() - .to_vec() - { - // send to reprocessing queue ? - //TODO(sean) try_send? - //TODO(sean) errors - } else { - let _ = inner.executed_block.insert(executed_block); - } - } - }) - .or_insert(GossipBlobCache { - verified_blobs: vec![blob.clone()], - executed_block: None, - }); - - drop(blob_cache); - - // RPC cache. - self.rpc_blob_cache.write().insert(blob.id(), blob.clone()); - } - - Ok(Availability::PendingBlobs(vec![])) - } - - // return an enum here that may include the full block - pub fn check_block_availability( - &self, - executed_block: ExecutedBlock, - da_check_fn: impl FnOnce(Epoch) -> bool, - ) -> Result, AvailabilityCheckError> { - let block_clone = executed_block.block.clone(); - - let availability = match block_clone { - BlockWrapper::Available(_) => Availability::Available(Box::new(executed_block)), - BlockWrapper::AvailabilityPending(block) => { - if let Ok(kzg_commitments) = block.message().body().blob_kzg_commitments() { - // first check if the blockwrapper contains blobs, if so, use those - - let mut guard = self.gossip_blob_cache.lock(); - let entry = guard.entry(executed_block.block_root); - - match entry { - Entry::Occupied(mut occupied_entry) => { - let cache: &mut GossipBlobCache = occupied_entry.get_mut(); - - let verified_commitments: Vec<_> = cache - .verified_blobs - .iter() - .map(|blob| blob.kzg_commitment) - .collect(); - if verified_commitments == kzg_commitments.clone().to_vec() { - let removed: GossipBlobCache = occupied_entry.remove(); - - let ExecutedBlock { - block: _, - block_root, - state, - parent_block, - parent_eth1_finalization_data, - confirmed_state_roots, - consensus_context, - payload_verification_outcome, - } = executed_block; - - let available_block = - AvailableBlock::new(block, removed.verified_blobs, da_check_fn) - .map_err(AvailabilityCheckError::InvalidBlockOrBlob)? - .into(); - - let available_executed = ExecutedBlock { - block: available_block, - block_root, - state, - parent_block, - parent_eth1_finalization_data, - confirmed_state_roots, - consensus_context, - payload_verification_outcome, - }; - Availability::Available(Box::new(available_executed)) - } else { - let mut missing_blobs = Vec::with_capacity(kzg_commitments.len()); - for i in 0..kzg_commitments.len() { - if cache.verified_blobs.get(i).is_none() { - missing_blobs.push(BlobIdentifier { - block_root: executed_block.block_root, - index: i as u64, - }) - } - } - - //TODO(sean) add a check that missing blobs > 0 - - let _ = cache.executed_block.insert(executed_block); - // log that we cached the block? - Availability::PendingBlobs(missing_blobs) - } - } - Entry::Vacant(vacant_entry) => { - let mut blob_ids = Vec::with_capacity(kzg_commitments.len()); - for i in 0..kzg_commitments.len() { - blob_ids.push(BlobIdentifier { - block_root: executed_block.block_root, - index: i as u64, - }); - } - - vacant_entry.insert(GossipBlobCache { - verified_blobs: vec![], - executed_block: Some(executed_block), - }); - - Availability::PendingBlobs(blob_ids) - } - } - } else { - Availability::Available(Box::new(executed_block)) - } - } - }; - Ok(availability) - } -} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 1a0247f99a6..a790dd3f672 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -15,6 +15,7 @@ pub mod builder; pub mod canonical_head; pub mod capella_readiness; pub mod chain_config; +pub mod data_availability_checker; mod early_attester_cache; mod errors; pub mod eth1_chain; @@ -23,7 +24,6 @@ pub mod events; pub mod execution_payload; pub mod fork_choice_signal; pub mod fork_revert; -pub mod gossip_blob_cache; mod head_tracker; pub mod historical_blocks; pub mod kzg_utils; diff --git a/beacon_node/beacon_chain/tests/attestation_production.rs b/beacon_node/beacon_chain/tests/attestation_production.rs index 9cfff81c388..feea5c7218b 100644 --- a/beacon_node/beacon_chain/tests/attestation_production.rs +++ b/beacon_node/beacon_chain/tests/attestation_production.rs @@ -1,9 +1,7 @@ #![cfg(not(debug_assertions))] -use beacon_chain::{ - blob_verification::{BlockWrapper, IntoAvailableBlock}, - test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy}, -}; +use beacon_chain::blob_verification::BlockWrapper; +use beacon_chain::test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy}; use beacon_chain::{StateSkipConfig, WhenSlotSkipped}; use lazy_static::lazy_static; use std::sync::Arc; @@ -135,6 +133,10 @@ async fn produces_attestations() { assert_eq!(data.target.root, target_root, "bad target root"); let block_wrapper: BlockWrapper = Arc::new(block.clone()).into(); + let available_block = chain + .data_availability_checker + .try_check_availability(block_wrapper) + .unwrap(); let early_attestation = { let proto_block = chain @@ -146,9 +148,7 @@ async fn produces_attestations() { .early_attester_cache .add_head_block( block_root, - block_wrapper - .into_available_block(block_root, chain) - .expect("should wrap into available block"), + available_block, proto_block, &state, &chain.spec, @@ -199,18 +199,19 @@ async fn early_attester_cache_old_request() { .get_block(&head.beacon_block_root) .unwrap(); - let block: BlockWrapper = head.beacon_block.clone().into(); + let block_wrapper: BlockWrapper = head.beacon_block.clone().into(); + let available_block = harness + .chain + .data_availability_checker + .try_check_availability(block_wrapper) + .unwrap(); - let chain = &harness.chain; harness .chain .early_attester_cache .add_head_block( head.beacon_block_root, - block - .clone() - .into_available_block(head.beacon_block_root, &chain) - .expect("should wrap into available block"), + available_block, head_proto_block, &head.beacon_state, &harness.chain.spec, diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 8de906afd97..88364d6ff5f 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -1,7 +1,8 @@ #![cfg(not(debug_assertions))] +use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::{ - blob_verification::{AsBlock, BlockWrapper}, + blob_verification::AsBlock, test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType}, }; use beacon_chain::{BeaconSnapshot, BlockError, ChainSegmentResult, NotifyExecutionLayer}; diff --git a/beacon_node/beacon_chain/tests/payload_invalidation.rs b/beacon_node/beacon_chain/tests/payload_invalidation.rs index d0f81652bbd..dbb2ebfaea5 100644 --- a/beacon_node/beacon_chain/tests/payload_invalidation.rs +++ b/beacon_node/beacon_chain/tests/payload_invalidation.rs @@ -702,6 +702,8 @@ async fn invalidates_all_descendants() { NotifyExecutionLayer::Yes, ) .await + .unwrap() + .try_into() .unwrap(); rig.recompute_head().await; @@ -799,6 +801,8 @@ async fn switches_heads() { NotifyExecutionLayer::Yes, ) .await + .unwrap() + .try_into() .unwrap(); rig.recompute_head().await; diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index 384fcbe5db6..a2308061d66 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -681,19 +681,20 @@ async fn run_skip_slot_test(skip_slots: u64) { Slot::new(0) ); - assert_eq!( - harness_b - .chain - .process_block( - harness_a.chain.head_snapshot().beacon_block_root, - harness_a.chain.head_snapshot().beacon_block.clone(), - CountUnrealized::True, - NotifyExecutionLayer::Yes, - ) - .await - .unwrap(), - harness_a.chain.head_snapshot().beacon_block_root - ); + let status = harness_b + .chain + .process_block( + harness_a.chain.head_snapshot().beacon_block_root, + harness_a.chain.head_snapshot().beacon_block.clone(), + CountUnrealized::True, + NotifyExecutionLayer::Yes, + ) + .await + .unwrap(); + + let root: Hash256 = status.try_into().unwrap(); + + assert_eq!(root, harness_a.chain.head_snapshot().beacon_block_root); harness_b.chain.recompute_head_at_current_slot().await; diff --git a/beacon_node/http_api/src/build_block_contents.rs b/beacon_node/http_api/src/build_block_contents.rs index 6512197ef66..d40fef1d908 100644 --- a/beacon_node/http_api/src/build_block_contents.rs +++ b/beacon_node/http_api/src/build_block_contents.rs @@ -1,7 +1,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProductionError}; -use eth2::types::BlockContents; +use eth2::types::{BeaconBlockAndBlobSidecars, BlockContents}; use std::sync::Arc; -use types::{AbstractExecPayload, BeaconBlock, BeaconBlockAndBlobSidecars, ForkName}; +use types::{AbstractExecPayload, BeaconBlock, ForkName}; type Error = warp::reject::Rejection; diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index a302194241b..c470686ad7f 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -1,10 +1,10 @@ use crate::metrics; -use beacon_chain::blob_verification::AsBlock; -use beacon_chain::blob_verification::BlockWrapper; + +use beacon_chain::blob_verification::{AsBlock, BlockWrapper}; use beacon_chain::validator_monitor::{get_block_delay_ms, timestamp_now}; use beacon_chain::{AvailabilityProcessingStatus, NotifyExecutionLayer}; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, CountUnrealized}; -use eth2::types::SignedBlockContents; +use eth2::types::{SignedBlockContents, VariableList}; use execution_layer::ProvenancedPayload; use lighthouse_network::PubsubMessage; use network::NetworkMessage; @@ -70,32 +70,28 @@ pub async fn publish_block( } SignedBeaconBlock::Eip4844(_) => { crate::publish_pubsub_message(network_tx, PubsubMessage::BeaconBlock(block.clone()))?; - if let Some(blobs) = maybe_blobs { - for (blob_index, blob) in blobs.into_iter().enumerate() { + if let Some(signed_blobs) = maybe_blobs { + for (blob_index, blob) in signed_blobs.clone().into_iter().enumerate() { crate::publish_pubsub_message( network_tx, PubsubMessage::BlobSidecar(Box::new((blob_index as u64, blob))), )?; } + let blobs_vec = signed_blobs.into_iter().map(|blob| blob.message).collect(); + let blobs = VariableList::new(blobs_vec).map_err(|e| { + warp_utils::reject::custom_server_error(format!("Invalid blobs length: {e:?}")) + })?; + BlockWrapper::BlockAndBlobs(block, blobs) + } else { + block.into() } - block.into() } }; // Determine the delay after the start of the slot, register it with metrics. - let available_block = match wrapped_block.clone().into_available_block() { - Some(available_block) => available_block, - None => { - error!( - log, - "Invalid block provided to HTTP API unavailable block"; //TODO(sean) probably want a real error here - ); - return Err(warp_utils::reject::broadcast_without_import( - "unavailable block".to_string(), - )); - } - }; - + let block_clone = wrapped_block.block_cloned(); + let slot = block_clone.message().slot(); + let proposer_index = block_clone.message().proposer_index(); match chain .process_block( block_root, @@ -111,14 +107,14 @@ pub async fn publish_block( "Valid block from HTTP API"; "block_delay" => ?delay, "root" => format!("{}", root), - "proposer_index" => available_block.message().proposer_index(), - "slot" => available_block.slot(), + "proposer_index" => proposer_index, + "slot" =>slot, ); // Notify the validator monitor. chain.validator_monitor.read().register_api_block( seen_timestamp, - available_block.message(), + block_clone.message(), root, &chain.slot_clock, ); @@ -134,7 +130,7 @@ pub async fn publish_block( late_block_logging( &chain, seen_timestamp, - available_block.message(), + block_clone.message(), root, "local", &log, @@ -166,7 +162,7 @@ pub async fn publish_block( log, "Block from HTTP API already known"; "block" => ?block_root, - "slot" => available_block.slot(), + "slot" => slot, ); Ok(()) } diff --git a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs index e344a61132b..c667c516d29 100644 --- a/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs +++ b/beacon_node/network/src/beacon_processor/work_reprocessing_queue.rs @@ -13,7 +13,8 @@ use super::MAX_SCHEDULED_WORK_QUEUE_LEN; use crate::metrics; use crate::sync::manager::BlockProcessType; -use beacon_chain::blob_verification::{AsBlock, BlockWrapper}; +use beacon_chain::blob_verification::AsBlock; +use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::{BeaconChainTypes, GossipVerifiedBlock, MAXIMUM_GOSSIP_CLOCK_DISPARITY}; use fnv::FnvHashMap; use futures::task::Poll; diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index f65691295cf..69e167b4d03 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -683,7 +683,7 @@ impl Worker { // TODO match self .chain - .process_blob(verified_blob.to_blob(), CountUnrealized::True) + .process_blob(verified_blob, CountUnrealized::True) .await { Ok(AvailabilityProcessingStatus::Imported(_hash)) => { diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index 542bd9af34a..09e98cb183c 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -7,7 +7,8 @@ use crate::beacon_processor::DuplicateCache; use crate::metrics; use crate::sync::manager::{BlockProcessType, SyncMessage}; use crate::sync::{BatchProcessResult, ChainId}; -use beacon_chain::blob_verification::{AsBlock, BlockWrapper}; +use beacon_chain::blob_verification::AsBlock; +use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::{AvailabilityProcessingStatus, CountUnrealized}; use beacon_chain::{ BeaconChainError, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError, diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 690d56644cd..77e659a2682 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -2,7 +2,8 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::time::Duration; -use beacon_chain::blob_verification::{AsBlock, BlockWrapper}; +use beacon_chain::blob_verification::AsBlock; +use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::{BeaconChainTypes, BlockError}; use fnv::FnvHashMap; use lighthouse_network::rpc::{RPCError, RPCResponseErrorCode}; diff --git a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs index b6de52d7059..f066191c00a 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_lookup.rs @@ -1,5 +1,6 @@ use super::RootBlockTuple; -use beacon_chain::blob_verification::{AsBlock, BlockWrapper}; +use beacon_chain::blob_verification::AsBlock; +use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::BeaconChainTypes; use lighthouse_network::PeerId; use store::Hash256; diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 02482d6192d..60911dbb395 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -1,5 +1,6 @@ use super::RootBlockTuple; -use beacon_chain::blob_verification::{AsBlock, BlockWrapper}; +use beacon_chain::blob_verification::AsBlock; +use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::get_block_root; use lighthouse_network::{rpc::BlocksByRootRequest, PeerId}; use rand::seq::IteratorRandom; diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index ad7cad32163..482bfab7083 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -42,7 +42,8 @@ use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEven use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::range_sync::ByRangeRequestType; -use beacon_chain::blob_verification::{AsBlock, BlockWrapper}; +use beacon_chain::blob_verification::AsBlock; +use beacon_chain::blob_verification::BlockWrapper; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState}; use futures::StreamExt; use lighthouse_network::rpc::methods::MAX_REQUEST_BLOCKS; @@ -743,7 +744,7 @@ impl SyncManager { self.block_lookups.single_block_lookup_response( id, peer_id, - maybe_block.map(BlockWrapper::AvailabilityPending), + maybe_block.map(BlockWrapper::Block), seen_timestamp, &mut self.network, ) @@ -758,7 +759,7 @@ impl SyncManager { BlockOrBlob::Block(maybe_block) => self.block_lookups.parent_lookup_response( id, peer_id, - maybe_block.map(BlockWrapper::AvailabilityPending), + maybe_block.map(BlockWrapper::Block), seen_timestamp, &mut self.network, ), diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index bc27ddb4743..543b3fda668 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -1396,3 +1396,32 @@ pub struct SignedBeaconBlockAndBlobSidecars, pub signed_blob_sidecars: SignedBlobSidecarList, } + +#[derive(Debug, Clone, Serialize, Deserialize, Encode)] +#[serde(bound = "T: EthSpec, Payload: AbstractExecPayload")] +pub struct BeaconBlockAndBlobSidecars> { + pub block: BeaconBlock, + pub blob_sidecars: BlobSidecarList, +} + +impl> ForkVersionDeserialize + for BeaconBlockAndBlobSidecars +{ + fn deserialize_by_fork<'de, D: serde::Deserializer<'de>>( + value: serde_json::value::Value, + fork_name: ForkName, + ) -> Result { + #[derive(Deserialize)] + #[serde(bound = "T: EthSpec")] + struct Helper { + block: serde_json::Value, + blob_sidecars: BlobSidecarList, + } + let helper: Helper = serde_json::from_value(value).map_err(serde::de::Error::custom)?; + + Ok(Self { + block: BeaconBlock::deserialize_by_fork::<'de, D>(helper.block, fork_name)?, + blob_sidecars: helper.blob_sidecars, + }) + } +} diff --git a/consensus/types/src/beacon_block_and_blob_sidecars.rs b/consensus/types/src/beacon_block_and_blob_sidecars.rs deleted file mode 100644 index 78e70419614..00000000000 --- a/consensus/types/src/beacon_block_and_blob_sidecars.rs +++ /dev/null @@ -1,37 +0,0 @@ -use crate::{ - AbstractExecPayload, BeaconBlock, BlobSidecarList, EthSpec, ForkName, ForkVersionDeserialize, -}; -use derivative::Derivative; -use serde_derive::{Deserialize, Serialize}; -use ssz_derive::Encode; -use tree_hash_derive::TreeHash; - -#[derive(Debug, Clone, Serialize, Deserialize, Encode, TreeHash, Derivative)] -#[derivative(PartialEq, Hash(bound = "T: EthSpec"))] -#[serde(bound = "T: EthSpec, Payload: AbstractExecPayload")] -pub struct BeaconBlockAndBlobSidecars> { - pub block: BeaconBlock, - pub blob_sidecars: BlobSidecarList, -} - -impl> ForkVersionDeserialize - for BeaconBlockAndBlobSidecars -{ - fn deserialize_by_fork<'de, D: serde::Deserializer<'de>>( - value: serde_json::value::Value, - fork_name: ForkName, - ) -> Result { - #[derive(Deserialize)] - #[serde(bound = "T: EthSpec")] - struct Helper { - block: serde_json::Value, - blob_sidecars: BlobSidecarList, - } - let helper: Helper = serde_json::from_value(value).map_err(serde::de::Error::custom)?; - - Ok(Self { - block: BeaconBlock::deserialize_by_fork::<'de, D>(helper.block, fork_name)?, - blob_sidecars: helper.blob_sidecars, - }) - } -} diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index c8664426057..14f3ff3560b 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -97,7 +97,6 @@ pub mod slot_data; #[cfg(feature = "sqlite")] pub mod sqlite; -pub mod beacon_block_and_blob_sidecars; pub mod blob_sidecar; pub mod signed_blob; pub mod transaction; @@ -113,7 +112,6 @@ pub use crate::beacon_block::{ BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockCapella, BeaconBlockEip4844, BeaconBlockMerge, BeaconBlockRef, BeaconBlockRefMut, BlindedBeaconBlock, EmptyBlock, }; -pub use crate::beacon_block_and_blob_sidecars::BeaconBlockAndBlobSidecars; pub use crate::beacon_block_body::{ BeaconBlockBody, BeaconBlockBodyAltair, BeaconBlockBodyBase, BeaconBlockBodyCapella, BeaconBlockBodyEip4844, BeaconBlockBodyMerge, BeaconBlockBodyRef, BeaconBlockBodyRefMut, diff --git a/consensus/types/src/signed_beacon_block.rs b/consensus/types/src/signed_beacon_block.rs index 52800f07821..301cfd5f878 100644 --- a/consensus/types/src/signed_beacon_block.rs +++ b/consensus/types/src/signed_beacon_block.rs @@ -35,14 +35,6 @@ impl From for Hash256 { } } -#[derive(Debug)] -pub enum BlobReconstructionError { - /// No blobs for the specified block where we would expect blobs. - UnavailableBlobs, - /// Blobs provided for a pre-Eip4844 fork. - InconsistentFork, -} - /// A `BeaconBlock` and a signature from its proposer. #[superstruct( variants(Base, Altair, Merge, Capella, Eip4844),