Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

pallet-message-queue: add queue pausing #14318

Merged
merged 3 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bin/node/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,7 @@ impl pallet_message_queue::Config for Runtime {
type MessageProcessor = pallet_message_queue::mock_helpers::NoopMessageProcessor<u32>;
type Size = u32;
type QueueChangeHandler = ();
type QueuePausedQuery = ();
type HeapSize = ConstU32<{ 64 * 1024 }>;
type MaxStale = ConstU32<128>;
type ServiceWeight = MessageQueueServiceWeight;
Expand Down
7 changes: 4 additions & 3 deletions frame/message-queue/src/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
use crate::{
mock::{
new_test_ext, CountingMessageProcessor, IntoWeight, MockedWeightInfo, NumMessagesProcessed,
SuspendedQueues,
YieldingQueues,
},
mock_helpers::MessageOrigin,
*,
Expand Down Expand Up @@ -96,6 +96,7 @@ impl Config for Test {
type MessageProcessor = CountingMessageProcessor;
type Size = u32;
type QueueChangeHandler = ();
type QueuePausedQuery = ();
type HeapSize = HeapSize;
type MaxStale = MaxStale;
type ServiceWeight = ServiceWeight;
Expand Down Expand Up @@ -207,7 +208,7 @@ fn stress_test_queue_suspension() {
to_resume,
per_queue.len()
);
SuspendedQueues::set(suspended.iter().map(|q| MessageOrigin::Everywhere(*q)).collect());
YieldingQueues::set(suspended.iter().map(|q| MessageOrigin::Everywhere(*q)).collect());

// Pick a fraction of all messages currently in queue and process them.
let resumed_messages =
Expand All @@ -229,7 +230,7 @@ fn stress_test_queue_suspension() {
process_all_messages(resumed_messages);
msgs_remaining -= resumed_messages;

let resumed = SuspendedQueues::take();
let resumed = YieldingQueues::take();
log::info!("Resumed all {} suspended queues", resumed.len());
log::info!("Processing all remaining {} messages", msgs_remaining);
process_all_messages(msgs_remaining);
Expand Down
25 changes: 23 additions & 2 deletions frame/message-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ use frame_support::{
pallet_prelude::*,
traits::{
DefensiveTruncateFrom, EnqueueMessage, ExecuteOverweightError, Footprint, ProcessMessage,
ProcessMessageError, ServiceQueues,
ProcessMessageError, QueuePausedQuery, ServiceQueues,
},
BoundedSlice, CloneNoBound, DefaultNoBound,
};
Expand Down Expand Up @@ -473,6 +473,13 @@ pub mod pallet {
/// removed.
type QueueChangeHandler: OnQueueChanged<<Self::MessageProcessor as ProcessMessage>::Origin>;

/// Queried by the pallet to check whether a queue can be serviced.
///
/// This also applies to manual servicing via `execute_overweight` and `service_queues`. The
/// value of this is only polled once before servicing the queue. This means that changes to
/// it that happen *within* the servicing will not be reflected.
type QueuePausedQuery: QueuePausedQuery<<Self::MessageProcessor as ProcessMessage>::Origin>;
ggwpez marked this conversation as resolved.
Show resolved Hide resolved

/// The size of the page; this implies the maximum message size which can be sent.
///
/// A good value depends on the expected message sizes, their weights, the weight that is
Expand Down Expand Up @@ -534,6 +541,10 @@ pub mod pallet {
/// Such errors are expected, but not guaranteed, to resolve themselves eventually through
/// retrying.
TemporarilyUnprocessable,
/// The queue is paused and no message can be executed from it.
///
/// This can change at any time and may resolve in the future by re-trying.
QueuePaused,
}

/// The index of the first and last (non-empty) pages.
Expand Down Expand Up @@ -803,6 +814,8 @@ impl<T: Config> Pallet<T> {
weight_limit: Weight,
) -> Result<Weight, Error<T>> {
let mut book_state = BookStateFor::<T>::get(&origin);
ensure!(!T::QueuePausedQuery::is_paused(&origin), Error::<T>::QueuePaused);

let mut page = Pages::<T>::get(&origin, page_index).ok_or(Error::<T>::NoPage)?;
let (pos, is_processed, payload) =
page.peek_index(index.into() as usize).ok_or(Error::<T>::NoMessage)?;
Expand Down Expand Up @@ -943,6 +956,10 @@ impl<T: Config> Pallet<T> {

let mut book_state = BookStateFor::<T>::get(&origin);
let mut total_processed = 0;
if T::QueuePausedQuery::is_paused(&origin) {
let next_ready = book_state.ready_neighbours.as_ref().map(|x| x.next.clone());
return (false, next_ready)
}

while book_state.end > book_state.begin {
let (processed, status) =
Expand Down Expand Up @@ -1284,7 +1301,11 @@ impl<T: Config> ServiceQueues for Pallet<T> {
Pallet::<T>::do_execute_overweight(message_origin, page, index, weight.remaining()).map_err(
|e| match e {
Error::<T>::InsufficientWeight => ExecuteOverweightError::InsufficientWeight,
_ => ExecuteOverweightError::NotFound,
Error::<T>::AlreadyProcessed => ExecuteOverweightError::AlreadyProcessed,
Error::<T>::QueuePaused => ExecuteOverweightError::QueuePaused,
Error::<T>::NoPage | Error::<T>::NoMessage | Error::<T>::Queued =>
ExecuteOverweightError::NotFound,
_ => ExecuteOverweightError::Other,
},
)
}
Expand Down
23 changes: 21 additions & 2 deletions frame/message-queue/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ impl Config for Test {
type MessageProcessor = RecordingMessageProcessor;
type Size = u32;
type QueueChangeHandler = RecordingQueueChangeHandler;
type QueuePausedQuery = MockedQueuePauser;
type HeapSize = HeapSize;
type MaxStale = MaxStale;
type ServiceWeight = ServiceWeight;
Expand Down Expand Up @@ -154,7 +155,8 @@ impl crate::weights::WeightInfo for MockedWeightInfo {

parameter_types! {
pub static MessagesProcessed: Vec<(Vec<u8>, MessageOrigin)> = vec![];
pub static SuspendedQueues: Vec<MessageOrigin> = vec![];
/// Queues that should return `Yield` upon being processed.
pub static YieldingQueues: Vec<MessageOrigin> = vec![];
}

/// A message processor which records all processed messages into [`MessagesProcessed`].
Expand Down Expand Up @@ -205,7 +207,7 @@ impl ProcessMessage for RecordingMessageProcessor {
/// Processed a mocked message. Messages that end with `badformat`, `corrupt`, `unsupported` or
/// `yield` will fail with an error respectively.
fn processing_message(msg: &[u8], origin: &MessageOrigin) -> Result<(), ProcessMessageError> {
if SuspendedQueues::get().contains(&origin) {
if YieldingQueues::get().contains(&origin) {
return Err(ProcessMessageError::Yield)
}

Expand Down Expand Up @@ -270,6 +272,17 @@ impl OnQueueChanged<MessageOrigin> for RecordingQueueChangeHandler {
}
}

parameter_types! {
pub static PausedQueues: Vec<MessageOrigin> = vec![];
}

pub struct MockedQueuePauser;
impl QueuePausedQuery<MessageOrigin> for MockedQueuePauser {
fn is_paused(id: &MessageOrigin) -> bool {
PausedQueues::get().contains(id)
}
}

/// Create new test externalities.
///
/// Is generic since it is used by the unit test, integration tests and benchmarks.
Expand All @@ -287,6 +300,12 @@ where
ext
}

/// Run this closure in test externalities.
pub fn test_closure<R>(f: impl FnOnce() -> R) -> R {
let mut ext = new_test_ext::<Test>();
ext.execute_with(f)
}

/// Set the weight of a specific weight function.
pub fn set_weight(name: &str, w: Weight) {
MockedWeightInfo::set_weight::<Test>(name, w);
Expand Down
6 changes: 3 additions & 3 deletions frame/message-queue/src/mock_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ pub fn page<T: Config>(msg: &[u8]) -> PageOf<T> {
}

pub fn single_page_book<T: Config>() -> BookStateOf<T> {
BookState { begin: 0, end: 1, count: 1, ready_neighbours: None, message_count: 0, size: 0 }
BookState { begin: 0, end: 1, count: 1, ..Default::default() }
}

pub fn empty_book<T: Config>() -> BookStateOf<T> {
BookState { begin: 0, end: 1, count: 1, ready_neighbours: None, message_count: 0, size: 0 }
BookState { begin: 0, end: 1, count: 1, ..Default::default() }
}

/// Returns a full page of messages with their index as payload and the number of messages.
Expand All @@ -118,9 +118,9 @@ pub fn book_for<T: Config>(page: &PageOf<T>) -> BookStateOf<T> {
count: 1,
begin: 0,
end: 1,
ready_neighbours: None,
message_count: page.remaining.into() as u64,
size: page.remaining_size.into() as u64,
..Default::default()
}
}

Expand Down
Loading