Skip to content

Commit

Permalink
Avoid processing redundant RPC blocks (sigp#4179)
Browse files Browse the repository at this point in the history
## Proposed Changes

We already make some attempts to avoid processing RPC blocks when a block from the same proposer is already being processed through gossip. This PR strengthens that guarantee by using the existing cache for `observed_block_producers` to inform whether an RPC block's processing should be delayed.
  • Loading branch information
michaelsproul authored and Woodpile37 committed Jan 6, 2024
1 parent 6ee1408 commit a5f8bb1
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 3 deletions.
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ pub struct BeaconChain<T: BeaconChainTypes> {
/// in recent epochs.
pub(crate) observed_sync_aggregators: RwLock<ObservedSyncAggregators<T::EthSpec>>,
/// Maintains a record of which validators have proposed blocks for each slot.
pub(crate) observed_block_producers: RwLock<ObservedBlockProducers<T::EthSpec>>,
pub observed_block_producers: RwLock<ObservedBlockProducers<T::EthSpec>>,
/// Maintains a record of which validators have submitted voluntary exits.
pub(crate) observed_voluntary_exits: Mutex<ObservedOperations<SignedVoluntaryExit, T::EthSpec>>,
/// Maintains a record of which validators we've seen proposer slashings for.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub const QUEUED_ATTESTATION_DELAY: Duration = Duration::from_secs(12);
pub const QUEUED_LIGHT_CLIENT_UPDATE_DELAY: Duration = Duration::from_secs(12);

/// For how long to queue rpc blocks before sending them back for reprocessing.
pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(3);
pub const QUEUED_RPC_BLOCK_DELAY: Duration = Duration::from_secs(4);

/// Set an arbitrary upper-bound on the number of queued blocks to avoid DoS attacks. The fact that
/// we signature-verify blocks before putting them in the queue *should* protect against this, but
Expand Down Expand Up @@ -520,7 +520,7 @@ impl<T: BeaconChainTypes> ReprocessQueue<T> {
return;
}

// Queue the block for 1/4th of a slot
// Queue the block for 1/3rd of a slot
self.rpc_block_delay_queue
.insert(rpc_block, QUEUED_RPC_BLOCK_DELAY);
}
Expand Down
46 changes: 46 additions & 0 deletions beacon_node/network/src/beacon_processor/worker/sync_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,52 @@ impl<T: BeaconChainTypes> Worker<T> {
return;
}
};
// Check if a block from this proposer is already known. If so, defer processing until later
// to avoid wasting time processing duplicates.
let proposal_already_known = self
.chain
.observed_block_producers
.read()
.proposer_has_been_observed(block.message())
.map_err(|e| {
error!(
self.log,
"Failed to check observed proposers";
"error" => ?e,
"source" => "rpc",
"block_root" => %block_root
);
})
.unwrap_or(true);
if proposal_already_known {
debug!(
self.log,
"Delaying processing of duplicate RPC block";
"block_root" => ?block_root,
"proposer" => block.message().proposer_index(),
"slot" => block.slot()
);

// Send message to work reprocess queue to retry the block
let reprocess_msg = ReprocessQueueMessage::RpcBlock(QueuedRpcBlock {
block_root,
block: block.clone(),
process_type,
seen_timestamp,
should_process: true,
});

if reprocess_tx.try_send(reprocess_msg).is_err() {
error!(
self.log,
"Failed to inform block import";
"source" => "rpc",
"block_root" => %block_root
);
}
return;
}

let slot = block.slot();
let parent_root = block.message().parent_root();
let result = self
Expand Down

0 comments on commit a5f8bb1

Please sign in to comment.