Skip to content

Commit

Permalink
Revert epoch parameter refactor.
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmygchen committed Jun 28, 2024
1 parent 876ea3b commit 4373a28
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 25 deletions.
2 changes: 1 addition & 1 deletion beacon_node/http_api/src/publish_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
}

if let Some(gossip_verified_data_columns) = gossip_verified_data_columns {
let custody_columns_indices = network_globals.custody_columns(&chain.spec);
let custody_columns_indices = network_globals.custody_columns(block.epoch(), &chain.spec);

let custody_columns = gossip_verified_data_columns
.into_iter()
Expand Down
9 changes: 5 additions & 4 deletions beacon_node/lighthouse_network/src/types/globals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use itertools::Itertools;
use parking_lot::RwLock;
use std::collections::HashSet;
use types::data_column_sidecar::ColumnIndex;
use types::{ChainSpec, DataColumnSubnetId, EthSpec};
use types::{ChainSpec, DataColumnSubnetId, Epoch, EthSpec};

pub struct NetworkGlobals<E: EthSpec> {
/// The current local ENR.
Expand Down Expand Up @@ -113,7 +113,7 @@ impl<E: EthSpec> NetworkGlobals<E> {
}

/// Compute custody data columns the node is assigned to custody.
pub fn custody_columns(&self, spec: &ChainSpec) -> Vec<ColumnIndex> {
pub fn custody_columns(&self, _epoch: Epoch, spec: &ChainSpec) -> Vec<ColumnIndex> {
let enr = self.local_enr();
let node_id = enr.node_id().raw().into();
let custody_subnet_count = enr.custody_subnet_count::<E>(spec);
Expand Down Expand Up @@ -178,7 +178,7 @@ impl<E: EthSpec> NetworkGlobals<E> {
mod test {
use super::*;
use std::str::FromStr;
use types::{EthSpec, MainnetEthSpec as E};
use types::{Epoch, EthSpec, MainnetEthSpec as E};

#[test]
fn test_custody_count_default() {
Expand All @@ -188,7 +188,8 @@ mod test {
/ spec.data_column_sidecar_subnet_count
* spec.custody_requirement;
let globals = NetworkGlobals::<E>::new_test_globals(vec![], &log);
let columns = globals.custody_columns(&spec);
let any_epoch = Epoch::new(0);
let columns = globals.custody_columns(any_epoch, &spec);
assert_eq!(
columns.len(),
default_custody_requirement_column_count as usize
Expand Down
8 changes: 4 additions & 4 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,10 +728,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
SyncMessage::SampleBlock(block_root, block_slot) => {
debug!(self.log, "Received SampleBlock message"; "block_root" => %block_root, "block_slot" => block_slot);
if let Some((requester, result)) = self
.sampling
.on_new_sample_request(block_root, &mut self.network)
debug!(self.log, "Received SampleBlock message"; "block_root" => %block_root);
if let Some((requester, result)) =
self.sampling
.on_new_sample_request(block_root, block_slot, &mut self.network)
{
self.on_sampling_result(requester, result)
}
Expand Down
22 changes: 13 additions & 9 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineStat
use fnv::FnvHashMap;
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, DataColumnsByRangeRequest};
use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError};
use lighthouse_network::{
Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request,
};
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request};
pub use requests::LookupVerifyError;
use slog::{debug, error, warn};
use slot_clock::SlotClock;
Expand All @@ -38,8 +36,7 @@ use std::time::Duration;
use tokio::sync::mpsc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{
BlobSidecar, ColumnIndex, DataColumnSidecar, EthSpec, Hash256,
SignedBeaconBlock, Slot,
BlobSidecar, ColumnIndex, DataColumnSidecar, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot,
};

pub mod custody;
Expand Down Expand Up @@ -238,7 +235,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}
}

pub fn get_custodial_peers(&self, column_index: ColumnIndex) -> Vec<PeerId> {
// TODO(das): epoch argument left here in case custody rotation is implemented
pub fn get_custodial_peers(&self, _epoch: Epoch, column_index: ColumnIndex) -> Vec<PeerId> {
self.network_globals()
.custody_peers_for_column(column_index, &self.chain.spec)
}
Expand Down Expand Up @@ -335,10 +333,12 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {

let expects_custody_columns = if matches!(batch_type, ByRangeRequestType::BlocksAndColumns)
{
let custody_indexes = self.network_globals().custody_columns(&self.chain.spec);
let custody_indexes = self
.network_globals()
.custody_columns(epoch, &self.chain.spec);

for column_index in &custody_indexes {
let custody_peer_ids = self.get_custodial_peers(*column_index);
let custody_peer_ids = self.get_custodial_peers(epoch, *column_index);
let Some(custody_peer) = custody_peer_ids.first().cloned() else {
// TODO(das): this will be pretty bad UX. To improve we should:
// - Attempt to fetch custody requests first, before requesting blocks
Expand Down Expand Up @@ -658,7 +658,11 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.imported_custody_column_indexes(&block_root)
.unwrap_or_default();

let custody_indexes_duty = self.network_globals().custody_columns(&self.chain.spec);
// TODO(das): figure out how to pass block.slot if we end up doing rotation
let block_epoch = Epoch::new(0);
let custody_indexes_duty = self
.network_globals()
.custody_columns(block_epoch, &self.chain.spec);

// Include only the blob indexes not yet imported (received through gossip)
let custody_indexes_to_fetch = custody_indexes_duty
Expand Down
7 changes: 5 additions & 2 deletions beacon_node/network/src/sync/network_context/custody.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use slog::{debug, warn};
use std::time::Duration;
use std::{collections::HashMap, marker::PhantomData, sync::Arc};
use types::EthSpec;
use types::{data_column_sidecar::ColumnIndex, DataColumnSidecar, Hash256};
use types::{data_column_sidecar::ColumnIndex, DataColumnSidecar, Epoch, Hash256};

use super::{LookupRequestResult, PeerGroup, ReqId, RpcResponseResult, SyncNetworkContext};

Expand All @@ -34,6 +34,7 @@ type DataColumnSidecarVec<E> = Vec<Arc<DataColumnSidecar<E>>>;

pub struct ActiveCustodyRequest<T: BeaconChainTypes> {
block_root: Hash256,
block_epoch: Epoch,
custody_id: CustodyId,
/// List of column indices this request needs to download to complete successfully
column_requests: FnvHashMap<ColumnIndex, ColumnRequest<T::EthSpec>>,
Expand Down Expand Up @@ -79,6 +80,8 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
) -> Self {
Self {
block_root,
// TODO(das): use actual epoch if there's rotation
block_epoch: Epoch::new(0),
custody_id,
column_requests: HashMap::from_iter(
column_indices
Expand Down Expand Up @@ -245,7 +248,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {

// TODO: When is a fork and only a subset of your peers know about a block, we should only
// query the peers on that fork. Should this case be handled? How to handle it?
let custodial_peers = cx.get_custodial_peers(*column_index);
let custodial_peers = cx.get_custodial_peers(self.block_epoch, *column_index);

// TODO(das): cache this computation in a OneCell or similar to prevent having to
// run it every loop
Expand Down
1 change: 0 additions & 1 deletion beacon_node/network/src/sync/range_sync/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
) -> ProcessingResult {
if let Some(batch) = self.batches.get_mut(&batch_id) {
let (request, batch_type) = batch.to_blocks_by_range_request();

match network.block_components_by_range_request(
peer,
batch_type,
Expand Down
17 changes: 13 additions & 4 deletions beacon_node/network/src/sync/sampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
collections::hash_map::Entry, collections::HashMap, marker::PhantomData, sync::Arc,
time::Duration,
};
use types::{data_column_sidecar::ColumnIndex, ChainSpec, DataColumnSidecar, Hash256};
use types::{data_column_sidecar::ColumnIndex, ChainSpec, DataColumnSidecar, Hash256, Slot};

pub type SamplingResult = Result<(), SamplingError>;

Expand Down Expand Up @@ -57,13 +57,15 @@ impl<T: BeaconChainTypes> Sampling<T> {
pub fn on_new_sample_request(
&mut self,
block_root: Hash256,
block_slot: Slot,
cx: &mut SyncNetworkContext<T>,
) -> Option<(SamplingRequester, SamplingResult)> {
let id = SamplingRequester::ImportedBlock(block_root);

let request = match self.requests.entry(id) {
Entry::Vacant(e) => e.insert(ActiveSamplingRequest::new(
block_root,
block_slot,
id,
&self.sampling_config,
self.log.clone(),
Expand Down Expand Up @@ -161,6 +163,7 @@ impl<T: BeaconChainTypes> Sampling<T> {

pub struct ActiveSamplingRequest<T: BeaconChainTypes> {
block_root: Hash256,
block_slot: Slot,
requester_id: SamplingRequester,
column_requests: FnvHashMap<ColumnIndex, ActiveColumnSampleRequest>,
column_shuffle: Vec<ColumnIndex>,
Expand Down Expand Up @@ -195,6 +198,7 @@ pub enum SamplingConfig {
impl<T: BeaconChainTypes> ActiveSamplingRequest<T> {
fn new(
block_root: Hash256,
block_slot: Slot,
requester_id: SamplingRequester,
sampling_config: &SamplingConfig,
log: slog::Logger,
Expand All @@ -208,6 +212,7 @@ impl<T: BeaconChainTypes> ActiveSamplingRequest<T> {

Self {
block_root,
block_slot,
requester_id,
column_requests: <_>::default(),
column_shuffle,
Expand Down Expand Up @@ -396,7 +401,7 @@ impl<T: BeaconChainTypes> ActiveSamplingRequest<T> {
.entry(column_index)
.or_insert(ActiveColumnSampleRequest::new(column_index));

if request.request(self.block_root, self.requester_id, cx)? {
if request.request(self.block_root, self.block_slot, self.requester_id, cx)? {
sent_requests += 1
}
}
Expand All @@ -422,7 +427,7 @@ mod request {
use beacon_chain::BeaconChainTypes;
use lighthouse_network::PeerId;
use std::collections::HashSet;
use types::{data_column_sidecar::ColumnIndex, Hash256};
use types::{data_column_sidecar::ColumnIndex, EthSpec, Hash256, Slot};

pub(crate) struct ActiveColumnSampleRequest {
column_index: ColumnIndex,
Expand Down Expand Up @@ -473,6 +478,7 @@ mod request {
pub(crate) fn request<T: BeaconChainTypes>(
&mut self,
block_root: Hash256,
block_slot: Slot,
requester: SamplingRequester,
cx: &mut SyncNetworkContext<T>,
) -> Result<bool, SamplingError> {
Expand All @@ -484,7 +490,10 @@ mod request {

// TODO: When is a fork and only a subset of your peers know about a block, sampling should only
// be queried on the peers on that fork. Should this case be handled? How to handle it?
let peer_ids = cx.get_custodial_peers(self.column_index);
let peer_ids = cx.get_custodial_peers(
block_slot.epoch(<T::EthSpec as EthSpec>::slots_per_epoch()),
self.column_index,
);

// TODO(das) randomize custodial peer and avoid failing peers
if let Some(peer_id) = peer_ids.first().cloned() {
Expand Down
1 change: 1 addition & 0 deletions consensus/types/src/data_column_subnet_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ impl DataColumnSubnetId {
}

/// Compute required subnets to subscribe to given the node id.
/// TODO(das): Add epoch param
#[allow(clippy::arithmetic_side_effects)]
pub fn compute_custody_subnets<E: EthSpec>(
node_id: U256,
Expand Down

0 comments on commit 4373a28

Please sign in to comment.