Skip to content

Commit

Permalink
Refactor AllForksSync::add_source (#2142)
Browse files Browse the repository at this point in the history
* Tweak and rename variables

* Add new structs and a new enum

* Return an enum from add_source

* Adjust code to compile

* Only add the source later

* Also handle OldBestBlock separately

* Rename to prepare_add_source

* Oops

* Early return if the best block is old

* Update all.rs with the changes

* Pass the source user data only when inserting

* Documentation and tweaks

* Combine match blocks

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
tomaka and mergify[bot] authored Mar 17, 2022
1 parent d482e89 commit b4fe650
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 56 deletions.
50 changes: 34 additions & 16 deletions src/sync/all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,14 +372,23 @@ impl<TRq, TSrc, TBl> AllSync<TRq, TSrc, TBl> {
let outer_source_id_entry = self.shared.sources.vacant_entry();
let outer_source_id = SourceId(outer_source_id_entry.key());

let source_id = all_forks.add_source(
AllForksSourceExtra {
user_data,
outer_source_id,
},
best_block_number,
best_block_hash,
);
let source_user_data = AllForksSourceExtra {
user_data,
outer_source_id,
};

let source_id =
match all_forks.prepare_add_source(best_block_number, best_block_hash) {
all_forks::AddSource::BestBlockAlreadyVerified(b)
| all_forks::AddSource::BestBlockPendingVerification(b) => {
b.add_source(source_user_data)
}
all_forks::AddSource::OldBestBlock(b) => b.add_source(source_user_data),
all_forks::AddSource::UnknownBestBlock(b) => {
b.add_source_and_insert_block(source_user_data)
}
};

outer_source_id_entry.insert(SourceMapping::AllForks(source_id));

self.inner = AllSyncInner::AllForks(all_forks);
Expand Down Expand Up @@ -2275,14 +2284,23 @@ impl<TRq> Shared<TRq> {
.all(|(_, s)| matches!(s, SourceMapping::GrandpaWarpSync(_))));

for source in grandpa.sources {
let updated_source_id = all_forks.add_source(
AllForksSourceExtra {
user_data: source.user_data,
outer_source_id: source.outer_source_id,
},
source.best_block_number,
source.best_block_hash,
);
let source_user_data = AllForksSourceExtra {
user_data: source.user_data,
outer_source_id: source.outer_source_id,
};

let updated_source_id = match all_forks
.prepare_add_source(source.best_block_number, source.best_block_hash)
{
all_forks::AddSource::BestBlockAlreadyVerified(b)
| all_forks::AddSource::BestBlockPendingVerification(b) => {
b.add_source(source_user_data)
}
all_forks::AddSource::OldBestBlock(b) => b.add_source(source_user_data),
all_forks::AddSource::UnknownBestBlock(b) => {
b.add_source_and_insert_block(source_user_data)
}
};

self.sources[source.outer_source_id.0] = SourceMapping::AllForks(updated_source_id);
}
Expand Down
191 changes: 151 additions & 40 deletions src/sync/all_forks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ struct PendingBlock {
header: Option<header::Header>,
// TODO: add body: Option<Vec<Vec<u8>>>, when adding full node support
justifications: Vec<([u8; 4], Vec<u8>)>,
// TODO: add user_data as soon as block announces and add_source APIs support passing a user data
// TODO: add user_data as soon as block announces and prepare_add_source APIs support passing a user data
}

struct Block<TBl> {
Expand Down Expand Up @@ -281,62 +281,59 @@ impl<TBl, TRq, TSrc> AllForksSync<TBl, TRq, TSrc> {
self.chain.iter_ancestry_order()
}

/// Inform the [`AllForksSync`] of a new potential source of blocks.
/// Starts the process of inserting a new source in the [`AllForksSync`].
///
/// The `user_data` parameter is opaque and decided entirely by the user. It can later be
/// retrieved using the `Index` trait implementation of this container.
///
/// Returns the newly-created source entry, plus optionally a request that should be started
/// towards this source.
pub fn add_source(
/// This function doesn't modify the state machine, but only looks at the current state of the
/// block referenced by `best_block_number` and `best_block_hash`. It returns an enum that
/// allows performing the actual insertion.
pub fn prepare_add_source(
&mut self,
user_data: TSrc,
best_block_number: u64,
best_block_hash: [u8; 32],
) -> SourceId {
let source_id = self
.inner
.blocks
.add_source(user_data, best_block_number, best_block_hash);
) -> AddSource<TBl, TRq, TSrc> {
if best_block_number <= self.chain.finalized_block_header().number {
return AddSource::OldBestBlock(AddSourceOldBlock {
inner: self,
best_block_hash,
best_block_number,
});
}

let needs_verification = best_block_number > self.chain.finalized_block_header().number
&& self
.chain
.non_finalized_block_by_hash(&best_block_hash)
.is_none();
let is_in_disjoints_list = self
let best_block_already_verified = self
.chain
.non_finalized_block_by_hash(&best_block_hash)
.is_some();
let best_block_in_disjoints_list = self
.inner
.blocks
.contains_unverified_block(best_block_number, &best_block_hash);
debug_assert!(!(!needs_verification && is_in_disjoints_list));

if needs_verification && !is_in_disjoints_list {
self.inner.blocks.insert_unverified_block(
match (best_block_already_verified, best_block_in_disjoints_list) {
(false, false) => AddSource::UnknownBestBlock(AddSourceUnknown {
inner: self,
best_block_hash,
best_block_number,
}),
(true, false) => AddSource::BestBlockAlreadyVerified(AddSourceKnown {
inner: self,
best_block_hash,
pending_blocks::UnverifiedBlockState::HeightHashKnown,
PendingBlock {
header: None,
justifications: Vec::new(),
},
);

if self.inner.banned_blocks.contains(&best_block_hash) {
self.inner
.blocks
.mark_unverified_block_as_bad(best_block_number, &best_block_hash);
}
best_block_number,
}),
(false, true) => AddSource::BestBlockPendingVerification(AddSourceKnown {
inner: self,
best_block_hash,
best_block_number,
}),
(true, true) => unreachable!(),
}

source_id
}

/// Removes the source from the [`AllForksSync`].
///
/// Removing the source implicitly cancels the request that is associated to it (if any).
///
/// Returns the user data that was originally passed to [`AllForksSync::add_source`], plus
/// an `Option`.
/// Returns the user data that was originally passed when inserting the source, plus an
/// `Option`.
/// If this `Option` is `Some`, it contains a request that must be started towards the source
/// indicated by the [`SourceId`].
///
Expand Down Expand Up @@ -404,7 +401,7 @@ impl<TBl, TRq, TSrc> AllForksSync<TBl, TRq, TSrc> {
/// Returns the current best block of the given source.
///
/// This corresponds either the latest call to [`AllForksSync::block_announce`] where
/// `is_best` was `true`, or to the parameter passed to [`AllForksSync::add_source`].
/// `is_best` was `true`, or to the parameter passed to [`AllForksSync::prepare_add_source`].
///
/// # Panic
///
Expand Down Expand Up @@ -1360,6 +1357,120 @@ pub enum AncestrySearchResponseError {
TooOld,
}

/// Outcome of calling [`AllForksSync::prepare_add_source`].
#[must_use]
pub enum AddSource<'a, TBl, TRq, TSrc> {
/// The best block of the source is older or equal to the local latest finalized block. This
/// block isn't tracked by the state machine.
OldBestBlock(AddSourceOldBlock<'a, TBl, TRq, TSrc>),

/// The best block of the source has already been verified by this state machine.
BestBlockAlreadyVerified(AddSourceKnown<'a, TBl, TRq, TSrc>),

/// The best block of the source is already known to this state machine but hasn't been
/// verified yet.
BestBlockPendingVerification(AddSourceKnown<'a, TBl, TRq, TSrc>),

/// The best block of the source isn't in this state machine yet and needs to be inserted.
UnknownBestBlock(AddSourceUnknown<'a, TBl, TRq, TSrc>),
}

/// See [`AddSource`] and [`AllForksSync::prepare_add_source`].
#[must_use]
pub struct AddSourceOldBlock<'a, TBl, TRq, TSrc> {
inner: &'a mut AllForksSync<TBl, TRq, TSrc>,
best_block_number: u64,
best_block_hash: [u8; 32],
}

impl<'a, TBl, TRq, TSrc> AddSourceOldBlock<'a, TBl, TRq, TSrc> {
/// Inserts a new source in the state machine.
///
/// Returns the newly-allocated identifier for that source.
///
/// The `user_data` parameter is opaque and decided entirely by the user. It can later be
/// retrieved using the `Index` trait implementation of the [`AllForksSync`].
pub fn add_source(self, source_user_data: TSrc) -> SourceId {
self.inner.inner.blocks.add_source(
source_user_data,
self.best_block_number,
self.best_block_hash,
)
}
}

/// See [`AddSource`] and [`AllForksSync::prepare_add_source`].
#[must_use]
pub struct AddSourceKnown<'a, TBl, TRq, TSrc> {
inner: &'a mut AllForksSync<TBl, TRq, TSrc>,
best_block_number: u64,
best_block_hash: [u8; 32],
}

impl<'a, TBl, TRq, TSrc> AddSourceKnown<'a, TBl, TRq, TSrc> {
/// Inserts a new source in the state machine.
///
/// Returns the newly-allocated identifier for that source.
///
/// The `user_data` parameter is opaque and decided entirely by the user. It can later be
/// retrieved using the `Index` trait implementation of the [`AllForksSync`].
pub fn add_source(self, source_user_data: TSrc) -> SourceId {
self.inner.inner.blocks.add_source(
source_user_data,
self.best_block_number,
self.best_block_hash,
)
}
}

/// See [`AddSource`] and [`AllForksSync::prepare_add_source`].
#[must_use]
pub struct AddSourceUnknown<'a, TBl, TRq, TSrc> {
inner: &'a mut AllForksSync<TBl, TRq, TSrc>,
best_block_number: u64,
best_block_hash: [u8; 32],
}

impl<'a, TBl, TRq, TSrc> AddSourceUnknown<'a, TBl, TRq, TSrc> {
/// Inserts a new source in the state machine, plus the best block of that source.
///
/// Returns the newly-allocated identifier for that source.
///
/// The `user_data` parameter is opaque and decided entirely by the user. It can later be
/// retrieved using the `Index` trait implementation of the [`AllForksSync`].
pub fn add_source_and_insert_block(self, source_user_data: TSrc) -> SourceId {
let source_id = self.inner.inner.blocks.add_source(
source_user_data,
self.best_block_number,
self.best_block_hash,
);

self.inner.inner.blocks.insert_unverified_block(
self.best_block_number,
self.best_block_hash,
pending_blocks::UnverifiedBlockState::HeightHashKnown,
PendingBlock {
header: None,
justifications: Vec::new(),
},
);

if self
.inner
.inner
.banned_blocks
.contains(&self.best_block_hash)
{
self.inner
.inner
.blocks
.mark_unverified_block_as_bad(self.best_block_number, &self.best_block_hash);
}

source_id
}
}

/// Header verification to be performed.
///
/// Internally holds the [`AllForksSync`].
Expand Down

0 comments on commit b4fe650

Please sign in to comment.