Skip to content

Commit

Permalink
Avoid spamming blocks by range request until there's enough peers on …
Browse files Browse the repository at this point in the history
…all custody subnets (sigp#6004)

Squashed commit of the following:

commit 0d97026
Author: Jimmy Chen <[email protected]>
Date:   Thu Jun 27 13:51:33 2024 +1000

    Add check to cover a case where batch is not processed while waiting for custody peers to become available.

commit 9f82497
Author: Jimmy Chen <[email protected]>
Date:   Thu Jun 27 12:57:06 2024 +1000

    Add custody peer check before mutating `BatchInfo` to avoid inconsistent state.

commit edc584a
Author: Jimmy Chen <[email protected]>
Date:   Thu Jun 27 10:32:19 2024 +1000

    Only start requesting batches when there are good peers across all custody columns to avoid spaming block requests.
  • Loading branch information
jimmygchen committed Jun 27, 2024
1 parent ea3d9d3 commit 10690eb
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 41 deletions.
2 changes: 1 addition & 1 deletion beacon_node/http_api/src/publish_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
}

if let Some(gossip_verified_data_columns) = gossip_verified_data_columns {
let custody_columns_indices = network_globals.custody_columns(block.epoch(), &chain.spec);
let custody_columns_indices = network_globals.custody_columns(&chain.spec);

let custody_columns = gossip_verified_data_columns
.into_iter()
Expand Down
17 changes: 12 additions & 5 deletions beacon_node/lighthouse_network/src/types/globals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{Enr, GossipTopic, Multiaddr, PeerId};
use parking_lot::RwLock;
use std::collections::HashSet;
use types::data_column_sidecar::ColumnIndex;
use types::{ChainSpec, DataColumnSubnetId, Epoch, EthSpec};
use types::{ChainSpec, DataColumnSubnetId, EthSpec};

pub struct NetworkGlobals<E: EthSpec> {
/// The current local ENR.
Expand Down Expand Up @@ -112,14 +112,22 @@ impl<E: EthSpec> NetworkGlobals<E> {
}

/// Compute custody data columns the node is assigned to custody.
pub fn custody_columns(&self, _epoch: Epoch, spec: &ChainSpec) -> Vec<ColumnIndex> {
pub fn custody_columns(&self, spec: &ChainSpec) -> Vec<ColumnIndex> {
let enr = self.local_enr();
let node_id = enr.node_id().raw().into();
let custody_subnet_count = enr.custody_subnet_count::<E>(spec);
DataColumnSubnetId::compute_custody_columns::<E>(node_id, custody_subnet_count, spec)
.collect()
}

/// Compute custody data column subnets the node is assigned to custody.
pub fn custody_subnets(&self, spec: &ChainSpec) -> impl Iterator<Item = DataColumnSubnetId> {
let enr = self.local_enr();
let node_id = enr.node_id().raw().into();
let custody_subnet_count = enr.custody_subnet_count::<E>(spec);
DataColumnSubnetId::compute_custody_subnets::<E>(node_id, custody_subnet_count, spec)
}

/// TESTING ONLY. Build a dummy NetworkGlobals instance.
pub fn new_test_globals(trusted_peers: Vec<PeerId>, log: &slog::Logger) -> NetworkGlobals<E> {
use crate::CombinedKeyExt;
Expand All @@ -143,7 +151,7 @@ impl<E: EthSpec> NetworkGlobals<E> {
#[cfg(test)]
mod test {
use crate::NetworkGlobals;
use types::{Epoch, EthSpec, MainnetEthSpec as E};
use types::{EthSpec, MainnetEthSpec as E};

#[test]
fn test_custody_count_default() {
Expand All @@ -153,8 +161,7 @@ mod test {
/ spec.data_column_sidecar_subnet_count
* spec.custody_requirement;
let globals = NetworkGlobals::<E>::new_test_globals(vec![], &log);
let any_epoch = Epoch::new(0);
let columns = globals.custody_columns(any_epoch, &spec);
let columns = globals.custody_columns(&spec);
assert_eq!(
columns.len(),
default_custody_requirement_column_count as usize
Expand Down
8 changes: 4 additions & 4 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,10 +728,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
SyncMessage::SampleBlock(block_root, block_slot) => {
debug!(self.log, "Received SampleBlock message"; "block_root" => %block_root);
if let Some((requester, result)) =
self.sampling
.on_new_sample_request(block_root, block_slot, &mut self.network)
debug!(self.log, "Received SampleBlock message"; "block_root" => %block_root, "block_slot" => block_slot);
if let Some((requester, result)) = self
.sampling
.on_new_sample_request(block_root, &mut self.network)
{
self.on_sampling_result(requester, result)
}
Expand Down
16 changes: 5 additions & 11 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use std::time::Duration;
use tokio::sync::mpsc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::{
BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSubnetId, Epoch, EthSpec, Hash256,
BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSubnetId, EthSpec, Hash256,
SignedBeaconBlock, Slot,
};

Expand Down Expand Up @@ -239,7 +239,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}

// TODO(das): epoch argument left here in case custody rotation is implemented
pub fn get_custodial_peers(&self, _epoch: Epoch, column_index: ColumnIndex) -> Vec<PeerId> {
pub fn get_custodial_peers(&self, column_index: ColumnIndex) -> Vec<PeerId> {
let mut peer_ids = vec![];

for (peer_id, peer_info) in self.network_globals().peers.read().connected_peers() {
Expand Down Expand Up @@ -357,12 +357,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {

let expects_custody_columns = if matches!(batch_type, ByRangeRequestType::BlocksAndColumns)
{
let custody_indexes = self
.network_globals()
.custody_columns(epoch, &self.chain.spec);
let custody_indexes = self.network_globals().custody_columns(&self.chain.spec);

for column_index in &custody_indexes {
let custody_peer_ids = self.get_custodial_peers(epoch, *column_index);
let custody_peer_ids = self.get_custodial_peers(*column_index);
let Some(custody_peer) = custody_peer_ids.first().cloned() else {
// TODO(das): this will be pretty bad UX. To improve we should:
// - Attempt to fetch custody requests first, before requesting blocks
Expand Down Expand Up @@ -682,11 +680,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.imported_custody_column_indexes(&block_root)
.unwrap_or_default();

// TODO(das): figure out how to pass block.slot if we end up doing rotation
let block_epoch = Epoch::new(0);
let custody_indexes_duty = self
.network_globals()
.custody_columns(block_epoch, &self.chain.spec);
let custody_indexes_duty = self.network_globals().custody_columns(&self.chain.spec);

// Include only the blob indexes not yet imported (received through gossip)
let custody_indexes_to_fetch = custody_indexes_duty
Expand Down
7 changes: 2 additions & 5 deletions beacon_node/network/src/sync/network_context/custody.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use slog::{debug, warn};
use std::time::Duration;
use std::{collections::HashMap, marker::PhantomData, sync::Arc};
use types::EthSpec;
use types::{data_column_sidecar::ColumnIndex, DataColumnSidecar, Epoch, Hash256};
use types::{data_column_sidecar::ColumnIndex, DataColumnSidecar, Hash256};

use super::{LookupRequestResult, PeerGroup, ReqId, RpcResponseResult, SyncNetworkContext};

Expand All @@ -34,7 +34,6 @@ type DataColumnSidecarVec<E> = Vec<Arc<DataColumnSidecar<E>>>;

pub struct ActiveCustodyRequest<T: BeaconChainTypes> {
block_root: Hash256,
block_epoch: Epoch,
custody_id: CustodyId,
/// List of column indices this request needs to download to complete successfully
column_requests: FnvHashMap<ColumnIndex, ColumnRequest<T::EthSpec>>,
Expand Down Expand Up @@ -80,8 +79,6 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
) -> Self {
Self {
block_root,
// TODO(das): use actual epoch if there's rotation
block_epoch: Epoch::new(0),
custody_id,
column_requests: HashMap::from_iter(
column_indices
Expand Down Expand Up @@ -248,7 +245,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {

// TODO: When is a fork and only a subset of your peers know about a block, we should only
// query the peers on that fork. Should this case be handled? How to handle it?
let custodial_peers = cx.get_custodial_peers(self.block_epoch, *column_index);
let custodial_peers = cx.get_custodial_peers(*column_index);

// TODO(das): cache this computation in a OneCell or similar to prevent having to
// run it every loop
Expand Down
47 changes: 46 additions & 1 deletion beacon_node/network/src/sync/range_sync/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::sync::{
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::BeaconChainTypes;
use fnv::FnvHashMap;
use lighthouse_network::{PeerAction, PeerId};
use lighthouse_network::{PeerAction, PeerId, Subnet};
use rand::seq::SliceRandom;
use slog::{crit, debug, o, warn};
use std::collections::{btree_map::Entry, BTreeMap, HashSet};
Expand Down Expand Up @@ -396,6 +396,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
self.request_batches(network)?;
}
}
} else if self.good_peers_on_custody_subnets(self.processing_target, network) {
// If there's no good custody peers for this epoch, batch won't be created
return Ok(KeepChain)
} else {
return Err(RemoveChain::WrongChainState(format!(
"Batch not found for current processing target {}",
Expand Down Expand Up @@ -884,6 +887,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
) -> ProcessingResult {
if let Some(batch) = self.batches.get_mut(&batch_id) {
let (request, batch_type) = batch.to_blocks_by_range_request();

match network.block_components_by_range_request(
peer,
batch_type,
Expand Down Expand Up @@ -994,6 +998,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
// check if we have the batch for our optimistic start. If not, request it first.
// We wait for this batch before requesting any other batches.
if let Some(epoch) = self.optimistic_start {
if !self.good_peers_on_custody_subnets(epoch, network) {
debug!(
self.log,
"Waiting for peers to be available on custody column subnets"
);
return Ok(KeepChain);
}

if let Entry::Vacant(entry) = self.batches.entry(epoch) {
if let Some(peer) = idle_peers.pop() {
let batch_type = network.batch_type(epoch);
Expand All @@ -1018,6 +1030,30 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
Ok(KeepChain)
}

/// Checks all custody column subnets for peers. Returns `true` if there is at least one peer in
/// every custody column subnet.
fn good_peers_on_custody_subnets(&self, epoch: Epoch, network: &SyncNetworkContext<T>) -> bool {
if network.chain.spec.is_peer_das_enabled_for_epoch(epoch) {
// Require peers on all custody column subnets before sending batches
let peers_on_all_custody_subnets = network
.network_globals()
.custody_subnets(&network.chain.spec)
.all(|subnet_id| {
let peer_count = network
.network_globals()
.peers
.read()
.good_peers_on_subnet(Subnet::DataColumn(subnet_id))
.count();
debug!(self.log, "Peers found on custody subnet"; "subnet_id" => ?subnet_id, "peer_count" => peer_count);
peer_count > 0
});
peers_on_all_custody_subnets
} else {
true
}
}

/// Creates the next required batch from the chain. If there are no more batches required,
/// `false` is returned.
fn include_next_batch(&mut self, network: &mut SyncNetworkContext<T>) -> Option<BatchId> {
Expand Down Expand Up @@ -1048,6 +1084,15 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
return None;
}

// don't send batch requests until we have peers on custody subnets
if !self.good_peers_on_custody_subnets(self.to_be_downloaded, network) {
debug!(
self.log,
"Waiting for peers to be available on custody column subnets"
);
return None;
}

let batch_id = self.to_be_downloaded;
// this batch could have been included already being an optimistic batch
match self.batches.entry(batch_id) {
Expand Down
17 changes: 4 additions & 13 deletions beacon_node/network/src/sync/sampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
collections::hash_map::Entry, collections::HashMap, marker::PhantomData, sync::Arc,
time::Duration,
};
use types::{data_column_sidecar::ColumnIndex, ChainSpec, DataColumnSidecar, Hash256, Slot};
use types::{data_column_sidecar::ColumnIndex, ChainSpec, DataColumnSidecar, Hash256};

pub type SamplingResult = Result<(), SamplingError>;

Expand Down Expand Up @@ -57,15 +57,13 @@ impl<T: BeaconChainTypes> Sampling<T> {
pub fn on_new_sample_request(
&mut self,
block_root: Hash256,
block_slot: Slot,
cx: &mut SyncNetworkContext<T>,
) -> Option<(SamplingRequester, SamplingResult)> {
let id = SamplingRequester::ImportedBlock(block_root);

let request = match self.requests.entry(id) {
Entry::Vacant(e) => e.insert(ActiveSamplingRequest::new(
block_root,
block_slot,
id,
&self.sampling_config,
self.log.clone(),
Expand Down Expand Up @@ -163,7 +161,6 @@ impl<T: BeaconChainTypes> Sampling<T> {

pub struct ActiveSamplingRequest<T: BeaconChainTypes> {
block_root: Hash256,
block_slot: Slot,
requester_id: SamplingRequester,
column_requests: FnvHashMap<ColumnIndex, ActiveColumnSampleRequest>,
column_shuffle: Vec<ColumnIndex>,
Expand Down Expand Up @@ -198,7 +195,6 @@ pub enum SamplingConfig {
impl<T: BeaconChainTypes> ActiveSamplingRequest<T> {
fn new(
block_root: Hash256,
block_slot: Slot,
requester_id: SamplingRequester,
sampling_config: &SamplingConfig,
log: slog::Logger,
Expand All @@ -212,7 +208,6 @@ impl<T: BeaconChainTypes> ActiveSamplingRequest<T> {

Self {
block_root,
block_slot,
requester_id,
column_requests: <_>::default(),
column_shuffle,
Expand Down Expand Up @@ -401,7 +396,7 @@ impl<T: BeaconChainTypes> ActiveSamplingRequest<T> {
.entry(column_index)
.or_insert(ActiveColumnSampleRequest::new(column_index));

if request.request(self.block_root, self.block_slot, self.requester_id, cx)? {
if request.request(self.block_root, self.requester_id, cx)? {
sent_requests += 1
}
}
Expand All @@ -427,7 +422,7 @@ mod request {
use beacon_chain::BeaconChainTypes;
use lighthouse_network::PeerId;
use std::collections::HashSet;
use types::{data_column_sidecar::ColumnIndex, EthSpec, Hash256, Slot};
use types::{data_column_sidecar::ColumnIndex, Hash256};

pub(crate) struct ActiveColumnSampleRequest {
column_index: ColumnIndex,
Expand Down Expand Up @@ -478,7 +473,6 @@ mod request {
pub(crate) fn request<T: BeaconChainTypes>(
&mut self,
block_root: Hash256,
block_slot: Slot,
requester: SamplingRequester,
cx: &mut SyncNetworkContext<T>,
) -> Result<bool, SamplingError> {
Expand All @@ -490,10 +484,7 @@ mod request {

// TODO: When is a fork and only a subset of your peers know about a block, sampling should only
// be queried on the peers on that fork. Should this case be handled? How to handle it?
let peer_ids = cx.get_custodial_peers(
block_slot.epoch(<T::EthSpec as EthSpec>::slots_per_epoch()),
self.column_index,
);
let peer_ids = cx.get_custodial_peers(self.column_index);

// TODO(das) randomize custodial peer and avoid failing peers
if let Some(peer_id) = peer_ids.first().cloned() {
Expand Down
1 change: 0 additions & 1 deletion consensus/types/src/data_column_subnet_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ impl DataColumnSubnetId {
}

/// Compute required subnets to subscribe to given the node id.
/// TODO(das): Add epoch param
#[allow(clippy::arithmetic_side_effects)]
pub fn compute_custody_subnets<E: EthSpec>(
node_id: U256,
Expand Down

0 comments on commit 10690eb

Please sign in to comment.