Skip to content

Commit

Permalink
Using-same-fork metric for finality and complex relay (paritytech#1327)
Browse files Browse the repository at this point in the history
* using_same_fork metric in finality relay

* support `using_different_forks` in messages relay

* added dashboards and alerts

* lockfile
  • Loading branch information
svyatonik authored Feb 24, 2022
1 parent 6403ccf commit 681d2b4
Show file tree
Hide file tree
Showing 14 changed files with 397 additions and 198 deletions.
6 changes: 5 additions & 1 deletion bridges/relays/client-substrate/src/sync_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ impl<Header> From<Header> for SyncHeader<Header> {
}
}

impl<Header: HeaderT> FinalitySourceHeader<Header::Number> for SyncHeader<Header> {
impl<Header: HeaderT> FinalitySourceHeader<Header::Hash, Header::Number> for SyncHeader<Header> {
fn hash(&self) -> Header::Hash {
self.0.hash()
}

fn number(&self) -> Header::Number {
*self.0.number()
}
Expand Down
69 changes: 55 additions & 14 deletions bridges/relays/finality/src/finality_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use futures::{select, Future, FutureExt, Stream, StreamExt};
use num_traits::{One, Saturating};
use relay_utils::{
metrics::MetricsParams, relay_loop::Client as RelayClient, retry_backoff, FailedClient,
MaybeConnectionError,
HeaderId, MaybeConnectionError,
};
use std::{
pin::Pin,
Expand Down Expand Up @@ -87,7 +87,9 @@ pub trait SourceClient<P: FinalitySyncPipeline>: RelayClient {
#[async_trait]
pub trait TargetClient<P: FinalitySyncPipeline>: RelayClient {
/// Get best finalized source block number.
async fn best_finalized_source_block_number(&self) -> Result<P::Number, Self::Error>;
async fn best_finalized_source_block_id(
&self,
) -> Result<HeaderId<P::Hash, P::Number>, Self::Error>;

/// Submit header finality proof.
async fn submit_finality_proof(
Expand All @@ -114,7 +116,11 @@ pub async fn run<P: FinalitySyncPipeline>(
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
.with_metrics(metrics_params)
.loop_metric(SyncLoopMetrics::new(Some(&metrics_prefix::<P>()))?)?
.loop_metric(SyncLoopMetrics::new(
Some(&metrics_prefix::<P>()),
"source",
"source_at_target",
)?)?
.expose()
.await?
.run(metrics_prefix::<P>(), move |source_client, target_client, metrics| {
Expand Down Expand Up @@ -169,7 +175,7 @@ where

/// Information about transaction that we have submitted.
#[derive(Debug, Clone)]
struct Transaction<Number> {
pub(crate) struct Transaction<Number> {
/// Time when we have submitted this transaction.
pub time: Instant,
/// The number of the header we have submitted.
Expand All @@ -181,7 +187,7 @@ pub(crate) struct RestartableFinalityProofsStream<S> {
/// Flag that the stream needs to be restarted.
pub(crate) needs_restart: bool,
/// The stream itself.
stream: Pin<Box<S>>,
pub(crate) stream: Pin<Box<S>>,
}

#[cfg(test)]
Expand All @@ -192,15 +198,16 @@ impl<S> From<S> for RestartableFinalityProofsStream<S> {
}

/// Finality synchronization loop state.
struct FinalityLoopState<'a, P: FinalitySyncPipeline, FinalityProofsStream> {
pub(crate) struct FinalityLoopState<'a, P: FinalitySyncPipeline, FinalityProofsStream> {
/// Synchronization loop progress.
progress: &'a mut (Instant, Option<P::Number>),
pub(crate) progress: &'a mut (Instant, Option<P::Number>),
/// Finality proofs stream.
finality_proofs_stream: &'a mut RestartableFinalityProofsStream<FinalityProofsStream>,
pub(crate) finality_proofs_stream:
&'a mut RestartableFinalityProofsStream<FinalityProofsStream>,
/// Recent finality proofs that we have read from the stream.
recent_finality_proofs: &'a mut FinalityProofs<P>,
pub(crate) recent_finality_proofs: &'a mut FinalityProofs<P>,
/// Last transaction that we have submitted to the target node.
last_transaction: Option<Transaction<P::Number>>,
pub(crate) last_transaction: Option<Transaction<P::Number>>,
}

async fn run_until_connection_lost<P: FinalitySyncPipeline>(
Expand Down Expand Up @@ -280,7 +287,7 @@ async fn run_until_connection_lost<P: FinalitySyncPipeline>(
}
}

async fn run_loop_iteration<P, SC, TC>(
pub(crate) async fn run_loop_iteration<P, SC, TC>(
source_client: &SC,
target_client: &TC,
state: FinalityLoopState<'_, P, SC::FinalityProofsStream>,
Expand All @@ -295,13 +302,31 @@ where
// read best source headers ids from source and target nodes
let best_number_at_source =
source_client.best_finalized_block_number().await.map_err(Error::Source)?;
let best_number_at_target = target_client
.best_finalized_source_block_number()
let best_id_at_target =
target_client.best_finalized_source_block_id().await.map_err(Error::Target)?;
let best_number_at_target = best_id_at_target.0;

let different_hash_at_source = ensure_same_fork::<P, _>(&best_id_at_target, source_client)
.await
.map_err(Error::Target)?;
.map_err(Error::Source)?;
let using_same_fork = different_hash_at_source.is_none();
if let Some(ref different_hash_at_source) = different_hash_at_source {
log::error!(
target: "bridge",
"Source node ({}) and pallet at target node ({}) have different headers at the same height {:?}: \
at-source {:?} vs at-target {:?}",
P::SOURCE_NAME,
P::TARGET_NAME,
best_number_at_target,
different_hash_at_source,
best_id_at_target.1,
);
}

if let Some(ref metrics_sync) = *metrics_sync {
metrics_sync.update_best_block_at_source(best_number_at_source);
metrics_sync.update_best_block_at_target(best_number_at_target);
metrics_sync.update_using_same_fork(using_same_fork);
}
*state.progress =
print_sync_progress::<P>(*state.progress, best_number_at_source, best_number_at_target);
Expand Down Expand Up @@ -427,6 +452,22 @@ where
Ok(selected_finality_proof)
}

/// Ensures that both clients are on the same fork.
///
/// Returns `Some(_)` with header has at the source client if headers are different.
async fn ensure_same_fork<P: FinalitySyncPipeline, SC: SourceClient<P>>(
best_id_at_target: &HeaderId<P::Hash, P::Number>,
source_client: &SC,
) -> Result<Option<P::Hash>, SC::Error> {
let header_at_source = source_client.header_and_finality_proof(best_id_at_target.0).await?.0;
let header_hash_at_source = header_at_source.hash();
Ok(if best_id_at_target.1 == header_hash_at_source {
None
} else {
Some(header_hash_at_source)
})
}

/// Finality proof that has been selected by the `read_missing_headers` function.
pub(crate) enum SelectedFinalityProof<Header, FinalityProof> {
/// Mandatory header and its proof has been selected. We shall submit proof for this header.
Expand Down
Loading

0 comments on commit 681d2b4

Please sign in to comment.