Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Fix stalling dispute coordinator. (#7125)
Browse files Browse the repository at this point in the history
* Fix stalling dispute coordinator.

* Initialization.

---------

Co-authored-by: eskimor <[email protected]>
  • Loading branch information
eskimor and eskimor authored Apr 25, 2023
1 parent a5d59c3 commit a539c75
Showing 1 changed file with 69 additions and 33 deletions.
102 changes: 69 additions & 33 deletions node/core/dispute-coordinator/src/initialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

//! Dispute coordinator subsystem in initialized state (after first active leaf is received).
use std::{collections::BTreeMap, sync::Arc};
use std::{
collections::{BTreeMap, VecDeque},
sync::Arc,
};

use futures::{
channel::{mpsc, oneshot},
Expand Down Expand Up @@ -65,6 +68,12 @@ use super::{
OverlayedBackend,
};

/// How many blocks we import votes from per leaf update.
///
/// Since vote import is relatively slow, we have to limit the maximum amount of work we do on leaf
/// updates (and especially on startup) so the dispute coordinator won't be considered stalling.
const CHAIN_IMPORT_MAX_BATCH_SIZE: usize = 8;

// Initial data for `dispute-coordinator`. It is provided only at first start.
pub struct InitialData {
pub participations: Vec<(ParticipationPriority, ParticipationRequest)>,
Expand All @@ -89,6 +98,17 @@ pub(crate) struct Initialized {
participation: Participation,
scraper: ChainScraper,
participation_receiver: WorkerMessageReceiver,
/// Backlog of still to be imported votes from chain.
///
/// For some reason importing votes is relatively slow, if there is a large finality lag (~50
/// blocks) we will be too slow importing all votes from unfinalized chains on startup
/// (dispute-coordinator gets killed because of unresponsiveness).
///
/// https://github.com/paritytech/polkadot/issues/6912
///
/// To resolve this, we limit the amount of votes imported at once to
/// `CHAIN_IMPORT_MAX_BATCH_SIZE` and put the rest here for later processing.
chain_import_backlog: VecDeque<ScrapedOnChainVotes>,
metrics: Metrics,
}

Expand Down Expand Up @@ -117,6 +137,7 @@ impl Initialized {
scraper,
participation,
participation_receiver,
chain_import_backlog: VecDeque::new(),
metrics,
}
}
Expand Down Expand Up @@ -168,24 +189,16 @@ impl Initialized {
}

let mut overlay_db = OverlayedBackend::new(backend);
for votes in on_chain_votes {
let _ = self
.process_on_chain_votes(
ctx,
&mut overlay_db,
votes,
clock.now(),
first_leaf.hash,
)
.await
.map_err(|error| {
gum::warn!(
target: LOG_TARGET,
?error,
"Skipping scraping block due to error",
);
});
}

self.process_chain_import_backlog(
ctx,
&mut overlay_db,
on_chain_votes,
clock.now(),
first_leaf.hash,
)
.await;

if !overlay_db.is_empty() {
let ops = overlay_db.into_write_ops();
backend.write(ops)?;
Expand Down Expand Up @@ -344,26 +357,49 @@ impl Initialized {
scraped_updates.on_chain_votes.len()
);

// The `runtime-api` subsystem has an internal queue which serializes the execution,
// so there is no point in running these in parallel
for votes in scraped_updates.on_chain_votes {
let _ = self
.process_on_chain_votes(ctx, overlay_db, votes, now, new_leaf.hash)
.await
.map_err(|error| {
gum::warn!(
target: LOG_TARGET,
?error,
"Skipping scraping block due to error",
);
});
}
self.process_chain_import_backlog(
ctx,
overlay_db,
scraped_updates.on_chain_votes,
now,
new_leaf.hash,
)
.await;
}

gum::trace!(target: LOG_TARGET, timestamp = now, "Done processing ActiveLeavesUpdate");
Ok(())
}

/// Process one batch of our `chain_import_backlog`.
///
/// `new_votes` will be appended beforehand.
async fn process_chain_import_backlog<Context>(
&mut self,
ctx: &mut Context,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
new_votes: Vec<ScrapedOnChainVotes>,
now: u64,
block_hash: Hash,
) {
let mut chain_import_backlog = std::mem::take(&mut self.chain_import_backlog);
chain_import_backlog.extend(new_votes);
let import_range =
0..std::cmp::min(CHAIN_IMPORT_MAX_BATCH_SIZE, chain_import_backlog.len());
// The `runtime-api` subsystem has an internal queue which serializes the execution,
// so there is no point in running these in parallel
for votes in chain_import_backlog.drain(import_range) {
let res = self.process_on_chain_votes(ctx, overlay_db, votes, now, block_hash).await;
match res {
Ok(()) => {},
Err(error) => {
gum::warn!(target: LOG_TARGET, ?error, "Skipping scraping block due to error",);
},
};
}
self.chain_import_backlog = chain_import_backlog;
}

/// Scrapes on-chain votes (backing votes and concluded disputes) for a active leaf of the
/// relay chain.
async fn process_on_chain_votes<Context>(
Expand Down

0 comments on commit a539c75

Please sign in to comment.