diff --git a/bridges/relays/client-substrate/Cargo.toml b/bridges/relays/client-substrate/Cargo.toml index 6a1173581e7a..2eb07fdcde46 100644 --- a/bridges/relays/client-substrate/Cargo.toml +++ b/bridges/relays/client-substrate/Cargo.toml @@ -22,7 +22,6 @@ thiserror = "1.0.26" bp-header-chain = { path = "../../primitives/header-chain" } bp-runtime = { path = "../../primitives/runtime" } finality-relay = { path = "../finality" } -headers-relay = { path = "../headers" } relay-utils = { path = "../utils" } # Substrate Dependencies diff --git a/bridges/relays/client-substrate/src/headers_source.rs b/bridges/relays/client-substrate/src/headers_source.rs deleted file mode 100644 index e3839bf2c8ba..000000000000 --- a/bridges/relays/client-substrate/src/headers_source.rs +++ /dev/null @@ -1,108 +0,0 @@ -// Copyright 2019-2021 Parity Technologies (UK) Ltd. -// This file is part of Parity Bridges Common. - -// Parity Bridges Common is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity Bridges Common is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity Bridges Common. If not, see . - -//! Default generic implementation of headers source for basic Substrate client. - -use crate::{ - chain::{BlockWithJustification, Chain}, - client::Client, - error::Error, -}; - -use async_trait::async_trait; -use headers_relay::{ - sync_loop::SourceClient, - sync_types::{HeaderIdOf, HeadersSyncPipeline, QueuedHeader, SourceHeader}, -}; -use relay_utils::relay_loop::Client as RelayClient; -use sp_runtime::{traits::Header as HeaderT, EncodedJustification}; -use std::marker::PhantomData; - -/// Substrate node as headers source. -pub struct HeadersSource { - client: Client, - _phantom: PhantomData

, -} - -impl HeadersSource { - /// Create new headers source using given client. - pub fn new(client: Client) -> Self { - HeadersSource { client, _phantom: Default::default() } - } -} - -impl Clone for HeadersSource { - fn clone(&self) -> Self { - HeadersSource { client: self.client.clone(), _phantom: Default::default() } - } -} - -#[async_trait] -impl RelayClient for HeadersSource { - type Error = Error; - - async fn reconnect(&mut self) -> Result<(), Error> { - self.client.reconnect().await - } -} - -#[async_trait] -impl SourceClient

for HeadersSource -where - C: Chain, - C::BlockNumber: relay_utils::BlockNumberBase, - C::Header: Into, - P: HeadersSyncPipeline< - Extra = (), - Completion = EncodedJustification, - Hash = C::Hash, - Number = C::BlockNumber, - >, - P::Header: SourceHeader, -{ - async fn best_block_number(&self) -> Result { - // we **CAN** continue to relay headers if source node is out of sync, because - // target node may be missing headers that are already available at the source - Ok(*self.client.best_header().await?.number()) - } - - async fn header_by_hash(&self, hash: P::Hash) -> Result { - self.client.header_by_hash(hash).await.map(Into::into).map_err(Into::into) - } - - async fn header_by_number(&self, number: P::Number) -> Result { - self.client.header_by_number(number).await.map(Into::into).map_err(Into::into) - } - - async fn header_completion( - &self, - id: HeaderIdOf

, - ) -> Result<(HeaderIdOf

, Option), Error> { - let hash = id.1; - let signed_block = self.client.get_block(Some(hash)).await?; - let grandpa_justification = signed_block.justification().cloned(); - - Ok((id, grandpa_justification)) - } - - async fn header_extra( - &self, - id: HeaderIdOf

, - _header: QueuedHeader

, - ) -> Result<(HeaderIdOf

, ()), Error> { - Ok((id, ())) - } -} diff --git a/bridges/relays/client-substrate/src/lib.rs b/bridges/relays/client-substrate/src/lib.rs index 1f6606ea287c..51ddf852b9b6 100644 --- a/bridges/relays/client-substrate/src/lib.rs +++ b/bridges/relays/client-substrate/src/lib.rs @@ -26,7 +26,6 @@ mod sync_header; pub mod finality_source; pub mod guard; -pub mod headers_source; pub mod metrics; use std::time::Duration; diff --git a/bridges/relays/client-substrate/src/sync_header.rs b/bridges/relays/client-substrate/src/sync_header.rs index 0b74dee690f2..ed3de6289ce0 100644 --- a/bridges/relays/client-substrate/src/sync_header.rs +++ b/bridges/relays/client-substrate/src/sync_header.rs @@ -16,13 +16,10 @@ use bp_header_chain::find_grandpa_authorities_scheduled_change; use finality_relay::SourceHeader as FinalitySourceHeader; -use headers_relay::sync_types::SourceHeader; -use num_traits::{CheckedSub, One}; -use relay_utils::HeaderId; use sp_runtime::traits::Header as HeaderT; /// Generic wrapper for `sp_runtime::traits::Header` based headers, that -/// implements `headers_relay::sync_types::SourceHeader` and may be used in headers sync directly. +/// implements `finality_relay::SourceHeader` and may be used in headers sync directly. #[derive(Clone, Debug, PartialEq)] pub struct SyncHeader

(Header); @@ -47,21 +44,6 @@ impl
From
for SyncHeader
{ } } -impl SourceHeader for SyncHeader
{ - fn id(&self) -> HeaderId { - relay_utils::HeaderId(*self.0.number(), self.hash()) - } - - fn parent_id(&self) -> HeaderId { - relay_utils::HeaderId( - self.number() - .checked_sub(&One::one()) - .expect("should never be called for genesis header"), - *self.parent_hash(), - ) - } -} - impl FinalitySourceHeader for SyncHeader
{ fn number(&self) -> Header::Number { *self.0.number() diff --git a/bridges/relays/finality/Cargo.toml b/bridges/relays/finality/Cargo.toml index 944da9837ffc..645ac10775ba 100644 --- a/bridges/relays/finality/Cargo.toml +++ b/bridges/relays/finality/Cargo.toml @@ -12,7 +12,6 @@ async-trait = "0.1.40" backoff = "0.2" bp-header-chain = { path = "../../primitives/header-chain" } futures = "0.3.5" -headers-relay = { path = "../headers" } log = "0.4.11" num-traits = "0.2" relay-utils = { path = "../utils" } diff --git a/bridges/relays/finality/src/finality_loop.rs b/bridges/relays/finality/src/finality_loop.rs index 191d18383793..8fad11bf370c 100644 --- a/bridges/relays/finality/src/finality_loop.rs +++ b/bridges/relays/finality/src/finality_loop.rs @@ -19,12 +19,13 @@ //! is the mandatory headers, which we always submit to the target node. For such headers, we //! assume that the persistent proof either exists, or will eventually become available. -use crate::{FinalityProof, FinalitySyncPipeline, SourceHeader}; +use crate::{ + sync_loop_metrics::SyncLoopMetrics, FinalityProof, FinalitySyncPipeline, SourceHeader, +}; use async_trait::async_trait; use backoff::backoff::Backoff; use futures::{select, Future, FutureExt, Stream, StreamExt}; -use headers_relay::sync_loop_metrics::SyncLoopMetrics; use num_traits::{One, Saturating}; use relay_utils::{ metrics::{GlobalMetrics, MetricsParams}, diff --git a/bridges/relays/finality/src/lib.rs b/bridges/relays/finality/src/lib.rs index 78ef33f1b376..6421d13b787c 100644 --- a/bridges/relays/finality/src/lib.rs +++ b/bridges/relays/finality/src/lib.rs @@ -28,6 +28,7 @@ use std::fmt::Debug; mod finality_loop; mod finality_loop_tests; +mod sync_loop_metrics; /// Finality proofs synchronization pipeline. pub trait FinalitySyncPipeline: 'static + Clone + Debug + Send + Sync { diff --git a/bridges/relays/headers/src/sync_loop_metrics.rs b/bridges/relays/finality/src/sync_loop_metrics.rs similarity index 53% rename from bridges/relays/headers/src/sync_loop_metrics.rs rename to bridges/relays/finality/src/sync_loop_metrics.rs index 1c558c25de9d..1e4d910926ec 100644 --- a/bridges/relays/headers/src/sync_loop_metrics.rs +++ b/bridges/relays/finality/src/sync_loop_metrics.rs @@ -16,12 +16,6 @@ //! Metrics for headers synchronization relay loop. -use crate::{ - sync::HeadersSync, - sync_types::{HeaderStatus, HeadersSyncPipeline}, -}; - -use num_traits::Zero; use relay_utils::metrics::{metric_name, register, GaugeVec, Opts, PrometheusError, Registry, U64}; /// Headers sync metrics. @@ -29,8 +23,6 @@ use relay_utils::metrics::{metric_name, register, GaugeVec, Opts, PrometheusErro pub struct SyncLoopMetrics { /// Best syncing headers at "source" and "target" nodes. best_block_numbers: GaugeVec, - /// Number of headers in given states (see `HeaderStatus`). - blocks_in_state: GaugeVec, } impl SyncLoopMetrics { @@ -47,16 +39,6 @@ impl SyncLoopMetrics { )?, registry, )?, - blocks_in_state: register( - GaugeVec::new( - Opts::new( - metric_name(prefix, "blocks_in_state"), - "Number of blocks in given state", - ), - &["state"], - )?, - registry, - )?, }) } } @@ -75,37 +57,4 @@ impl SyncLoopMetrics { .with_label_values(&["target"]) .set(target_best_number.into()); } - - /// Update metrics. - pub fn update(&self, sync: &HeadersSync

) { - let headers = sync.headers(); - let source_best_number = sync.source_best_number().unwrap_or_else(Zero::zero); - let target_best_number = - sync.target_best_header().map(|id| id.0).unwrap_or_else(Zero::zero); - - self.update_best_block_at_source(source_best_number); - self.update_best_block_at_target(target_best_number); - - self.blocks_in_state - .with_label_values(&["maybe_orphan"]) - .set(headers.headers_in_status(HeaderStatus::MaybeOrphan) as _); - self.blocks_in_state - .with_label_values(&["orphan"]) - .set(headers.headers_in_status(HeaderStatus::Orphan) as _); - self.blocks_in_state - .with_label_values(&["maybe_extra"]) - .set(headers.headers_in_status(HeaderStatus::MaybeExtra) as _); - self.blocks_in_state - .with_label_values(&["extra"]) - .set(headers.headers_in_status(HeaderStatus::Extra) as _); - self.blocks_in_state - .with_label_values(&["ready"]) - .set(headers.headers_in_status(HeaderStatus::Ready) as _); - self.blocks_in_state - .with_label_values(&["incomplete"]) - .set(headers.headers_in_status(HeaderStatus::Incomplete) as _); - self.blocks_in_state - .with_label_values(&["submitted"]) - .set(headers.headers_in_status(HeaderStatus::Submitted) as _); - } } diff --git a/bridges/relays/headers/Cargo.toml b/bridges/relays/headers/Cargo.toml deleted file mode 100644 index 31d3166a9978..000000000000 --- a/bridges/relays/headers/Cargo.toml +++ /dev/null @@ -1,17 +0,0 @@ -[package] -name = "headers-relay" -version = "0.1.0" -authors = ["Parity Technologies "] -edition = "2018" -license = "GPL-3.0-or-later WITH Classpath-exception-2.0" - -[dependencies] -async-std = "1.6.5" -async-trait = "0.1.40" -backoff = "0.2" -futures = "0.3.5" -linked-hash-map = "0.5.3" -log = "0.4.11" -num-traits = "0.2" -parking_lot = "0.11.0" -relay-utils = { path = "../utils" } diff --git a/bridges/relays/headers/src/headers.rs b/bridges/relays/headers/src/headers.rs deleted file mode 100644 index 8d67c1cf4857..000000000000 --- a/bridges/relays/headers/src/headers.rs +++ /dev/null @@ -1,1703 +0,0 @@ -// Copyright 2019-2021 Parity Technologies (UK) Ltd. -// This file is part of Parity Bridges Common. - -// Parity Bridges Common is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity Bridges Common is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity Bridges Common. If not, see . - -//! Headers queue - the intermediate buffer that is filled when headers are read -//! from the source chain. Headers are removed from the queue once they become -//! known to the target chain. Inside, there are several sub-queues, where headers -//! may stay until source/target chain state isn't updated. When a header reaches the -//! `ready` sub-queue, it may be submitted to the target chain. - -use crate::sync_types::{ - HeaderIdOf, HeaderStatus, HeadersSyncPipeline, QueuedHeader, SourceHeader, -}; - -use linked_hash_map::LinkedHashMap; -use num_traits::{One, Zero}; -use relay_utils::HeaderId; -use std::{ - collections::{ - btree_map::Entry as BTreeMapEntry, hash_map::Entry as HashMapEntry, BTreeMap, HashMap, - HashSet, - }, - time::{Duration, Instant}, -}; - -type HeadersQueue

= BTreeMap< -

::Number, - HashMap<

::Hash, QueuedHeader

>, ->; -type SyncedChildren

= BTreeMap< -

::Number, - HashMap<

::Hash, HashSet>>, ->; -type KnownHeaders

= BTreeMap< -

::Number, - HashMap<

::Hash, HeaderStatus>, ->; - -/// We're trying to fetch completion data for single header at this interval. -const RETRY_FETCH_COMPLETION_INTERVAL: Duration = Duration::from_secs(20); - -/// Headers queue. -#[derive(Debug)] -pub struct QueuedHeaders { - /// Headers that are received from source node, but we (native sync code) have - /// never seen their parents. So we need to check if we can/should submit this header. - maybe_orphan: HeadersQueue

, - /// Headers that are received from source node, and we (native sync code) have - /// checked that Substrate runtime doesn't know their parents. So we need to submit parents - /// first. - orphan: HeadersQueue

, - /// Headers that are ready to be submitted to target node, but we need to check - /// whether submission requires extra data to be provided. - maybe_extra: HeadersQueue

, - /// Headers that are ready to be submitted to target node, but we need to retrieve - /// extra data first. - extra: HeadersQueue

, - /// Headers that are ready to be submitted to target node. - ready: HeadersQueue

, - /// Headers that are ready to be submitted to target node, but their ancestor is incomplete. - /// Thus we're waiting for these ancestors to be completed first. - /// Note that the incomplete header itself is synced and it isn't in this queue. - incomplete: HeadersQueue

, - /// Headers that are (we believe) currently submitted to target node by our, - /// not-yet mined transactions. - submitted: HeadersQueue

, - /// Synced headers children. We need it to support case when header is synced, but some of - /// its parents are incomplete. - synced_children: SyncedChildren

, - /// Pointers to all headers that we ever seen and we believe we can touch in the future. - known_headers: KnownHeaders

, - /// Headers that are waiting for completion data from source node. Mapped (and auto-sorted - /// by) to the last fetch time. - incomplete_headers: LinkedHashMap, Option>, - /// Headers that are waiting to be completed at target node. Auto-sorted by insertion time. - completion_data: LinkedHashMap, P::Completion>, - /// Best synced block number. - best_synced_number: P::Number, - /// Pruned blocks border. We do not store or accept any blocks with number less than - /// this number. - prune_border: P::Number, -} - -impl Default for QueuedHeaders

{ - fn default() -> Self { - QueuedHeaders { - maybe_orphan: HeadersQueue::new(), - orphan: HeadersQueue::new(), - maybe_extra: HeadersQueue::new(), - extra: HeadersQueue::new(), - ready: HeadersQueue::new(), - incomplete: HeadersQueue::new(), - submitted: HeadersQueue::new(), - synced_children: SyncedChildren::

::new(), - known_headers: KnownHeaders::

::new(), - incomplete_headers: LinkedHashMap::new(), - completion_data: LinkedHashMap::new(), - best_synced_number: Zero::zero(), - prune_border: Zero::zero(), - } - } -} - -impl QueuedHeaders

{ - /// Returns prune border. - #[cfg(test)] - pub fn prune_border(&self) -> P::Number { - self.prune_border - } - - /// Returns number of headers that are currently in given queue. - pub fn headers_in_status(&self, status: HeaderStatus) -> usize { - match status { - HeaderStatus::Unknown | HeaderStatus::Synced => 0, - HeaderStatus::MaybeOrphan => - self.maybe_orphan.values().fold(0, |total, headers| total + headers.len()), - HeaderStatus::Orphan => - self.orphan.values().fold(0, |total, headers| total + headers.len()), - HeaderStatus::MaybeExtra => - self.maybe_extra.values().fold(0, |total, headers| total + headers.len()), - HeaderStatus::Extra => - self.extra.values().fold(0, |total, headers| total + headers.len()), - HeaderStatus::Ready => - self.ready.values().fold(0, |total, headers| total + headers.len()), - HeaderStatus::Incomplete => - self.incomplete.values().fold(0, |total, headers| total + headers.len()), - HeaderStatus::Submitted => - self.submitted.values().fold(0, |total, headers| total + headers.len()), - } - } - - /// Returns number of headers that are currently in the queue. - pub fn total_headers(&self) -> usize { - self.maybe_orphan.values().fold(0, |total, headers| total + headers.len()) + - self.orphan.values().fold(0, |total, headers| total + headers.len()) + - self.maybe_extra.values().fold(0, |total, headers| total + headers.len()) + - self.extra.values().fold(0, |total, headers| total + headers.len()) + - self.ready.values().fold(0, |total, headers| total + headers.len()) + - self.incomplete.values().fold(0, |total, headers| total + headers.len()) - } - - /// Returns number of best block in the queue. - pub fn best_queued_number(&self) -> P::Number { - std::cmp::max( - self.maybe_orphan.keys().next_back().cloned().unwrap_or_else(Zero::zero), - std::cmp::max( - self.orphan.keys().next_back().cloned().unwrap_or_else(Zero::zero), - std::cmp::max( - self.maybe_extra.keys().next_back().cloned().unwrap_or_else(Zero::zero), - std::cmp::max( - self.extra.keys().next_back().cloned().unwrap_or_else(Zero::zero), - std::cmp::max( - self.ready.keys().next_back().cloned().unwrap_or_else(Zero::zero), - std::cmp::max( - self.incomplete - .keys() - .next_back() - .cloned() - .unwrap_or_else(Zero::zero), - self.submitted - .keys() - .next_back() - .cloned() - .unwrap_or_else(Zero::zero), - ), - ), - ), - ), - ), - ) - } - - /// Returns number of best synced block we have ever seen. It is either less - /// than `best_queued_number()`, or points to last synced block if queue is empty. - pub fn best_synced_number(&self) -> P::Number { - self.best_synced_number - } - - /// Returns synchronization status of the header. - pub fn status(&self, id: &HeaderIdOf

) -> HeaderStatus { - self.known_headers - .get(&id.0) - .and_then(|x| x.get(&id.1)) - .cloned() - .unwrap_or(HeaderStatus::Unknown) - } - - /// Get the oldest header from given queue. - pub fn header(&self, status: HeaderStatus) -> Option<&QueuedHeader

> { - match status { - HeaderStatus::Unknown | HeaderStatus::Synced => None, - HeaderStatus::MaybeOrphan => oldest_header(&self.maybe_orphan), - HeaderStatus::Orphan => oldest_header(&self.orphan), - HeaderStatus::MaybeExtra => oldest_header(&self.maybe_extra), - HeaderStatus::Extra => oldest_header(&self.extra), - HeaderStatus::Ready => oldest_header(&self.ready), - HeaderStatus::Incomplete => oldest_header(&self.incomplete), - HeaderStatus::Submitted => oldest_header(&self.submitted), - } - } - - /// Get the oldest headers from given queue until functor will return false. - pub fn headers( - &self, - status: HeaderStatus, - f: impl FnMut(&QueuedHeader

) -> bool, - ) -> Option>> { - match status { - HeaderStatus::Unknown | HeaderStatus::Synced => None, - HeaderStatus::MaybeOrphan => oldest_headers(&self.maybe_orphan, f), - HeaderStatus::Orphan => oldest_headers(&self.orphan, f), - HeaderStatus::MaybeExtra => oldest_headers(&self.maybe_extra, f), - HeaderStatus::Extra => oldest_headers(&self.extra, f), - HeaderStatus::Ready => oldest_headers(&self.ready, f), - HeaderStatus::Incomplete => oldest_headers(&self.incomplete, f), - HeaderStatus::Submitted => oldest_headers(&self.submitted, f), - } - } - - /// Appends new header, received from the source node, to the queue. - pub fn header_response(&mut self, header: P::Header) { - let id = header.id(); - let status = self.status(&id); - if status != HeaderStatus::Unknown { - log::debug!( - target: "bridge", - "Ignoring new {} header: {:?}. Status is {:?}.", - P::SOURCE_NAME, - id, - status, - ); - return - } - - if id.0 < self.prune_border { - log::debug!( - target: "bridge", - "Ignoring ancient new {} header: {:?}.", - P::SOURCE_NAME, - id, - ); - return - } - - let parent_id = header.parent_id(); - let parent_status = self.status(&parent_id); - let header = QueuedHeader::new(header); - - let status = match parent_status { - HeaderStatus::Unknown | HeaderStatus::MaybeOrphan => { - insert_header(&mut self.maybe_orphan, id, header); - HeaderStatus::MaybeOrphan - }, - HeaderStatus::Orphan => { - insert_header(&mut self.orphan, id, header); - HeaderStatus::Orphan - }, - HeaderStatus::MaybeExtra | - HeaderStatus::Extra | - HeaderStatus::Ready | - HeaderStatus::Incomplete | - HeaderStatus::Submitted | - HeaderStatus::Synced => { - insert_header(&mut self.maybe_extra, id, header); - HeaderStatus::MaybeExtra - }, - }; - - self.known_headers.entry(id.0).or_default().insert(id.1, status); - log::debug!( - target: "bridge", - "Queueing new {} header: {:?}. Queue: {:?}.", - P::SOURCE_NAME, - id, - status, - ); - } - - /// Receive the best header from the target node. - pub fn target_best_header_response(&mut self, id: &HeaderIdOf

) { - self.header_synced(id) - } - - /// Receive target node response for MaybeOrphan request. - pub fn maybe_orphan_response(&mut self, id: &HeaderIdOf

, response: bool) { - if !response { - move_header_descendants::

( - &mut [&mut self.maybe_orphan], - &mut self.orphan, - &mut self.known_headers, - HeaderStatus::Orphan, - id, - ); - return - } - - move_header_descendants::

( - &mut [&mut self.maybe_orphan, &mut self.orphan], - &mut self.maybe_extra, - &mut self.known_headers, - HeaderStatus::MaybeExtra, - id, - ); - } - - /// Receive target node response for MaybeExtra request. - pub fn maybe_extra_response(&mut self, id: &HeaderIdOf

, response: bool) { - let (destination_status, destination_queue) = if response { - (HeaderStatus::Extra, &mut self.extra) - } else if self.is_parent_incomplete(id) { - (HeaderStatus::Incomplete, &mut self.incomplete) - } else { - (HeaderStatus::Ready, &mut self.ready) - }; - - move_header( - &mut self.maybe_extra, - destination_queue, - &mut self.known_headers, - destination_status, - id, - |header| header, - ); - } - - /// Receive extra from source node. - pub fn extra_response(&mut self, id: &HeaderIdOf

, extra: P::Extra) { - let (destination_status, destination_queue) = if self.is_parent_incomplete(id) { - (HeaderStatus::Incomplete, &mut self.incomplete) - } else { - (HeaderStatus::Ready, &mut self.ready) - }; - - // move header itself from extra to ready queue - move_header( - &mut self.extra, - destination_queue, - &mut self.known_headers, - destination_status, - id, - |header| header.set_extra(extra), - ); - } - - /// Receive completion response from source node. - pub fn completion_response(&mut self, id: &HeaderIdOf

, completion: Option) { - let completion = match completion { - Some(completion) => completion, - None => { - log::debug!( - target: "bridge", - "{} Node is still missing completion data for header: {:?}. Will retry later.", - P::SOURCE_NAME, - id, - ); - - return - }, - }; - - // do not remove from `incomplete_headers` here, because otherwise we'll miss - // completion 'notification' - // this could lead to duplicate completion retrieval (if completion transaction isn't mined - // for too long) - // - // instead, we're moving entry to the end of the queue, so that completion data won't be - // refetched instantly - if self.incomplete_headers.remove(id).is_some() { - log::debug!( - target: "bridge", - "Received completion data from {} for header: {:?}", - P::SOURCE_NAME, - id, - ); - - self.completion_data.insert(*id, completion); - self.incomplete_headers.insert(*id, Some(Instant::now())); - } - } - - /// When header is submitted to target node. - pub fn headers_submitted(&mut self, ids: Vec>) { - for id in ids { - move_header( - &mut self.ready, - &mut self.submitted, - &mut self.known_headers, - HeaderStatus::Submitted, - &id, - |header| header, - ); - } - } - - /// When header completion data is sent to target node. - pub fn header_completed(&mut self, id: &HeaderIdOf

) { - if self.completion_data.remove(id).is_some() { - log::debug!( - target: "bridge", - "Sent completion data to {} for header: {:?}", - P::TARGET_NAME, - id, - ); - - // transaction can be dropped by target chain nodes => it would never be mined - // - // in current implementation the sync loop would wait for some time && if best - // **source** header won't change on **target** node, then the sync will be restarted - // => we'll resubmit the same completion data again (the same is true for submitted - // headers) - // - // the other option would be to track emitted transactions at least on target node, - // but it won't give us 100% guarantee anyway - // - // => we're just dropping completion data just after it has been submitted - } - } - - /// Marks given headers incomplete. - pub fn add_incomplete_headers( - &mut self, - make_header_incomplete: bool, - new_incomplete_headers: Vec>, - ) { - for new_incomplete_header in new_incomplete_headers { - if make_header_incomplete { - self.header_synced(&new_incomplete_header); - } - - let move_origins = - select_synced_children::

(&self.synced_children, &new_incomplete_header); - let move_origins = - move_origins.into_iter().chain(std::iter::once(new_incomplete_header)); - for move_origin in move_origins { - move_header_descendants::

( - &mut [&mut self.ready, &mut self.submitted], - &mut self.incomplete, - &mut self.known_headers, - HeaderStatus::Incomplete, - &move_origin, - ); - } - - if make_header_incomplete { - log::debug!( - target: "bridge", - "Scheduling completion data retrieval for header: {:?}", - new_incomplete_header, - ); - - self.incomplete_headers.insert(new_incomplete_header, None); - } - } - } - - /// When incomplete headers ids are received from target node. - pub fn incomplete_headers_response(&mut self, ids: HashSet>) { - // all new incomplete headers are marked Synced and all their descendants - // are moved from Ready/Submitted to Incomplete queue - let new_incomplete_headers = ids - .iter() - .filter(|id| { - !self.incomplete_headers.contains_key(id) && !self.completion_data.contains_key(id) - }) - .cloned() - .collect::>(); - self.add_incomplete_headers(true, new_incomplete_headers); - - // for all headers that were incompleted previously, but now are completed, we move - // all descendants from incomplete to ready - let just_completed_headers = self - .incomplete_headers - .keys() - .chain(self.completion_data.keys()) - .filter(|id| !ids.contains(id)) - .cloned() - .collect::>(); - for just_completed_header in just_completed_headers { - // sub2eth rejects H if H.Parent is incomplete - // sub2sub allows 'syncing' headers like that - // => let's check if there are some synced children of just completed header - let move_origins = - select_synced_children::

(&self.synced_children, &just_completed_header); - let move_origins = - move_origins.into_iter().chain(std::iter::once(just_completed_header)); - for move_origin in move_origins { - move_header_descendants::

( - &mut [&mut self.incomplete], - &mut self.ready, - &mut self.known_headers, - HeaderStatus::Ready, - &move_origin, - ); - } - - log::debug!( - target: "bridge", - "Completion data is no longer required for header: {:?}", - just_completed_header, - ); - - self.incomplete_headers.remove(&just_completed_header); - self.completion_data.remove(&just_completed_header); - } - } - - /// Returns true if given header requires completion data. - pub fn requires_completion_data(&self, id: &HeaderIdOf

) -> bool { - self.incomplete_headers.contains_key(id) - } - - /// Returns id of the header for which we want to fetch completion data. - pub fn incomplete_header(&mut self) -> Option> { - queued_incomplete_header(&mut self.incomplete_headers, |last_fetch_time| { - let retry = match *last_fetch_time { - Some(last_fetch_time) => - last_fetch_time.elapsed() > RETRY_FETCH_COMPLETION_INTERVAL, - None => true, - }; - - if retry { - *last_fetch_time = Some(Instant::now()); - } - - retry - }) - .map(|(id, _)| id) - } - - /// Returns header completion data to upload to target node. - pub fn header_to_complete(&mut self) -> Option<(HeaderIdOf

, &P::Completion)> { - queued_incomplete_header(&mut self.completion_data, |_| true) - } - - /// Prune and never accept headers before this block. - pub fn prune(&mut self, prune_border: P::Number) { - if prune_border <= self.prune_border { - return - } - - prune_queue(&mut self.maybe_orphan, prune_border); - prune_queue(&mut self.orphan, prune_border); - prune_queue(&mut self.maybe_extra, prune_border); - prune_queue(&mut self.extra, prune_border); - prune_queue(&mut self.ready, prune_border); - prune_queue(&mut self.submitted, prune_border); - prune_queue(&mut self.incomplete, prune_border); - self.synced_children = self.synced_children.split_off(&prune_border); - prune_known_headers::

(&mut self.known_headers, prune_border); - self.prune_border = prune_border; - } - - /// Forgets all ever known headers. - pub fn clear(&mut self) { - self.maybe_orphan.clear(); - self.orphan.clear(); - self.maybe_extra.clear(); - self.extra.clear(); - self.ready.clear(); - self.incomplete.clear(); - self.submitted.clear(); - self.synced_children.clear(); - self.known_headers.clear(); - self.best_synced_number = Zero::zero(); - self.prune_border = Zero::zero(); - } - - /// Returns true if parent of this header is either incomplete or waiting for - /// its own incomplete ancestor to be completed. - fn is_parent_incomplete(&self, id: &HeaderIdOf

) -> bool { - let status = self.status(id); - let header = match status { - HeaderStatus::MaybeOrphan => header(&self.maybe_orphan, id), - HeaderStatus::Orphan => header(&self.orphan, id), - HeaderStatus::MaybeExtra => header(&self.maybe_extra, id), - HeaderStatus::Extra => header(&self.extra, id), - HeaderStatus::Ready => header(&self.ready, id), - HeaderStatus::Incomplete => header(&self.incomplete, id), - HeaderStatus::Submitted => header(&self.submitted, id), - HeaderStatus::Unknown => return false, - HeaderStatus::Synced => return false, - }; - - match header { - Some(header) => { - let parent_id = header.header().parent_id(); - self.incomplete_headers.contains_key(&parent_id) || - self.completion_data.contains_key(&parent_id) || - self.status(&parent_id) == HeaderStatus::Incomplete - }, - None => false, - } - } - - /// When we receive new Synced header from target node. - fn header_synced(&mut self, id: &HeaderIdOf

) { - // update best synced block number - self.best_synced_number = std::cmp::max(self.best_synced_number, id.0); - - // all ancestors of this header are now synced => let's remove them from - // queues - let mut current = *id; - let mut id_processed = false; - let mut previous_current = None; - loop { - let header = match self.status(¤t) { - HeaderStatus::Unknown => break, - HeaderStatus::MaybeOrphan => remove_header(&mut self.maybe_orphan, ¤t), - HeaderStatus::Orphan => remove_header(&mut self.orphan, ¤t), - HeaderStatus::MaybeExtra => remove_header(&mut self.maybe_extra, ¤t), - HeaderStatus::Extra => remove_header(&mut self.extra, ¤t), - HeaderStatus::Ready => remove_header(&mut self.ready, ¤t), - HeaderStatus::Incomplete => remove_header(&mut self.incomplete, ¤t), - HeaderStatus::Submitted => remove_header(&mut self.submitted, ¤t), - HeaderStatus::Synced => break, - } - .expect("header has a given status; given queue has the header; qed"); - - // remember ids of all the children of the current header - let synced_children_entry = - self.synced_children.entry(current.0).or_default().entry(current.1).or_default(); - let all_queues = [ - &self.maybe_orphan, - &self.orphan, - &self.maybe_extra, - &self.extra, - &self.ready, - &self.incomplete, - &self.submitted, - ]; - for queue in &all_queues { - let children_from_queue = queue - .get(&(current.0 + One::one())) - .map(|potential_children| { - potential_children - .values() - .filter(|potential_child| { - potential_child.header().parent_id() == current - }) - .map(|child| child.id()) - .collect::>() - }) - .unwrap_or_default(); - synced_children_entry.extend(children_from_queue); - } - if let Some(previous_current) = previous_current { - synced_children_entry.insert(previous_current); - } - - set_header_status::

(&mut self.known_headers, ¤t, HeaderStatus::Synced); - - previous_current = Some(current); - current = header.parent_id(); - id_processed = true; - } - - // remember that the header itself is synced - // (condition is here to avoid duplicate log messages) - if !id_processed { - set_header_status::

(&mut self.known_headers, id, HeaderStatus::Synced); - } - - // now let's move all descendants from maybe_orphan && orphan queues to - // maybe_extra queue - move_header_descendants::

( - &mut [&mut self.maybe_orphan, &mut self.orphan], - &mut self.maybe_extra, - &mut self.known_headers, - HeaderStatus::MaybeExtra, - id, - ); - } -} - -/// Insert header to the queue. -fn insert_header( - queue: &mut HeadersQueue

, - id: HeaderIdOf

, - header: QueuedHeader

, -) { - queue.entry(id.0).or_default().insert(id.1, header); -} - -/// Remove header from the queue. -fn remove_header( - queue: &mut HeadersQueue

, - id: &HeaderIdOf

, -) -> Option> { - let mut headers_at = match queue.entry(id.0) { - BTreeMapEntry::Occupied(headers_at) => headers_at, - BTreeMapEntry::Vacant(_) => return None, - }; - - let header = headers_at.get_mut().remove(&id.1); - if headers_at.get().is_empty() { - headers_at.remove(); - } - header -} - -/// Get header from the queue. -fn header<'a, P: HeadersSyncPipeline>( - queue: &'a HeadersQueue

, - id: &HeaderIdOf

, -) -> Option<&'a QueuedHeader

> { - queue.get(&id.0).and_then(|by_hash| by_hash.get(&id.1)) -} - -/// Move header from source to destination queue. -/// -/// Returns ID of parent header, if header has been moved, or None otherwise. -fn move_header( - source_queue: &mut HeadersQueue

, - destination_queue: &mut HeadersQueue

, - known_headers: &mut KnownHeaders

, - destination_status: HeaderStatus, - id: &HeaderIdOf

, - prepare: impl FnOnce(QueuedHeader

) -> QueuedHeader

, -) -> Option> { - let header = match remove_header(source_queue, id) { - Some(header) => prepare(header), - None => return None, - }; - - let parent_id = header.header().parent_id(); - destination_queue.entry(id.0).or_default().insert(id.1, header); - set_header_status::

(known_headers, id, destination_status); - - Some(parent_id) -} - -/// Move all descendant headers from the source to destination queue. -fn move_header_descendants( - source_queues: &mut [&mut HeadersQueue

], - destination_queue: &mut HeadersQueue

, - known_headers: &mut KnownHeaders

, - destination_status: HeaderStatus, - id: &HeaderIdOf

, -) { - let mut current_number = id.0 + One::one(); - let mut current_parents = HashSet::new(); - current_parents.insert(id.1); - - while !current_parents.is_empty() { - let mut next_parents = HashSet::new(); - for source_queue in source_queues.iter_mut() { - let mut source_entry = match source_queue.entry(current_number) { - BTreeMapEntry::Occupied(source_entry) => source_entry, - BTreeMapEntry::Vacant(_) => continue, - }; - - let mut headers_to_move = Vec::new(); - let children_at_number = source_entry.get().keys().cloned().collect::>(); - for key in children_at_number { - let entry = match source_entry.get_mut().entry(key) { - HashMapEntry::Occupied(entry) => entry, - HashMapEntry::Vacant(_) => unreachable!("iterating existing keys; qed"), - }; - - if current_parents.contains(&entry.get().header().parent_id().1) { - let header_to_move = entry.remove(); - let header_to_move_id = header_to_move.id(); - headers_to_move.push((header_to_move_id, header_to_move)); - set_header_status::

(known_headers, &header_to_move_id, destination_status); - } - } - - if source_entry.get().is_empty() { - source_entry.remove(); - } - - next_parents.extend(headers_to_move.iter().map(|(id, _)| id.1)); - - destination_queue - .entry(current_number) - .or_default() - .extend(headers_to_move.into_iter().map(|(id, h)| (id.1, h))) - } - - current_number = current_number + One::one(); - std::mem::swap(&mut current_parents, &mut next_parents); - } -} - -/// Selects (recursive) all synced children of given header. -fn select_synced_children( - synced_children: &SyncedChildren

, - id: &HeaderIdOf

, -) -> Vec> { - let mut result = Vec::new(); - let mut current_parents = HashSet::new(); - current_parents.insert(*id); - - while !current_parents.is_empty() { - let mut next_parents = HashSet::new(); - for current_parent in ¤t_parents { - let current_parent_synced_children = synced_children - .get(¤t_parent.0) - .and_then(|by_number_entry| by_number_entry.get(¤t_parent.1)); - if let Some(current_parent_synced_children) = current_parent_synced_children { - for current_parent_synced_child in current_parent_synced_children { - result.push(*current_parent_synced_child); - next_parents.insert(*current_parent_synced_child); - } - } - } - - let _ = std::mem::replace(&mut current_parents, next_parents); - } - - result -} - -/// Return oldest header from the queue. -fn oldest_header(queue: &HeadersQueue

) -> Option<&QueuedHeader

> { - queue.values().flat_map(|h| h.values()).next() -} - -/// Return oldest headers from the queue until functor will return false. -fn oldest_headers( - queue: &HeadersQueue

, - mut f: impl FnMut(&QueuedHeader

) -> bool, -) -> Option>> { - let result = queue.values().flat_map(|h| h.values()).take_while(|h| f(h)).collect::>(); - if result.is_empty() { - None - } else { - Some(result) - } -} - -/// Forget all headers with number less than given. -fn prune_queue(queue: &mut HeadersQueue

, prune_border: P::Number) { - *queue = queue.split_off(&prune_border); -} - -/// Forget all known headers with number less than given. -fn prune_known_headers( - known_headers: &mut KnownHeaders

, - prune_border: P::Number, -) { - let new_known_headers = known_headers.split_off(&prune_border); - for (pruned_number, pruned_headers) in &*known_headers { - for pruned_hash in pruned_headers.keys() { - log::debug!(target: "bridge", "Pruning header {:?}.", HeaderId(*pruned_number, *pruned_hash)); - } - } - *known_headers = new_known_headers; -} - -/// Change header status. -fn set_header_status( - known_headers: &mut KnownHeaders

, - id: &HeaderIdOf

, - status: HeaderStatus, -) { - log::debug!( - target: "bridge", - "{} header {:?} is now {:?}", - P::SOURCE_NAME, - id, - status, - ); - *known_headers.entry(id.0).or_default().entry(id.1).or_insert(status) = status; -} - -/// Returns queued incomplete header with maximal elapsed time since last update. -fn queued_incomplete_header( - map: &mut LinkedHashMap, - filter: impl FnMut(&mut T) -> bool, -) -> Option<(Id, &T)> { - // TODO (#84): headers that have been just appended to the end of the queue would have to wait - // until all previous headers will be retried - - let retry_old_header = map - .front() - .map(|(key, _)| key.clone()) - .and_then(|key| map.get_mut(&key).map(filter)) - .unwrap_or(false); - if retry_old_header { - let (header_key, header) = - map.pop_front().expect("we have checked that front() exists; qed"); - map.insert(header_key, header); - return map.back().map(|(id, data)| (id.clone(), data)) - } - - None -} - -#[cfg(test)] -pub(crate) mod tests { - use super::*; - use crate::{ - sync_loop_tests::{ - TestHash, TestHeader, TestHeaderId, TestHeadersSyncPipeline, TestNumber, - }, - sync_types::QueuedHeader, - }; - - pub(crate) fn header(number: TestNumber) -> QueuedHeader { - QueuedHeader::new(TestHeader { number, hash: hash(number), parent_hash: hash(number - 1) }) - } - - pub(crate) fn hash(number: TestNumber) -> TestHash { - number - } - - pub(crate) fn id(number: TestNumber) -> TestHeaderId { - HeaderId(number, hash(number)) - } - - #[test] - fn total_headers_works() { - // total headers just sums up number of headers in every queue - let mut queue = QueuedHeaders::::default(); - queue - .maybe_orphan - .entry(1) - .or_default() - .insert(hash(1), QueuedHeader::::new(Default::default())); - queue - .maybe_orphan - .entry(1) - .or_default() - .insert(hash(2), QueuedHeader::::new(Default::default())); - queue - .maybe_orphan - .entry(2) - .or_default() - .insert(hash(3), QueuedHeader::::new(Default::default())); - queue - .orphan - .entry(3) - .or_default() - .insert(hash(4), QueuedHeader::::new(Default::default())); - queue - .maybe_extra - .entry(4) - .or_default() - .insert(hash(5), QueuedHeader::::new(Default::default())); - queue - .ready - .entry(5) - .or_default() - .insert(hash(6), QueuedHeader::::new(Default::default())); - queue - .incomplete - .entry(6) - .or_default() - .insert(hash(7), QueuedHeader::::new(Default::default())); - assert_eq!(queue.total_headers(), 7); - } - - #[test] - fn best_queued_number_works() { - // initially there are headers in MaybeOrphan queue only - let mut queue = QueuedHeaders::::default(); - queue - .maybe_orphan - .entry(1) - .or_default() - .insert(hash(1), QueuedHeader::::new(Default::default())); - queue - .maybe_orphan - .entry(1) - .or_default() - .insert(hash(2), QueuedHeader::::new(Default::default())); - queue - .maybe_orphan - .entry(3) - .or_default() - .insert(hash(3), QueuedHeader::::new(Default::default())); - assert_eq!(queue.best_queued_number(), 3); - // and then there's better header in Orphan - queue - .orphan - .entry(10) - .or_default() - .insert(hash(10), QueuedHeader::::new(Default::default())); - assert_eq!(queue.best_queued_number(), 10); - // and then there's better header in MaybeExtra - queue - .maybe_extra - .entry(20) - .or_default() - .insert(hash(20), QueuedHeader::::new(Default::default())); - assert_eq!(queue.best_queued_number(), 20); - // and then there's better header in Ready - queue - .ready - .entry(30) - .or_default() - .insert(hash(30), QueuedHeader::::new(Default::default())); - assert_eq!(queue.best_queued_number(), 30); - // and then there's better header in MaybeOrphan again - queue - .maybe_orphan - .entry(40) - .or_default() - .insert(hash(40), QueuedHeader::::new(Default::default())); - assert_eq!(queue.best_queued_number(), 40); - // and then there's some header in Incomplete - queue - .incomplete - .entry(50) - .or_default() - .insert(hash(50), QueuedHeader::::new(Default::default())); - assert_eq!(queue.best_queued_number(), 50); - } - - #[test] - fn status_works() { - // all headers are unknown initially - let mut queue = QueuedHeaders::::default(); - assert_eq!(queue.status(&id(10)), HeaderStatus::Unknown); - // and status is read from the KnownHeaders - queue.known_headers.entry(10).or_default().insert(hash(10), HeaderStatus::Ready); - assert_eq!(queue.status(&id(10)), HeaderStatus::Ready); - } - - #[test] - fn header_works() { - // initially we have oldest header #10 - let mut queue = QueuedHeaders::::default(); - queue.maybe_orphan.entry(10).or_default().insert(hash(1), header(100)); - assert_eq!(queue.header(HeaderStatus::MaybeOrphan).unwrap().header().hash, hash(100)); - // inserting #20 changes nothing - queue.maybe_orphan.entry(20).or_default().insert(hash(1), header(101)); - assert_eq!(queue.header(HeaderStatus::MaybeOrphan).unwrap().header().hash, hash(100)); - // inserting #5 makes it oldest - queue.maybe_orphan.entry(5).or_default().insert(hash(1), header(102)); - assert_eq!(queue.header(HeaderStatus::MaybeOrphan).unwrap().header().hash, hash(102)); - } - - #[test] - fn header_response_works() { - // when parent is Synced, we insert to MaybeExtra - let mut queue = QueuedHeaders::::default(); - queue - .known_headers - .entry(100) - .or_default() - .insert(hash(100), HeaderStatus::Synced); - queue.header_response(header(101).header().clone()); - assert_eq!(queue.status(&id(101)), HeaderStatus::MaybeExtra); - - // when parent is Ready, we insert to MaybeExtra - let mut queue = QueuedHeaders::::default(); - queue - .known_headers - .entry(100) - .or_default() - .insert(hash(100), HeaderStatus::Ready); - queue.header_response(header(101).header().clone()); - assert_eq!(queue.status(&id(101)), HeaderStatus::MaybeExtra); - - // when parent is Receipts, we insert to MaybeExtra - let mut queue = QueuedHeaders::::default(); - queue - .known_headers - .entry(100) - .or_default() - .insert(hash(100), HeaderStatus::Extra); - queue.header_response(header(101).header().clone()); - assert_eq!(queue.status(&id(101)), HeaderStatus::MaybeExtra); - - // when parent is MaybeExtra, we insert to MaybeExtra - let mut queue = QueuedHeaders::::default(); - queue - .known_headers - .entry(100) - .or_default() - .insert(hash(100), HeaderStatus::MaybeExtra); - queue.header_response(header(101).header().clone()); - assert_eq!(queue.status(&id(101)), HeaderStatus::MaybeExtra); - - // when parent is Orphan, we insert to Orphan - let mut queue = QueuedHeaders::::default(); - queue - .known_headers - .entry(100) - .or_default() - .insert(hash(100), HeaderStatus::Orphan); - queue.header_response(header(101).header().clone()); - assert_eq!(queue.status(&id(101)), HeaderStatus::Orphan); - - // when parent is MaybeOrphan, we insert to MaybeOrphan - let mut queue = QueuedHeaders::::default(); - queue - .known_headers - .entry(100) - .or_default() - .insert(hash(100), HeaderStatus::MaybeOrphan); - queue.header_response(header(101).header().clone()); - assert_eq!(queue.status(&id(101)), HeaderStatus::MaybeOrphan); - - // when parent is unknown, we insert to MaybeOrphan - let mut queue = QueuedHeaders::::default(); - queue.header_response(header(101).header().clone()); - assert_eq!(queue.status(&id(101)), HeaderStatus::MaybeOrphan); - } - - #[test] - fn ancestors_are_synced_on_substrate_best_header_response() { - // let's say someone else has submitted transaction to bridge that changes - // its best block to #100. At this time we have: - // #100 in MaybeOrphan - // #99 in Orphan - // #98 in MaybeExtra - // #97 in Receipts - // #96 in Ready - let mut queue = QueuedHeaders::::default(); - queue - .known_headers - .entry(100) - .or_default() - .insert(hash(100), HeaderStatus::MaybeOrphan); - queue.maybe_orphan.entry(100).or_default().insert(hash(100), header(100)); - queue - .known_headers - .entry(99) - .or_default() - .insert(hash(99), HeaderStatus::Orphan); - queue.orphan.entry(99).or_default().insert(hash(99), header(99)); - queue - .known_headers - .entry(98) - .or_default() - .insert(hash(98), HeaderStatus::MaybeExtra); - queue.maybe_extra.entry(98).or_default().insert(hash(98), header(98)); - queue.known_headers.entry(97).or_default().insert(hash(97), HeaderStatus::Extra); - queue.extra.entry(97).or_default().insert(hash(97), header(97)); - queue.known_headers.entry(96).or_default().insert(hash(96), HeaderStatus::Ready); - queue.ready.entry(96).or_default().insert(hash(96), header(96)); - queue.target_best_header_response(&id(100)); - - // then the #100 and all ancestors of #100 (#96..#99) are treated as synced - assert!(queue.maybe_orphan.is_empty()); - assert!(queue.orphan.is_empty()); - assert!(queue.maybe_extra.is_empty()); - assert!(queue.extra.is_empty()); - assert!(queue.ready.is_empty()); - assert_eq!(queue.known_headers.len(), 5); - assert!(queue - .known_headers - .values() - .all(|s| s.values().all(|s| *s == HeaderStatus::Synced))); - - // children of synced headers are stored - assert_eq!( - vec![id(97)], - queue.synced_children[&96][&hash(96)].iter().cloned().collect::>() - ); - assert_eq!( - vec![id(98)], - queue.synced_children[&97][&hash(97)].iter().cloned().collect::>() - ); - assert_eq!( - vec![id(99)], - queue.synced_children[&98][&hash(98)].iter().cloned().collect::>() - ); - assert_eq!( - vec![id(100)], - queue.synced_children[&99][&hash(99)].iter().cloned().collect::>() - ); - assert_eq!(0, queue.synced_children[&100][&hash(100)].len()); - } - - #[test] - fn descendants_are_moved_on_substrate_best_header_response() { - // let's say someone else has submitted transaction to bridge that changes - // its best block to #100. At this time we have: - // #101 in Orphan - // #102 in MaybeOrphan - // #103 in Orphan - let mut queue = QueuedHeaders::::default(); - queue - .known_headers - .entry(101) - .or_default() - .insert(hash(101), HeaderStatus::Orphan); - queue.orphan.entry(101).or_default().insert(hash(101), header(101)); - queue - .known_headers - .entry(102) - .or_default() - .insert(hash(102), HeaderStatus::MaybeOrphan); - queue.maybe_orphan.entry(102).or_default().insert(hash(102), header(102)); - queue - .known_headers - .entry(103) - .or_default() - .insert(hash(103), HeaderStatus::Orphan); - queue.orphan.entry(103).or_default().insert(hash(103), header(103)); - queue.target_best_header_response(&id(100)); - - // all descendants are moved to MaybeExtra - assert!(queue.maybe_orphan.is_empty()); - assert!(queue.orphan.is_empty()); - assert_eq!(queue.maybe_extra.len(), 3); - assert_eq!(queue.known_headers[&101][&hash(101)], HeaderStatus::MaybeExtra); - assert_eq!(queue.known_headers[&102][&hash(102)], HeaderStatus::MaybeExtra); - assert_eq!(queue.known_headers[&103][&hash(103)], HeaderStatus::MaybeExtra); - } - - #[test] - fn positive_maybe_orphan_response_works() { - // let's say we have: - // #100 in MaybeOrphan - // #101 in Orphan - // #102 in MaybeOrphan - // and we have asked for MaybeOrphan status of #100.parent (i.e. #99) - // and the response is: YES, #99 is known to the Substrate runtime - let mut queue = QueuedHeaders::::default(); - queue - .known_headers - .entry(100) - .or_default() - .insert(hash(100), HeaderStatus::MaybeOrphan); - queue.maybe_orphan.entry(100).or_default().insert(hash(100), header(100)); - queue - .known_headers - .entry(101) - .or_default() - .insert(hash(101), HeaderStatus::Orphan); - queue.orphan.entry(101).or_default().insert(hash(101), header(101)); - queue - .known_headers - .entry(102) - .or_default() - .insert(hash(102), HeaderStatus::MaybeOrphan); - queue.maybe_orphan.entry(102).or_default().insert(hash(102), header(102)); - queue.maybe_orphan_response(&id(99), true); - - // then all headers (#100..#103) are moved to the MaybeExtra queue - assert!(queue.orphan.is_empty()); - assert!(queue.maybe_orphan.is_empty()); - assert_eq!(queue.maybe_extra.len(), 3); - assert_eq!(queue.known_headers[&100][&hash(100)], HeaderStatus::MaybeExtra); - assert_eq!(queue.known_headers[&101][&hash(101)], HeaderStatus::MaybeExtra); - assert_eq!(queue.known_headers[&102][&hash(102)], HeaderStatus::MaybeExtra); - } - - #[test] - fn negative_maybe_orphan_response_works() { - // let's say we have: - // #100 in MaybeOrphan - // #101 in MaybeOrphan - // and we have asked for MaybeOrphan status of #100.parent (i.e. #99) - // and the response is: NO, #99 is NOT known to the Substrate runtime - let mut queue = QueuedHeaders::::default(); - queue - .known_headers - .entry(100) - .or_default() - .insert(hash(100), HeaderStatus::MaybeOrphan); - queue.maybe_orphan.entry(100).or_default().insert(hash(100), header(100)); - queue - .known_headers - .entry(101) - .or_default() - .insert(hash(101), HeaderStatus::MaybeOrphan); - queue.maybe_orphan.entry(101).or_default().insert(hash(101), header(101)); - queue.maybe_orphan_response(&id(99), false); - - // then all headers (#100..#101) are moved to the Orphan queue - assert!(queue.maybe_orphan.is_empty()); - assert_eq!(queue.orphan.len(), 2); - assert_eq!(queue.known_headers[&100][&hash(100)], HeaderStatus::Orphan); - assert_eq!(queue.known_headers[&101][&hash(101)], HeaderStatus::Orphan); - } - - #[test] - fn positive_maybe_extra_response_works() { - let mut queue = QueuedHeaders::::default(); - queue - .known_headers - .entry(100) - .or_default() - .insert(hash(100), HeaderStatus::MaybeExtra); - queue.maybe_extra.entry(100).or_default().insert(hash(100), header(100)); - queue.maybe_extra_response(&id(100), true); - assert!(queue.maybe_extra.is_empty()); - assert_eq!(queue.extra.len(), 1); - assert_eq!(queue.known_headers[&100][&hash(100)], HeaderStatus::Extra); - } - - #[test] - fn negative_maybe_extra_response_works() { - // when parent header is complete - let mut queue = QueuedHeaders::::default(); - queue - .known_headers - .entry(100) - .or_default() - .insert(hash(100), HeaderStatus::MaybeExtra); - queue.maybe_extra.entry(100).or_default().insert(hash(100), header(100)); - queue.maybe_extra_response(&id(100), false); - assert!(queue.maybe_extra.is_empty()); - assert_eq!(queue.ready.len(), 1); - assert_eq!(queue.known_headers[&100][&hash(100)], HeaderStatus::Ready); - - // when parent header is incomplete - queue.incomplete_headers.insert(id(200), None); - queue - .known_headers - .entry(201) - .or_default() - .insert(hash(201), HeaderStatus::MaybeExtra); - queue.maybe_extra.entry(201).or_default().insert(hash(201), header(201)); - queue.maybe_extra_response(&id(201), false); - assert!(queue.maybe_extra.is_empty()); - assert_eq!(queue.incomplete.len(), 1); - assert_eq!(queue.known_headers[&201][&hash(201)], HeaderStatus::Incomplete); - } - - #[test] - fn receipts_response_works() { - // when parent header is complete - let mut queue = QueuedHeaders::::default(); - queue - .known_headers - .entry(100) - .or_default() - .insert(hash(100), HeaderStatus::Extra); - queue.extra.entry(100).or_default().insert(hash(100), header(100)); - queue.extra_response(&id(100), 100_100); - assert!(queue.extra.is_empty()); - assert_eq!(queue.ready.len(), 1); - assert_eq!(queue.known_headers[&100][&hash(100)], HeaderStatus::Ready); - - // when parent header is incomplete - queue.incomplete_headers.insert(id(200), None); - queue - .known_headers - .entry(201) - .or_default() - .insert(hash(201), HeaderStatus::Extra); - queue.extra.entry(201).or_default().insert(hash(201), header(201)); - queue.extra_response(&id(201), 201_201); - assert!(queue.extra.is_empty()); - assert_eq!(queue.incomplete.len(), 1); - assert_eq!(queue.known_headers[&201][&hash(201)], HeaderStatus::Incomplete); - } - - #[test] - fn header_submitted_works() { - let mut queue = QueuedHeaders::::default(); - queue - .known_headers - .entry(100) - .or_default() - .insert(hash(100), HeaderStatus::Ready); - queue.ready.entry(100).or_default().insert(hash(100), header(100)); - queue.headers_submitted(vec![id(100)]); - assert!(queue.ready.is_empty()); - assert_eq!(queue.known_headers[&100][&hash(100)], HeaderStatus::Submitted); - } - - #[test] - fn incomplete_header_works() { - let mut queue = QueuedHeaders::::default(); - - // nothing to complete if queue is empty - assert_eq!(queue.incomplete_header(), None); - - // when there's new header to complete => ask for completion data - queue.incomplete_headers.insert(id(100), None); - assert_eq!(queue.incomplete_header(), Some(id(100))); - - // we have just asked for completion data => nothing to request - assert_eq!(queue.incomplete_header(), None); - - // enough time have passed => ask again - queue.incomplete_headers.clear(); - queue.incomplete_headers.insert( - id(100), - Some( - Instant::now() - RETRY_FETCH_COMPLETION_INTERVAL - RETRY_FETCH_COMPLETION_INTERVAL, - ), - ); - assert_eq!(queue.incomplete_header(), Some(id(100))); - } - - #[test] - fn completion_response_works() { - let mut queue = QueuedHeaders::::default(); - queue.incomplete_headers.insert(id(100), None); - queue.incomplete_headers.insert(id(200), Some(Instant::now())); - queue.incomplete_headers.insert(id(300), Some(Instant::now())); - - // when header isn't incompete, nothing changes - queue.completion_response(&id(400), None); - assert_eq!(queue.incomplete_headers.len(), 3); - assert_eq!(queue.completion_data.len(), 0); - assert_eq!(queue.header_to_complete(), None); - - // when response is None, nothing changes - queue.completion_response(&id(100), None); - assert_eq!(queue.incomplete_headers.len(), 3); - assert_eq!(queue.completion_data.len(), 0); - assert_eq!(queue.header_to_complete(), None); - - // when response is Some, we're scheduling completion - queue.completion_response(&id(200), Some(200_200)); - assert_eq!(queue.completion_data.len(), 1); - assert!(queue.completion_data.contains_key(&id(200))); - assert_eq!(queue.header_to_complete(), Some((id(200), &200_200))); - assert_eq!( - queue.incomplete_headers.keys().collect::>(), - vec![&id(100), &id(300), &id(200)], - ); - } - - #[test] - fn header_completed_works() { - let mut queue = QueuedHeaders::::default(); - queue.completion_data.insert(id(100), 100_100); - - // when unknown header is completed - queue.header_completed(&id(200)); - assert_eq!(queue.completion_data.len(), 1); - - // when known header is completed - queue.header_completed(&id(100)); - assert_eq!(queue.completion_data.len(), 0); - } - - #[test] - fn incomplete_headers_response_works() { - let mut queue = QueuedHeaders::::default(); - - // when we have already submitted #101 and #102 is ready - queue - .known_headers - .entry(101) - .or_default() - .insert(hash(101), HeaderStatus::Submitted); - queue.submitted.entry(101).or_default().insert(hash(101), header(101)); - queue - .known_headers - .entry(102) - .or_default() - .insert(hash(102), HeaderStatus::Ready); - queue.submitted.entry(102).or_default().insert(hash(102), header(102)); - - // AND now we know that the #100 is incomplete - queue.incomplete_headers_response(vec![id(100)].into_iter().collect()); - - // => #101 and #102 are moved to the Incomplete and #100 is now synced - assert_eq!(queue.status(&id(100)), HeaderStatus::Synced); - assert_eq!(queue.status(&id(101)), HeaderStatus::Incomplete); - assert_eq!(queue.status(&id(102)), HeaderStatus::Incomplete); - assert_eq!(queue.submitted.len(), 0); - assert_eq!(queue.ready.len(), 0); - assert!(queue.incomplete.entry(101).or_default().contains_key(&hash(101))); - assert!(queue.incomplete.entry(102).or_default().contains_key(&hash(102))); - assert!(queue.incomplete_headers.contains_key(&id(100))); - assert!(queue.completion_data.is_empty()); - - // and then header #100 is no longer incomplete - queue.incomplete_headers_response(vec![].into_iter().collect()); - - // => #101 and #102 are moved to the Ready queue and #100 if now forgotten - assert_eq!(queue.status(&id(100)), HeaderStatus::Synced); - assert_eq!(queue.status(&id(101)), HeaderStatus::Ready); - assert_eq!(queue.status(&id(102)), HeaderStatus::Ready); - assert_eq!(queue.incomplete.len(), 0); - assert_eq!(queue.submitted.len(), 0); - assert!(queue.ready.entry(101).or_default().contains_key(&hash(101))); - assert!(queue.ready.entry(102).or_default().contains_key(&hash(102))); - assert!(queue.incomplete_headers.is_empty()); - assert!(queue.completion_data.is_empty()); - } - - #[test] - fn is_parent_incomplete_works() { - let mut queue = QueuedHeaders::::default(); - - // when we do not know header itself - assert!(!queue.is_parent_incomplete(&id(50))); - - // when we do not know parent - queue - .known_headers - .entry(100) - .or_default() - .insert(hash(100), HeaderStatus::Incomplete); - queue.incomplete.entry(100).or_default().insert(hash(100), header(100)); - assert!(!queue.is_parent_incomplete(&id(100))); - - // when parent is inside incomplete queue (i.e. some other ancestor is actually incomplete) - queue - .known_headers - .entry(101) - .or_default() - .insert(hash(101), HeaderStatus::Submitted); - queue.submitted.entry(101).or_default().insert(hash(101), header(101)); - assert!(queue.is_parent_incomplete(&id(101))); - - // when parent is the incomplete header and we do not have completion data - queue.incomplete_headers.insert(id(199), None); - queue - .known_headers - .entry(200) - .or_default() - .insert(hash(200), HeaderStatus::Submitted); - queue.submitted.entry(200).or_default().insert(hash(200), header(200)); - assert!(queue.is_parent_incomplete(&id(200))); - - // when parent is the incomplete header and we have completion data - queue.completion_data.insert(id(299), 299_299); - queue - .known_headers - .entry(300) - .or_default() - .insert(hash(300), HeaderStatus::Submitted); - queue.submitted.entry(300).or_default().insert(hash(300), header(300)); - assert!(queue.is_parent_incomplete(&id(300))); - } - - #[test] - fn prune_works() { - let mut queue = QueuedHeaders::::default(); - queue - .known_headers - .entry(105) - .or_default() - .insert(hash(105), HeaderStatus::Incomplete); - queue.incomplete.entry(105).or_default().insert(hash(105), header(105)); - queue - .known_headers - .entry(104) - .or_default() - .insert(hash(104), HeaderStatus::MaybeOrphan); - queue.maybe_orphan.entry(104).or_default().insert(hash(104), header(104)); - queue - .known_headers - .entry(103) - .or_default() - .insert(hash(103), HeaderStatus::Orphan); - queue.orphan.entry(103).or_default().insert(hash(103), header(103)); - queue - .known_headers - .entry(102) - .or_default() - .insert(hash(102), HeaderStatus::MaybeExtra); - queue.maybe_extra.entry(102).or_default().insert(hash(102), header(102)); - queue - .known_headers - .entry(101) - .or_default() - .insert(hash(101), HeaderStatus::Extra); - queue.extra.entry(101).or_default().insert(hash(101), header(101)); - queue - .known_headers - .entry(100) - .or_default() - .insert(hash(100), HeaderStatus::Ready); - queue.ready.entry(100).or_default().insert(hash(100), header(100)); - queue - .synced_children - .entry(100) - .or_default() - .insert(hash(100), vec![id(101)].into_iter().collect()); - queue - .synced_children - .entry(102) - .or_default() - .insert(hash(102), vec![id(102)].into_iter().collect()); - - queue.prune(102); - - assert_eq!(queue.ready.len(), 0); - assert_eq!(queue.extra.len(), 0); - assert_eq!(queue.maybe_extra.len(), 1); - assert_eq!(queue.orphan.len(), 1); - assert_eq!(queue.maybe_orphan.len(), 1); - assert_eq!(queue.incomplete.len(), 1); - assert_eq!(queue.synced_children.len(), 1); - assert_eq!(queue.known_headers.len(), 4); - - queue.prune(110); - - assert_eq!(queue.ready.len(), 0); - assert_eq!(queue.extra.len(), 0); - assert_eq!(queue.maybe_extra.len(), 0); - assert_eq!(queue.orphan.len(), 0); - assert_eq!(queue.maybe_orphan.len(), 0); - assert_eq!(queue.incomplete.len(), 0); - assert_eq!(queue.synced_children.len(), 0); - assert_eq!(queue.known_headers.len(), 0); - - queue.header_response(header(109).header().clone()); - assert_eq!(queue.known_headers.len(), 0); - - queue.header_response(header(110).header().clone()); - assert_eq!(queue.known_headers.len(), 1); - } - - #[test] - fn incomplete_headers_are_still_incomplete_after_advance() { - let mut queue = QueuedHeaders::::default(); - - // relay#1 knows that header#100 is incomplete && it has headers 101..104 in incomplete - // queue - queue.incomplete_headers.insert(id(100), None); - queue.incomplete.entry(101).or_default().insert(hash(101), header(101)); - queue.incomplete.entry(102).or_default().insert(hash(102), header(102)); - queue.incomplete.entry(103).or_default().insert(hash(103), header(103)); - queue.incomplete.entry(104).or_default().insert(hash(104), header(104)); - queue - .known_headers - .entry(100) - .or_default() - .insert(hash(100), HeaderStatus::Synced); - queue - .known_headers - .entry(101) - .or_default() - .insert(hash(101), HeaderStatus::Incomplete); - queue - .known_headers - .entry(102) - .or_default() - .insert(hash(102), HeaderStatus::Incomplete); - queue - .known_headers - .entry(103) - .or_default() - .insert(hash(103), HeaderStatus::Incomplete); - queue - .known_headers - .entry(104) - .or_default() - .insert(hash(104), HeaderStatus::Incomplete); - - // let's say relay#2 completes header#100 and then submits header#101+header#102 and it - // turns out that header#102 is also incomplete - queue.incomplete_headers_response(vec![id(102)].into_iter().collect()); - - // then the header#103 and the header#104 must have Incomplete status - assert_eq!(queue.status(&id(101)), HeaderStatus::Synced); - assert_eq!(queue.status(&id(102)), HeaderStatus::Synced); - assert_eq!(queue.status(&id(103)), HeaderStatus::Incomplete); - assert_eq!(queue.status(&id(104)), HeaderStatus::Incomplete); - } - - #[test] - fn incomplete_headers_response_moves_synced_headers() { - let mut queue = QueuedHeaders::::default(); - - // we have submitted two headers - 100 and 101. 102 is ready - queue.submitted.entry(100).or_default().insert(hash(100), header(100)); - queue.submitted.entry(101).or_default().insert(hash(101), header(101)); - queue.ready.entry(102).or_default().insert(hash(102), header(102)); - queue - .known_headers - .entry(100) - .or_default() - .insert(hash(100), HeaderStatus::Submitted); - queue - .known_headers - .entry(101) - .or_default() - .insert(hash(101), HeaderStatus::Submitted); - queue - .known_headers - .entry(102) - .or_default() - .insert(hash(102), HeaderStatus::Ready); - - // both headers are accepted - queue.target_best_header_response(&id(101)); - - // but header 100 is incomplete - queue.incomplete_headers_response(vec![id(100)].into_iter().collect()); - assert_eq!(queue.status(&id(100)), HeaderStatus::Synced); - assert_eq!(queue.status(&id(101)), HeaderStatus::Synced); - assert_eq!(queue.status(&id(102)), HeaderStatus::Incomplete); - assert!(queue.incomplete_headers.contains_key(&id(100))); - assert!(queue.incomplete[&102].contains_key(&hash(102))); - - // when header 100 is completed, 101 is synced and 102 is ready - queue.incomplete_headers_response(HashSet::new()); - assert_eq!(queue.status(&id(100)), HeaderStatus::Synced); - assert_eq!(queue.status(&id(101)), HeaderStatus::Synced); - assert_eq!(queue.status(&id(102)), HeaderStatus::Ready); - assert!(queue.ready[&102].contains_key(&hash(102))); - } -} diff --git a/bridges/relays/headers/src/lib.rs b/bridges/relays/headers/src/lib.rs deleted file mode 100644 index 8946355921f0..000000000000 --- a/bridges/relays/headers/src/lib.rs +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright 2019-2021 Parity Technologies (UK) Ltd. -// This file is part of Parity Bridges Common. - -// Parity Bridges Common is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity Bridges Common is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity Bridges Common. If not, see . - -//! Relaying source chain headers to target chain. This module provides entrypoint -//! that starts reading new headers from source chain and submit these headers as -//! module/contract transactions to the target chain. Pallet/contract on the target -//! chain is a light-client of the source chain. All other trustless bridge -//! applications are built using this light-client, so running headers-relay is -//! essential for running all other bridge applications. - -// required for futures::select! -#![recursion_limit = "1024"] -#![warn(missing_docs)] - -pub mod headers; -pub mod sync; -pub mod sync_loop; -pub mod sync_loop_metrics; -pub mod sync_loop_tests; -pub mod sync_types; diff --git a/bridges/relays/headers/src/sync.rs b/bridges/relays/headers/src/sync.rs deleted file mode 100644 index 012b63f0dc59..000000000000 --- a/bridges/relays/headers/src/sync.rs +++ /dev/null @@ -1,529 +0,0 @@ -// Copyright 2019-2021 Parity Technologies (UK) Ltd. -// This file is part of Parity Bridges Common. - -// Parity Bridges Common is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity Bridges Common is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity Bridges Common. If not, see . - -//! Headers synchronization context. This structure wraps headers queue and is -//! able to choose: which headers to read from the source chain? Which headers -//! to submit to the target chain? The context makes decisions basing on parameters -//! passed using `HeadersSyncParams` structure. - -use crate::{ - headers::QueuedHeaders, - sync_types::{HeaderIdOf, HeaderStatus, HeadersSyncPipeline, QueuedHeader}, -}; -use num_traits::{One, Saturating, Zero}; - -/// Common sync params. -#[derive(Debug, Clone)] -pub struct HeadersSyncParams { - /// Maximal number of ethereum headers to pre-download. - pub max_future_headers_to_download: usize, - /// Maximal number of active (we believe) submit header transactions. - pub max_headers_in_submitted_status: usize, - /// Maximal number of headers in single submit request. - pub max_headers_in_single_submit: usize, - /// Maximal total headers size in single submit request. - pub max_headers_size_in_single_submit: usize, - /// We only may store and accept (from Ethereum node) headers that have - /// number >= than "best_substrate_header.number" - "prune_depth". - pub prune_depth: u32, - /// Target transactions mode. - pub target_tx_mode: TargetTransactionMode, -} - -/// Target transaction mode. -#[derive(Debug, PartialEq, Clone)] -pub enum TargetTransactionMode { - /// Submit new headers using signed transactions. - Signed, - /// Submit new headers using unsigned transactions. - Unsigned, - /// Submit new headers using signed transactions, but only when we - /// believe that sync has stalled. - Backup, -} - -/// Headers synchronization context. -#[derive(Debug)] -pub struct HeadersSync { - /// Synchronization parameters. - params: HeadersSyncParams, - /// The best header number known to source node. - source_best_number: Option, - /// The best header known to target node. - target_best_header: Option>, - /// Headers queue. - headers: QueuedHeaders

, - /// Pause headers submission. - pause_submit: bool, -} - -impl HeadersSync

{ - /// Creates new headers synchronizer. - pub fn new(params: HeadersSyncParams) -> Self { - HeadersSync { - headers: QueuedHeaders::default(), - params, - source_best_number: None, - target_best_header: None, - pause_submit: false, - } - } - - /// Return best header number known to source node. - pub fn source_best_number(&self) -> Option { - self.source_best_number - } - - /// The best header known to target node. - pub fn target_best_header(&self) -> Option> { - self.target_best_header - } - - /// Returns true if we have synced almost all known headers. - pub fn is_almost_synced(&self) -> bool { - match self.source_best_number { - Some(source_best_number) => self - .target_best_header - .map(|best| source_best_number.saturating_sub(best.0) < 4.into()) - .unwrap_or(false), - None => true, - } - } - - /// Returns synchronization status. - pub fn status(&self) -> (&Option>, &Option) { - (&self.target_best_header, &self.source_best_number) - } - - /// Returns reference to the headers queue. - pub fn headers(&self) -> &QueuedHeaders

{ - &self.headers - } - - /// Returns mutable reference to the headers queue. - pub fn headers_mut(&mut self) -> &mut QueuedHeaders

{ - &mut self.headers - } - - /// Select header that needs to be downloaded from the source node. - pub fn select_new_header_to_download(&self) -> Option { - // if we haven't received best header from source node yet, there's nothing we can download - let source_best_number = self.source_best_number?; - - // if we haven't received known best header from target node yet, there's nothing we can - // download - let target_best_header = self.target_best_header.as_ref()?; - - // if there's too many headers in the queue, stop downloading - let in_memory_headers = self.headers.total_headers(); - if in_memory_headers >= self.params.max_future_headers_to_download { - return None - } - - // if queue is empty and best header on target is > than best header on source, - // then we shoud reorganization - let best_queued_number = self.headers.best_queued_number(); - if best_queued_number.is_zero() && source_best_number < target_best_header.0 { - return Some(source_best_number) - } - - // we assume that there were no reorganizations if we have already downloaded best header - let best_downloaded_number = std::cmp::max( - std::cmp::max(best_queued_number, self.headers.best_synced_number()), - target_best_header.0, - ); - if best_downloaded_number >= source_best_number { - return None - } - - // download new header - Some(best_downloaded_number + One::one()) - } - - /// Select orphan header to download. - pub fn select_orphan_header_to_download(&self) -> Option<&QueuedHeader

> { - let orphan_header = self.headers.header(HeaderStatus::Orphan)?; - - // we consider header orphan until we'll find it ancestor that is known to the target node - // => we may get orphan header while we ask target node whether it knows its parent - // => let's avoid fetching duplicate headers - let parent_id = orphan_header.parent_id(); - if self.headers.status(&parent_id) != HeaderStatus::Unknown { - return None - } - - Some(orphan_header) - } - - /// Select headers that need to be submitted to the target node. - pub fn select_headers_to_submit(&self, stalled: bool) -> Option>> { - // maybe we have paused new headers submit? - if self.pause_submit { - return None - } - - // if we operate in backup mode, we only submit headers when sync has stalled - if self.params.target_tx_mode == TargetTransactionMode::Backup && !stalled { - return None - } - - let headers_in_submit_status = self.headers.headers_in_status(HeaderStatus::Submitted); - let headers_to_submit_count = self - .params - .max_headers_in_submitted_status - .checked_sub(headers_in_submit_status)?; - - let mut total_size = 0; - let mut total_headers = 0; - self.headers.headers(HeaderStatus::Ready, |header| { - if total_headers == headers_to_submit_count { - return false - } - if total_headers == self.params.max_headers_in_single_submit { - return false - } - - let encoded_size = P::estimate_size(header); - if total_headers != 0 && - total_size + encoded_size > self.params.max_headers_size_in_single_submit - { - return false - } - - total_size += encoded_size; - total_headers += 1; - - true - }) - } - - /// Receive new target header number from the source node. - pub fn source_best_header_number_response(&mut self, best_header_number: P::Number) { - log::debug!( - target: "bridge", - "Received best header number from {} node: {}", - P::SOURCE_NAME, - best_header_number, - ); - self.source_best_number = Some(best_header_number); - } - - /// Receive new best header from the target node. - /// Returns true if it is different from the previous block known to us. - pub fn target_best_header_response(&mut self, best_header: HeaderIdOf

) -> bool { - log::debug!( - target: "bridge", - "Received best known header from {}: {:?}", - P::TARGET_NAME, - best_header, - ); - - // early return if it is still the same - if self.target_best_header == Some(best_header) { - return false - } - - // remember that this header is now known to the Substrate runtime - self.headers.target_best_header_response(&best_header); - - // prune ancient headers - self.headers.prune(best_header.0.saturating_sub(self.params.prune_depth.into())); - - // finally remember the best header itself - self.target_best_header = Some(best_header); - - // we are ready to submit headers again - if self.pause_submit { - log::debug!( - target: "bridge", - "Ready to submit {} headers to {} node again!", - P::SOURCE_NAME, - P::TARGET_NAME, - ); - - self.pause_submit = false; - } - - true - } - - /// Pause headers submit until best header will be updated on target node. - pub fn pause_submit(&mut self) { - log::debug!( - target: "bridge", - "Stopping submitting {} headers to {} node. Waiting for {} submitted headers to be accepted", - P::SOURCE_NAME, - P::TARGET_NAME, - self.headers.headers_in_status(HeaderStatus::Submitted), - ); - - self.pause_submit = true; - } - - /// Restart synchronization. - pub fn restart(&mut self) { - self.source_best_number = None; - self.target_best_header = None; - self.headers.clear(); - self.pause_submit = false; - } -} - -#[cfg(test)] -pub mod tests { - use super::*; - use crate::{ - headers::tests::{header, id}, - sync_loop_tests::{TestHash, TestHeadersSyncPipeline, TestNumber}, - sync_types::HeaderStatus, - }; - use relay_utils::HeaderId; - - fn side_hash(number: TestNumber) -> TestHash { - 1000 + number - } - - pub fn default_sync_params() -> HeadersSyncParams { - HeadersSyncParams { - max_future_headers_to_download: 128, - max_headers_in_submitted_status: 128, - max_headers_in_single_submit: 32, - max_headers_size_in_single_submit: 131_072, - prune_depth: 4096, - target_tx_mode: TargetTransactionMode::Signed, - } - } - - #[test] - fn select_new_header_to_download_works() { - let mut eth_sync = HeadersSync::::new(default_sync_params()); - - // both best && target headers are unknown - assert_eq!(eth_sync.select_new_header_to_download(), None); - - // best header is known, target header is unknown - eth_sync.target_best_header = Some(HeaderId(0, Default::default())); - assert_eq!(eth_sync.select_new_header_to_download(), None); - - // target header is known, best header is unknown - eth_sync.target_best_header = None; - eth_sync.source_best_number = Some(100); - assert_eq!(eth_sync.select_new_header_to_download(), None); - - // when our best block has the same number as the target - eth_sync.target_best_header = Some(HeaderId(100, Default::default())); - assert_eq!(eth_sync.select_new_header_to_download(), None); - - // when we actually need a new header - eth_sync.source_best_number = Some(101); - assert_eq!(eth_sync.select_new_header_to_download(), Some(101)); - - // when we have to reorganize to longer fork - eth_sync.source_best_number = Some(100); - eth_sync.target_best_header = Some(HeaderId(200, Default::default())); - assert_eq!(eth_sync.select_new_header_to_download(), Some(100)); - - // when there are too many headers scheduled for submitting - for i in 1..1000 { - eth_sync.headers.header_response(header(i).header().clone()); - } - assert_eq!(eth_sync.select_new_header_to_download(), None); - } - - #[test] - fn select_new_header_to_download_works_with_empty_queue() { - let mut eth_sync = HeadersSync::::new(default_sync_params()); - eth_sync.source_best_header_number_response(100); - - // when queue is not empty => everything goes as usually - eth_sync.target_best_header_response(header(10).id()); - eth_sync.headers_mut().header_response(header(11).header().clone()); - eth_sync.headers_mut().maybe_extra_response(&header(11).id(), false); - assert_eq!(eth_sync.select_new_header_to_download(), Some(12)); - - // but then queue is drained - eth_sync.headers_mut().target_best_header_response(&header(11).id()); - - // even though it's empty, we know that header#11 is synced - assert_eq!(eth_sync.headers().best_queued_number(), 0); - assert_eq!(eth_sync.headers().best_synced_number(), 11); - assert_eq!(eth_sync.select_new_header_to_download(), Some(12)); - } - - #[test] - fn sync_without_reorgs_works() { - let mut eth_sync = HeadersSync::new(default_sync_params()); - eth_sync.params.max_headers_in_submitted_status = 1; - - // ethereum reports best header #102 - eth_sync.source_best_header_number_response(102); - - // substrate reports that it is at block #100 - eth_sync.target_best_header_response(id(100)); - - // block #101 is downloaded first - assert_eq!(eth_sync.select_new_header_to_download(), Some(101)); - eth_sync.headers.header_response(header(101).header().clone()); - - // now header #101 is ready to be submitted - assert_eq!(eth_sync.headers.header(HeaderStatus::MaybeExtra), Some(&header(101))); - eth_sync.headers.maybe_extra_response(&id(101), false); - assert_eq!(eth_sync.headers.header(HeaderStatus::Ready), Some(&header(101))); - assert_eq!(eth_sync.select_headers_to_submit(false), Some(vec![&header(101)])); - - // and header #102 is ready to be downloaded - assert_eq!(eth_sync.select_new_header_to_download(), Some(102)); - eth_sync.headers.header_response(header(102).header().clone()); - - // receive submission confirmation - eth_sync.headers.headers_submitted(vec![id(101)]); - - // we have nothing to submit because previous header hasn't been confirmed yet - // (and we allow max 1 submit transaction in the wild) - assert_eq!(eth_sync.headers.header(HeaderStatus::MaybeExtra), Some(&header(102))); - eth_sync.headers.maybe_extra_response(&id(102), false); - assert_eq!(eth_sync.headers.header(HeaderStatus::Ready), Some(&header(102))); - assert_eq!(eth_sync.select_headers_to_submit(false), None); - - // substrate reports that it has imported block #101 - eth_sync.target_best_header_response(id(101)); - - // and we are ready to submit #102 - assert_eq!(eth_sync.select_headers_to_submit(false), Some(vec![&header(102)])); - eth_sync.headers.headers_submitted(vec![id(102)]); - - // substrate reports that it has imported block #102 - eth_sync.target_best_header_response(id(102)); - - // and we have nothing to download - assert_eq!(eth_sync.select_new_header_to_download(), None); - } - - #[test] - fn sync_with_orphan_headers_work() { - let mut eth_sync = HeadersSync::new(default_sync_params()); - - // ethereum reports best header #102 - eth_sync.source_best_header_number_response(102); - - // substrate reports that it is at block #100, but it isn't part of best chain - eth_sync.target_best_header_response(HeaderId(100, side_hash(100))); - - // block #101 is downloaded first - assert_eq!(eth_sync.select_new_header_to_download(), Some(101)); - eth_sync.headers.header_response(header(101).header().clone()); - - // we can't submit header #101, because its parent status is unknown - assert_eq!(eth_sync.select_headers_to_submit(false), None); - - // instead we are trying to determine status of its parent (#100) - assert_eq!(eth_sync.headers.header(HeaderStatus::MaybeOrphan), Some(&header(101))); - - // and the status is still unknown - eth_sync.headers.maybe_orphan_response(&id(100), false); - - // so we consider #101 orphaned now && will download its parent - #100 - assert_eq!(eth_sync.headers.header(HeaderStatus::Orphan), Some(&header(101))); - eth_sync.headers.header_response(header(100).header().clone()); - - // #101 is now Orphan and #100 is MaybeOrphan => we do not want to retrieve - // header #100 again - assert_eq!(eth_sync.headers.header(HeaderStatus::Orphan), Some(&header(101))); - assert_eq!(eth_sync.select_orphan_header_to_download(), None); - - // we can't submit header #100, because its parent status is unknown - assert_eq!(eth_sync.select_headers_to_submit(false), None); - - // instead we are trying to determine status of its parent (#99) - assert_eq!(eth_sync.headers.header(HeaderStatus::MaybeOrphan), Some(&header(100))); - - // and the status is known, so we move previously orphaned #100 and #101 to ready queue - eth_sync.headers.maybe_orphan_response(&id(99), true); - - // and we are ready to submit #100 - assert_eq!(eth_sync.headers.header(HeaderStatus::MaybeExtra), Some(&header(100))); - eth_sync.headers.maybe_extra_response(&id(100), false); - assert_eq!(eth_sync.select_headers_to_submit(false), Some(vec![&header(100)])); - eth_sync.headers.headers_submitted(vec![id(100)]); - - // and we are ready to submit #101 - assert_eq!(eth_sync.headers.header(HeaderStatus::MaybeExtra), Some(&header(101))); - eth_sync.headers.maybe_extra_response(&id(101), false); - assert_eq!(eth_sync.select_headers_to_submit(false), Some(vec![&header(101)])); - eth_sync.headers.headers_submitted(vec![id(101)]); - } - - #[test] - fn pruning_happens_on_target_best_header_response() { - let mut eth_sync = HeadersSync::::new(default_sync_params()); - eth_sync.params.prune_depth = 50; - eth_sync.target_best_header_response(id(100)); - assert_eq!(eth_sync.headers.prune_border(), 50); - } - - #[test] - fn only_submitting_headers_in_backup_mode_when_stalled() { - let mut eth_sync = HeadersSync::new(default_sync_params()); - eth_sync.params.target_tx_mode = TargetTransactionMode::Backup; - - // ethereum reports best header #102 - eth_sync.source_best_header_number_response(102); - - // substrate reports that it is at block #100 - eth_sync.target_best_header_response(id(100)); - - // block #101 is downloaded first - eth_sync.headers.header_response(header(101).header().clone()); - eth_sync.headers.maybe_extra_response(&id(101), false); - - // ensure that headers are not submitted when sync is not stalled - assert_eq!(eth_sync.select_headers_to_submit(false), None); - - // ensure that headers are not submitted when sync is stalled - assert_eq!(eth_sync.select_headers_to_submit(true), Some(vec![&header(101)])); - } - - #[test] - fn does_not_select_new_headers_to_submit_when_submit_is_paused() { - let mut eth_sync = HeadersSync::new(default_sync_params()); - eth_sync.params.max_headers_in_submitted_status = 1; - - // ethereum reports best header #102 and substrate is at #100 - eth_sync.source_best_header_number_response(102); - eth_sync.target_best_header_response(id(100)); - - // let's prepare #101 and #102 for submitting - eth_sync.headers.header_response(header(101).header().clone()); - eth_sync.headers.maybe_extra_response(&id(101), false); - eth_sync.headers.header_response(header(102).header().clone()); - eth_sync.headers.maybe_extra_response(&id(102), false); - - // when submit is not paused, we're ready to submit #101 - assert_eq!(eth_sync.select_headers_to_submit(false), Some(vec![&header(101)])); - - // when submit is paused, we're not ready to submit anything - eth_sync.pause_submit(); - assert_eq!(eth_sync.select_headers_to_submit(false), None); - - // if best header on substrate node isn't updated, we still not submitting anything - eth_sync.target_best_header_response(id(100)); - assert_eq!(eth_sync.select_headers_to_submit(false), None); - - // but after it is actually updated, we are ready to submit - eth_sync.target_best_header_response(id(101)); - assert_eq!(eth_sync.select_headers_to_submit(false), Some(vec![&header(102)])); - } -} diff --git a/bridges/relays/headers/src/sync_loop.rs b/bridges/relays/headers/src/sync_loop.rs deleted file mode 100644 index da8d23dc39de..000000000000 --- a/bridges/relays/headers/src/sync_loop.rs +++ /dev/null @@ -1,654 +0,0 @@ -// Copyright 2019-2021 Parity Technologies (UK) Ltd. -// This file is part of Parity Bridges Common. - -// Parity Bridges Common is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity Bridges Common is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity Bridges Common. If not, see . - -//! Entrypoint for running headers synchronization loop. - -use crate::{ - sync::{HeadersSync, HeadersSyncParams}, - sync_loop_metrics::SyncLoopMetrics, - sync_types::{HeaderIdOf, HeaderStatus, HeadersSyncPipeline, QueuedHeader, SubmittedHeaders}, -}; - -use async_trait::async_trait; -use futures::{future::FutureExt, stream::StreamExt}; -use num_traits::{Saturating, Zero}; -use relay_utils::{ - format_ids, interval, - metrics::{GlobalMetrics, MetricsParams}, - process_future_result, - relay_loop::Client as RelayClient, - retry_backoff, FailedClient, MaybeConnectionError, StringifiedMaybeConnectionError, -}; -use std::{ - collections::HashSet, - future::Future, - time::{Duration, Instant}, -}; - -/// When we submit headers to target node, but see no updates of best -/// source block known to target node during STALL_SYNC_TIMEOUT seconds, -/// we consider that our headers are rejected because there has been reorganization in target chain. -/// This reorganization could invalidate our knowledge about sync process (i.e. we have asked if -/// HeaderA is known to target, but then reorganization happened and the answer is different -/// now) => we need to reset sync. -/// The other option is to receive **EVERY** best target header and check if it is -/// direct child of previous best header. But: (1) subscription doesn't guarantee that -/// the subscriber will receive every best header (2) reorganization won't always lead to sync -/// stall and restart is a heavy operation (we forget all in-memory headers). -const STALL_SYNC_TIMEOUT: Duration = Duration::from_secs(5 * 60); -/// Delay after we have seen update of best source header at target node, -/// for us to treat sync stalled. ONLY when relay operates in backup mode. -const BACKUP_STALL_SYNC_TIMEOUT: Duration = Duration::from_secs(10 * 60); -/// Interval between calling sync maintain procedure. -const MAINTAIN_INTERVAL: Duration = Duration::from_secs(30); - -/// Source client trait. -#[async_trait] -pub trait SourceClient: RelayClient { - /// Get best block number. - async fn best_block_number(&self) -> Result; - - /// Get header by hash. - async fn header_by_hash(&self, hash: P::Hash) -> Result; - - /// Get canonical header by number. - async fn header_by_number(&self, number: P::Number) -> Result; - - /// Get completion data by header hash. - async fn header_completion( - &self, - id: HeaderIdOf

, - ) -> Result<(HeaderIdOf

, Option), Self::Error>; - - /// Get extra data by header hash. - async fn header_extra( - &self, - id: HeaderIdOf

, - header: QueuedHeader

, - ) -> Result<(HeaderIdOf

, P::Extra), Self::Error>; -} - -/// Target client trait. -#[async_trait] -pub trait TargetClient: RelayClient { - /// Returns ID of the best header known to the target node. - async fn best_header_id(&self) -> Result, Self::Error>; - - /// Returns true if header is known to the target node. - async fn is_known_header( - &self, - id: HeaderIdOf

, - ) -> Result<(HeaderIdOf

, bool), Self::Error>; - - /// Submit headers. - async fn submit_headers( - &self, - headers: Vec>, - ) -> SubmittedHeaders, Self::Error>; - - /// Returns ID of headers that require to be 'completed' before children can be submitted. - async fn incomplete_headers_ids(&self) -> Result>, Self::Error>; - - /// Submit completion data for header. - async fn complete_header( - &self, - id: HeaderIdOf

, - completion: P::Completion, - ) -> Result, Self::Error>; - - /// Returns true if header requires extra data to be submitted. - async fn requires_extra( - &self, - header: QueuedHeader

, - ) -> Result<(HeaderIdOf

, bool), Self::Error>; -} - -/// Synchronization maintain procedure. -#[async_trait] -pub trait SyncMaintain: 'static + Clone + Send + Sync { - /// Run custom maintain procedures. This is guaranteed to be called when both source and target - /// clients are unoccupied. - async fn maintain(&self, _sync: &mut HeadersSync

) {} -} - -impl SyncMaintain

for () {} - -/// Return prefix that will be used by default to expose Prometheus metrics of the finality proofs -/// sync loop. -pub fn metrics_prefix() -> String { - format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME) -} - -/// Run headers synchronization. -#[allow(clippy::too_many_arguments)] -pub async fn run>( - source_client: impl SourceClient

, - source_tick: Duration, - target_client: TC, - target_tick: Duration, - sync_maintain: impl SyncMaintain

, - sync_params: HeadersSyncParams, - metrics_params: MetricsParams, - exit_signal: impl Future + 'static + Send, -) -> Result<(), relay_utils::Error> { - let exit_signal = exit_signal.shared(); - relay_utils::relay_loop(source_client, target_client) - .with_metrics(Some(metrics_prefix::

()), metrics_params) - .loop_metric(SyncLoopMetrics::new)? - .standalone_metric(GlobalMetrics::new)? - .expose() - .await? - .run(metrics_prefix::

(), move |source_client, target_client, metrics| { - run_until_connection_lost( - source_client, - source_tick, - target_client, - target_tick, - sync_maintain.clone(), - sync_params.clone(), - metrics, - exit_signal.clone(), - ) - }) - .await -} - -/// Run headers synchronization. -#[allow(clippy::too_many_arguments)] -async fn run_until_connection_lost>( - source_client: impl SourceClient

, - source_tick: Duration, - target_client: TC, - target_tick: Duration, - sync_maintain: impl SyncMaintain

, - sync_params: HeadersSyncParams, - metrics_sync: Option, - exit_signal: impl Future + Send, -) -> Result<(), FailedClient> { - let mut progress_context = (Instant::now(), None, None); - - let mut sync = HeadersSync::

::new(sync_params); - let mut stall_countdown = None; - let mut last_update_time = Instant::now(); - - let mut source_retry_backoff = retry_backoff(); - let mut source_client_is_online = false; - let mut source_best_block_number_required = false; - let source_best_block_number_future = source_client.best_block_number().fuse(); - let source_new_header_future = futures::future::Fuse::terminated(); - let source_orphan_header_future = futures::future::Fuse::terminated(); - let source_extra_future = futures::future::Fuse::terminated(); - let source_completion_future = futures::future::Fuse::terminated(); - let source_go_offline_future = futures::future::Fuse::terminated(); - let source_tick_stream = interval(source_tick).fuse(); - - let mut target_retry_backoff = retry_backoff(); - let mut target_client_is_online = false; - let mut target_best_block_required = false; - let mut target_incomplete_headers_required = true; - let target_best_block_future = target_client.best_header_id().fuse(); - let target_incomplete_headers_future = futures::future::Fuse::terminated(); - let target_extra_check_future = futures::future::Fuse::terminated(); - let target_existence_status_future = futures::future::Fuse::terminated(); - let target_submit_header_future = futures::future::Fuse::terminated(); - let target_complete_header_future = futures::future::Fuse::terminated(); - let target_go_offline_future = futures::future::Fuse::terminated(); - let target_tick_stream = interval(target_tick).fuse(); - - let mut maintain_required = false; - let maintain_stream = interval(MAINTAIN_INTERVAL).fuse(); - - let exit_signal = exit_signal.fuse(); - - futures::pin_mut!( - source_best_block_number_future, - source_new_header_future, - source_orphan_header_future, - source_extra_future, - source_completion_future, - source_go_offline_future, - source_tick_stream, - target_best_block_future, - target_incomplete_headers_future, - target_extra_check_future, - target_existence_status_future, - target_submit_header_future, - target_complete_header_future, - target_go_offline_future, - target_tick_stream, - maintain_stream, - exit_signal - ); - - loop { - futures::select! { - source_best_block_number = source_best_block_number_future => { - source_best_block_number_required = false; - - source_client_is_online = process_future_result( - source_best_block_number, - &mut source_retry_backoff, - |source_best_block_number| sync.source_best_header_number_response(source_best_block_number), - &mut source_go_offline_future, - async_std::task::sleep, - || format!("Error retrieving best header number from {}", P::SOURCE_NAME), - ).fail_if_connection_error(FailedClient::Source)?; - }, - source_new_header = source_new_header_future => { - source_client_is_online = process_future_result( - source_new_header, - &mut source_retry_backoff, - |source_new_header| sync.headers_mut().header_response(source_new_header), - &mut source_go_offline_future, - async_std::task::sleep, - || format!("Error retrieving header from {} node", P::SOURCE_NAME), - ).fail_if_connection_error(FailedClient::Source)?; - }, - source_orphan_header = source_orphan_header_future => { - source_client_is_online = process_future_result( - source_orphan_header, - &mut source_retry_backoff, - |source_orphan_header| sync.headers_mut().header_response(source_orphan_header), - &mut source_go_offline_future, - async_std::task::sleep, - || format!("Error retrieving orphan header from {} node", P::SOURCE_NAME), - ).fail_if_connection_error(FailedClient::Source)?; - }, - source_extra = source_extra_future => { - source_client_is_online = process_future_result( - source_extra, - &mut source_retry_backoff, - |(header, extra)| sync.headers_mut().extra_response(&header, extra), - &mut source_go_offline_future, - async_std::task::sleep, - || format!("Error retrieving extra data from {} node", P::SOURCE_NAME), - ).fail_if_connection_error(FailedClient::Source)?; - }, - source_completion = source_completion_future => { - source_client_is_online = process_future_result( - source_completion, - &mut source_retry_backoff, - |(header, completion)| sync.headers_mut().completion_response(&header, completion), - &mut source_go_offline_future, - async_std::task::sleep, - || format!("Error retrieving completion data from {} node", P::SOURCE_NAME), - ).fail_if_connection_error(FailedClient::Source)?; - }, - _ = source_go_offline_future => { - source_client_is_online = true; - }, - _ = source_tick_stream.next() => { - if sync.is_almost_synced() { - source_best_block_number_required = true; - } - }, - target_best_block = target_best_block_future => { - target_best_block_required = false; - - target_client_is_online = process_future_result( - target_best_block, - &mut target_retry_backoff, - |target_best_block| { - let head_updated = sync.target_best_header_response(target_best_block); - if head_updated { - last_update_time = Instant::now(); - } - match head_updated { - // IF head is updated AND there are still our transactions: - // => restart stall countdown timer - true if sync.headers().headers_in_status(HeaderStatus::Submitted) != 0 => - stall_countdown = Some(Instant::now()), - // IF head is updated AND there are no our transactions: - // => stop stall countdown timer - true => stall_countdown = None, - // IF head is not updated AND stall countdown is not yet completed - // => do nothing - false if stall_countdown - .map(|stall_countdown| stall_countdown.elapsed() < STALL_SYNC_TIMEOUT) - .unwrap_or(true) - => (), - // IF head is not updated AND stall countdown has completed - // => restart sync - false => { - log::info!( - target: "bridge", - "Sync has stalled. Restarting {} headers synchronization.", - P::SOURCE_NAME, - ); - stall_countdown = None; - sync.restart(); - }, - } - }, - &mut target_go_offline_future, - async_std::task::sleep, - || format!("Error retrieving best known {} header from {} node", P::SOURCE_NAME, P::TARGET_NAME), - ).fail_if_connection_error(FailedClient::Target)?; - }, - incomplete_headers_ids = target_incomplete_headers_future => { - target_incomplete_headers_required = false; - - target_client_is_online = process_future_result( - incomplete_headers_ids, - &mut target_retry_backoff, - |incomplete_headers_ids| sync.headers_mut().incomplete_headers_response(incomplete_headers_ids), - &mut target_go_offline_future, - async_std::task::sleep, - || format!("Error retrieving incomplete headers from {} node", P::TARGET_NAME), - ).fail_if_connection_error(FailedClient::Target)?; - }, - target_existence_status = target_existence_status_future => { - target_client_is_online = process_future_result( - target_existence_status, - &mut target_retry_backoff, - |(target_header, target_existence_status)| sync - .headers_mut() - .maybe_orphan_response(&target_header, target_existence_status), - &mut target_go_offline_future, - async_std::task::sleep, - || format!("Error retrieving existence status from {} node", P::TARGET_NAME), - ).fail_if_connection_error(FailedClient::Target)?; - }, - submitted_headers = target_submit_header_future => { - // following line helps Rust understand the type of `submitted_headers` :/ - let submitted_headers: SubmittedHeaders, TC::Error> = submitted_headers; - let submitted_headers_str = format!("{}", submitted_headers); - let all_headers_rejected = submitted_headers.submitted.is_empty() - && submitted_headers.incomplete.is_empty(); - let has_submitted_headers = sync.headers().headers_in_status(HeaderStatus::Submitted) != 0; - - let maybe_fatal_error = match submitted_headers.fatal_error { - Some(fatal_error) => Err(StringifiedMaybeConnectionError::new( - fatal_error.is_connection_error(), - format!("{:?}", fatal_error), - )), - None if all_headers_rejected && !has_submitted_headers => - Err(StringifiedMaybeConnectionError::new(false, "All headers were rejected".into())), - None => Ok(()), - }; - - let no_fatal_error = maybe_fatal_error.is_ok(); - target_client_is_online = process_future_result( - maybe_fatal_error, - &mut target_retry_backoff, - |_| {}, - &mut target_go_offline_future, - async_std::task::sleep, - || format!("Error submitting headers to {} node", P::TARGET_NAME), - ).fail_if_connection_error(FailedClient::Target)?; - - log::debug!(target: "bridge", "Header submit result: {}", submitted_headers_str); - - sync.headers_mut().headers_submitted(submitted_headers.submitted); - sync.headers_mut().add_incomplete_headers(false, submitted_headers.incomplete); - - // when there's no fatal error, but node has rejected all our headers we may - // want to pause until our submitted headers will be accepted - if no_fatal_error && all_headers_rejected && has_submitted_headers { - sync.pause_submit(); - } - }, - target_complete_header_result = target_complete_header_future => { - target_client_is_online = process_future_result( - target_complete_header_result, - &mut target_retry_backoff, - |completed_header| sync.headers_mut().header_completed(&completed_header), - &mut target_go_offline_future, - async_std::task::sleep, - || format!("Error completing headers at {}", P::TARGET_NAME), - ).fail_if_connection_error(FailedClient::Target)?; - }, - target_extra_check_result = target_extra_check_future => { - target_client_is_online = process_future_result( - target_extra_check_result, - &mut target_retry_backoff, - |(header, extra_check_result)| sync - .headers_mut() - .maybe_extra_response(&header, extra_check_result), - &mut target_go_offline_future, - async_std::task::sleep, - || format!("Error retrieving receipts requirement from {} node", P::TARGET_NAME), - ).fail_if_connection_error(FailedClient::Target)?; - }, - _ = target_go_offline_future => { - target_client_is_online = true; - }, - _ = target_tick_stream.next() => { - target_best_block_required = true; - target_incomplete_headers_required = true; - }, - - _ = maintain_stream.next() => { - maintain_required = true; - }, - _ = exit_signal => { - return Ok(()); - } - } - - // update metrics - if let Some(ref metrics_sync) = metrics_sync { - metrics_sync.update(&sync); - } - - // print progress - progress_context = print_sync_progress(progress_context, &sync); - - // run maintain procedures - if maintain_required && source_client_is_online && target_client_is_online { - log::debug!(target: "bridge", "Maintaining headers sync loop"); - maintain_required = false; - sync_maintain.maintain(&mut sync).await; - } - - // If the target client is accepting requests we update the requests that - // we want it to run - if !maintain_required && target_client_is_online { - // NOTE: Is is important to reset this so that we only have one - // request being processed by the client at a time. This prevents - // race conditions like receiving two transactions with the same - // nonce from the client. - target_client_is_online = false; - - // The following is how we prioritize requests: - // - // 1. Get best block - // - Stops us from downloading or submitting new blocks - // - Only called rarely - // - // 2. Get incomplete headers - // - Stops us from submitting new blocks - // - Only called rarely - // - // 3. Get complete headers - // - Stops us from submitting new blocks - // - // 4. Check if we need extra data from source - // - Stops us from downloading or submitting new blocks - // - // 5. Check existence of header - // - Stops us from submitting new blocks - // - // 6. Submit header - - if target_best_block_required { - log::debug!(target: "bridge", "Asking {} about best block", P::TARGET_NAME); - target_best_block_future.set(target_client.best_header_id().fuse()); - } else if target_incomplete_headers_required { - log::debug!(target: "bridge", "Asking {} about incomplete headers", P::TARGET_NAME); - target_incomplete_headers_future.set(target_client.incomplete_headers_ids().fuse()); - } else if let Some((id, completion)) = sync.headers_mut().header_to_complete() { - log::debug!( - target: "bridge", - "Going to complete header: {:?}", - id, - ); - - target_complete_header_future - .set(target_client.complete_header(id, completion.clone()).fuse()); - } else if let Some(header) = sync.headers().header(HeaderStatus::MaybeExtra) { - log::debug!( - target: "bridge", - "Checking if header submission requires extra: {:?}", - header.id(), - ); - - target_extra_check_future.set(target_client.requires_extra(header.clone()).fuse()); - } else if let Some(header) = sync.headers().header(HeaderStatus::MaybeOrphan) { - // for MaybeOrphan we actually ask for parent' header existence - let parent_id = header.parent_id(); - - log::debug!( - target: "bridge", - "Asking {} node for existence of: {:?}", - P::TARGET_NAME, - parent_id, - ); - - target_existence_status_future.set(target_client.is_known_header(parent_id).fuse()); - } else if let Some(headers) = sync - .select_headers_to_submit(last_update_time.elapsed() > BACKUP_STALL_SYNC_TIMEOUT) - { - log::debug!( - target: "bridge", - "Submitting {} header(s) to {} node: {:?}", - headers.len(), - P::TARGET_NAME, - format_ids(headers.iter().map(|header| header.id())), - ); - - let headers = headers.into_iter().cloned().collect(); - target_submit_header_future.set(target_client.submit_headers(headers).fuse()); - - // remember that we have submitted some headers - if stall_countdown.is_none() { - stall_countdown = Some(Instant::now()); - } - } else { - target_client_is_online = true; - } - } - - // If the source client is accepting requests we update the requests that - // we want it to run - if !maintain_required && source_client_is_online { - // NOTE: Is is important to reset this so that we only have one - // request being processed by the client at a time. This prevents - // race conditions like receiving two transactions with the same - // nonce from the client. - source_client_is_online = false; - - // The following is how we prioritize requests: - // - // 1. Get best block - // - Stops us from downloading or submitting new blocks - // - Only called rarely - // - // 2. Download completion data - // - Stops us from submitting new blocks - // - // 3. Download extra data - // - Stops us from submitting new blocks - // - // 4. Download missing headers - // - Stops us from downloading or submitting new blocks - // - // 5. Downloading new headers - - if source_best_block_number_required { - log::debug!(target: "bridge", "Asking {} node about best block number", P::SOURCE_NAME); - source_best_block_number_future.set(source_client.best_block_number().fuse()); - } else if let Some(id) = sync.headers_mut().incomplete_header() { - log::debug!( - target: "bridge", - "Retrieving completion data for header: {:?}", - id, - ); - source_completion_future.set(source_client.header_completion(id).fuse()); - } else if let Some(header) = sync.headers().header(HeaderStatus::Extra) { - let id = header.id(); - log::debug!( - target: "bridge", - "Retrieving extra data for header: {:?}", - id, - ); - source_extra_future.set(source_client.header_extra(id, header.clone()).fuse()); - } else if let Some(header) = sync.select_orphan_header_to_download() { - // for Orphan we actually ask for parent' header - let parent_id = header.parent_id(); - - // if we have end up with orphan header#0, then we are misconfigured - if parent_id.0.is_zero() { - log::error!( - target: "bridge", - "Misconfiguration. Genesis {} header is considered orphan by {} node", - P::SOURCE_NAME, - P::TARGET_NAME, - ); - return Ok(()) - } - - log::debug!( - target: "bridge", - "Going to download orphan header from {} node: {:?}", - P::SOURCE_NAME, - parent_id, - ); - - source_orphan_header_future.set(source_client.header_by_hash(parent_id.1).fuse()); - } else if let Some(id) = sync.select_new_header_to_download() { - log::debug!( - target: "bridge", - "Going to download new header from {} node: {:?}", - P::SOURCE_NAME, - id, - ); - - source_new_header_future.set(source_client.header_by_number(id).fuse()); - } else { - source_client_is_online = true; - } - } - } -} - -/// Print synchronization progress. -fn print_sync_progress( - progress_context: (Instant, Option, Option), - eth_sync: &HeadersSync

, -) -> (Instant, Option, Option) { - let (prev_time, prev_best_header, prev_target_header) = progress_context; - let now_time = Instant::now(); - let (now_best_header, now_target_header) = eth_sync.status(); - - let need_update = now_time - prev_time > Duration::from_secs(10) || - match (prev_best_header, now_best_header) { - (Some(prev_best_header), Some(now_best_header)) => - now_best_header.0.saturating_sub(prev_best_header) > 10.into(), - _ => false, - }; - if !need_update { - return (prev_time, prev_best_header, prev_target_header) - } - - log::info!( - target: "bridge", - "Synced {:?} of {:?} headers", - now_best_header.map(|id| id.0), - now_target_header, - ); - (now_time, (*now_best_header).map(|id| id.0), *now_target_header) -} diff --git a/bridges/relays/headers/src/sync_loop_tests.rs b/bridges/relays/headers/src/sync_loop_tests.rs deleted file mode 100644 index f100998ca83f..000000000000 --- a/bridges/relays/headers/src/sync_loop_tests.rs +++ /dev/null @@ -1,602 +0,0 @@ -// Copyright 2019-2021 Parity Technologies (UK) Ltd. -// This file is part of Parity Bridges Common. - -// Parity Bridges Common is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity Bridges Common is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity Bridges Common. If not, see . - -#![cfg(test)] - -use crate::{ - sync_loop::{run, SourceClient, TargetClient}, - sync_types::{HeadersSyncPipeline, QueuedHeader, SourceHeader, SubmittedHeaders}, -}; - -use async_trait::async_trait; -use backoff::backoff::Backoff; -use futures::{future::FutureExt, stream::StreamExt}; -use parking_lot::Mutex; -use relay_utils::{ - metrics::MetricsParams, process_future_result, relay_loop::Client as RelayClient, - retry_backoff, HeaderId, MaybeConnectionError, -}; -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, - time::Duration, -}; - -pub type TestNumber = u64; -pub type TestHash = u64; -pub type TestHeaderId = HeaderId; -pub type TestExtra = u64; -pub type TestCompletion = u64; -pub type TestQueuedHeader = QueuedHeader; - -#[derive(Default, Debug, Clone, PartialEq)] -pub struct TestHeader { - pub hash: TestHash, - pub number: TestNumber, - pub parent_hash: TestHash, -} - -impl SourceHeader for TestHeader { - fn id(&self) -> TestHeaderId { - HeaderId(self.number, self.hash) - } - - fn parent_id(&self) -> TestHeaderId { - HeaderId(self.number - 1, self.parent_hash) - } -} - -#[derive(Debug, Clone)] -struct TestError(bool); - -impl MaybeConnectionError for TestError { - fn is_connection_error(&self) -> bool { - self.0 - } -} - -#[derive(Debug, Clone, Copy, PartialEq)] -pub struct TestHeadersSyncPipeline; - -impl HeadersSyncPipeline for TestHeadersSyncPipeline { - const SOURCE_NAME: &'static str = "Source"; - const TARGET_NAME: &'static str = "Target"; - - type Hash = TestHash; - type Number = TestNumber; - type Header = TestHeader; - type Extra = TestExtra; - type Completion = TestCompletion; - - fn estimate_size(_: &TestQueuedHeader) -> usize { - 0 - } -} - -enum SourceMethod { - BestBlockNumber, - HeaderByHash(TestHash), - HeaderByNumber(TestNumber), - HeaderCompletion(TestHeaderId), - HeaderExtra(TestHeaderId, TestQueuedHeader), -} - -#[derive(Clone)] -struct Source { - data: Arc>, - on_method_call: Arc, -} - -struct SourceData { - best_block_number: Result, - header_by_hash: HashMap, - header_by_number: HashMap, - provides_completion: bool, - provides_extra: bool, -} - -impl Source { - pub fn new( - best_block_id: TestHeaderId, - headers: Vec<(bool, TestHeader)>, - on_method_call: impl Fn(SourceMethod, &mut SourceData) + Send + Sync + 'static, - ) -> Self { - Source { - data: Arc::new(Mutex::new(SourceData { - best_block_number: Ok(best_block_id.0), - header_by_hash: headers - .iter() - .map(|(_, header)| (header.hash, header.clone())) - .collect(), - header_by_number: headers - .iter() - .filter_map(|(is_canonical, header)| { - if *is_canonical { - Some((header.hash, header.clone())) - } else { - None - } - }) - .collect(), - provides_completion: true, - provides_extra: true, - })), - on_method_call: Arc::new(on_method_call), - } - } -} - -#[async_trait] -impl RelayClient for Source { - type Error = TestError; - - async fn reconnect(&mut self) -> Result<(), TestError> { - unimplemented!() - } -} - -#[async_trait] -impl SourceClient for Source { - async fn best_block_number(&self) -> Result { - let mut data = self.data.lock(); - (self.on_method_call)(SourceMethod::BestBlockNumber, &mut *data); - data.best_block_number.clone() - } - - async fn header_by_hash(&self, hash: TestHash) -> Result { - let mut data = self.data.lock(); - (self.on_method_call)(SourceMethod::HeaderByHash(hash), &mut *data); - data.header_by_hash.get(&hash).cloned().ok_or(TestError(false)) - } - - async fn header_by_number(&self, number: TestNumber) -> Result { - let mut data = self.data.lock(); - (self.on_method_call)(SourceMethod::HeaderByNumber(number), &mut *data); - data.header_by_number.get(&number).cloned().ok_or(TestError(false)) - } - - async fn header_completion( - &self, - id: TestHeaderId, - ) -> Result<(TestHeaderId, Option), TestError> { - let mut data = self.data.lock(); - (self.on_method_call)(SourceMethod::HeaderCompletion(id), &mut *data); - if data.provides_completion { - Ok((id, Some(test_completion(id)))) - } else { - Ok((id, None)) - } - } - - async fn header_extra( - &self, - id: TestHeaderId, - header: TestQueuedHeader, - ) -> Result<(TestHeaderId, TestExtra), TestError> { - let mut data = self.data.lock(); - (self.on_method_call)(SourceMethod::HeaderExtra(id, header), &mut *data); - if data.provides_extra { - Ok((id, test_extra(id))) - } else { - Err(TestError(false)) - } - } -} - -enum TargetMethod { - BestHeaderId, - IsKnownHeader(TestHeaderId), - SubmitHeaders(Vec), - IncompleteHeadersIds, - CompleteHeader(TestHeaderId, TestCompletion), - RequiresExtra(TestQueuedHeader), -} - -#[derive(Clone)] -struct Target { - data: Arc>, - on_method_call: Arc, -} - -struct TargetData { - best_header_id: Result, - is_known_header_by_hash: HashMap, - submitted_headers: HashMap, - submit_headers_result: Option>, - completed_headers: HashMap, - requires_completion: bool, - requires_extra: bool, -} - -impl Target { - pub fn new( - best_header_id: TestHeaderId, - headers: Vec, - on_method_call: impl Fn(TargetMethod, &mut TargetData) + Send + Sync + 'static, - ) -> Self { - Target { - data: Arc::new(Mutex::new(TargetData { - best_header_id: Ok(best_header_id), - is_known_header_by_hash: headers.iter().map(|header| (header.1, true)).collect(), - submitted_headers: HashMap::new(), - submit_headers_result: None, - completed_headers: HashMap::new(), - requires_completion: false, - requires_extra: false, - })), - on_method_call: Arc::new(on_method_call), - } - } -} - -#[async_trait] -impl RelayClient for Target { - type Error = TestError; - - async fn reconnect(&mut self) -> Result<(), TestError> { - unimplemented!() - } -} - -#[async_trait] -impl TargetClient for Target { - async fn best_header_id(&self) -> Result { - let mut data = self.data.lock(); - (self.on_method_call)(TargetMethod::BestHeaderId, &mut *data); - data.best_header_id.clone() - } - - async fn is_known_header(&self, id: TestHeaderId) -> Result<(TestHeaderId, bool), TestError> { - let mut data = self.data.lock(); - (self.on_method_call)(TargetMethod::IsKnownHeader(id), &mut *data); - data.is_known_header_by_hash - .get(&id.1) - .cloned() - .map(|is_known_header| Ok((id, is_known_header))) - .unwrap_or(Ok((id, false))) - } - - async fn submit_headers( - &self, - headers: Vec, - ) -> SubmittedHeaders { - let mut data = self.data.lock(); - (self.on_method_call)(TargetMethod::SubmitHeaders(headers.clone()), &mut *data); - data.submitted_headers - .extend(headers.iter().map(|header| (header.id().1, header.clone()))); - data.submit_headers_result.take().expect("test must accept headers") - } - - async fn incomplete_headers_ids(&self) -> Result, TestError> { - let mut data = self.data.lock(); - (self.on_method_call)(TargetMethod::IncompleteHeadersIds, &mut *data); - if data.requires_completion { - Ok(data - .submitted_headers - .iter() - .filter(|(hash, _)| !data.completed_headers.contains_key(hash)) - .map(|(_, header)| header.id()) - .collect()) - } else { - Ok(HashSet::new()) - } - } - - async fn complete_header( - &self, - id: TestHeaderId, - completion: TestCompletion, - ) -> Result { - let mut data = self.data.lock(); - (self.on_method_call)(TargetMethod::CompleteHeader(id, completion), &mut *data); - data.completed_headers.insert(id.1, completion); - Ok(id) - } - - async fn requires_extra( - &self, - header: TestQueuedHeader, - ) -> Result<(TestHeaderId, bool), TestError> { - let mut data = self.data.lock(); - (self.on_method_call)(TargetMethod::RequiresExtra(header.clone()), &mut *data); - if data.requires_extra { - Ok((header.id(), true)) - } else { - Ok((header.id(), false)) - } - } -} - -fn test_tick() -> Duration { - // in ideal world that should have been Duration::from_millis(0), because we do not want - // to sleep in tests at all, but that could lead to `select! {}` always waking on tick - // => not doing actual job - Duration::from_millis(10) -} - -fn test_id(number: TestNumber) -> TestHeaderId { - HeaderId(number, number) -} - -fn test_header(number: TestNumber) -> TestHeader { - let id = test_id(number); - TestHeader { - hash: id.1, - number: id.0, - parent_hash: if number == 0 { TestHash::default() } else { test_id(number - 1).1 }, - } -} - -fn test_forked_id(number: TestNumber, forked_from: TestNumber) -> TestHeaderId { - const FORK_OFFSET: TestNumber = 1000; - - if number == forked_from { - HeaderId(number, number) - } else { - HeaderId(number, FORK_OFFSET + number) - } -} - -fn test_forked_header(number: TestNumber, forked_from: TestNumber) -> TestHeader { - let id = test_forked_id(number, forked_from); - TestHeader { - hash: id.1, - number: id.0, - parent_hash: if number == 0 { - TestHash::default() - } else { - test_forked_id(number - 1, forked_from).1 - }, - } -} - -fn test_completion(id: TestHeaderId) -> TestCompletion { - id.0 -} - -fn test_extra(id: TestHeaderId) -> TestExtra { - id.0 -} - -fn source_reject_completion(method: &SourceMethod) { - if let SourceMethod::HeaderCompletion(_) = method { - unreachable!("HeaderCompletion request is not expected") - } -} - -fn source_reject_extra(method: &SourceMethod) { - if let SourceMethod::HeaderExtra(_, _) = method { - unreachable!("HeaderExtra request is not expected") - } -} - -fn target_accept_all_headers(method: &TargetMethod, data: &mut TargetData, requires_extra: bool) { - if let TargetMethod::SubmitHeaders(ref submitted) = method { - assert_eq!(submitted.iter().all(|header| header.extra().is_some()), requires_extra,); - - data.submit_headers_result = Some(SubmittedHeaders { - submitted: submitted.iter().map(|header| header.id()).collect(), - ..Default::default() - }); - } -} - -fn target_signal_exit_when_header_submitted( - method: &TargetMethod, - header_id: TestHeaderId, - exit_signal: &futures::channel::mpsc::UnboundedSender<()>, -) { - if let TargetMethod::SubmitHeaders(ref submitted) = method { - if submitted.iter().any(|header| header.id() == header_id) { - exit_signal.unbounded_send(()).unwrap(); - } - } -} - -fn target_signal_exit_when_header_completed( - method: &TargetMethod, - header_id: TestHeaderId, - exit_signal: &futures::channel::mpsc::UnboundedSender<()>, -) { - if let TargetMethod::CompleteHeader(completed_id, _) = method { - if *completed_id == header_id { - exit_signal.unbounded_send(()).unwrap(); - } - } -} - -fn run_backoff_test(result: Result<(), TestError>) -> (Duration, Duration) { - let mut backoff = retry_backoff(); - - // no randomness in tests (otherwise intervals may overlap => asserts are failing) - backoff.randomization_factor = 0f64; - - // increase backoff's current interval - let interval1 = backoff.next_backoff().unwrap(); - let interval2 = backoff.next_backoff().unwrap(); - assert!(interval2 > interval1); - - // successful future result leads to backoff's reset - let go_offline_future = futures::future::Fuse::terminated(); - futures::pin_mut!(go_offline_future); - - process_future_result( - result, - &mut backoff, - |_| {}, - &mut go_offline_future, - async_std::task::sleep, - || "Test error".into(), - ); - - (interval2, backoff.next_backoff().unwrap()) -} - -#[test] -fn process_future_result_resets_backoff_on_success() { - let (interval2, interval_after_reset) = run_backoff_test(Ok(())); - assert!(interval2 > interval_after_reset); -} - -#[test] -fn process_future_result_resets_backoff_on_connection_error() { - let (interval2, interval_after_reset) = run_backoff_test(Err(TestError(true))); - assert!(interval2 > interval_after_reset); -} - -#[test] -fn process_future_result_does_not_reset_backoff_on_non_connection_error() { - let (interval2, interval_after_reset) = run_backoff_test(Err(TestError(false))); - assert!(interval2 < interval_after_reset); -} - -struct SyncLoopTestParams { - best_source_header: TestHeader, - headers_on_source: Vec<(bool, TestHeader)>, - best_target_header: TestHeader, - headers_on_target: Vec, - target_requires_extra: bool, - target_requires_completion: bool, - stop_at: TestHeaderId, -} - -fn run_sync_loop_test(params: SyncLoopTestParams) { - let (exit_sender, exit_receiver) = futures::channel::mpsc::unbounded(); - let target_requires_extra = params.target_requires_extra; - let target_requires_completion = params.target_requires_completion; - let stop_at = params.stop_at; - let source = - Source::new(params.best_source_header.id(), params.headers_on_source, move |method, _| { - if !target_requires_extra { - source_reject_extra(&method); - } - if !target_requires_completion { - source_reject_completion(&method); - } - }); - let target = Target::new( - params.best_target_header.id(), - params.headers_on_target.into_iter().map(|header| header.id()).collect(), - move |method, data| { - target_accept_all_headers(&method, data, target_requires_extra); - if target_requires_completion { - target_signal_exit_when_header_completed(&method, stop_at, &exit_sender); - } else { - target_signal_exit_when_header_submitted(&method, stop_at, &exit_sender); - } - }, - ); - target.data.lock().requires_extra = target_requires_extra; - target.data.lock().requires_completion = target_requires_completion; - - let _ = async_std::task::block_on(run( - source, - test_tick(), - target, - test_tick(), - (), - crate::sync::tests::default_sync_params(), - MetricsParams::disabled(), - exit_receiver.into_future().map(|(_, _)| ()), - )); -} - -#[test] -fn sync_loop_is_able_to_synchronize_single_header() { - run_sync_loop_test(SyncLoopTestParams { - best_source_header: test_header(1), - headers_on_source: vec![(true, test_header(1))], - best_target_header: test_header(0), - headers_on_target: vec![test_header(0)], - target_requires_extra: false, - target_requires_completion: false, - stop_at: test_id(1), - }); -} - -#[test] -fn sync_loop_is_able_to_synchronize_single_header_with_extra() { - run_sync_loop_test(SyncLoopTestParams { - best_source_header: test_header(1), - headers_on_source: vec![(true, test_header(1))], - best_target_header: test_header(0), - headers_on_target: vec![test_header(0)], - target_requires_extra: true, - target_requires_completion: false, - stop_at: test_id(1), - }); -} - -#[test] -fn sync_loop_is_able_to_synchronize_single_header_with_completion() { - run_sync_loop_test(SyncLoopTestParams { - best_source_header: test_header(1), - headers_on_source: vec![(true, test_header(1))], - best_target_header: test_header(0), - headers_on_target: vec![test_header(0)], - target_requires_extra: false, - target_requires_completion: true, - stop_at: test_id(1), - }); -} - -#[test] -fn sync_loop_is_able_to_reorganize_from_shorter_fork() { - run_sync_loop_test(SyncLoopTestParams { - best_source_header: test_header(3), - headers_on_source: vec![ - (true, test_header(1)), - (true, test_header(2)), - (true, test_header(3)), - (false, test_forked_header(1, 0)), - (false, test_forked_header(2, 0)), - ], - best_target_header: test_forked_header(2, 0), - headers_on_target: vec![test_header(0), test_forked_header(1, 0), test_forked_header(2, 0)], - target_requires_extra: false, - target_requires_completion: false, - stop_at: test_id(3), - }); -} - -#[test] -fn sync_loop_is_able_to_reorganize_from_longer_fork() { - run_sync_loop_test(SyncLoopTestParams { - best_source_header: test_header(3), - headers_on_source: vec![ - (true, test_header(1)), - (true, test_header(2)), - (true, test_header(3)), - (false, test_forked_header(1, 0)), - (false, test_forked_header(2, 0)), - (false, test_forked_header(3, 0)), - (false, test_forked_header(4, 0)), - (false, test_forked_header(5, 0)), - ], - best_target_header: test_forked_header(5, 0), - headers_on_target: vec![ - test_header(0), - test_forked_header(1, 0), - test_forked_header(2, 0), - test_forked_header(3, 0), - test_forked_header(4, 0), - test_forked_header(5, 0), - ], - target_requires_extra: false, - target_requires_completion: false, - stop_at: test_id(3), - }); -} diff --git a/bridges/relays/headers/src/sync_types.rs b/bridges/relays/headers/src/sync_types.rs deleted file mode 100644 index 8d93e8bf49fb..000000000000 --- a/bridges/relays/headers/src/sync_types.rs +++ /dev/null @@ -1,193 +0,0 @@ -// Copyright 2019-2021 Parity Technologies (UK) Ltd. -// This file is part of Parity Bridges Common. - -// Parity Bridges Common is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity Bridges Common is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity Bridges Common. If not, see . - -//! Types that are used by headers synchronization components. - -use relay_utils::{format_ids, HeaderId}; -use std::{ops::Deref, sync::Arc}; - -/// Ethereum header synchronization status. -#[derive(Debug, Clone, Copy, PartialEq)] -pub enum HeaderStatus { - /// Header is unknown. - Unknown, - /// Header is in MaybeOrphan queue. - MaybeOrphan, - /// Header is in Orphan queue. - Orphan, - /// Header is in MaybeExtra queue. - MaybeExtra, - /// Header is in Extra queue. - Extra, - /// Header is in Ready queue. - Ready, - /// Header is in Incomplete queue. - Incomplete, - /// Header has been recently submitted to the target node. - Submitted, - /// Header is known to the target node. - Synced, -} - -/// Headers synchronization pipeline. -pub trait HeadersSyncPipeline: 'static + Clone + Send + Sync { - /// Name of the headers source. - const SOURCE_NAME: &'static str; - /// Name of the headers target. - const TARGET_NAME: &'static str; - - /// Headers we're syncing are identified by this hash. - type Hash: Eq - + Clone - + Copy - + Send - + Sync - + std::fmt::Debug - + std::fmt::Display - + std::hash::Hash; - /// Headers we're syncing are identified by this number. - type Number: relay_utils::BlockNumberBase; - /// Type of header that we're syncing. - type Header: SourceHeader; - /// Type of extra data for the header that we're receiving from the source node: - /// 1) extra data is required for some headers; - /// 2) target node may answer if it'll require extra data before header is submitted; - /// 3) extra data available since the header creation time; - /// 4) header and extra data are submitted in single transaction. - /// - /// Example: Ethereum transactions receipts. - type Extra: Clone + Send + Sync + PartialEq + std::fmt::Debug; - /// Type of data required to 'complete' header that we're receiving from the source node: - /// 1) completion data is required for some headers; - /// 2) target node can't answer if it'll require completion data before header is accepted; - /// 3) completion data may be generated after header generation; - /// 4) header and completion data are submitted in separate transactions. - /// - /// Example: Substrate GRANDPA justifications. - type Completion: Clone + Send + Sync + std::fmt::Debug; - - /// Function used to estimate size of target-encoded header. - fn estimate_size(source: &QueuedHeader) -> usize; -} - -/// A HeaderId for `HeaderSyncPipeline`. -pub type HeaderIdOf

= - HeaderId<

::Hash,

::Number>; - -/// Header that we're receiving from source node. -pub trait SourceHeader: Clone + std::fmt::Debug + PartialEq + Send + Sync { - /// Returns ID of header. - fn id(&self) -> HeaderId; - /// Returns ID of parent header. - /// - /// Panics if called for genesis header. - fn parent_id(&self) -> HeaderId; -} - -/// Header how it's stored in the synchronization queue. -#[derive(Clone, Debug, PartialEq)] -pub struct QueuedHeader(Arc>); - -impl QueuedHeader

{ - /// Creates new queued header. - pub fn new(header: P::Header) -> Self { - QueuedHeader(Arc::new(QueuedHeaderData { header, extra: None })) - } - - /// Set associated extra data. - pub fn set_extra(self, extra: P::Extra) -> Self { - QueuedHeader(Arc::new(QueuedHeaderData { - header: Arc::try_unwrap(self.0) - .map(|data| data.header) - .unwrap_or_else(|data| data.header.clone()), - extra: Some(extra), - })) - } -} - -impl Deref for QueuedHeader

{ - type Target = QueuedHeaderData

; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -/// Header how it's stored in the synchronization queue. -#[derive(Clone, Debug, Default, PartialEq)] -pub struct QueuedHeaderData { - header: P::Header, - extra: Option, -} - -impl QueuedHeader

{ - /// Returns ID of header. - pub fn id(&self) -> HeaderId { - self.header.id() - } - - /// Returns ID of parent header. - pub fn parent_id(&self) -> HeaderId { - self.header.parent_id() - } - - /// Returns reference to header. - pub fn header(&self) -> &P::Header { - &self.header - } - - /// Returns reference to associated extra data. - pub fn extra(&self) -> &Option { - &self.extra - } -} - -/// Headers submission result. -#[derive(Debug)] -#[cfg_attr(test, derive(PartialEq))] -pub struct SubmittedHeaders { - /// IDs of headers that have been submitted to target node. - pub submitted: Vec, - /// IDs of incomplete headers. These headers were submitted (so this id is also in `submitted` - /// vec), but all descendants are not. - pub incomplete: Vec, - /// IDs of ignored headers that we have decided not to submit (they are either rejected by - /// target node immediately, or their descendants of incomplete headers). - pub rejected: Vec, - /// Fatal target node error, if it has occurred during submission. - pub fatal_error: Option, -} - -impl Default for SubmittedHeaders { - fn default() -> Self { - SubmittedHeaders { - submitted: Vec::new(), - incomplete: Vec::new(), - rejected: Vec::new(), - fatal_error: None, - } - } -} - -impl std::fmt::Display for SubmittedHeaders { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - let submitted = format_ids(self.submitted.iter()); - let incomplete = format_ids(self.incomplete.iter()); - let rejected = format_ids(self.rejected.iter()); - - write!(f, "Submitted: {}, Incomplete: {}, Rejected: {}", submitted, incomplete, rejected) - } -}