Skip to content

Commit

Permalink
[FRAME] Message Queue use proper overweight limit (paritytech#1873)
Browse files Browse the repository at this point in the history
Changes:
- Use a sensible limit for the overweight-cutoff of a single messages
instead of the full configured `ServiceWeight`.
- Add/Update tests

---------

Signed-off-by: Oliver Tale-Yazdi <[email protected]>
  • Loading branch information
ggwpez authored Oct 19, 2023
1 parent 2cd005f commit cecb553
Show file tree
Hide file tree
Showing 4 changed files with 267 additions and 22 deletions.
17 changes: 9 additions & 8 deletions substrate/frame/message-queue/src/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@

//! Stress tests pallet-message-queue. Defines its own runtime config to use larger constants for
//! `HeapSize` and `MaxStale`.
//!
//! The tests in this file are ignored by default, since they are quite slow. You can run them
//! manually like this:
//!
//! ```sh
//! RUST_LOG=info cargo test -p pallet-message-queue --profile testnet -- --ignored
//! ```
#![cfg(test)]

Expand Down Expand Up @@ -96,9 +103,6 @@ impl Config for Test {

/// Simulates heavy usage by enqueueing and processing large amounts of messages.
///
/// Best to run with `RUST_LOG=info RUSTFLAGS='-Cdebug-assertions=y' cargo test -r -p
/// pallet-message-queue -- --ignored`.
///
/// # Example output
///
/// ```pre
Expand All @@ -121,7 +125,7 @@ fn stress_test_enqueue_and_service() {
let max_queues = 10_000;
let max_messages_per_queue = 10_000;
let max_msg_len = MaxMessageLenOf::<Test>::get();
let mut rng = StdRng::seed_from_u64(42);
let mut rng = StdRng::seed_from_u64(43);

build_and_execute::<Test>(|| {
let mut msgs_remaining = 0;
Expand All @@ -145,9 +149,6 @@ fn stress_test_enqueue_and_service() {

/// Simulates heavy usage of the suspension logic via `Yield`.
///
/// Best to run with `RUST_LOG=info RUSTFLAGS='-Cdebug-assertions=y' cargo test -r -p
/// pallet-message-queue -- --ignored`.
///
/// # Example output
///
/// ```pre
Expand All @@ -169,7 +170,7 @@ fn stress_test_queue_suspension() {
let max_messages_per_queue = 10_000;
let (max_suspend_per_block, max_resume_per_block) = (100, 50);
let max_msg_len = MaxMessageLenOf::<Test>::get();
let mut rng = StdRng::seed_from_u64(41);
let mut rng = StdRng::seed_from_u64(43);

build_and_execute::<Test>(|| {
let mut suspended = BTreeSet::<u32>::new();
Expand Down
54 changes: 50 additions & 4 deletions substrate/frame/message-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,8 +584,9 @@ pub mod pallet {
}

/// Check all compile-time assumptions about [`crate::Config`].
#[cfg(test)]
fn integrity_test() {
assert!(!MaxMessageLenOf::<T>::get().is_zero(), "HeapSize too low");
Self::do_integrity_test().expect("Pallet config is valid; qed")
}
}

Expand Down Expand Up @@ -759,6 +760,47 @@ impl<T: Config> Pallet<T> {
}
}

/// The maximal weight that a single message can consume.
///
/// Any message using more than this will be marked as permanently overweight and not
/// automatically re-attempted. Returns `None` if the servicing of a message cannot begin.
/// `Some(0)` means that only messages with no weight may be served.
fn max_message_weight(limit: Weight) -> Option<Weight> {
limit.checked_sub(&Self::single_msg_overhead())
}

/// The overhead of servicing a single message.
fn single_msg_overhead() -> Weight {
T::WeightInfo::bump_service_head()
.saturating_add(T::WeightInfo::service_queue_base())
.saturating_add(
T::WeightInfo::service_page_base_completion()
.max(T::WeightInfo::service_page_base_no_completion()),
)
.saturating_add(T::WeightInfo::service_page_item())
.saturating_add(T::WeightInfo::ready_ring_unknit())
}

/// Checks invariants of the pallet config.
///
/// The results of this can only be relied upon if the config values are set to constants.
#[cfg(test)]
fn do_integrity_test() -> Result<(), String> {
ensure!(!MaxMessageLenOf::<T>::get().is_zero(), "HeapSize too low");

if let Some(service) = T::ServiceWeight::get() {
if Self::max_message_weight(service).is_none() {
return Err(format!(
"ServiceWeight too low: {}. Must be at least {}",
service,
Self::single_msg_overhead(),
))
}
}

Ok(())
}

fn do_enqueue_message(
origin: &MessageOriginOf<T>,
message: BoundedSlice<u8, MaxMessageLenOf<T>>,
Expand Down Expand Up @@ -1360,10 +1402,14 @@ impl<T: Config> ServiceQueues for Pallet<T> {
type OverweightMessageAddress = (MessageOriginOf<T>, PageIndex, T::Size);

fn service_queues(weight_limit: Weight) -> Weight {
// The maximum weight that processing a single message may take.
let overweight_limit = weight_limit;
let mut weight = WeightMeter::with_limit(weight_limit);

// Get the maximum weight that processing a single message may take:
let max_weight = Self::max_message_weight(weight_limit).unwrap_or_else(|| {
defensive!("Not enough weight to service a single message.");
Weight::zero()
});

let mut next = match Self::bump_service_head(&mut weight) {
Some(h) => h,
None => return weight.consumed(),
Expand All @@ -1374,7 +1420,7 @@ impl<T: Config> ServiceQueues for Pallet<T> {
let mut last_no_progress = None;

loop {
let (progressed, n) = Self::service_queue(next.clone(), &mut weight, overweight_limit);
let (progressed, n) = Self::service_queue(next.clone(), &mut weight, max_weight);
next = match n {
Some(n) =>
if !progressed {
Expand Down
36 changes: 26 additions & 10 deletions substrate/frame/message-queue/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl frame_system::Config for Test {
parameter_types! {
pub const HeapSize: u32 = 24;
pub const MaxStale: u32 = 2;
pub const ServiceWeight: Option<Weight> = Some(Weight::from_parts(10, 10));
pub const ServiceWeight: Option<Weight> = Some(Weight::from_parts(100, 100));
}
impl Config for Test {
type RuntimeEvent = RuntimeEvent;
Expand All @@ -91,6 +91,7 @@ pub struct MockedWeightInfo;
parameter_types! {
/// Storage for `MockedWeightInfo`, do not use directly.
pub static WeightForCall: BTreeMap<String, Weight> = Default::default();
pub static DefaultWeightForCall: Weight = Weight::zero();
}

/// Set the return value for a function from the `WeightInfo` trait.
Expand All @@ -111,40 +112,55 @@ impl crate::weights::WeightInfo for MockedWeightInfo {
WeightForCall::get()
.get("execute_overweight_page_updated")
.copied()
.unwrap_or_default()
.unwrap_or(DefaultWeightForCall::get())
}
fn execute_overweight_page_removed() -> Weight {
WeightForCall::get()
.get("execute_overweight_page_removed")
.copied()
.unwrap_or_default()
.unwrap_or(DefaultWeightForCall::get())
}
fn service_page_base_completion() -> Weight {
WeightForCall::get()
.get("service_page_base_completion")
.copied()
.unwrap_or_default()
.unwrap_or(DefaultWeightForCall::get())
}
fn service_page_base_no_completion() -> Weight {
WeightForCall::get()
.get("service_page_base_no_completion")
.copied()
.unwrap_or_default()
.unwrap_or(DefaultWeightForCall::get())
}
fn service_queue_base() -> Weight {
WeightForCall::get().get("service_queue_base").copied().unwrap_or_default()
WeightForCall::get()
.get("service_queue_base")
.copied()
.unwrap_or(DefaultWeightForCall::get())
}
fn bump_service_head() -> Weight {
WeightForCall::get().get("bump_service_head").copied().unwrap_or_default()
WeightForCall::get()
.get("bump_service_head")
.copied()
.unwrap_or(DefaultWeightForCall::get())
}
fn service_page_item() -> Weight {
WeightForCall::get().get("service_page_item").copied().unwrap_or_default()
WeightForCall::get()
.get("service_page_item")
.copied()
.unwrap_or(DefaultWeightForCall::get())
}
fn ready_ring_knit() -> Weight {
WeightForCall::get().get("ready_ring_knit").copied().unwrap_or_default()
WeightForCall::get()
.get("ready_ring_knit")
.copied()
.unwrap_or(DefaultWeightForCall::get())
}
fn ready_ring_unknit() -> Weight {
WeightForCall::get().get("ready_ring_unknit").copied().unwrap_or_default()
WeightForCall::get()
.get("ready_ring_unknit")
.copied()
.unwrap_or(DefaultWeightForCall::get())
}
}

Expand Down
Loading

0 comments on commit cecb553

Please sign in to comment.