Skip to content

Commit

Permalink
Reschedule wakeup
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandru Gheorghe <[email protected]>
  • Loading branch information
alexggh committed Dec 20, 2024
1 parent ade1f75 commit 3eb2489
Show file tree
Hide file tree
Showing 2 changed files with 270 additions and 1 deletion.
25 changes: 24 additions & 1 deletion polkadot/node/core/approval-voting/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,16 @@ pub(crate) const LOG_TARGET: &str = "parachain::approval-voting";
// The max number of ticks we delay sending the approval after we are ready to issue the approval
const MAX_APPROVAL_COALESCE_WAIT_TICKS: Tick = 12;

// If the node restarted and the tranche has passed without the assignment
// being trigger, we won't trigger the assignment at restart because we don't have
// an wakeup schedule for it.,
// The solution, is to always schedule a wake up after the restart and let the
// process_wakeup decided if the assignment needs to be triggered.
// We need to have a delay after restart to give time to the node to catch up with
// messages and not trigger its assignment unnecessarily, because it hasn't seen
// the assignments from the other validators.
const RESTART_WAKEUP_DELAY: Tick = 12;

/// Configuration for the approval voting subsystem
#[derive(Debug, Clone)]
pub struct Config {
Expand Down Expand Up @@ -1732,7 +1742,20 @@ async fn distribution_messages_for_activation<Sender: SubsystemSender<RuntimeApi
match candidate_entry.approval_entry(&block_hash) {
Some(approval_entry) => {
match approval_entry.local_statements() {
(None, None) | (None, Some(_)) => {}, // second is impossible case.
(None, None) =>
if approval_entry
.our_assignment()
.map(|assignment| !assignment.triggered())
.unwrap_or(false)
{
actions.push(Action::ScheduleWakeup {
block_hash,
block_number: block_entry.block_number(),
candidate_hash: *candidate_hash,
tick: state.clock.tick_now() + RESTART_WAKEUP_DELAY,
})
},
(None, Some(_)) => {}, // second is impossible case.
(Some(assignment), None) => {
let claimed_core_indices =
get_core_indices_on_startup(&assignment.cert().kind, *core_index);
Expand Down
246 changes: 246 additions & 0 deletions polkadot/node/core/approval-voting/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5234,6 +5234,252 @@ fn subsystem_sends_assignment_approval_in_correct_order_on_approval_restart() {
});
}

// Test that if the subsystem missed the triggering of some tranches because it was not running
// it launches the missed assignements on restart.
#[test]
fn subsystem_launches_missed_assignments_on_restart() {
let test_tranche = 20;
let assignment_criteria = Box::new(MockAssignmentCriteria(
move || {
let mut assignments = HashMap::new();
let _ = assignments.insert(
CoreIndex(0),
approval_db::v2::OurAssignment {
cert: garbage_assignment_cert_v2(AssignmentCertKindV2::RelayVRFDelay {
core_index: CoreIndex(0),
}),
tranche: test_tranche,
validator_index: ValidatorIndex(0),
triggered: false,
}
.into(),
);

assignments
},
|_| Ok(0),
));
let config = HarnessConfigBuilder::default().assignment_criteria(assignment_criteria).build();
let store = config.backend();
let store_clone = config.backend();

test_harness(config, |test_harness| async move {
let TestHarness { mut virtual_overseer, clock, sync_oracle_handle } = test_harness;

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(rx)) => {
rx.send(Ok(0)).unwrap();
}
);

let block_hash = Hash::repeat_byte(0x01);
let fork_block_hash = Hash::repeat_byte(0x02);
let candidate_commitments = CandidateCommitments::default();
let mut candidate_receipt = dummy_candidate_receipt_v2(block_hash);
candidate_receipt.commitments_hash = candidate_commitments.hash();
let candidate_hash = candidate_receipt.hash();
let slot = Slot::from(1);
let (chain_builder, _session_info) = build_chain_with_two_blocks_with_one_candidate_each(
block_hash,
fork_block_hash,
slot,
sync_oracle_handle,
candidate_receipt,
)
.await;
chain_builder.build(&mut virtual_overseer).await;

assert!(!clock.inner.lock().current_wakeup_is(1));
clock.inner.lock().wakeup_all(1);

assert!(clock.inner.lock().current_wakeup_is(slot_to_tick(slot) + test_tranche as u64));
clock.inner.lock().wakeup_all(slot_to_tick(slot));

futures_timer::Delay::new(Duration::from_millis(200)).await;

clock.inner.lock().wakeup_all(slot_to_tick(slot + 2));

assert_eq!(clock.inner.lock().wakeups.len(), 0);

futures_timer::Delay::new(Duration::from_millis(200)).await;

let candidate_entry = store.load_candidate_entry(&candidate_hash).unwrap().unwrap();
let our_assignment =
candidate_entry.approval_entry(&block_hash).unwrap().our_assignment().unwrap();
assert!(!our_assignment.triggered());

// Assignment is not triggered because its tranches has not been reached.
virtual_overseer
});

// Restart a new approval voting subsystem with the same database and major syncing true until
// the last leaf.
let config = HarnessConfigBuilder::default().backend(store_clone).major_syncing(true).build();

test_harness(config, |test_harness| async move {
let TestHarness { mut virtual_overseer, clock, sync_oracle_handle } = test_harness;
let slot = Slot::from(1);
// 1. Set the clock to the to a tick past the tranche where the assignment should be
// triggered.
clock.inner.lock().set_tick(slot_to_tick(slot) + 2 * test_tranche as u64);
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(rx)) => {
rx.send(Ok(0)).unwrap();
}
);

let block_hash = Hash::repeat_byte(0x01);
let fork_block_hash = Hash::repeat_byte(0x02);
let candidate_commitments = CandidateCommitments::default();
let mut candidate_receipt = dummy_candidate_receipt_v2(block_hash);
candidate_receipt.commitments_hash = candidate_commitments.hash();
let (chain_builder, session_info) = build_chain_with_two_blocks_with_one_candidate_each(
block_hash,
fork_block_hash,
slot,
sync_oracle_handle,
candidate_receipt,
)
.await;

chain_builder.build(&mut virtual_overseer).await;

futures_timer::Delay::new(Duration::from_millis(2000)).await;

// On major syncing ending Approval voting should send all the necessary messages for a
// candidate to be approved.
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NewBlocks(
_,
)) => {
}
);

clock
.inner
.lock()
.wakeup_all(slot_to_tick(slot) + 2 * test_tranche as u64 + RESTART_WAKEUP_DELAY - 1);

// Subsystem should not send any messages because the assignment is not triggered yet.
assert!(overseer_recv(&mut virtual_overseer).timeout(TIMEOUT / 2).await.is_none());

// Set the clock to the tick where the assignment should be triggered.
clock
.inner
.lock()
.wakeup_all(slot_to_tick(slot) + 2 * test_tranche as u64 + RESTART_WAKEUP_DELAY);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(
_,
RuntimeApiRequest::SessionInfo(_, si_tx),
)
) => {
si_tx.send(Ok(Some(session_info.clone()))).unwrap();
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(
_,
RuntimeApiRequest::SessionExecutorParams(_, si_tx),
)
) => {
// Make sure all SessionExecutorParams calls are not made for the leaf (but for its relay parent)
si_tx.send(Ok(Some(ExecutorParams::default()))).unwrap();
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(_, RuntimeApiRequest::NodeFeatures(_, si_tx), )
) => {
si_tx.send(Ok(NodeFeatures::EMPTY)).unwrap();
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment(
_,
_,
)) => {
}
);

// Guarantees the approval work has been relaunched.
recover_available_data(&mut virtual_overseer).await;
fetch_validation_code(&mut virtual_overseer).await;

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::CandidateValidation(CandidateValidationMessage::ValidateFromExhaustive {
exec_kind,
response_sender,
..
}) if exec_kind == PvfExecKind::Approval => {
response_sender.send(Ok(ValidationResult::Valid(Default::default(), Default::default())))
.unwrap();
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(_, sender))) => {
let _ = sender.send(Ok(ApprovalVotingParams {
max_approval_coalesce_count: 1,
}));
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeApproval(_))
);

clock
.inner
.lock()
.wakeup_all(slot_to_tick(slot) + 2 * test_tranche as u64 + RESTART_WAKEUP_DELAY);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment(
_,
_,
)) => {
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(_, sender))) => {
let _ = sender.send(Ok(ApprovalVotingParams {
max_approval_coalesce_count: 1,
}));
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeApproval(_))
);

// Assert that there are no more messages being sent by the subsystem
assert!(overseer_recv(&mut virtual_overseer).timeout(TIMEOUT / 2).await.is_none());

virtual_overseer
});
}

// Test we correctly update the timer when we mark the beginning of gathering assignments.
#[test]
fn test_gathering_assignments_statements() {
Expand Down

0 comments on commit 3eb2489

Please sign in to comment.