Skip to content

Commit

Permalink
Partial processing (#4)
Browse files Browse the repository at this point in the history
* move beacon block and blob to eth2/types

* rename gossip blob cache to data availability checker

* lots of changes

* fix some compilation issues

* fix compilation issues

* fix compilation issues

* fix compilation issues

* fix compilation issues

* fix compilation issues

* cargo fmt

* use a common data structure for block import types

* fix availability check on proposal import

* refactor the blob cache and split the block wrapper into two types

* add type conversion for signed block and block wrapper

* fix beacon chain tests and do some renaming, add some comments
  • Loading branch information
realbigsean authored Mar 24, 2023
1 parent 882110f commit aa10943
Show file tree
Hide file tree
Showing 25 changed files with 1,019 additions and 782 deletions.
259 changes: 121 additions & 138 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,26 @@ use crate::attester_cache::{AttesterCache, AttesterCacheKey};
use crate::beacon_proposer_cache::compute_proposer_duties_from_head;
use crate::beacon_proposer_cache::BeaconProposerCache;
use crate::blob_cache::BlobCache;
use crate::blob_verification::{
self, AsBlock, AvailableBlock, BlobError, BlockWrapper, GossipVerifiedBlob,
};
use crate::blob_verification::{self, AsBlock, BlobError, BlockWrapper, GossipVerifiedBlob};
use crate::block_times_cache::BlockTimesCache;
use crate::block_verification::POS_PANDA_BANNER;
use crate::block_verification::{
check_block_is_finalized_checkpoint_or_descendant, check_block_relevancy, get_block_root,
signature_verify_chain_segment, BlockError, ExecutedBlock, ExecutionPendingBlock,
GossipVerifiedBlock, IntoExecutionPendingBlock, POS_PANDA_BANNER,
signature_verify_chain_segment, AvailableExecutedBlock, BlockError, BlockImportData,
ExecutedBlock, ExecutionPendingBlock, GossipVerifiedBlock, IntoExecutionPendingBlock,
};
pub use crate::canonical_head::{CanonicalHead, CanonicalHeadRwLock};
use crate::chain_config::ChainConfig;
use crate::data_availability_checker::{
Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker,
};
use crate::early_attester_cache::EarlyAttesterCache;
use crate::errors::{BeaconChainError as Error, BlockProductionError};
use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend};
use crate::eth1_finalization_cache::{Eth1FinalizationCache, Eth1FinalizationData};
use crate::events::ServerSentEventHandler;
use crate::execution_payload::{get_execution_payload, NotifyExecutionLayer, PreparePayloadHandle};
use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult};
use crate::gossip_blob_cache::{Availability, AvailabilityCheckError, DataAvailabilityChecker};
use crate::head_tracker::HeadTracker;
use crate::historical_blocks::HistoricalBlockError;
use crate::kzg_utils;
Expand Down Expand Up @@ -185,7 +186,7 @@ pub enum WhenSlotSkipped {
Prev,
}

#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub enum AvailabilityProcessingStatus {
PendingBlobs(Vec<BlobIdentifier>),
PendingBlock(Hash256),
Expand All @@ -204,6 +205,17 @@ impl TryInto<SignedBeaconBlockHash> for AvailabilityProcessingStatus {
}
}

impl TryInto<Hash256> for AvailabilityProcessingStatus {
type Error = ();

fn try_into(self) -> Result<Hash256, Self::Error> {
match self {
AvailabilityProcessingStatus::Imported(hash) => Ok(hash),
_ => Err(()),
}
}
}

/// The result of a chain segment processing.
pub enum ChainSegmentResult<T: EthSpec> {
/// Processing this chain segment finished successfully.
Expand Down Expand Up @@ -455,7 +467,7 @@ pub struct BeaconChain<T: BeaconChainTypes> {
/// Provides monitoring of a set of explicitly defined validators.
pub validator_monitor: RwLock<ValidatorMonitor<T::EthSpec>>,
pub proposal_blob_cache: BlobCache<T::EthSpec>,
pub data_availability_checker: DataAvailabilityChecker<T::EthSpec>,
pub data_availability_checker: DataAvailabilityChecker<T::EthSpec, T::SlotClock>,
pub kzg: Option<Arc<Kzg>>,
}

Expand Down Expand Up @@ -2657,11 +2669,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

pub async fn process_blob(
self: &Arc<Self>,
blob: Arc<BlobSidecar<T::EthSpec>>,
blob: GossipVerifiedBlob<T::EthSpec>,
count_unrealized: CountUnrealized,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
self.check_availability_and_maybe_import(
|chain| chain.data_availability_checker.put_blob(blob),
|chain| chain.data_availability_checker.put_gossip_blob(blob),
count_unrealized,
)
.await
Expand Down Expand Up @@ -2704,24 +2716,29 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
notify_execution_layer,
)?;

// TODO(log required errors)
let executed_block = self
.clone()
.into_executed_block(execution_pending)
.await
.map_err(|e| self.handle_block_error(e))?;

self.check_availability_and_maybe_import(
|chain| {
chain
.data_availability_checker
.check_block_availability(executed_block, |epoch| {
chain.block_needs_da_check(epoch)
})
},
count_unrealized,
)
.await
match executed_block {
ExecutedBlock::Available(block) => {
self.import_available_block(Box::new(block), count_unrealized)
.await
}
ExecutedBlock::AvailabilityPending(block) => {
self.check_availability_and_maybe_import(
|chain| {
chain
.data_availability_checker
.put_pending_executed_block(block)
},
count_unrealized,
)
.await
}
}
}

/// Accepts a fully-verified block and awaits on it's payload verification handle to
Expand All @@ -2734,13 +2751,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
) -> Result<ExecutedBlock<T::EthSpec>, BlockError<T::EthSpec>> {
let ExecutionPendingBlock {
block,
block_root,
state,
parent_block,
confirmed_state_roots,
import_data,
payload_verification_handle,
parent_eth1_finalization_data,
consensus_context,
} = execution_pending_block;

let payload_verification_outcome = payload_verification_handle
Expand Down Expand Up @@ -2777,16 +2789,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.into_root()
);
}
Ok(ExecutedBlock {
Ok(ExecutedBlock::new(
block,
block_root,
state,
parent_block,
confirmed_state_roots,
parent_eth1_finalization_data,
consensus_context,
import_data,
payload_verification_outcome,
})
))
}

fn handle_block_error(&self, e: BlockError<T::EthSpec>) -> BlockError<T::EthSpec> {
Expand Down Expand Up @@ -2831,68 +2838,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let availability = cache_fn(self.clone())?;
match availability {
Availability::Available(block) => {
let ExecutedBlock {
block,
block_root,
state,
parent_block,
parent_eth1_finalization_data,
confirmed_state_roots,
consensus_context,
payload_verification_outcome,
} = *block;

let available_block = match block {
BlockWrapper::Available(block) => block,
BlockWrapper::AvailabilityPending(_) => {
todo!() // logic error
}
};

let slot = available_block.slot();

// import
let chain = self.clone();
let result = self
.spawn_blocking_handle(
move || {
chain.import_block(
available_block,
block_root,
state,
confirmed_state_roots,
payload_verification_outcome.payload_verification_status,
count_unrealized,
parent_block,
parent_eth1_finalization_data,
consensus_context,
)
},
"payload_verification_handle",
)
.await
.map_err(|e| {
let b = BlockError::from(e);
self.handle_block_error(b)
})?;

match result {
// The block was successfully verified and imported. Yay.
Ok(block_root) => {
trace!(
self.log,
"Beacon block imported";
"block_root" => ?block_root,
"block_slot" => slot,
);

// Increment the Prometheus counter for block processing successes.
metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES);

Ok(AvailabilityProcessingStatus::Imported(block_root))
}
Err(e) => Err(self.handle_block_error(e)),
}
self.import_available_block(block, count_unrealized).await
}
Availability::PendingBlock(block_root) => {
Ok(AvailabilityProcessingStatus::PendingBlock(block_root))
Expand All @@ -2903,6 +2849,72 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

async fn import_available_block(
self: &Arc<Self>,
block: Box<AvailableExecutedBlock<T::EthSpec>>,
count_unrealized: CountUnrealized,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let AvailableExecutedBlock {
block,
import_data,
payload_verification_outcome,
} = *block;

let BlockImportData {
block_root,
state,
parent_block,
parent_eth1_finalization_data,
confirmed_state_roots,
consensus_context,
} = import_data;

let slot = block.slot();

// import
let chain = self.clone();
let result = self
.spawn_blocking_handle(
move || {
chain.import_block(
block,
block_root,
state,
confirmed_state_roots,
payload_verification_outcome.payload_verification_status,
count_unrealized,
parent_block,
parent_eth1_finalization_data,
consensus_context,
)
},
"payload_verification_handle",
)
.await
.map_err(|e| {
let b = BlockError::from(e);
self.handle_block_error(b)
})?;

match result {
// The block was successfully verified and imported. Yay.
Ok(block_root) => {
trace!(
self.log,
"Beacon block imported";
"block_root" => ?block_root,
"block_slot" => slot,
);

// Increment the Prometheus counter for block processing successes.
metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES);

Ok(AvailabilityProcessingStatus::Imported(block_root))
}
Err(e) => Err(self.handle_block_error(e)),
}
}

/// Accepts a fully-verified and available block and imports it into the chain without performing any
/// additional verification.
///
Expand Down Expand Up @@ -2965,13 +2977,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let mut fork_choice = self.canonical_head.fork_choice_write_lock();

// Do not import a block that doesn't descend from the finalized root.
let signed_block = check_block_is_finalized_checkpoint_or_descendant(
self,
&fork_choice,
BlockWrapper::from(signed_block),
)?;
// TODO(pawan): fix this atrocity
let signed_block = signed_block.into_available_block().unwrap();
let signed_block =
check_block_is_finalized_checkpoint_or_descendant(self, &fork_choice, signed_block)?;
let block = signed_block.message();

// Register the new block with the fork choice service.
Expand Down Expand Up @@ -3092,27 +3099,17 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
ops.push(StoreOp::PutBlock(block_root, signed_block.clone()));
ops.push(StoreOp::PutState(block.state_root(), &state));

// Only consider blobs if the eip4844 fork is enabled.
if let Some(data_availability_boundary) = self.data_availability_boundary() {
let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());
let margin_epochs = self.store.get_config().blob_prune_margin_epochs;
let import_boundary = data_availability_boundary - margin_epochs;

// Only store blobs at the data availability boundary, minus any configured epochs
// margin, or younger (of higher epoch number).
if block_epoch >= import_boundary {
if let Some(blobs) = blobs {
if !blobs.is_empty() {
//FIXME(sean) using this for debugging for now
info!(
self.log, "Writing blobs to store";
"block_root" => ?block_root
);
ops.push(StoreOp::PutBlobs(block_root, blobs));
}
}
if let Some(blobs) = blobs {
if !blobs.is_empty() {
//FIXME(sean) using this for debugging for now
info!(
self.log, "Writing blobs to store";
"block_root" => ?block_root
);
ops.push(StoreOp::PutBlobs(block_root, blobs));
}
}

let txn_lock = self.store.hot_db.begin_rw_transaction();

if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) {
Expand Down Expand Up @@ -6180,20 +6177,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_or(false, |da_epoch| block_epoch >= da_epoch)
}

/// The epoch that is a data availability boundary, or the latest finalized epoch.
/// `None` if the `Eip4844` fork is disabled.
pub fn finalized_data_availability_boundary(&self) -> Option<Epoch> {
self.data_availability_boundary().map(|boundary| {
std::cmp::max(
boundary,
self.canonical_head
.cached_head()
.finalized_checkpoint()
.epoch,
)
})
}

/// Returns `true` if we are at or past the `Eip4844` fork. This will always return `false` if
/// the `Eip4844` fork is disabled.
pub fn is_data_availability_check_required(&self) -> Result<bool, Error> {
Expand Down
Loading

0 comments on commit aa10943

Please sign in to comment.