Skip to content

Commit

Permalink
Fix algorithmic complexity of on-demand scheduler with regards to num…
Browse files Browse the repository at this point in the history
…ber of cores. (paritytech#3190)

We witnessed really poor performance on Rococo, where we ended up with
50 on-demand cores. This was due to the fact that for each core the full
queue was processed. With this change full queue processing will happen
way less often (most of the time complexity is O(1) or O(log(n))) and if
it happens then only for one core (in expectation).

Also spot price is now updated before each order to ensure economic back
pressure.


TODO:

- [x] Implement
- [x] Basic tests
- [x] Add more tests (see todos)
- [x] Run benchmark to confirm better performance, first results suggest
> 100x faster.
- [x] Write migrations
- [x] Bump scale-info version and remove patch in Cargo.toml
- [x] Write PR docs: on-demand performance improved, more on-demand
cores are now non problematic anymore. If need by also the max queue
size can be increased again. (Maybe not to 10k)

Optional: Performance can be improved even more, if we called
`pop_assignment_for_core()`, before calling `report_processed` (Avoid
needless affinity drops). The effect gets smaller the larger the claim
queue and I would only go for it, if it does not add complexity to the
scheduler.

---------

Co-authored-by: eskimor <[email protected]>
Co-authored-by: antonva <[email protected]>
Co-authored-by: command-bot <>
Co-authored-by: Anton Vilhelm Ásgeirsson <[email protected]>
Co-authored-by: ordian <[email protected]>
  • Loading branch information
5 people authored and dharjeezy committed Mar 24, 2024
1 parent a04fcfd commit 8d3e74a
Show file tree
Hide file tree
Showing 13 changed files with 1,046 additions and 546 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion polkadot/primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ pub use v6::{
ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature, ValidityAttestation,
ValidityError, ASSIGNMENT_KEY_TYPE_ID, LEGACY_MIN_BACKING_VOTES, LOWEST_PUBLIC_ID,
MAX_CODE_SIZE, MAX_HEAD_DATA_SIZE, MAX_POV_SIZE, MIN_CODE_SIZE,
ON_DEMAND_DEFAULT_QUEUE_MAX_SIZE, PARACHAINS_INHERENT_IDENTIFIER, PARACHAIN_KEY_TYPE_ID,
ON_DEMAND_DEFAULT_QUEUE_MAX_SIZE, ON_DEMAND_MAX_QUEUE_MAX_SIZE, PARACHAINS_INHERENT_IDENTIFIER,
PARACHAIN_KEY_TYPE_ID,
};

#[cfg(feature = "std")]
Expand Down
7 changes: 7 additions & 0 deletions polkadot/primitives/src/v6/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,13 @@ pub const MAX_POV_SIZE: u32 = 5 * 1024 * 1024;
/// Can be adjusted in configuration.
pub const ON_DEMAND_DEFAULT_QUEUE_MAX_SIZE: u32 = 10_000;

/// Maximum for maximum queue size.
///
/// Setting `on_demand_queue_max_size` to a value higher than this is unsound. This is more a
/// theoretical limit, just below enough what the target type supports, so comparisons are possible
/// even with indices that are overflowing the underyling type.
pub const ON_DEMAND_MAX_QUEUE_MAX_SIZE: u32 = 1_000_000_000;

/// Backing votes threshold used from the host prior to runtime API version 6 and from the runtime
/// prior to v9 configuration migration.
pub const LEGACY_MIN_BACKING_VOTES: u32 = 2;
Expand Down
2 changes: 1 addition & 1 deletion polkadot/runtime/parachains/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ bitvec = { version = "1.0.0", default-features = false, features = ["alloc"] }
parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive", "max-encoded-len"] }
log = { workspace = true }
rustc-hex = { version = "2.1.0", default-features = false }
scale-info = { version = "2.10.0", default-features = false, features = ["derive"] }
scale-info = { version = "2.11.0", default-features = false, features = ["derive"] }
serde = { features = ["alloc", "derive"], workspace = true }
derive_more = "0.99.17"
bitflags = "1.3.2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,7 @@ mod benchmarks {
let para_id = ParaId::from(111u32);
init_parathread::<T>(para_id);
T::Currency::make_free_balance_be(&caller, BalanceOf::<T>::max_value());
let order = EnqueuedOrder::new(para_id);

for _ in 0..s {
Pallet::<T>::add_on_demand_order(order.clone(), QueuePushDirection::Back).unwrap();
}
Pallet::<T>::populate_queue(para_id, s);

#[extrinsic_call]
_(RawOrigin::Signed(caller.into()), BalanceOf::<T>::max_value(), para_id)
Expand All @@ -87,11 +83,8 @@ mod benchmarks {
let para_id = ParaId::from(111u32);
init_parathread::<T>(para_id);
T::Currency::make_free_balance_be(&caller, BalanceOf::<T>::max_value());
let order = EnqueuedOrder::new(para_id);

for _ in 0..s {
Pallet::<T>::add_on_demand_order(order.clone(), QueuePushDirection::Back).unwrap();
}
Pallet::<T>::populate_queue(para_id, s);

#[extrinsic_call]
_(RawOrigin::Signed(caller.into()), BalanceOf::<T>::max_value(), para_id)
Expand Down
181 changes: 181 additions & 0 deletions polkadot/runtime/parachains/src/assigner_on_demand/migration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! A module that is responsible for migration of storage.
use super::*;
use frame_support::{
migrations::VersionedMigration, pallet_prelude::ValueQuery, storage_alias,
traits::OnRuntimeUpgrade, weights::Weight,
};

mod v0 {
use super::*;
use sp_std::collections::vec_deque::VecDeque;

#[derive(Encode, Decode, TypeInfo, Debug, PartialEq, Clone)]
pub(super) struct EnqueuedOrder {
pub para_id: ParaId,
}

/// Keeps track of the multiplier used to calculate the current spot price for the on demand
/// assigner.
/// NOTE: Ignoring the `OnEmpty` field for the migration.
#[storage_alias]
pub(super) type SpotTraffic<T: Config> = StorageValue<Pallet<T>, FixedU128, ValueQuery>;

/// The order storage entry. Uses a VecDeque to be able to push to the front of the
/// queue from the scheduler on session boundaries.
/// NOTE: Ignoring the `OnEmpty` field for the migration.
#[storage_alias]
pub(super) type OnDemandQueue<T: Config> =
StorageValue<Pallet<T>, VecDeque<EnqueuedOrder>, ValueQuery>;
}

mod v1 {
use super::*;

use crate::assigner_on_demand::LOG_TARGET;

/// Migration to V1
pub struct UncheckedMigrateToV1<T>(sp_std::marker::PhantomData<T>);
impl<T: Config> OnRuntimeUpgrade for UncheckedMigrateToV1<T> {
fn on_runtime_upgrade() -> Weight {
let mut weight: Weight = Weight::zero();

// Migrate the current traffic value
let config = <configuration::Pallet<T>>::config();
QueueStatus::<T>::mutate(|mut queue_status| {
Pallet::<T>::update_spot_traffic(&config, &mut queue_status);

let v0_queue = v0::OnDemandQueue::<T>::take();
// Process the v0 queue into v1.
v0_queue.into_iter().for_each(|enqueued_order| {
// Readding the old orders will use the new systems.
Pallet::<T>::add_on_demand_order(
queue_status,
enqueued_order.para_id,
QueuePushDirection::Back,
);
});
});

// Remove the old storage.
v0::OnDemandQueue::<T>::kill(); // 1 write
v0::SpotTraffic::<T>::kill(); // 1 write

// Config read
weight.saturating_accrue(T::DbWeight::get().reads(1));
// QueueStatus read write (update_spot_traffic)
weight.saturating_accrue(T::DbWeight::get().reads_writes(1, 1));
// Kill x 2
weight.saturating_accrue(T::DbWeight::get().writes(2));

log::info!(target: LOG_TARGET, "Migrated on demand assigner storage to v1");
weight
}

#[cfg(feature = "try-runtime")]
fn pre_upgrade() -> Result<Vec<u8>, sp_runtime::TryRuntimeError> {
let n: u32 = v0::OnDemandQueue::<T>::get().len() as u32;

log::info!(
target: LOG_TARGET,
"Number of orders waiting in the queue before: {n}",
);

Ok(n.encode())
}

#[cfg(feature = "try-runtime")]
fn post_upgrade(state: Vec<u8>) -> Result<(), sp_runtime::TryRuntimeError> {
log::info!(target: LOG_TARGET, "Running post_upgrade()");

ensure!(
v0::OnDemandQueue::<T>::get().is_empty(),
"OnDemandQueue should be empty after the migration"
);

let expected_len = u32::decode(&mut &state[..]).unwrap();
let queue_status_size = QueueStatus::<T>::get().size();
ensure!(
expected_len == queue_status_size,
"Number of orders should be the same before and after migration"
);

let n_affinity_entries: u32 =
AffinityEntries::<T>::iter().map(|(_index, heap)| heap.len() as u32).sum();
let n_para_id_affinity: u32 = ParaIdAffinity::<T>::iter()
.map(|(_para_id, affinity)| affinity.count as u32)
.sum();
ensure!(
n_para_id_affinity == n_affinity_entries,
"Number of affinity entries should be the same as the counts in ParaIdAffinity"
);

Ok(())
}
}
}

/// Migrate `V0` to `V1` of the storage format.
pub type MigrateV0ToV1<T> = VersionedMigration<
0,
1,
v1::UncheckedMigrateToV1<T>,
Pallet<T>,
<T as frame_system::Config>::DbWeight,
>;

#[cfg(test)]
mod tests {
use super::{v0, v1, OnRuntimeUpgrade, Weight};
use crate::mock::{new_test_ext, MockGenesisConfig, OnDemandAssigner, Test};
use primitives::Id as ParaId;

#[test]
fn migration_to_v1_preserves_queue_ordering() {
new_test_ext(MockGenesisConfig::default()).execute_with(|| {
// Place orders for paraids 1..5
for i in 1..=5 {
v0::OnDemandQueue::<Test>::mutate(|queue| {
queue.push_back(v0::EnqueuedOrder { para_id: ParaId::new(i) })
});
}

// Queue has 5 orders
let old_queue = v0::OnDemandQueue::<Test>::get();
assert_eq!(old_queue.len(), 5);
// New queue has 0 orders
assert_eq!(OnDemandAssigner::get_queue_status().size(), 0);

// For tests, db weight is zero.
assert_eq!(
<v1::UncheckedMigrateToV1<Test> as OnRuntimeUpgrade>::on_runtime_upgrade(),
Weight::zero()
);

// New queue has 5 orders
assert_eq!(OnDemandAssigner::get_queue_status().size(), 5);

// Compare each entry from the old queue with the entry in the new queue.
old_queue.iter().zip(OnDemandAssigner::get_free_entries().iter()).for_each(
|(old_enq, new_enq)| {
assert_eq!(old_enq.para_id, new_enq.para_id);
},
);
});
}
}
Loading

0 comments on commit 8d3e74a

Please sign in to comment.