Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Fix stalling dispute coordinator. #7125

Merged
merged 2 commits into from
Apr 25, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 69 additions & 33 deletions node/core/dispute-coordinator/src/initialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

//! Dispute coordinator subsystem in initialized state (after first active leaf is received).

use std::{collections::BTreeMap, sync::Arc};
use std::{
collections::{BTreeMap, VecDeque},
sync::Arc,
};

use futures::{
channel::{mpsc, oneshot},
Expand Down Expand Up @@ -65,6 +68,12 @@ use super::{
OverlayedBackend,
};

/// How many blocks we import votes from per leaf update.
///
/// Since vote import is relatively slow, we have to limit the maximum amount of work we do on leaf
/// updates (and especially on startup) so the dispute coordinator won't be considered stalling.
const CHAIN_IMPORT_MAX_BATCH_SIZE: usize = 8;

// Initial data for `dispute-coordinator`. It is provided only at first start.
pub struct InitialData {
pub participations: Vec<(ParticipationPriority, ParticipationRequest)>,
Expand All @@ -89,6 +98,17 @@ pub(crate) struct Initialized {
participation: Participation,
scraper: ChainScraper,
participation_receiver: WorkerMessageReceiver,
/// Backlog of still to be imported votes from chain.
///
/// For some reason importing votes is relatively slow, if there is a large finality lag (~50
/// blocks) we will be too slow importing all votes from unfinalized chains on startup
/// (dispute-coordinator gets killed because of unresponsiveness).
///
/// https://github.com/paritytech/polkadot/issues/6912
///
/// To resolve this, we limit the amount of votes imported at once to
/// `CHAIN_IMPORT_MAX_BATCH_SIZE` and put the rest here for later processing.
chain_import_backlog: VecDeque<ScrapedOnChainVotes>,
metrics: Metrics,
}

Expand Down Expand Up @@ -117,6 +137,7 @@ impl Initialized {
scraper,
participation,
participation_receiver,
chain_import_backlog: VecDeque::new(),
metrics,
}
}
Expand Down Expand Up @@ -168,24 +189,16 @@ impl Initialized {
}

let mut overlay_db = OverlayedBackend::new(backend);
for votes in on_chain_votes {
let _ = self
.process_on_chain_votes(
ctx,
&mut overlay_db,
votes,
clock.now(),
first_leaf.hash,
)
.await
.map_err(|error| {
gum::warn!(
target: LOG_TARGET,
?error,
"Skipping scraping block due to error",
);
});
}

self.process_chain_import_backlog(
ctx,
&mut overlay_db,
on_chain_votes,
clock.now(),
first_leaf.hash,
)
.await;

if !overlay_db.is_empty() {
let ops = overlay_db.into_write_ops();
backend.write(ops)?;
Expand Down Expand Up @@ -344,26 +357,49 @@ impl Initialized {
scraped_updates.on_chain_votes.len()
);

// The `runtime-api` subsystem has an internal queue which serializes the execution,
// so there is no point in running these in parallel
for votes in scraped_updates.on_chain_votes {
let _ = self
.process_on_chain_votes(ctx, overlay_db, votes, now, new_leaf.hash)
.await
.map_err(|error| {
gum::warn!(
target: LOG_TARGET,
?error,
"Skipping scraping block due to error",
);
});
}
self.process_chain_import_backlog(
ctx,
overlay_db,
scraped_updates.on_chain_votes,
now,
new_leaf.hash,
)
.await;
}

gum::trace!(target: LOG_TARGET, timestamp = now, "Done processing ActiveLeavesUpdate");
Ok(())
}

/// Process one batch of our `chain_import_backlog`.
///
/// `new_votes` will be appended beforehand.
async fn process_chain_import_backlog<Context>(
&mut self,
ctx: &mut Context,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
new_votes: Vec<ScrapedOnChainVotes>,
now: u64,
block_hash: Hash,
) {
let mut chain_import_backlog = std::mem::take(&mut self.chain_import_backlog);
chain_import_backlog.extend(new_votes);
sandreim marked this conversation as resolved.
Show resolved Hide resolved
let import_range =
0..std::cmp::min(CHAIN_IMPORT_MAX_BATCH_SIZE, chain_import_backlog.len());
// The `runtime-api` subsystem has an internal queue which serializes the execution,
// so there is no point in running these in parallel
for votes in chain_import_backlog.drain(import_range) {
let res = self.process_on_chain_votes(ctx, overlay_db, votes, now, block_hash).await;
match res {
Ok(()) => {},
Err(error) => {
gum::warn!(target: LOG_TARGET, ?error, "Skipping scraping block due to error",);
},
};
}
self.chain_import_backlog = chain_import_backlog;
}

/// Scrapes on-chain votes (backing votes and concluded disputes) for a active leaf of the
/// relay chain.
async fn process_on_chain_votes<Context>(
Expand Down