Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
sc-consensus-beefy: restart voter on pallet reset (#14821)
Browse files Browse the repository at this point in the history
When detecting pallet-beefy consensus reset, just reinitialize the worker
and continue without bringing down the task (and possibly the node).

Signed-off-by: Adrian Catangiu <[email protected]>
  • Loading branch information
acatangiu authored Aug 25, 2023
1 parent 8dac0ab commit b8854b5
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 102 deletions.
2 changes: 1 addition & 1 deletion client/consensus/beefy/src/communication/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::collections::{HashMap, VecDeque};

/// Report specifying a reputation change for a given peer.
#[derive(Debug, PartialEq)]
pub(crate) struct PeerReport {
pub struct PeerReport {
pub who: PeerId,
pub cost_benefit: ReputationChange,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use codec::DecodeAll;
use futures::{channel::oneshot, StreamExt};
use log::{debug, error, trace};
use log::{debug, trace};
use sc_client_api::BlockBackend;
use sc_network::{
config as netconfig, config::RequestResponseConfig, types::ProtocolName, PeerId,
Expand Down Expand Up @@ -182,7 +182,9 @@ where
}

/// Run [`BeefyJustifsRequestHandler`].
pub async fn run(mut self) {
///
/// Should never end, returns `Error` otherwise.
pub async fn run(&mut self) -> Error {
trace!(target: BEEFY_SYNC_LOG_TARGET, "🥩 Running BeefyJustifsRequestHandler");

while let Ok(request) = self
Expand Down Expand Up @@ -215,9 +217,6 @@ where
},
}
}
error!(
target: crate::LOG_TARGET,
"🥩 On-demand requests receiver stream terminated, closing worker."
);
Error::RequestsReceiverStreamClosed
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub struct JustificationRequest<B: Block> {
}

#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
pub enum Error {
#[error(transparent)]
Client(#[from] sp_blockchain::Error),

Expand All @@ -102,4 +102,7 @@ pub(crate) enum Error {

#[error("Internal error while getting response.")]
ResponseError,

#[error("On-demand requests receiver stream terminated.")]
RequestsReceiverStreamClosed,
}
12 changes: 11 additions & 1 deletion client/consensus/beefy/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,18 @@ pub enum Error {
Signature(String),
#[error("Session uninitialized")]
UninitSession,
#[error("pallet-beefy was reset, please restart voter")]
#[error("pallet-beefy was reset")]
ConsensusReset,
#[error("Block import stream terminated")]
BlockImportStreamTerminated,
#[error("Gossip Engine terminated")]
GossipEngineTerminated,
#[error("Finality proofs gossiping stream terminated")]
FinalityProofGossipStreamTerminated,
#[error("Finality stream terminated")]
FinalityStreamTerminated,
#[error("Votes gossiping stream terminated")]
VotesGossipStreamTerminated,
}

#[cfg(test)]
Expand Down
154 changes: 88 additions & 66 deletions client/consensus/beefy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
B: Block,
BE: Backend<B>,
C: Client<B, BE> + BlockBackend<B>,
P: PayloadProvider<B>,
P: PayloadProvider<B> + Clone,
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B, AuthorityId> + MmrApi<B, MmrRootHash, NumberFor<B>>,
N: GossipNetwork<B> + NetworkRequest + Send + Sync + 'static,
Expand All @@ -237,7 +237,7 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
min_block_delta,
prometheus_registry,
links,
on_demand_justifications_handler,
mut on_demand_justifications_handler,
} = beefy_params;

let BeefyNetworkParams {
Expand All @@ -248,83 +248,105 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
..
} = network_params;

let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
// Default votes filter is to discard everything.
// Validator is updated later with correct starting round and set id.
let (gossip_validator, gossip_report_stream) =
communication::gossip::GossipValidator::new(known_peers.clone());
let gossip_validator = Arc::new(gossip_validator);
let mut gossip_engine = GossipEngine::new(
network.clone(),
sync.clone(),
gossip_protocol_name,
gossip_validator.clone(),
None,
);
let metrics = register_metrics(prometheus_registry.clone());

// The `GossipValidator` adds and removes known peers based on valid votes and network events.
let on_demand_justifications = OnDemandJustificationsEngine::new(
network.clone(),
justifications_protocol_name,
known_peers,
prometheus_registry.clone(),
);

// Subscribe to finality notifications and justifications before waiting for runtime pallet and
// reuse the streams, so we don't miss notifications while waiting for pallet to be available.
let mut finality_notifications = client.finality_notification_stream().fuse();
let block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse();

// Wait for BEEFY pallet to be active before starting voter.
let persisted_state =
match wait_for_runtime_pallet(&*runtime, &mut gossip_engine, &mut finality_notifications)
.await
.and_then(|(beefy_genesis, best_grandpa)| {
load_or_init_voter_state(
&*backend,
&*runtime,
beefy_genesis,
best_grandpa,
min_block_delta,
)
}) {
let mut block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse();

// We re-create and re-run the worker in this loop in order to quickly reinit and resume after
// select recoverable errors.
loop {
let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
// Default votes filter is to discard everything.
// Validator is updated later with correct starting round and set id.
let (gossip_validator, gossip_report_stream) =
communication::gossip::GossipValidator::new(known_peers.clone());
let gossip_validator = Arc::new(gossip_validator);
let mut gossip_engine = GossipEngine::new(
network.clone(),
sync.clone(),
gossip_protocol_name.clone(),
gossip_validator.clone(),
None,
);

// The `GossipValidator` adds and removes known peers based on valid votes and network
// events.
let on_demand_justifications = OnDemandJustificationsEngine::new(
network.clone(),
justifications_protocol_name.clone(),
known_peers,
prometheus_registry.clone(),
);

// Wait for BEEFY pallet to be active before starting voter.
let persisted_state = match wait_for_runtime_pallet(
&*runtime,
&mut gossip_engine,
&mut finality_notifications,
)
.await
.and_then(|(beefy_genesis, best_grandpa)| {
load_or_init_voter_state(
&*backend,
&*runtime,
beefy_genesis,
best_grandpa,
min_block_delta,
)
}) {
Ok(state) => state,
Err(e) => {
error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e);
return
},
};
// Update the gossip validator with the right starting round and set id.
if let Err(e) = persisted_state
.gossip_filter_config()
.map(|f| gossip_validator.update_filter(f))
{
error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e);
return
}
// Update the gossip validator with the right starting round and set id.
if let Err(e) = persisted_state
.gossip_filter_config()
.map(|f| gossip_validator.update_filter(f))
{
error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e);
return
}

let worker = worker::BeefyWorker {
backend,
payload_provider,
runtime,
sync,
key_store: key_store.into(),
gossip_engine,
gossip_validator,
gossip_report_stream,
on_demand_justifications,
links,
metrics,
pending_justifications: BTreeMap::new(),
persisted_state,
};
let worker = worker::BeefyWorker {
backend: backend.clone(),
payload_provider: payload_provider.clone(),
runtime: runtime.clone(),
sync: sync.clone(),
key_store: key_store.clone().into(),
gossip_engine,
gossip_validator,
gossip_report_stream,
on_demand_justifications,
links: links.clone(),
metrics: metrics.clone(),
pending_justifications: BTreeMap::new(),
persisted_state,
};

futures::future::select(
Box::pin(worker.run(block_import_justif, finality_notifications)),
Box::pin(on_demand_justifications_handler.run()),
)
.await;
match futures::future::select(
Box::pin(worker.run(&mut block_import_justif, &mut finality_notifications)),
Box::pin(on_demand_justifications_handler.run()),
)
.await
{
// On `ConsensusReset` error, just reinit and restart voter.
futures::future::Either::Left((error::Error::ConsensusReset, _)) => {
error!(target: LOG_TARGET, "🥩 Error: {:?}. Restarting voter.", error::Error::ConsensusReset);
continue
},
// On other errors, bring down / finish the task.
futures::future::Either::Left((worker_err, _)) =>
error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", worker_err),
futures::future::Either::Right((odj_handler_err, _)) =>
error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", odj_handler_err),
};
return
}
}

fn load_or_init_voter_state<B, BE, R>(
Expand Down
42 changes: 15 additions & 27 deletions client/consensus/beefy/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,11 +447,7 @@ where
.ok()
.flatten()
.filter(|genesis| *genesis == self.persisted_state.pallet_genesis)
.ok_or_else(|| {
let err = Error::ConsensusReset;
error!(target: LOG_TARGET, "🥩 Error: {}", err);
err
})?;
.ok_or(Error::ConsensusReset)?;

if *header.number() > self.best_grandpa_block() {
// update best GRANDPA finalized block we have seen
Expand Down Expand Up @@ -795,11 +791,12 @@ where
/// Main loop for BEEFY worker.
///
/// Run the main async loop which is driven by finality notifications and gossiped votes.
/// Should never end, returns `Error` otherwise.
pub(crate) async fn run(
mut self,
mut block_import_justif: Fuse<NotificationReceiver<BeefyVersionedFinalityProof<B>>>,
mut finality_notifications: Fuse<FinalityNotifications<B>>,
) {
block_import_justif: &mut Fuse<NotificationReceiver<BeefyVersionedFinalityProof<B>>>,
finality_notifications: &mut Fuse<FinalityNotifications<B>>,
) -> Error {
info!(
target: LOG_TARGET,
"🥩 run BEEFY worker, best grandpa: #{:?}.",
Expand Down Expand Up @@ -848,17 +845,17 @@ where
// Use `select_biased!` to prioritize order below.
// Process finality notifications first since these drive the voter.
notification = finality_notifications.next() => {
if notification.and_then(|notif| {
self.handle_finality_notification(&notif).ok()
}).is_none() {
error!(target: LOG_TARGET, "🥩 Finality stream terminated, closing worker.");
return;
if let Some(notif) = notification {
if let Err(err) = self.handle_finality_notification(&notif) {
return err;
}
} else {
return Error::FinalityStreamTerminated;
}
},
// Make sure to pump gossip engine.
_ = gossip_engine => {
error!(target: LOG_TARGET, "🥩 Gossip engine has terminated, closing worker.");
return;
return Error::GossipEngineTerminated;
},
// Process incoming justifications as these can make some in-flight votes obsolete.
response_info = self.on_demand_justifications.next().fuse() => {
Expand All @@ -881,8 +878,7 @@ where
debug!(target: LOG_TARGET, "🥩 {}", err);
}
} else {
error!(target: LOG_TARGET, "🥩 Block import stream terminated, closing worker.");
return;
return Error::BlockImportStreamTerminated;
}
},
justif = gossip_proofs.next() => {
Expand All @@ -892,11 +888,7 @@ where
debug!(target: LOG_TARGET, "🥩 {}", err);
}
} else {
error!(
target: LOG_TARGET,
"🥩 Finality proofs gossiping stream terminated, closing worker."
);
return;
return Error::FinalityProofGossipStreamTerminated;
}
},
// Finally process incoming votes.
Expand All @@ -907,11 +899,7 @@ where
debug!(target: LOG_TARGET, "🥩 {}", err);
}
} else {
error!(
target: LOG_TARGET,
"🥩 Votes gossiping stream terminated, closing worker."
);
return;
return Error::VotesGossipStreamTerminated;
}
},
// Process peer reports.
Expand Down
6 changes: 6 additions & 0 deletions primitives/consensus/beefy/src/mmr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ mod mmr_root_provider {
_phantom: PhantomData<B>,
}

impl<B, R> Clone for MmrRootProvider<B, R> {
fn clone(&self) -> Self {
Self { runtime: self.runtime.clone(), _phantom: PhantomData }
}
}

impl<B, R> MmrRootProvider<B, R>
where
B: Block,
Expand Down

0 comments on commit b8854b5

Please sign in to comment.