diff --git a/bridges/relays/client-substrate/src/sync_header.rs b/bridges/relays/client-substrate/src/sync_header.rs index ed3de6289ce..e45e6b4197a 100644 --- a/bridges/relays/client-substrate/src/sync_header.rs +++ b/bridges/relays/client-substrate/src/sync_header.rs @@ -44,7 +44,11 @@ impl
From
for SyncHeader
{ } } -impl FinalitySourceHeader for SyncHeader
{ +impl FinalitySourceHeader for SyncHeader
{ + fn hash(&self) -> Header::Hash { + self.0.hash() + } + fn number(&self) -> Header::Number { *self.0.number() } diff --git a/bridges/relays/finality/src/finality_loop.rs b/bridges/relays/finality/src/finality_loop.rs index 320b44d310f..c29a5d5fec2 100644 --- a/bridges/relays/finality/src/finality_loop.rs +++ b/bridges/relays/finality/src/finality_loop.rs @@ -29,7 +29,7 @@ use futures::{select, Future, FutureExt, Stream, StreamExt}; use num_traits::{One, Saturating}; use relay_utils::{ metrics::MetricsParams, relay_loop::Client as RelayClient, retry_backoff, FailedClient, - MaybeConnectionError, + HeaderId, MaybeConnectionError, }; use std::{ pin::Pin, @@ -87,7 +87,9 @@ pub trait SourceClient: RelayClient { #[async_trait] pub trait TargetClient: RelayClient { /// Get best finalized source block number. - async fn best_finalized_source_block_number(&self) -> Result; + async fn best_finalized_source_block_id( + &self, + ) -> Result, Self::Error>; /// Submit header finality proof. async fn submit_finality_proof( @@ -114,7 +116,11 @@ pub async fn run( let exit_signal = exit_signal.shared(); relay_utils::relay_loop(source_client, target_client) .with_metrics(metrics_params) - .loop_metric(SyncLoopMetrics::new(Some(&metrics_prefix::

()))?)? + .loop_metric(SyncLoopMetrics::new( + Some(&metrics_prefix::

()), + "source", + "source_at_target", + )?)? .expose() .await? .run(metrics_prefix::

(), move |source_client, target_client, metrics| { @@ -169,7 +175,7 @@ where /// Information about transaction that we have submitted. #[derive(Debug, Clone)] -struct Transaction { +pub(crate) struct Transaction { /// Time when we have submitted this transaction. pub time: Instant, /// The number of the header we have submitted. @@ -181,7 +187,7 @@ pub(crate) struct RestartableFinalityProofsStream { /// Flag that the stream needs to be restarted. pub(crate) needs_restart: bool, /// The stream itself. - stream: Pin>, + pub(crate) stream: Pin>, } #[cfg(test)] @@ -192,15 +198,16 @@ impl From for RestartableFinalityProofsStream { } /// Finality synchronization loop state. -struct FinalityLoopState<'a, P: FinalitySyncPipeline, FinalityProofsStream> { +pub(crate) struct FinalityLoopState<'a, P: FinalitySyncPipeline, FinalityProofsStream> { /// Synchronization loop progress. - progress: &'a mut (Instant, Option), + pub(crate) progress: &'a mut (Instant, Option), /// Finality proofs stream. - finality_proofs_stream: &'a mut RestartableFinalityProofsStream, + pub(crate) finality_proofs_stream: + &'a mut RestartableFinalityProofsStream, /// Recent finality proofs that we have read from the stream. - recent_finality_proofs: &'a mut FinalityProofs

, + pub(crate) recent_finality_proofs: &'a mut FinalityProofs

, /// Last transaction that we have submitted to the target node. - last_transaction: Option>, + pub(crate) last_transaction: Option>, } async fn run_until_connection_lost( @@ -280,7 +287,7 @@ async fn run_until_connection_lost( } } -async fn run_loop_iteration( +pub(crate) async fn run_loop_iteration( source_client: &SC, target_client: &TC, state: FinalityLoopState<'_, P, SC::FinalityProofsStream>, @@ -295,13 +302,31 @@ where // read best source headers ids from source and target nodes let best_number_at_source = source_client.best_finalized_block_number().await.map_err(Error::Source)?; - let best_number_at_target = target_client - .best_finalized_source_block_number() + let best_id_at_target = + target_client.best_finalized_source_block_id().await.map_err(Error::Target)?; + let best_number_at_target = best_id_at_target.0; + + let different_hash_at_source = ensure_same_fork::(&best_id_at_target, source_client) .await - .map_err(Error::Target)?; + .map_err(Error::Source)?; + let using_same_fork = different_hash_at_source.is_none(); + if let Some(ref different_hash_at_source) = different_hash_at_source { + log::error!( + target: "bridge", + "Source node ({}) and pallet at target node ({}) have different headers at the same height {:?}: \ + at-source {:?} vs at-target {:?}", + P::SOURCE_NAME, + P::TARGET_NAME, + best_number_at_target, + different_hash_at_source, + best_id_at_target.1, + ); + } + if let Some(ref metrics_sync) = *metrics_sync { metrics_sync.update_best_block_at_source(best_number_at_source); metrics_sync.update_best_block_at_target(best_number_at_target); + metrics_sync.update_using_same_fork(using_same_fork); } *state.progress = print_sync_progress::

(*state.progress, best_number_at_source, best_number_at_target); @@ -427,6 +452,22 @@ where Ok(selected_finality_proof) } +/// Ensures that both clients are on the same fork. +/// +/// Returns `Some(_)` with header has at the source client if headers are different. +async fn ensure_same_fork>( + best_id_at_target: &HeaderId, + source_client: &SC, +) -> Result, SC::Error> { + let header_at_source = source_client.header_and_finality_proof(best_id_at_target.0).await?.0; + let header_hash_at_source = header_at_source.hash(); + Ok(if best_id_at_target.1 == header_hash_at_source { + None + } else { + Some(header_hash_at_source) + }) +} + /// Finality proof that has been selected by the `read_missing_headers` function. pub(crate) enum SelectedFinalityProof { /// Mandatory header and its proof has been selected. We shall submit proof for this header. diff --git a/bridges/relays/finality/src/finality_loop_tests.rs b/bridges/relays/finality/src/finality_loop_tests.rs index 915b7ee6766..478d8e1be65 100644 --- a/bridges/relays/finality/src/finality_loop_tests.rs +++ b/bridges/relays/finality/src/finality_loop_tests.rs @@ -20,10 +20,12 @@ use crate::{ finality_loop::{ - prune_recent_finality_proofs, read_finality_proofs_from_stream, run, - select_better_recent_finality_proof, select_header_to_submit, FinalityProofs, - FinalitySyncParams, RestartableFinalityProofsStream, SourceClient, TargetClient, + prune_recent_finality_proofs, read_finality_proofs_from_stream, run, run_loop_iteration, + select_better_recent_finality_proof, select_header_to_submit, FinalityLoopState, + FinalityProofs, FinalitySyncParams, RestartableFinalityProofsStream, SourceClient, + TargetClient, }, + sync_loop_metrics::SyncLoopMetrics, FinalityProof, FinalitySyncPipeline, SourceHeader, }; @@ -31,12 +33,18 @@ use async_trait::async_trait; use futures::{FutureExt, Stream, StreamExt}; use parking_lot::Mutex; use relay_utils::{ - metrics::MetricsParams, relay_loop::Client as RelayClient, MaybeConnectionError, + metrics::MetricsParams, relay_loop::Client as RelayClient, HeaderId, MaybeConnectionError, +}; +use std::{ + collections::HashMap, + pin::Pin, + sync::Arc, + time::{Duration, Instant}, }; -use std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration}; type IsMandatory = bool; type TestNumber = u64; +type TestHash = u64; #[derive(Debug, Clone)] enum TestError { @@ -56,16 +64,20 @@ impl FinalitySyncPipeline for TestFinalitySyncPipeline { const SOURCE_NAME: &'static str = "TestSource"; const TARGET_NAME: &'static str = "TestTarget"; - type Hash = u64; + type Hash = TestHash; type Number = TestNumber; type Header = TestSourceHeader; type FinalityProof = TestFinalityProof; } #[derive(Debug, Clone, PartialEq)] -struct TestSourceHeader(IsMandatory, TestNumber); +struct TestSourceHeader(IsMandatory, TestNumber, TestHash); + +impl SourceHeader for TestSourceHeader { + fn hash(&self) -> TestHash { + self.2 + } -impl SourceHeader for TestSourceHeader { fn number(&self) -> TestNumber { self.1 } @@ -90,7 +102,7 @@ struct ClientsData { source_headers: HashMap)>, source_proofs: Vec, - target_best_block_number: TestNumber, + target_best_block_id: HeaderId, target_headers: Vec<(TestSourceHeader, TestFinalityProof)>, } @@ -152,10 +164,12 @@ impl RelayClient for TestTargetClient { #[async_trait] impl TargetClient for TestTargetClient { - async fn best_finalized_source_block_number(&self) -> Result { + async fn best_finalized_source_block_id( + &self, + ) -> Result, TestError> { let mut data = self.data.lock(); (self.on_method_call)(&mut *data); - Ok(data.target_best_block_number) + Ok(data.target_best_block_id) } async fn submit_finality_proof( @@ -165,7 +179,7 @@ impl TargetClient for TestTargetClient { ) -> Result<(), TestError> { let mut data = self.data.lock(); (self.on_method_call)(&mut *data); - data.target_best_block_number = header.number(); + data.target_best_block_id = HeaderId(header.number(), header.hash()); data.target_headers.push((header, proof)); Ok(()) } @@ -187,7 +201,7 @@ fn prepare_test_clients( source_headers, source_proofs: vec![TestFinalityProof(12), TestFinalityProof(14)], - target_best_block_number: 5, + target_best_block_id: HeaderId(5, 5), target_headers: vec![], })); ( @@ -199,6 +213,15 @@ fn prepare_test_clients( ) } +fn test_sync_params() -> FinalitySyncParams { + FinalitySyncParams { + tick: Duration::from_secs(0), + recent_finality_proofs_limit: 1024, + stall_timeout: Duration::from_secs(1), + only_mandatory_headers: false, + } +} + fn run_sync_loop( state_function: impl Fn(&mut ClientsData) -> bool + Send + Sync + 'static, ) -> ClientsData { @@ -207,21 +230,17 @@ fn run_sync_loop( exit_sender, state_function, vec![ - (6, (TestSourceHeader(false, 6), None)), - (7, (TestSourceHeader(false, 7), Some(TestFinalityProof(7)))), - (8, (TestSourceHeader(true, 8), Some(TestFinalityProof(8)))), - (9, (TestSourceHeader(false, 9), Some(TestFinalityProof(9)))), - (10, (TestSourceHeader(false, 10), None)), + (5, (TestSourceHeader(false, 5, 5), None)), + (6, (TestSourceHeader(false, 6, 6), None)), + (7, (TestSourceHeader(false, 7, 7), Some(TestFinalityProof(7)))), + (8, (TestSourceHeader(true, 8, 8), Some(TestFinalityProof(8)))), + (9, (TestSourceHeader(false, 9, 9), Some(TestFinalityProof(9)))), + (10, (TestSourceHeader(false, 10, 10), None)), ] .into_iter() .collect(), ); - let sync_params = FinalitySyncParams { - tick: Duration::from_secs(0), - recent_finality_proofs_limit: 1024, - stall_timeout: Duration::from_secs(1), - only_mandatory_headers: false, - }; + let sync_params = test_sync_params(); let clients_data = source_client.data.clone(); let _ = async_std::task::block_on(run( @@ -246,38 +265,38 @@ fn finality_sync_loop_works() { // // once this ^^^ is done, we generate more blocks && read proof for blocks 12 and 14 from // the stream - if data.target_best_block_number == 9 { + if data.target_best_block_id.0 == 9 { data.source_best_block_number = 14; - data.source_headers.insert(11, (TestSourceHeader(false, 11), None)); + data.source_headers.insert(11, (TestSourceHeader(false, 11, 11), None)); data.source_headers - .insert(12, (TestSourceHeader(false, 12), Some(TestFinalityProof(12)))); - data.source_headers.insert(13, (TestSourceHeader(false, 13), None)); + .insert(12, (TestSourceHeader(false, 12, 12), Some(TestFinalityProof(12)))); + data.source_headers.insert(13, (TestSourceHeader(false, 13, 13), None)); data.source_headers - .insert(14, (TestSourceHeader(false, 14), Some(TestFinalityProof(14)))); + .insert(14, (TestSourceHeader(false, 14, 14), Some(TestFinalityProof(14)))); } // once this ^^^ is done, we generate more blocks && read persistent proof for block 16 - if data.target_best_block_number == 14 { + if data.target_best_block_id.0 == 14 { data.source_best_block_number = 17; - data.source_headers.insert(15, (TestSourceHeader(false, 15), None)); + data.source_headers.insert(15, (TestSourceHeader(false, 15, 15), None)); data.source_headers - .insert(16, (TestSourceHeader(false, 16), Some(TestFinalityProof(16)))); - data.source_headers.insert(17, (TestSourceHeader(false, 17), None)); + .insert(16, (TestSourceHeader(false, 16, 16), Some(TestFinalityProof(16)))); + data.source_headers.insert(17, (TestSourceHeader(false, 17, 17), None)); } - data.target_best_block_number == 16 + data.target_best_block_id.0 == 16 }); assert_eq!( client_data.target_headers, vec![ // before adding 11..14: finality proof for mandatory header#8 - (TestSourceHeader(true, 8), TestFinalityProof(8)), + (TestSourceHeader(true, 8, 8), TestFinalityProof(8)), // before adding 11..14: persistent finality proof for non-mandatory header#9 - (TestSourceHeader(false, 9), TestFinalityProof(9)), + (TestSourceHeader(false, 9, 9), TestFinalityProof(9)), // after adding 11..14: ephemeral finality proof for non-mandatory header#14 - (TestSourceHeader(false, 14), TestFinalityProof(14)), + (TestSourceHeader(false, 14, 14), TestFinalityProof(14)), // after adding 15..17: persistent finality proof for non-mandatory header#16 - (TestSourceHeader(false, 16), TestFinalityProof(16)), + (TestSourceHeader(false, 16, 16), TestFinalityProof(16)), ], ); } @@ -291,11 +310,11 @@ fn run_only_mandatory_headers_mode_test( exit_sender, |_| false, vec![ - (6, (TestSourceHeader(false, 6), Some(TestFinalityProof(6)))), - (7, (TestSourceHeader(false, 7), Some(TestFinalityProof(7)))), - (8, (TestSourceHeader(has_mandatory_headers, 8), Some(TestFinalityProof(8)))), - (9, (TestSourceHeader(false, 9), Some(TestFinalityProof(9)))), - (10, (TestSourceHeader(false, 10), Some(TestFinalityProof(10)))), + (6, (TestSourceHeader(false, 6, 6), Some(TestFinalityProof(6)))), + (7, (TestSourceHeader(false, 7, 7), Some(TestFinalityProof(7)))), + (8, (TestSourceHeader(has_mandatory_headers, 8, 8), Some(TestFinalityProof(8)))), + (9, (TestSourceHeader(false, 9, 9), Some(TestFinalityProof(9)))), + (10, (TestSourceHeader(false, 10, 10), Some(TestFinalityProof(10)))), ] .into_iter() .collect(), @@ -322,7 +341,7 @@ fn select_header_to_submit_skips_non_mandatory_headers_when_only_mandatory_heade assert_eq!(run_only_mandatory_headers_mode_test(true, false), None); assert_eq!( run_only_mandatory_headers_mode_test(false, false), - Some((TestSourceHeader(false, 10), TestFinalityProof(10))), + Some((TestSourceHeader(false, 10, 10), TestFinalityProof(10))), ); } @@ -330,11 +349,11 @@ fn select_header_to_submit_skips_non_mandatory_headers_when_only_mandatory_heade fn select_header_to_submit_selects_mandatory_headers_when_only_mandatory_headers_are_required() { assert_eq!( run_only_mandatory_headers_mode_test(true, true), - Some((TestSourceHeader(true, 8), TestFinalityProof(8))), + Some((TestSourceHeader(true, 8, 8), TestFinalityProof(8))), ); assert_eq!( run_only_mandatory_headers_mode_test(false, true), - Some((TestSourceHeader(true, 8), TestFinalityProof(8))), + Some((TestSourceHeader(true, 8, 8), TestFinalityProof(8))), ); } @@ -345,63 +364,74 @@ fn select_better_recent_finality_proof_works() { select_better_recent_finality_proof::( &[(5, TestFinalityProof(5))], &mut vec![], - Some((TestSourceHeader(false, 2), TestFinalityProof(2))), + Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))), ), - Some((TestSourceHeader(false, 2), TestFinalityProof(2))), + Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))), ); // if there are no recent finality proofs, nothing is changed assert_eq!( select_better_recent_finality_proof::( &[], - &mut vec![TestSourceHeader(false, 5)], - Some((TestSourceHeader(false, 2), TestFinalityProof(2))), + &mut vec![TestSourceHeader(false, 5, 5)], + Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))), ), - Some((TestSourceHeader(false, 2), TestFinalityProof(2))), + Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))), ); // if there's no intersection between recent finality proofs and unjustified headers, nothing is // changed - let mut unjustified_headers = vec![TestSourceHeader(false, 9), TestSourceHeader(false, 10)]; + let mut unjustified_headers = + vec![TestSourceHeader(false, 9, 9), TestSourceHeader(false, 10, 10)]; assert_eq!( select_better_recent_finality_proof::( &[(1, TestFinalityProof(1)), (4, TestFinalityProof(4))], &mut unjustified_headers, - Some((TestSourceHeader(false, 2), TestFinalityProof(2))), + Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))), ), - Some((TestSourceHeader(false, 2), TestFinalityProof(2))), + Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))), ); // if there's intersection between recent finality proofs and unjustified headers, but there are // no proofs in this intersection, nothing is changed - let mut unjustified_headers = - vec![TestSourceHeader(false, 8), TestSourceHeader(false, 9), TestSourceHeader(false, 10)]; + let mut unjustified_headers = vec![ + TestSourceHeader(false, 8, 8), + TestSourceHeader(false, 9, 9), + TestSourceHeader(false, 10, 10), + ]; assert_eq!( select_better_recent_finality_proof::( &[(7, TestFinalityProof(7)), (11, TestFinalityProof(11))], &mut unjustified_headers, - Some((TestSourceHeader(false, 2), TestFinalityProof(2))), + Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))), ), - Some((TestSourceHeader(false, 2), TestFinalityProof(2))), + Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))), ); assert_eq!( unjustified_headers, - vec![TestSourceHeader(false, 8), TestSourceHeader(false, 9), TestSourceHeader(false, 10)] + vec![ + TestSourceHeader(false, 8, 8), + TestSourceHeader(false, 9, 9), + TestSourceHeader(false, 10, 10) + ] ); // if there's intersection between recent finality proofs and unjustified headers and there's // a proof in this intersection: // - this better (last from intersection) proof is selected; // - 'obsolete' unjustified headers are pruned. - let mut unjustified_headers = - vec![TestSourceHeader(false, 8), TestSourceHeader(false, 9), TestSourceHeader(false, 10)]; + let mut unjustified_headers = vec![ + TestSourceHeader(false, 8, 8), + TestSourceHeader(false, 9, 9), + TestSourceHeader(false, 10, 10), + ]; assert_eq!( select_better_recent_finality_proof::( &[(7, TestFinalityProof(7)), (9, TestFinalityProof(9))], &mut unjustified_headers, - Some((TestSourceHeader(false, 2), TestFinalityProof(2))), + Some((TestSourceHeader(false, 2, 2), TestFinalityProof(2))), ), - Some((TestSourceHeader(false, 9), TestFinalityProof(9))), + Some((TestSourceHeader(false, 9, 9), TestFinalityProof(9))), ); } @@ -475,3 +505,45 @@ fn prune_recent_finality_proofs_works() { prune_recent_finality_proofs::(20, &mut recent_finality_proofs, 2); assert_eq!(&original_recent_finality_proofs[5..], recent_finality_proofs,); } + +#[test] +fn different_forks_at_source_and_at_target_are_detected() { + let (exit_sender, _exit_receiver) = futures::channel::mpsc::unbounded(); + let (source_client, target_client) = prepare_test_clients( + exit_sender, + |_| false, + vec![ + (5, (TestSourceHeader(false, 5, 42), None)), + (6, (TestSourceHeader(false, 6, 6), None)), + (7, (TestSourceHeader(false, 7, 7), None)), + (8, (TestSourceHeader(false, 8, 8), None)), + (9, (TestSourceHeader(false, 9, 9), None)), + (10, (TestSourceHeader(false, 10, 10), None)), + ] + .into_iter() + .collect(), + ); + + let mut progress = (Instant::now(), None); + let mut finality_proofs_stream = RestartableFinalityProofsStream { + needs_restart: false, + stream: Box::pin(futures::stream::iter(vec![]).boxed()), + }; + let mut recent_finality_proofs = Vec::new(); + let metrics_sync = SyncLoopMetrics::new(None, "source", "target").unwrap(); + async_std::task::block_on(run_loop_iteration::( + &source_client, + &target_client, + FinalityLoopState { + progress: &mut progress, + finality_proofs_stream: &mut finality_proofs_stream, + recent_finality_proofs: &mut recent_finality_proofs, + last_transaction: None, + }, + &test_sync_params(), + &Some(metrics_sync.clone()), + )) + .unwrap(); + + assert!(!metrics_sync.is_using_same_fork()); +} diff --git a/bridges/relays/finality/src/lib.rs b/bridges/relays/finality/src/lib.rs index 6421d13b787..49be64ff74d 100644 --- a/bridges/relays/finality/src/lib.rs +++ b/bridges/relays/finality/src/lib.rs @@ -19,8 +19,9 @@ //! are still submitted to the target node, but are treated as auxiliary data as we are not trying //! to submit all source headers to the target node. -pub use crate::finality_loop::{ - metrics_prefix, run, FinalitySyncParams, SourceClient, TargetClient, +pub use crate::{ + finality_loop::{metrics_prefix, run, FinalitySyncParams, SourceClient, TargetClient}, + sync_loop_metrics::SyncLoopMetrics, }; use bp_header_chain::FinalityProof; @@ -42,13 +43,15 @@ pub trait FinalitySyncPipeline: 'static + Clone + Debug + Send + Sync { /// Headers we're syncing are identified by this number. type Number: relay_utils::BlockNumberBase; /// Type of header that we're syncing. - type Header: SourceHeader; + type Header: SourceHeader; /// Finality proof type. type FinalityProof: FinalityProof; } /// Header that we're receiving from source node. -pub trait SourceHeader: Clone + Debug + PartialEq + Send + Sync { +pub trait SourceHeader: Clone + Debug + PartialEq + Send + Sync { + /// Returns hash of header. + fn hash(&self) -> Hash; /// Returns number of header. fn number(&self) -> Number; /// Returns true if this header needs to be submitted to target node. diff --git a/bridges/relays/finality/src/sync_loop_metrics.rs b/bridges/relays/finality/src/sync_loop_metrics.rs index 1f65dac17c0..a003a47d890 100644 --- a/bridges/relays/finality/src/sync_loop_metrics.rs +++ b/bridges/relays/finality/src/sync_loop_metrics.rs @@ -16,49 +16,71 @@ //! Metrics for headers synchronization relay loop. -use relay_utils::metrics::{ - metric_name, register, GaugeVec, Metric, Opts, PrometheusError, Registry, U64, -}; +use relay_utils::metrics::{metric_name, register, IntGauge, Metric, PrometheusError, Registry}; /// Headers sync metrics. #[derive(Clone)] pub struct SyncLoopMetrics { - /// Best syncing headers at "source" and "target" nodes. - best_block_numbers: GaugeVec, + /// Best syncing header at the source. + best_source_block_number: IntGauge, + /// Best syncing header at the target. + best_target_block_number: IntGauge, + /// Flag that has `0` value when best source headers at the source node and at-target-chain + /// are matching and `1` otherwise. + using_different_forks: IntGauge, } impl SyncLoopMetrics { /// Create and register headers loop metrics. - pub fn new(prefix: Option<&str>) -> Result { + pub fn new( + prefix: Option<&str>, + at_source_chain_label: &str, + at_target_chain_label: &str, + ) -> Result { Ok(SyncLoopMetrics { - best_block_numbers: GaugeVec::new( - Opts::new( - metric_name(prefix, "best_block_numbers"), - "Best block numbers on source and target nodes", - ), - &["node"], + best_source_block_number: IntGauge::new( + metric_name(prefix, &format!("best_{}_block_number", at_source_chain_label)), + format!("Best block number at the {}", at_source_chain_label), + )?, + best_target_block_number: IntGauge::new( + metric_name(prefix, &format!("best_{}_block_number", at_target_chain_label)), + format!("Best block number at the {}", at_target_chain_label), + )?, + using_different_forks: IntGauge::new( + metric_name(prefix, &format!("is_{}_and_{}_using_different_forks", at_source_chain_label, at_target_chain_label)), + "Whether the best finalized source block at target node is different (value 1) from the \ + corresponding block at the source node", )?, }) } + /// Returns current value of the using-same-fork flag. + #[cfg(test)] + pub(crate) fn is_using_same_fork(&self) -> bool { + self.using_different_forks.get() == 0 + } + /// Update best block number at source. pub fn update_best_block_at_source>(&self, source_best_number: Number) { - self.best_block_numbers - .with_label_values(&["source"]) - .set(source_best_number.into()); + self.best_source_block_number.set(source_best_number.into()); } /// Update best block number at target. pub fn update_best_block_at_target>(&self, target_best_number: Number) { - self.best_block_numbers - .with_label_values(&["target"]) - .set(target_best_number.into()); + self.best_target_block_number.set(target_best_number.into()); + } + + /// Update using-same-fork flag. + pub fn update_using_same_fork(&self, using_same_fork: bool) { + self.using_different_forks.set(if using_same_fork { 0 } else { 1 }) } } impl Metric for SyncLoopMetrics { fn register(&self, registry: &Registry) -> Result<(), PrometheusError> { - register(self.best_block_numbers.clone(), registry)?; + register(self.best_source_block_number.clone(), registry)?; + register(self.best_target_block_number.clone(), registry)?; + register(self.using_different_forks.clone(), registry)?; Ok(()) } } diff --git a/bridges/relays/lib-substrate-relay/src/finality_target.rs b/bridges/relays/lib-substrate-relay/src/finality_target.rs index e7a7487ae2c..b7bc90cb4de 100644 --- a/bridges/relays/lib-substrate-relay/src/finality_target.rs +++ b/bridges/relays/lib-substrate-relay/src/finality_target.rs @@ -30,8 +30,8 @@ use bp_header_chain::{justification::GrandpaJustification, storage_keys::is_halt use codec::Encode; use finality_relay::TargetClient; use relay_substrate_client::{ - AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, ChainWithGrandpa, Client, Error, HashOf, - HeaderOf, SignParam, SyncHeader, TransactionEra, TransactionSignScheme, UnsignedTransaction, + AccountIdOf, AccountKeyPairOf, Chain, ChainWithGrandpa, Client, Error, HeaderIdOf, HeaderOf, + SignParam, SyncHeader, TransactionEra, TransactionSignScheme, UnsignedTransaction, }; use relay_utils::relay_loop::Client as RelayClient; use sp_core::{Bytes, Pair}; @@ -90,23 +90,20 @@ where AccountIdOf: From< as Pair>::Public>, P::TransactionSignScheme: TransactionSignScheme, { - async fn best_finalized_source_block_number( - &self, - ) -> Result, Error> { + async fn best_finalized_source_block_id(&self) -> Result, Error> { // we can't continue to relay finality if target node is out of sync, because // it may have already received (some of) headers that we're going to relay self.client.ensure_synced().await?; // we can't relay finality if GRANDPA pallet at target chain is halted self.ensure_pallet_active().await?; - Ok(crate::messages_source::read_client_state::< - P::TargetChain, - HashOf, - BlockNumberOf, - >(&self.client, P::SourceChain::BEST_FINALIZED_HEADER_ID_METHOD) + Ok(crate::messages_source::read_client_state::( + &self.client, + None, + P::SourceChain::BEST_FINALIZED_HEADER_ID_METHOD, + ) .await? - .best_finalized_peer_at_best_self - .0) + .best_finalized_peer_at_best_self) } async fn submit_finality_proof( diff --git a/bridges/relays/lib-substrate-relay/src/messages_lane.rs b/bridges/relays/lib-substrate-relay/src/messages_lane.rs index a88b9441cd1..2da434c8c7c 100644 --- a/bridges/relays/lib-substrate-relay/src/messages_lane.rs +++ b/bridges/relays/lib-substrate-relay/src/messages_lane.rs @@ -234,13 +234,15 @@ where }, }, SubstrateMessagesSource::

::new( - source_client, + source_client.clone(), + target_client.clone(), params.lane_id, params.source_transaction_params, params.target_to_source_headers_relay, ), SubstrateMessagesTarget::

::new( target_client, + source_client, params.lane_id, relayer_id_at_source, params.target_transaction_params, diff --git a/bridges/relays/lib-substrate-relay/src/messages_source.rs b/bridges/relays/lib-substrate-relay/src/messages_source.rs index 96e19beba25..39d15866267 100644 --- a/bridges/relays/lib-substrate-relay/src/messages_source.rs +++ b/bridges/relays/lib-substrate-relay/src/messages_source.rs @@ -46,7 +46,7 @@ use messages_relay::{ }; use num_traits::{Bounded, Zero}; use relay_substrate_client::{ - AccountIdOf, AccountKeyPairOf, BalanceOf, Chain, ChainWithMessages, Client, + AccountIdOf, AccountKeyPairOf, BalanceOf, BlockNumberOf, Chain, ChainWithMessages, Client, Error as SubstrateError, HashOf, HeaderIdOf, IndexOf, SignParam, TransactionEra, TransactionSignScheme, UnsignedTransaction, }; @@ -62,7 +62,8 @@ pub type SubstrateMessagesProof = (Weight, FromBridgedChainMessagesProof { - client: Client, + source_client: Client, + target_client: Client, lane_id: LaneId, transaction_params: TransactionParams>, target_to_source_headers_relay: Option>, @@ -71,13 +72,15 @@ pub struct SubstrateMessagesSource { impl SubstrateMessagesSource

{ /// Create new Substrate headers source. pub fn new( - client: Client, + source_client: Client, + target_client: Client, lane_id: LaneId, transaction_params: TransactionParams>, target_to_source_headers_relay: Option>, ) -> Self { SubstrateMessagesSource { - client, + source_client, + target_client, lane_id, transaction_params, target_to_source_headers_relay, @@ -89,7 +92,7 @@ impl SubstrateMessagesSource

{ &self, id: SourceHeaderIdOf>, ) -> Result, SubstrateError> { - self.client + self.source_client .storage_value( outbound_lane_data_key( P::TargetChain::WITH_CHAIN_MESSAGES_PALLET_NAME, @@ -102,14 +105,15 @@ impl SubstrateMessagesSource

{ /// Ensure that the messages pallet at source chain is active. async fn ensure_pallet_active(&self) -> Result<(), SubstrateError> { - ensure_messages_pallet_active::(&self.client).await + ensure_messages_pallet_active::(&self.source_client).await } } impl Clone for SubstrateMessagesSource

{ fn clone(&self) -> Self { Self { - client: self.client.clone(), + source_client: self.source_client.clone(), + target_client: self.target_client.clone(), lane_id: self.lane_id, transaction_params: self.transaction_params.clone(), target_to_source_headers_relay: self.target_to_source_headers_relay.clone(), @@ -122,7 +126,8 @@ impl RelayClient for SubstrateMessagesSource

{ type Error = SubstrateError; async fn reconnect(&mut self) -> Result<(), SubstrateError> { - self.client.reconnect().await + self.source_client.reconnect().await?; + self.target_client.reconnect().await } } @@ -136,15 +141,15 @@ where async fn state(&self) -> Result>, SubstrateError> { // we can't continue to deliver confirmations if source node is out of sync, because // it may have already received confirmations that we're going to deliver - self.client.ensure_synced().await?; + self.source_client.ensure_synced().await?; // we can't relay confirmations if messages pallet at source chain is halted self.ensure_pallet_active().await?; - read_client_state::< - _, - as MessageLane>::TargetHeaderHash, - as MessageLane>::TargetHeaderNumber, - >(&self.client, P::TargetChain::BEST_FINALIZED_HEADER_ID_METHOD) + read_client_state( + &self.source_client, + Some(&self.target_client), + P::TargetChain::BEST_FINALIZED_HEADER_ID_METHOD, + ) .await } @@ -183,7 +188,7 @@ where SubstrateError, > { let encoded_response = self - .client + .source_client .state_call( P::TargetChain::TO_CHAIN_MESSAGE_DETAILS_METHOD.into(), Bytes((self.lane_id, nonces.start(), nonces.end()).encode()), @@ -230,7 +235,12 @@ where )); } - let proof = self.client.prove_storage(storage_keys, id.1).await?.iter_nodes().collect(); + let proof = self + .source_client + .prove_storage(storage_keys, id.1) + .await? + .iter_nodes() + .collect(); let proof = FromBridgedChainMessagesProof { bridged_header_hash: id.1, storage_proof: proof, @@ -246,10 +256,11 @@ where _generated_at_block: TargetHeaderIdOf>, proof: as MessageLane>::MessagesReceivingProof, ) -> Result<(), SubstrateError> { - let genesis_hash = *self.client.genesis_hash(); + let genesis_hash = *self.source_client.genesis_hash(); let transaction_params = self.transaction_params.clone(); - let (spec_version, transaction_version) = self.client.simple_runtime_version().await?; - self.client + let (spec_version, transaction_version) = + self.source_client.simple_runtime_version().await?; + self.source_client .submit_signed_extrinsic( self.transaction_params.signer.public().into(), move |best_block_id, transaction_nonce| { @@ -278,7 +289,7 @@ where async fn estimate_confirmation_transaction( &self, ) -> as MessageLane>::SourceChainBalance { - let runtime_version = match self.client.runtime_version().await { + let runtime_version = match self.source_client.runtime_version().await { Ok(v) => v, Err(_) => return BalanceOf::::max_value(), }; @@ -286,14 +297,14 @@ where let dummy_tx = make_messages_delivery_proof_transaction::

( runtime_version.spec_version, runtime_version.transaction_version, - self.client.genesis_hash(), + self.source_client.genesis_hash(), &self.transaction_params, HeaderId(Default::default(), Default::default()), Zero::zero(), prepare_dummy_messages_delivery_proof::(), false, )?; - self.client + self.source_client .estimate_extrinsic_fee(dummy_tx) .await .map(|fee| fee.inclusion_fee()) @@ -385,19 +396,19 @@ fn prepare_dummy_messages_delivery_proof( /// This function assumes that the chain that is followed by the `self_client` has /// bridge GRANDPA pallet deployed and it provides `best_finalized_header_id_method_name` /// runtime API to read the best finalized Bridged chain header. -pub async fn read_client_state( +/// +/// If `peer_client` is `None`, the value of `actual_best_finalized_peer_at_best_self` will +/// always match the `best_finalized_peer_at_best_self`. +pub async fn read_client_state( self_client: &Client, + peer_client: Option<&Client>, best_finalized_header_id_method_name: &str, -) -> Result< - ClientState, HeaderId>, - SubstrateError, -> +) -> Result, HeaderIdOf>, SubstrateError> where SelfChain: Chain, SelfChain::Header: DeserializeOwned, SelfChain::Index: DeserializeOwned, - BridgedHeaderHash: Decode, - BridgedHeaderNumber: Decode, + PeerChain: Chain, { // let's read our state first: we need best finalized header hash on **this** chain let self_best_finalized_header_hash = self_client.best_finalized_header_hash().await?; @@ -419,16 +430,27 @@ where Some(self_best_hash), ) .await?; - let decoded_best_finalized_peer_on_self: (BridgedHeaderNumber, BridgedHeaderHash) = + let decoded_best_finalized_peer_on_self: (BlockNumberOf, HashOf) = Decode::decode(&mut &encoded_best_finalized_peer_on_self.0[..]) .map_err(SubstrateError::ResponseParseFailed)?; let peer_on_self_best_finalized_id = HeaderId(decoded_best_finalized_peer_on_self.0, decoded_best_finalized_peer_on_self.1); + // read actual header, matching the `peer_on_self_best_finalized_id` from the peer chain + let actual_peer_on_self_best_finalized_id = match peer_client { + Some(peer_client) => { + let actual_peer_on_self_best_finalized = + peer_client.header_by_number(peer_on_self_best_finalized_id.0).await?; + HeaderId(peer_on_self_best_finalized_id.0, actual_peer_on_self_best_finalized.hash()) + }, + None => peer_on_self_best_finalized_id.clone(), + }; + Ok(ClientState { best_self: self_best_id, best_finalized_self: self_best_finalized_id, best_finalized_peer_at_best_self: peer_on_self_best_finalized_id, + actual_best_finalized_peer_at_best_self: actual_peer_on_self_best_finalized_id, }) } diff --git a/bridges/relays/lib-substrate-relay/src/messages_target.rs b/bridges/relays/lib-substrate-relay/src/messages_target.rs index 72267cbc09a..0815559f215 100644 --- a/bridges/relays/lib-substrate-relay/src/messages_target.rs +++ b/bridges/relays/lib-substrate-relay/src/messages_target.rs @@ -57,7 +57,8 @@ pub type SubstrateMessagesDeliveryProof = /// Substrate client as Substrate messages target. pub struct SubstrateMessagesTarget { - client: Client, + target_client: Client, + source_client: Client, lane_id: LaneId, relayer_id_at_source: AccountIdOf, transaction_params: TransactionParams>, @@ -68,7 +69,8 @@ pub struct SubstrateMessagesTarget { impl SubstrateMessagesTarget

{ /// Create new Substrate headers target. pub fn new( - client: Client, + target_client: Client, + source_client: Client, lane_id: LaneId, relayer_id_at_source: AccountIdOf, transaction_params: TransactionParams>, @@ -76,7 +78,8 @@ impl SubstrateMessagesTarget

{ source_to_target_headers_relay: Option>, ) -> Self { SubstrateMessagesTarget { - client, + target_client, + source_client, lane_id, relayer_id_at_source, transaction_params, @@ -90,7 +93,7 @@ impl SubstrateMessagesTarget

{ &self, id: TargetHeaderIdOf>, ) -> Result>>, SubstrateError> { - self.client + self.target_client .storage_value( inbound_lane_data_key( P::SourceChain::WITH_CHAIN_MESSAGES_PALLET_NAME, @@ -103,14 +106,15 @@ impl SubstrateMessagesTarget

{ /// Ensure that the messages pallet at target chain is active. async fn ensure_pallet_active(&self) -> Result<(), SubstrateError> { - ensure_messages_pallet_active::(&self.client).await + ensure_messages_pallet_active::(&self.target_client).await } } impl Clone for SubstrateMessagesTarget

{ fn clone(&self) -> Self { Self { - client: self.client.clone(), + target_client: self.target_client.clone(), + source_client: self.source_client.clone(), lane_id: self.lane_id, relayer_id_at_source: self.relayer_id_at_source.clone(), transaction_params: self.transaction_params.clone(), @@ -125,7 +129,8 @@ impl RelayClient for SubstrateMessagesTarget

{ type Error = SubstrateError; async fn reconnect(&mut self) -> Result<(), SubstrateError> { - self.client.reconnect().await + self.target_client.reconnect().await?; + self.source_client.reconnect().await } } @@ -140,15 +145,15 @@ where async fn state(&self) -> Result>, SubstrateError> { // we can't continue to deliver messages if target node is out of sync, because // it may have already received (some of) messages that we're going to deliver - self.client.ensure_synced().await?; + self.target_client.ensure_synced().await?; // we can't relay messages if messages pallet at target chain is halted self.ensure_pallet_active().await?; - read_client_state::< - _, - as MessageLane>::SourceHeaderHash, - as MessageLane>::SourceHeaderNumber, - >(&self.client, P::SourceChain::BEST_FINALIZED_HEADER_ID_METHOD) + read_client_state( + &self.target_client, + Some(&self.source_client), + P::SourceChain::BEST_FINALIZED_HEADER_ID_METHOD, + ) .await } @@ -184,7 +189,7 @@ where ) -> Result<(TargetHeaderIdOf>, UnrewardedRelayersState), SubstrateError> { let encoded_response = self - .client + .target_client .state_call( P::SourceChain::FROM_CHAIN_UNREWARDED_RELAYERS_STATE.into(), Bytes(self.lane_id.encode()), @@ -213,7 +218,7 @@ where &self.lane_id, ); let proof = self - .client + .target_client .prove_storage(vec![inbound_data_key], id.1) .await? .iter_nodes() @@ -232,12 +237,13 @@ where nonces: RangeInclusive, proof: as MessageLane>::MessagesProof, ) -> Result, SubstrateError> { - let genesis_hash = *self.client.genesis_hash(); + let genesis_hash = *self.target_client.genesis_hash(); let transaction_params = self.transaction_params.clone(); let relayer_id_at_source = self.relayer_id_at_source.clone(); let nonces_clone = nonces.clone(); - let (spec_version, transaction_version) = self.client.simple_runtime_version().await?; - self.client + let (spec_version, transaction_version) = + self.target_client.simple_runtime_version().await?; + self.target_client .submit_signed_extrinsic( self.transaction_params.signer.public().into(), move |best_block_id, transaction_nonce| { @@ -281,12 +287,13 @@ where )) })?; - let (spec_version, transaction_version) = self.client.simple_runtime_version().await?; + let (spec_version, transaction_version) = + self.target_client.simple_runtime_version().await?; // Prepare 'dummy' delivery transaction - we only care about its length and dispatch weight. let delivery_tx = make_messages_delivery_transaction::

( spec_version, transaction_version, - self.client.genesis_hash(), + self.target_client.genesis_hash(), &self.transaction_params, HeaderId(Default::default(), Default::default()), Zero::zero(), @@ -299,7 +306,7 @@ where ), false, )?; - let delivery_tx_fee = self.client.estimate_extrinsic_fee(delivery_tx).await?; + let delivery_tx_fee = self.target_client.estimate_extrinsic_fee(delivery_tx).await?; let inclusion_fee_in_target_tokens = delivery_tx_fee.inclusion_fee(); // The pre-dispatch cost of delivery transaction includes additional fee to cover dispatch @@ -321,12 +328,13 @@ where let expected_refund_in_target_tokens = if total_prepaid_nonces != 0 { const WEIGHT_DIFFERENCE: Weight = 100; - let (spec_version, transaction_version) = self.client.simple_runtime_version().await?; + let (spec_version, transaction_version) = + self.target_client.simple_runtime_version().await?; let larger_dispatch_weight = total_dispatch_weight.saturating_add(WEIGHT_DIFFERENCE); let dummy_tx = make_messages_delivery_transaction::

( spec_version, transaction_version, - self.client.genesis_hash(), + self.target_client.genesis_hash(), &self.transaction_params, HeaderId(Default::default(), Default::default()), Zero::zero(), @@ -339,7 +347,8 @@ where ), false, )?; - let larger_delivery_tx_fee = self.client.estimate_extrinsic_fee(dummy_tx).await?; + let larger_delivery_tx_fee = + self.target_client.estimate_extrinsic_fee(dummy_tx).await?; compute_prepaid_messages_refund::( total_prepaid_nonces, diff --git a/bridges/relays/lib-substrate-relay/src/on_demand_headers.rs b/bridges/relays/lib-substrate-relay/src/on_demand_headers.rs index 9a6a062d7d8..c1401a28a6d 100644 --- a/bridges/relays/lib-substrate-relay/src/on_demand_headers.rs +++ b/bridges/relays/lib-substrate-relay/src/on_demand_headers.rs @@ -382,16 +382,20 @@ where From< as sp_core::Pair>::Public>, P::TransactionSignScheme: TransactionSignScheme, { - finality_target.best_finalized_source_block_number().await.map_err(|error| { - log::error!( - target: "bridge", - "Failed to read best finalized source header from target in {} relay: {:?}", - relay_task_name, - error, - ); + finality_target + .best_finalized_source_block_id() + .await + .map_err(|error| { + log::error!( + target: "bridge", + "Failed to read best finalized source header from target in {} relay: {:?}", + relay_task_name, + error, + ); - error - }) + error + }) + .map(|id| id.0) } /// Read first mandatory header in given inclusive range. diff --git a/bridges/relays/messages/Cargo.toml b/bridges/relays/messages/Cargo.toml index b11f00b957a..3b3551114a6 100644 --- a/bridges/relays/messages/Cargo.toml +++ b/bridges/relays/messages/Cargo.toml @@ -18,6 +18,7 @@ parking_lot = "0.11.0" bp-messages = { path = "../../primitives/messages" } bp-runtime = { path = "../../primitives/runtime" } +finality-relay = { path = "../finality" } relay-utils = { path = "../utils" } sp-arithmetic = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/bridges/relays/messages/src/message_lane_loop.rs b/bridges/relays/messages/src/message_lane_loop.rs index 6cdb2b1aa5a..c1778d5d11e 100644 --- a/bridges/relays/messages/src/message_lane_loop.rs +++ b/bridges/relays/messages/src/message_lane_loop.rs @@ -233,6 +233,9 @@ pub struct ClientState { /// Best finalized header id of the peer chain read at the best block of this chain (at /// `best_finalized_self`). pub best_finalized_peer_at_best_self: PeerHeaderId, + /// Header id of the peer chain with the number, matching the + /// `best_finalized_peer_at_best_self`. + pub actual_best_finalized_peer_at_best_self: PeerHeaderId, } /// State of source client in one-way message lane. @@ -843,12 +846,14 @@ pub(crate) mod tests { best_self: HeaderId(0, 0), best_finalized_self: HeaderId(0, 0), best_finalized_peer_at_best_self: HeaderId(0, 0), + actual_best_finalized_peer_at_best_self: HeaderId(0, 0), }, source_latest_generated_nonce: 1, target_state: ClientState { best_self: HeaderId(0, 0), best_finalized_self: HeaderId(0, 0), best_finalized_peer_at_best_self: HeaderId(0, 0), + actual_best_finalized_peer_at_best_self: HeaderId(0, 0), }, target_latest_received_nonce: 0, ..Default::default() @@ -888,12 +893,14 @@ pub(crate) mod tests { best_self: HeaderId(10, 10), best_finalized_self: HeaderId(10, 10), best_finalized_peer_at_best_self: HeaderId(0, 0), + actual_best_finalized_peer_at_best_self: HeaderId(0, 0), }, source_latest_generated_nonce: 10, target_state: ClientState { best_self: HeaderId(0, 0), best_finalized_self: HeaderId(0, 0), best_finalized_peer_at_best_self: HeaderId(0, 0), + actual_best_finalized_peer_at_best_self: HeaderId(0, 0), }, target_latest_received_nonce: 0, ..Default::default() diff --git a/bridges/relays/messages/src/metrics.rs b/bridges/relays/messages/src/metrics.rs index eac2f703692..4decb7e092e 100644 --- a/bridges/relays/messages/src/metrics.rs +++ b/bridges/relays/messages/src/metrics.rs @@ -22,6 +22,7 @@ use crate::{ }; use bp_messages::MessageNonce; +use finality_relay::SyncLoopMetrics; use relay_utils::metrics::{ metric_name, register, GaugeVec, Metric, Opts, PrometheusError, Registry, U64, }; @@ -31,8 +32,10 @@ use relay_utils::metrics::{ /// Cloning only clones references. #[derive(Clone)] pub struct MessageLaneLoopMetrics { + /// Best finalized block numbers - "source", "source_at_target", "target_at_source". + source_to_target_finality_metrics: SyncLoopMetrics, /// Best finalized block numbers - "source", "target", "source_at_target", "target_at_source". - best_block_numbers: GaugeVec, + target_to_source_finality_metrics: SyncLoopMetrics, /// Lane state nonces: "source_latest_generated", "source_latest_confirmed", /// "target_latest_received", "target_latest_confirmed". lane_state_nonces: GaugeVec, @@ -42,12 +45,15 @@ impl MessageLaneLoopMetrics { /// Create and register messages loop metrics. pub fn new(prefix: Option<&str>) -> Result { Ok(MessageLaneLoopMetrics { - best_block_numbers: GaugeVec::new( - Opts::new( - metric_name(prefix, "best_block_numbers"), - "Best finalized block numbers", - ), - &["type"], + source_to_target_finality_metrics: SyncLoopMetrics::new( + prefix, + "source", + "source_at_target", + )?, + target_to_source_finality_metrics: SyncLoopMetrics::new( + prefix, + "target", + "target_at_source", )?, lane_state_nonces: GaugeVec::new( Opts::new(metric_name(prefix, "lane_state_nonces"), "Nonces of the lane state"), @@ -58,22 +64,28 @@ impl MessageLaneLoopMetrics { /// Update source client state metrics. pub fn update_source_state(&self, source_client_state: SourceClientState

) { - self.best_block_numbers - .with_label_values(&["source"]) - .set(source_client_state.best_self.0.into()); - self.best_block_numbers - .with_label_values(&["target_at_source"]) - .set(source_client_state.best_finalized_peer_at_best_self.0.into()); + self.source_to_target_finality_metrics + .update_best_block_at_source(source_client_state.best_self.0.into()); + self.target_to_source_finality_metrics.update_best_block_at_target( + source_client_state.best_finalized_peer_at_best_self.0.into(), + ); + self.target_to_source_finality_metrics.update_using_same_fork( + source_client_state.best_finalized_peer_at_best_self.1 == + source_client_state.actual_best_finalized_peer_at_best_self.1, + ); } /// Update target client state metrics. pub fn update_target_state(&self, target_client_state: TargetClientState

) { - self.best_block_numbers - .with_label_values(&["target"]) - .set(target_client_state.best_self.0.into()); - self.best_block_numbers - .with_label_values(&["source_at_target"]) - .set(target_client_state.best_finalized_peer_at_best_self.0.into()); + self.target_to_source_finality_metrics + .update_best_block_at_source(target_client_state.best_self.0.into()); + self.source_to_target_finality_metrics.update_best_block_at_target( + target_client_state.best_finalized_peer_at_best_self.0.into(), + ); + self.source_to_target_finality_metrics.update_using_same_fork( + target_client_state.best_finalized_peer_at_best_self.1 == + target_client_state.actual_best_finalized_peer_at_best_self.1, + ); } /// Update latest generated nonce at source. @@ -119,7 +131,8 @@ impl MessageLaneLoopMetrics { impl Metric for MessageLaneLoopMetrics { fn register(&self, registry: &Registry) -> Result<(), PrometheusError> { - register(self.best_block_numbers.clone(), registry)?; + self.source_to_target_finality_metrics.register(registry)?; + self.target_to_source_finality_metrics.register(registry)?; register(self.lane_state_nonces.clone(), registry)?; Ok(()) } diff --git a/bridges/relays/utils/src/metrics.rs b/bridges/relays/utils/src/metrics.rs index 805fe70bfe8..084f72e7950 100644 --- a/bridges/relays/utils/src/metrics.rs +++ b/bridges/relays/utils/src/metrics.rs @@ -18,7 +18,7 @@ pub use float_json_value::FloatJsonValueMetric; pub use global::GlobalMetrics; pub use substrate_prometheus_endpoint::{ prometheus::core::{Atomic, Collector}, - register, Counter, CounterVec, Gauge, GaugeVec, Opts, PrometheusError, Registry, F64, U64, + register, Counter, CounterVec, Gauge, GaugeVec, Opts, PrometheusError, Registry, F64, I64, U64, }; use async_std::sync::{Arc, RwLock}; @@ -30,6 +30,8 @@ mod global; /// Shared reference to `f64` value that is updated by the metric. pub type F64SharedRef = Arc>>; +/// Int gauge metric type. +pub type IntGauge = Gauge; /// Unparsed address that needs to be used to expose Prometheus metrics. #[derive(Debug, Clone)]