Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix algorithmic complexity of on-demand scheduler with regards to number of cores. #3190

Merged
merged 35 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
fd41678
max -> min
Jan 25, 2024
9ee7aa1
Reduce default queue size on-demand
Jan 25, 2024
0b3c03d
Introduce max max queue size.
Feb 1, 2024
edad3a4
Use max max queue size.
Feb 1, 2024
ee74614
Implementation complete
Feb 1, 2024
16afcfa
Revert min/max fix.
Feb 1, 2024
c750146
Fixes + EncodeableBinaryHeap.
Feb 2, 2024
b0edbcf
Remove EncodeableBinaryHeap
Feb 2, 2024
cac2752
Patch scale-info for now.
Feb 2, 2024
a9b13f7
Revert default value.
Feb 2, 2024
f9ea4e4
Fixes + tests.
Feb 2, 2024
457b225
Bring back copyright.
Feb 2, 2024
ce7c8e8
Fix benchmark.
Feb 2, 2024
1bab42e
binary heap got merged
Feb 2, 2024
c21540d
Calculate on demand traffic on idle blocks
antonva Feb 9, 2024
999b7bb
Add migration for on demand provider
antonva Mar 4, 2024
4df8590
Merge branch 'master' into rk-on-demand-perf-proper-fix
antonva Mar 4, 2024
ecbae81
Readd missing export
antonva Mar 7, 2024
079c78f
Add storage version to on demand pallet
antonva Mar 7, 2024
ee91d8b
Merge branch 'master' into rk-on-demand-perf-proper-fix
antonva Mar 7, 2024
5308f3a
Address comments, add new scale-info
antonva Mar 13, 2024
2253a14
Merge branch 'master' into rk-on-demand-perf-proper-fix
antonva Mar 14, 2024
1ae8b7b
Bump scale-info version again
antonva Mar 14, 2024
3e60c09
Add test to migration to preserve queue ordering
antonva Mar 14, 2024
2dd2512
Merge branch 'master' of https://github.com/paritytech/polkadot-sdk i…
Mar 18, 2024
600fd5e
".git/.scripts/commands/bench/bench.sh" --subcommand=pallet --runtime…
Mar 18, 2024
ee29cc2
Fix post_upgrade in migration
antonva Mar 18, 2024
b344bc6
Add prdoc
antonva Mar 18, 2024
76e4e76
Remove unused on-demand max size import
antonva Mar 18, 2024
ab51045
Remove unused mut from test
antonva Mar 18, 2024
9a705cb
Remove benchmark todo
antonva Mar 18, 2024
f2ad19f
Simplify PartialOrd for EnqueuedOrder
antonva Mar 18, 2024
70f468e
".git/.scripts/commands/bench/bench.sh" --subcommand=pallet --runtime…
Mar 18, 2024
d8f1543
Address nits
antonva Mar 20, 2024
be3a3c1
Type sums in post migration
antonva Mar 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions polkadot/primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ pub use v6::{
UpgradeRestriction, UpwardMessage, ValidDisputeStatementKind, ValidationCode,
ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature, ValidityAttestation,
ValidityError, ASSIGNMENT_KEY_TYPE_ID, LEGACY_MIN_BACKING_VOTES, LOWEST_PUBLIC_ID,
MAX_CODE_SIZE, MAX_HEAD_DATA_SIZE, MAX_POV_SIZE, ON_DEMAND_DEFAULT_QUEUE_MAX_SIZE,
ON_DEMAND_MAX_QUEUE_MAX_SIZE, PARACHAINS_INHERENT_IDENTIFIER, PARACHAIN_KEY_TYPE_ID,
MAX_CODE_SIZE, MAX_HEAD_DATA_SIZE, MAX_POV_SIZE, MIN_CODE_SIZE,
ON_DEMAND_DEFAULT_QUEUE_MAX_SIZE, ON_DEMAND_MAX_QUEUE_MAX_SIZE, PARACHAINS_INHERENT_IDENTIFIER,
PARACHAIN_KEY_TYPE_ID,
};

#[cfg(feature = "std")]
Expand Down
143 changes: 143 additions & 0 deletions polkadot/runtime/parachains/src/assigner_on_demand/migration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! A module that is responsible for migration of storage.
use super::*;
use frame_support::{
migrations::VersionedMigration, pallet_prelude::ValueQuery, storage_alias,
traits::OnRuntimeUpgrade, weights::Weight,
};

mod v0 {
use super::*;
use sp_std::collections::vec_deque::VecDeque;

#[derive(Encode, Decode, TypeInfo, Debug, PartialEq, Clone)]
pub(super) struct EnqueuedOrder {
pub para_id: ParaId,
}

/// Keeps track of the multiplier used to calculate the current spot price for the on demand
/// assigner.
/// NOTE: Ignoring the `OnEmpty` field for the migration.
#[storage_alias]
pub(super) type SpotTraffic<T: Config> = StorageValue<Pallet<T>, FixedU128, ValueQuery>;

/// The order storage entry. Uses a VecDeque to be able to push to the front of the
/// queue from the scheduler on session boundaries.
/// NOTE: Ignoring the `OnEmpty` field for the migration.
#[storage_alias]
pub(super) type OnDemandQueue<T: Config> =
StorageValue<Pallet<T>, VecDeque<EnqueuedOrder>, ValueQuery>;
}

mod v1 {
use super::*;

use crate::assigner_on_demand::LOG_TARGET;

/// Migration to V1
pub struct UncheckedMigrateToV1<T>(sp_std::marker::PhantomData<T>);
impl<T: Config> OnRuntimeUpgrade for UncheckedMigrateToV1<T> {
fn on_runtime_upgrade() -> Weight {
let mut weight: Weight = Weight::zero();

// Migrate the current traffic value
let config = <configuration::Pallet<T>>::config();
QueueStatus::<T>::mutate(|mut queue_status| {
Pallet::<T>::update_spot_traffic(&config, &mut queue_status);

let v0_queue = v0::OnDemandQueue::<T>::take();
// Process the v0 queue into v1.
v0_queue.into_iter().for_each(|enqueued_order| {
// Readding the old orders will use the new systems.
Pallet::<T>::add_on_demand_order(
queue_status,
enqueued_order.para_id,
QueuePushDirection::Front,
antonva marked this conversation as resolved.
Show resolved Hide resolved
);
});
});

// Remove the old storage.
v0::OnDemandQueue::<T>::kill(); // 1 write
v0::SpotTraffic::<T>::kill(); // 1 write

// Config read
weight.saturating_accrue(T::DbWeight::get().reads(1));
// QueueStatus read write (update_spot_traffic)
weight.saturating_accrue(T::DbWeight::get().reads_writes(1, 1));
// Kill x 2
weight.saturating_accrue(T::DbWeight::get().writes(2));

log::info!(target: LOG_TARGET, "Migrated on demand assigner storage to v1");
weight
}

#[cfg(feature = "try-runtime")]
fn pre_upgrade() -> Result<Vec<u8>, sp_runtime::TryRuntimeError> {
let n: u32 = v0::OnDemandQueue::<T>::get().len() as u32;

log::info!(
target: LOG_TARGET,
"Number of orders waiting in the queue before: {n}",
);

Ok(n.encode())
}

#[cfg(feature = "try-runtime")]
fn post_upgrade(state: Vec<u8>) -> Result<(), sp_runtime::TryRuntimeError> {
log::info!(target: LOG_TARGET, "Running post_upgrade()");

ensure!(
v0::OnDemandQueue::<T>::get().is_empty(),
"OnDemandQueue should be empty after the migration"
);

let expected_len = u32::decode(&mut &state[..]).unwrap();
let n_affinity_entries = AffinityEntries::<T>::iter()
.map(|(_index, heap)| heap.len() as u32)
.fold(0, |acc, x| acc + x);
let n_free_entries = FreeEntries::<T>::get().len() as u32;
let n_orders = n_affinity_entries + n_free_entries;
antonva marked this conversation as resolved.
Show resolved Hide resolved
// TODO: Ensure that affinity is intact.
ensure!(
expected_len == n_orders,
"Number of orders should be the same before and after migration."
);

let n_para_id_affinity = ParaIdAffinity::<T>::iter()
.map(|(_para_id, affinity)| affinity.count as u32)
.fold(0, |acc, x| acc + x);
ensure!(
n_para_id_affinity == n_affinity_entries,
antonva marked this conversation as resolved.
Show resolved Hide resolved
"Number of affinity entries should be the same as the counts in ParaIdAffinity"
);

Ok(())
}
}
}

/// Migrate `V0` to `V1` of the storage format.
pub type MigrateV0ToV1<T> = VersionedMigration<
0,
1,
v1::UncheckedMigrateToV1<T>,
Pallet<T>,
<T as frame_system::Config>::DbWeight,
>;
91 changes: 73 additions & 18 deletions polkadot/runtime/parachains/src/assigner_on_demand/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
//! intent can be made explicit.

mod benchmarking;
pub mod migration;
mod mock_helpers;

extern crate alloc;
Expand Down Expand Up @@ -126,10 +127,21 @@ struct QueueStatusType {
/// heap is roughly bounded in the number of on demand cores:
///
/// For a single core, elements will always be processed in order. With each core added, a
/// level of of out of order execution is added.
/// level of out of order execution is added.
freed_indices: BinaryHeap<ReverseQueueIndex>,
}

impl Default for QueueStatusType {
fn default() -> QueueStatusType {
QueueStatusType {
traffic: FixedU128::default(),
next_index: QueueIndex(0),
smallest_index: QueueIndex(0),
freed_indices: BinaryHeap::new(),
}
}
}

impl QueueStatusType {
/// How many orders are queued in total?
///
Expand Down Expand Up @@ -206,15 +218,15 @@ enum SpotTrafficCalculationErr {
Division,
}

// Type used for priority indices.
/// Type used for priority indices.
// NOTE: The `Ord` implementation for this type is unsound in the general case.
// Do not use it for anything but it's intended purpose.
#[derive(Encode, Decode, TypeInfo, Debug, PartialEq, Clone, Eq, Copy)]
struct QueueIndex(u32);

/// QueueIndex with reverse ordering.
///
/// Same as `Reverse(QueueIndex)`, but with all the needed traits implemented.
///
/// TODO: Add basic ordering tests.
#[derive(Encode, Decode, TypeInfo, Debug, PartialEq, Clone, Eq, Copy)]
struct ReverseQueueIndex(u32);

Expand Down Expand Up @@ -294,8 +306,11 @@ pub mod pallet {

use super::*;

const STORAGE_VERSION: StorageVersion = StorageVersion::new(1);

#[pallet::pallet]
#[pallet::without_storage_info]
#[pallet::storage_version(STORAGE_VERSION)]
pub struct Pallet<T>(_);

#[pallet::config]
Expand All @@ -317,12 +332,7 @@ pub mod pallet {
/// Creates an empty queue status for an empty queue with initial traffic value.
#[pallet::type_value]
pub(super) fn QueueStatusOnEmpty<T: Config>() -> QueueStatusType {
QueueStatusType {
traffic: T::TrafficDefaultValue::get(),
next_index: QueueIndex(0),
smallest_index: QueueIndex(0),
freed_indices: BinaryHeap::new(),
}
QueueStatusType { traffic: T::TrafficDefaultValue::get(), ..Default::default() }
}

#[pallet::type_value]
Expand Down Expand Up @@ -376,6 +386,21 @@ pub mod pallet {
SpotPriceHigherThanMaxAmount,
}

#[pallet::hooks]
impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
fn on_initialize(_now: BlockNumberFor<T>) -> Weight {
let config = <configuration::Pallet<T>>::config();
// We need to update the spot traffic on block initialize in order to account for idle
// blocks.
QueueStatus::<T>::mutate(|queue_status| {
Self::update_spot_traffic(&config, queue_status);
});

// 2 reads in config and queuestatus, at maximum 1 write to queuestatus.
T::DbWeight::get().reads_writes(2, 1)
}
}

#[pallet::call]
impl<T: Config> Pallet<T> {
/// Create a single on demand core order.
Expand Down Expand Up @@ -478,8 +503,6 @@ where

let assignment = entry.map(|e| Assignment::Pool { para_id: e.para_id, core_index }).ok()?;

// TODO: Test for invariant: If affinity count was zero before (is 1 now) then the entry
// must have come from free_entries.
Pallet::<T>::increase_affinity(assignment.para_id(), core_index);
Some(assignment)
}
Expand Down Expand Up @@ -541,8 +564,9 @@ where
let traffic = queue_status.traffic;

// Calculate spot price
let spot_price: BalanceOf<T> = traffic
.saturating_mul_int(config.on_demand_base_fee.saturated_into::<BalanceOf<T>>());
let spot_price: BalanceOf<T> = traffic.saturating_mul_int(
config.scheduler_params.on_demand_base_fee.saturated_into::<BalanceOf<T>>(),
);

// Is the current price higher than `max_amount`
ensure!(spot_price.le(&max_amount), Error::<T>::SpotPriceHigherThanMaxAmount);
Expand All @@ -555,7 +579,10 @@ where
existence_requirement,
)?;

ensure!(queue_status.size() < config.on_demand_queue_max_size, Error::<T>::QueueFull);
ensure!(
queue_status.size() < config.scheduler_params.on_demand_queue_max_size,
Error::<T>::QueueFull
);
Pallet::<T>::add_on_demand_order(queue_status, para_id, QueuePushDirection::Back);
Ok(())
})
Expand All @@ -569,10 +596,10 @@ where
let old_traffic = queue_status.traffic;
match Self::calculate_spot_traffic(
old_traffic,
config.on_demand_queue_max_size,
config.scheduler_params.on_demand_queue_max_size,
queue_status.size(),
config.on_demand_target_queue_utilization,
config.on_demand_fee_variability,
config.scheduler_params.on_demand_target_queue_utilization,
config.scheduler_params.on_demand_fee_variability,
) {
Ok(new_traffic) => {
// Only update storage on change
Expand Down Expand Up @@ -764,6 +791,19 @@ where
fn get_affinity_map(para_id: ParaId) -> Option<CoreAffinityCount> {
ParaIdAffinity::<T>::get(para_id)
}

/// Getter for the affinity entries.
#[cfg(test)]
fn get_affinity_entries(core_index: CoreIndex) -> BinaryHeap<EnqueuedOrder> {
AffinityEntries::<T>::get(core_index)
}

/// Getter for the free entries.
#[cfg(test)]
fn get_free_entries() -> BinaryHeap<EnqueuedOrder> {
FreeEntries::<T>::get()
}

#[cfg(feature = "runtime-benchmarks")]
pub fn populate_queue(para_id: ParaId, num: u32) {
QueueStatus::<T>::mutate(|queue_status| {
Expand All @@ -772,4 +812,19 @@ where
}
});
}

#[cfg(test)]
fn set_queue_status(new_status: QueueStatusType) {
QueueStatus::<T>::set(new_status);
}

#[cfg(test)]
fn get_queue_status() -> QueueStatusType {
QueueStatus::<T>::get()
}

#[cfg(test)]
fn get_traffic_default_value() -> FixedU128 {
<T as Config>::TrafficDefaultValue::get()
}
}
Loading
Loading