Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Yieldable queues for pallet MessageQueue #13424

Merged
merged 15 commits into from
Feb 25, 2023
Prev Previous commit
Next Next commit
Use WeightMeter instead of weight return
Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>
  • Loading branch information
ggwpez committed Feb 21, 2023
commit 5359dfd5e729531db7a3453a2ac501d7dbea675d
9 changes: 5 additions & 4 deletions frame/message-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1145,13 +1145,14 @@ impl<T: Config> Pallet<T> {
page_index: PageIndex,
message_index: T::Size,
message: &[u8],
weight: &mut WeightMeter,
meter: &mut WeightMeter,
overweight_limit: Weight,
) -> MessageExecutionStatus {
let hash = T::Hashing::hash(message);
use ProcessMessageError::*;
let prev_consumed = meter.consumed;

match T::MessageProcessor::process_message(message, origin.clone(), weight.remaining()) {
match T::MessageProcessor::process_message(message, origin.clone(), meter) {
Err(Overweight(w)) if w.any_gt(overweight_limit) => {
// Permanently overweight.
Self::deposit_event(Event::<T>::OverweightEnqueued {
Expand All @@ -1176,9 +1177,9 @@ impl<T: Config> Pallet<T> {
Self::deposit_event(Event::<T>::ProcessingFailed { hash, origin, error });
MessageExecutionStatus::Unprocessable { permanent: true }
},
Ok((success, weight_used)) => {
Ok(success) => {
// Success
weight.defensive_saturating_accrue(weight_used);
let weight_used = meter.consumed.saturating_sub(prev_consumed);
Self::deposit_event(Event::<T>::Processed { hash, origin, weight_used, success });
MessageExecutionStatus::Processed
},
Expand Down
24 changes: 12 additions & 12 deletions frame/message-queue/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ impl ProcessMessage for RecordingMessageProcessor {
fn process_message(
message: &[u8],
origin: Self::Origin,
weight_limit: Weight,
) -> Result<(bool, Weight), ProcessMessageError> {
meter: &mut WeightMeter,
) -> Result<bool, ProcessMessageError> {
processing_message(message, &origin)?;

let weight = if message.starts_with(&b"weight="[..]) {
Expand All @@ -188,15 +188,15 @@ impl ProcessMessage for RecordingMessageProcessor {
} else {
1
};
let weight = Weight::from_parts(weight, weight);
let required = Weight::from_parts(weight, weight);

if weight.all_lte(weight_limit) {
if meter.check_accrue(required) {
let mut m = MessagesProcessed::get();
m.push((message.to_vec(), origin));
MessagesProcessed::set(m);
Ok((true, weight))
Ok(true)
} else {
Err(ProcessMessageError::Overweight(weight))
Err(ProcessMessageError::Overweight(required))
}
}
}
Expand Down Expand Up @@ -238,19 +238,19 @@ impl ProcessMessage for CountingMessageProcessor {
fn process_message(
message: &[u8],
origin: Self::Origin,
weight_limit: Weight,
) -> Result<(bool, Weight), ProcessMessageError> {
meter: &mut WeightMeter,
) -> Result<bool, ProcessMessageError> {
if let Err(e) = processing_message(message, &origin) {
NumMessagesErrored::set(NumMessagesErrored::get() + 1);
return Err(e)
}
let weight = Weight::from_parts(1, 1);
let required = Weight::from_parts(1, 1);

if weight.all_lte(weight_limit) {
if meter.check_accrue(required) {
NumMessagesProcessed::set(NumMessagesProcessed::get() + 1);
Ok((true, weight))
Ok(true)
} else {
Err(ProcessMessageError::Overweight(weight))
Err(ProcessMessageError::Overweight(required))
}
}
}
Expand Down
17 changes: 8 additions & 9 deletions frame/message-queue/src/mock_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,15 @@ where

fn process_message(
_message: &[u8],
origin: Self::Origin,
weight_limit: Weight,
) -> Result<(bool, Weight), ProcessMessageError> {
let weight = Weight::from_parts(REQUIRED_WEIGHT, REQUIRED_WEIGHT);
log::warn!("Processing message from {:?}", origin);

if weight.all_lte(weight_limit) {
Ok((true, weight))
_origin: Self::Origin,
meter: &mut WeightMeter,
) -> Result<bool, ProcessMessageError> {
let required = Weight::from_parts(REQUIRED_WEIGHT, REQUIRED_WEIGHT);

if meter.check_accrue(required) {
Ok(true)
} else {
Err(ProcessMessageError::Overweight(weight))
Err(ProcessMessageError::Overweight(required))
}
}
}
Expand Down
10 changes: 6 additions & 4 deletions frame/support/src/traits/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use scale_info::TypeInfo;
use sp_core::{ConstU32, Get, TypedGet};
use sp_runtime::{traits::Convert, BoundedSlice, RuntimeDebug};
use sp_std::{fmt::Debug, marker::PhantomData, prelude::*};
use sp_weights::Weight;
use sp_weights::{Weight, WeightMeter};

/// Errors that can happen when attempting to process a message with
/// [`ProcessMessage::process_message()`].
Expand Down Expand Up @@ -52,12 +52,14 @@ pub trait ProcessMessage {
/// The transport from where a message originates.
type Origin: FullCodec + MaxEncodedLen + Clone + Eq + PartialEq + TypeInfo + Debug;

/// Process the given message, using no more than `weight_limit` in weight to do so.
/// Process the given message, using no more than the remaining `meter` weight to do so.
///
/// Returns whether the message was processed.
fn process_message(
message: &[u8],
origin: Self::Origin,
weight_limit: Weight,
) -> Result<(bool, Weight), ProcessMessageError>;
meter: &mut WeightMeter,
) -> Result<bool, ProcessMessageError>;
}

/// Errors that can happen when attempting to execute an overweight message with
Expand Down