diff --git a/substrate/frame/message-queue/Cargo.toml b/substrate/frame/message-queue/Cargo.toml index 148848f2bf011..614e5885b4b29 100644 --- a/substrate/frame/message-queue/Cargo.toml +++ b/substrate/frame/message-queue/Cargo.toml @@ -13,6 +13,7 @@ codec = { package = "parity-scale-codec", version = "3.6.1", default-features = scale-info = { version = "2.10.0", default-features = false, features = ["derive"] } serde = { version = "1.0.193", optional = true, features = ["derive"] } log = { version = "0.4.17", default-features = false } +environmental = { version = "1.1.4", default-features = false } sp-core = { path = "../../primitives/core", default-features = false } sp-io = { path = "../../primitives/io", default-features = false } @@ -34,6 +35,7 @@ rand_distr = "0.4.3" default = ["std"] std = [ "codec/std", + "environmental/std", "frame-benchmarking?/std", "frame-support/std", "frame-system/std", diff --git a/substrate/frame/message-queue/src/benchmarking.rs b/substrate/frame/message-queue/src/benchmarking.rs index eedaaebeca944..7e99bc0585845 100644 --- a/substrate/frame/message-queue/src/benchmarking.rs +++ b/substrate/frame/message-queue/src/benchmarking.rs @@ -25,6 +25,7 @@ use super::{mock_helpers::*, Pallet as MessageQueue, *}; use frame_benchmarking::v2::*; use frame_support::traits::Get; use frame_system::RawOrigin; +use sp_io::hashing::blake2_256; use sp_std::prelude::*; #[benchmarks( @@ -142,7 +143,7 @@ mod benchmarks { // Check that it was processed. assert_last_event::( Event::Processed { - id: sp_io::hashing::blake2_256(&msg), + id: blake2_256(&msg).into(), origin: 0.into(), weight_used: 1.into_weight(), success: true, @@ -227,7 +228,7 @@ mod benchmarks { assert_last_event::( Event::Processed { - id: sp_io::hashing::blake2_256(&((msgs - 1) as u32).encode()), + id: blake2_256(&((msgs - 1) as u32).encode()).into(), origin: 0.into(), weight_used: Weight::from_parts(1, 1), success: true, @@ -264,7 +265,7 @@ mod benchmarks { assert_last_event::( Event::Processed { - id: sp_io::hashing::blake2_256(&((msgs - 1) as u32).encode()), + id: blake2_256(&((msgs - 1) as u32).encode()).into(), origin: 0.into(), weight_used: Weight::from_parts(1, 1), success: true, diff --git a/substrate/frame/message-queue/src/integration_test.rs b/substrate/frame/message-queue/src/integration_test.rs index 53dc204ab9c07..fee5d24213538 100644 --- a/substrate/frame/message-queue/src/integration_test.rs +++ b/substrate/frame/message-queue/src/integration_test.rs @@ -29,8 +29,8 @@ use crate::{ mock::{ - build_and_execute, CountingMessageProcessor, IntoWeight, MockedWeightInfo, - NumMessagesProcessed, YieldingQueues, + build_and_execute, gen_seed, Callback, CountingMessageProcessor, IntoWeight, + MessagesProcessed, MockedWeightInfo, NumMessagesProcessed, YieldingQueues, }, mock_helpers::MessageOrigin, *, @@ -120,13 +120,13 @@ impl Config for Test { /// Processing all remaining 28639 messages /// ``` #[test] -#[ignore] // Only run in the CI. +#[ignore] // Only run in the CI, otherwise its too slow. fn stress_test_enqueue_and_service() { let blocks = 20; let max_queues = 10_000; let max_messages_per_queue = 10_000; let max_msg_len = MaxMessageLenOf::::get(); - let mut rng = StdRng::seed_from_u64(43); + let mut rng = StdRng::seed_from_u64(gen_seed()); build_and_execute::(|| { let mut msgs_remaining = 0; @@ -148,6 +148,74 @@ fn stress_test_enqueue_and_service() { }); } +/// Very similar to `stress_test_enqueue_and_service`, but enqueues messages while processing them. +#[test] +#[ignore] // Only run in the CI, otherwise its too slow. +fn stress_test_recursive() { + let blocks = 20; + let mut rng = StdRng::seed_from_u64(gen_seed()); + + // We need to use thread-locals since the callback cannot capture anything. + parameter_types! { + pub static TotalEnqueued: u32 = 0; + pub static Enqueued: u32 = 0; + pub static Called: u32 = 0; + } + + Called::take(); + Enqueued::take(); + TotalEnqueued::take(); + + Callback::set(Box::new(|_, _| { + let mut rng = StdRng::seed_from_u64(Enqueued::get() as u64); + let max_queues = 1_000; + let max_messages_per_queue = 1_000; + let max_msg_len = MaxMessageLenOf::::get(); + + // Instead of directly enqueueing, we enqueue inside a `service` call. + let enqueued = enqueue_messages(max_queues, max_messages_per_queue, max_msg_len, &mut rng); + TotalEnqueued::set(TotalEnqueued::get() + enqueued); + Enqueued::set(Enqueued::get() + enqueued); + Called::set(Called::get() + 1); + })); + + build_and_execute::(|| { + let mut msgs_remaining = 0; + for b in 0..blocks { + log::info!("Block #{}", b); + MessageQueue::enqueue_message( + BoundedSlice::defensive_truncate_from(format!("callback={b}").as_bytes()), + b.into(), + ); + + msgs_remaining += Enqueued::take() + 1; + // Pick a fraction of all messages currently in queue and process them. + let processed = rng.gen_range(1..=msgs_remaining); + log::info!("Processing {} of all messages {}", processed, msgs_remaining); + process_some_messages(processed); // This also advances the block. + msgs_remaining -= processed; + TotalEnqueued::set(TotalEnqueued::get() - processed + 1); + MessageQueue::do_try_state().unwrap(); + } + while Called::get() < blocks { + msgs_remaining += Enqueued::take(); + // Pick a fraction of all messages currently in queue and process them. + let processed = rng.gen_range(1..=msgs_remaining); + log::info!("Processing {} of all messages {}", processed, msgs_remaining); + process_some_messages(processed); // This also advances the block. + msgs_remaining -= processed; + TotalEnqueued::set(TotalEnqueued::get() - processed); + MessageQueue::do_try_state().unwrap(); + } + + let msgs_remaining = TotalEnqueued::take(); + log::info!("Processing all remaining {} messages", msgs_remaining); + process_all_messages(msgs_remaining); + assert_eq!(Called::get(), blocks); + post_conditions(); + }); +} + /// Simulates heavy usage of the suspension logic via `Yield`. /// /// # Example output @@ -164,14 +232,14 @@ fn stress_test_enqueue_and_service() { /// Processing all remaining 430 messages /// ``` #[test] -#[ignore] // Only run in the CI. +#[ignore] // Only run in the CI, otherwise its too slow. fn stress_test_queue_suspension() { let blocks = 20; let max_queues = 10_000; let max_messages_per_queue = 10_000; let (max_suspend_per_block, max_resume_per_block) = (100, 50); let max_msg_len = MaxMessageLenOf::::get(); - let mut rng = StdRng::seed_from_u64(43); + let mut rng = StdRng::seed_from_u64(gen_seed()); build_and_execute::(|| { let mut suspended = BTreeSet::::new(); @@ -300,6 +368,7 @@ fn process_all_messages(expected: u32) { assert_eq!(consumed, Weight::from_all(expected as u64)); assert_eq!(NumMessagesProcessed::take(), expected as usize); + MessagesProcessed::take(); } /// Returns the weight consumed by `MessageQueue::on_initialize()`. @@ -327,5 +396,6 @@ fn post_conditions() { assert!(ServiceHead::::get().is_none()); // This still works fine. assert_eq!(MessageQueue::service_queues(Weight::MAX), Weight::zero(), "Nothing left"); + MessageQueue::do_try_state().unwrap(); next_block(); } diff --git a/substrate/frame/message-queue/src/lib.rs b/substrate/frame/message-queue/src/lib.rs index 12d289478b37c..07eb004198534 100644 --- a/substrate/frame/message-queue/src/lib.rs +++ b/substrate/frame/message-queue/src/lib.rs @@ -49,9 +49,21 @@ //! **Message Execution** //! //! Executing a message is offloaded to the [`Config::MessageProcessor`] which contains the actual -//! logic of how to handle the message since they are blobs. A message can be temporarily or -//! permanently overweight. The pallet will perpetually try to execute a temporarily overweight -//! message. A permanently overweight message is skipped and must be executed manually. +//! logic of how to handle the message since they are blobs. Storage changes are not rolled back on +//! error. +//! +//! A failed message can be temporarily or permanently overweight. The pallet will perpetually try +//! to execute a temporarily overweight message. A permanently overweight message is skipped and +//! must be executed manually. +//! +//! **Reentrancy** +//! +//! This pallet has two entry points for executing (possibly recursive) logic; +//! [`Pallet::service_queues`] and [`Pallet::execute_overweight`]. Both entry points are guarded by +//! the same mutex to error on reentrancy. The only functions that are explicitly **allowed** to be +//! called by a message processor are: [`Pallet::enqueue_message`] and +//! [`Pallet::enqueue_messages`]. All other functions are forbidden and error with +//! [`Error::RecursiveDisallowed`]. //! //! **Pagination** //! @@ -146,6 +158,7 @@ //! which is the default state for a message after being enqueued. //! - `knitting`/`unknitting`: The means of adding or removing a `Queue` from the `ReadyRing`. //! - `MEL`: The Max Encoded Length of a type, see [`codec::MaxEncodedLen`]. +//! - `Reentrance`: To enter an execution context again before it has completed. //! //! # Properties //! @@ -180,6 +193,7 @@ //! expensive. Currently this is archived by having one queue per para-chain/thread, which keeps the //! number of queues within `O(n)` and should be "good enough". +#![deny(missing_docs)] #![cfg_attr(not(feature = "std"), no_std)] mod benchmarking; @@ -194,8 +208,8 @@ use frame_support::{ defensive, pallet_prelude::*, traits::{ - DefensiveTruncateFrom, EnqueueMessage, ExecuteOverweightError, Footprint, ProcessMessage, - ProcessMessageError, QueueFootprint, QueuePausedQuery, ServiceQueues, + Defensive, DefensiveTruncateFrom, EnqueueMessage, ExecuteOverweightError, Footprint, + ProcessMessage, ProcessMessageError, QueueFootprint, QueuePausedQuery, ServiceQueues, }, BoundedSlice, CloneNoBound, DefaultNoBound, }; @@ -203,6 +217,7 @@ use frame_system::pallet_prelude::*; pub use pallet::*; use scale_info::TypeInfo; use sp_arithmetic::traits::{BaseArithmetic, Unsigned}; +use sp_core::{defer, H256}; use sp_runtime::{ traits::{One, Zero}, SaturatedConversion, Saturating, @@ -460,6 +475,10 @@ pub mod pallet { /// Processor for a message. /// + /// Storage changes are not rolled back on error. + /// + /// # Benchmarking + /// /// Must be set to [`mock_helpers::NoopMessageProcessor`] for benchmarking. /// Other message processors that consumes exactly (1, 1) weight for any give message will /// work as well. Otherwise the benchmarking will also measure the weight of the message @@ -516,18 +535,51 @@ pub mod pallet { #[pallet::generate_deposit(pub(super) fn deposit_event)] pub enum Event { /// Message discarded due to an error in the `MessageProcessor` (usually a format error). - ProcessingFailed { id: [u8; 32], origin: MessageOriginOf, error: ProcessMessageError }, + ProcessingFailed { + /// The `blake2_256` hash of the message. + id: H256, + /// The queue of the message. + origin: MessageOriginOf, + /// The error that occurred. + /// + /// This error is pretty opaque. More fine-grained errors need to be emitted as events + /// by the `MessageProcessor`. + error: ProcessMessageError, + }, /// Message is processed. - Processed { id: [u8; 32], origin: MessageOriginOf, weight_used: Weight, success: bool }, + Processed { + /// The `blake2_256` hash of the message. + id: H256, + /// The queue of the message. + origin: MessageOriginOf, + /// How much weight was used to process the message. + weight_used: Weight, + /// Whether the message was processed. + /// + /// Note that this does not mean that the underlying `MessageProcessor` was internally + /// successful. It *solely* means that the MQ pallet will treat this as a success + /// condition and discard the message. Any internal error needs to be emitted as events + /// by the `MessageProcessor`. + success: bool, + }, /// Message placed in overweight queue. OverweightEnqueued { + /// The `blake2_256` hash of the message. id: [u8; 32], + /// The queue of the message. origin: MessageOriginOf, + /// The page of the message. page_index: PageIndex, + /// The index of the message within the page. message_index: T::Size, }, /// This page was reaped. - PageReaped { origin: MessageOriginOf, index: PageIndex }, + PageReaped { + /// The queue of the page. + origin: MessageOriginOf, + /// The index of the page. + index: PageIndex, + }, } #[pallet::error] @@ -554,6 +606,8 @@ pub mod pallet { /// /// This can change at any time and may resolve in the future by re-trying. QueuePaused, + /// Another call is in progress and needs to finish before this call can happen. + RecursiveDisallowed, } /// The index of the first and last (non-empty) pages. @@ -868,6 +922,21 @@ impl Pallet { page_index: PageIndex, index: T::Size, weight_limit: Weight, + ) -> Result> { + match with_service_mutex(|| { + Self::do_execute_overweight_inner(origin, page_index, index, weight_limit) + }) { + Err(()) => Err(Error::::RecursiveDisallowed), + Ok(x) => x, + } + } + + /// Same as `do_execute_overweight` but must be called while holding the `service_mutex`. + fn do_execute_overweight_inner( + origin: MessageOriginOf, + page_index: PageIndex, + index: T::Size, + weight_limit: Weight, ) -> Result> { let mut book_state = BookStateFor::::get(&origin); ensure!(!T::QueuePausedQuery::is_paused(&origin), Error::::QueuePaused); @@ -924,6 +993,14 @@ impl Pallet { /// Remove a stale page or one which has no more messages remaining to be processed. fn do_reap_page(origin: &MessageOriginOf, page_index: PageIndex) -> DispatchResult { + match with_service_mutex(|| Self::do_reap_page_inner(origin, page_index)) { + Err(()) => Err(Error::::RecursiveDisallowed.into()), + Ok(x) => x, + } + } + + /// Same as `do_reap_page` but must be called while holding the `service_mutex`. + fn do_reap_page_inner(origin: &MessageOriginOf, page_index: PageIndex) -> DispatchResult { let mut book_state = BookStateFor::::get(origin); // definitely not reapable if the page's index is no less than the `begin`ning of ready // pages. @@ -1112,6 +1189,7 @@ impl Pallet { weight: &mut WeightMeter, overweight_limit: Weight, ) -> ItemExecutionStatus { + use MessageExecutionStatus::*; // This ugly pre-checking is needed for the invariant // "we never bail if a page became complete". if page.is_complete() { @@ -1125,16 +1203,31 @@ impl Pallet { Some(m) => m, None => return ItemExecutionStatus::NoItem, }[..]; + let payload_len = payload.len() as u64; - use MessageExecutionStatus::*; - let is_processed = match Self::process_message_payload( + // Store these for the case that `process_message_payload` is recursive. + Pages::::insert(origin, page_index, &*page); + BookStateFor::::insert(origin, &*book_state); + + let res = Self::process_message_payload( origin.clone(), page_index, page.first_index, payload, weight, overweight_limit, - ) { + ); + + // And restore them afterwards to see the changes of a recursive call. + *book_state = BookStateFor::::get(origin); + if let Some(new_page) = Pages::::get(origin, page_index) { + *page = new_page; + } else { + defensive!("page must exist since we just inserted it and recursive calls are not allowed to remove anything"); + return ItemExecutionStatus::NoItem + }; + + let is_processed = match res { InsufficientWeight => return ItemExecutionStatus::Bailed, Unprocessable { permanent: false } => return ItemExecutionStatus::NoProgress, Processed | Unprocessable { permanent: true } => true, @@ -1143,7 +1236,7 @@ impl Pallet { if is_processed { book_state.message_count.saturating_dec(); - book_state.size.saturating_reduce(payload.len() as u64); + book_state.size.saturating_reduce(payload_len as u64); } page.skip_first(is_processed); ItemExecutionStatus::Executed(is_processed) @@ -1168,7 +1261,7 @@ impl Pallet { /// * `remaining_size` > 0 /// * `first` <= `last` /// * Every page can be decoded into peek_* functions - #[cfg(any(test, feature = "try-runtime"))] + #[cfg(any(test, feature = "try-runtime", feature = "std"))] pub fn do_try_state() -> Result<(), sp_runtime::TryRuntimeError> { // Checking memory corruption for BookStateFor ensure!( @@ -1181,13 +1274,17 @@ impl Pallet { "Memory Corruption in Pages" ); - // No state to check - if ServiceHead::::get().is_none() { - return Ok(()) + // Basic checks for each book + for book in BookStateFor::::iter_values() { + ensure!(book.end >= book.begin, "Invariant"); + ensure!(book.end < 1 << 30, "Likely overflow or corruption"); + ensure!(book.message_count < 1 << 30, "Likely overflow or corruption"); + ensure!(book.size < 1 << 30, "Likely overflow or corruption"); + ensure!(book.count < 1 << 30, "Likely overflow or corruption"); } //loop around this origin - let starting_origin = ServiceHead::::get().unwrap(); + let Some(starting_origin) = ServiceHead::::get() else { return Ok(()) }; while let Some(head) = Self::bump_service_head(&mut WeightMeter::new()) { ensure!( @@ -1220,7 +1317,7 @@ impl Pallet { for page_index in head_book_state.begin..head_book_state.end { let page = Pages::::get(&head, page_index).unwrap(); let remaining_messages = page.remaining; - let mut counted_remaining_messages = 0; + let mut counted_remaining_messages: u32 = 0; ensure!( remaining_messages > 0.into(), "These must be some messages that have not been processed yet!" @@ -1237,7 +1334,7 @@ impl Pallet { } ensure!( - remaining_messages == counted_remaining_messages.into(), + remaining_messages.into() == counted_remaining_messages, "Memory Corruption" ); } @@ -1312,10 +1409,9 @@ impl Pallet { meter: &mut WeightMeter, overweight_limit: Weight, ) -> MessageExecutionStatus { - let hash = sp_io::hashing::blake2_256(message); + let mut id = sp_io::hashing::blake2_256(message); use ProcessMessageError::*; let prev_consumed = meter.consumed(); - let mut id = hash; match T::MessageProcessor::process_message(message, origin.clone(), meter, &mut id) { Err(Overweight(w)) if w.any_gt(overweight_limit) => { @@ -1339,19 +1435,44 @@ impl Pallet { }, Err(error @ BadFormat | error @ Corrupt | error @ Unsupported) => { // Permanent error - drop - Self::deposit_event(Event::::ProcessingFailed { id, origin, error }); + Self::deposit_event(Event::::ProcessingFailed { id: id.into(), origin, error }); MessageExecutionStatus::Unprocessable { permanent: true } }, Ok(success) => { // Success let weight_used = meter.consumed().saturating_sub(prev_consumed); - Self::deposit_event(Event::::Processed { id, origin, weight_used, success }); + Self::deposit_event(Event::::Processed { + id: id.into(), + origin, + weight_used, + success, + }); MessageExecutionStatus::Processed }, } } } +/// Run a closure that errors on re-entrance. Meant to be used by anything that services queues. +pub(crate) fn with_service_mutex R, R>(f: F) -> Result { + // Holds the singelton token instance. + environmental::environmental!(token: Option<()>); + + token::using_once(&mut Some(()), || { + // The first `ok_or` should always be `Ok` since we are inside a `using_once`. + let hold = token::with(|t| t.take()).ok_or(()).defensive()?.ok_or(())?; + + // Put the token back when we're done. + defer! { + token::with(|t| { + *t = Some(hold); + }); + } + + Ok(f()) + }) +} + /// Provides a [`sp_core::Get`] to access the `MEL` of a [`codec::MaxEncodedLen`] type. pub struct MaxEncodedLenOf(sp_std::marker::PhantomData); impl Get for MaxEncodedLenOf { @@ -1407,35 +1528,40 @@ impl ServiceQueues for Pallet { Weight::zero() }); - let mut next = match Self::bump_service_head(&mut weight) { - Some(h) => h, - None => return weight.consumed(), - }; - // The last queue that did not make any progress. - // The loop aborts as soon as it arrives at this queue again without making any progress - // on other queues in between. - let mut last_no_progress = None; - - loop { - let (progressed, n) = Self::service_queue(next.clone(), &mut weight, max_weight); - next = match n { - Some(n) => - if !progressed { - if last_no_progress == Some(n.clone()) { - break - } - if last_no_progress.is_none() { - last_no_progress = Some(next.clone()) - } - n - } else { - last_no_progress = None; - n - }, - None => break, + match with_service_mutex(|| { + let mut next = match Self::bump_service_head(&mut weight) { + Some(h) => h, + None => return weight.consumed(), + }; + // The last queue that did not make any progress. + // The loop aborts as soon as it arrives at this queue again without making any progress + // on other queues in between. + let mut last_no_progress = None; + + loop { + let (progressed, n) = Self::service_queue(next.clone(), &mut weight, max_weight); + next = match n { + Some(n) => + if !progressed { + if last_no_progress == Some(n.clone()) { + break + } + if last_no_progress.is_none() { + last_no_progress = Some(next.clone()) + } + n + } else { + last_no_progress = None; + n + }, + None => break, + } } + weight.consumed() + }) { + Err(()) => weight.consumed(), + Ok(w) => w, } - weight.consumed() } /// Execute a single overweight message. @@ -1463,6 +1589,7 @@ impl ServiceQueues for Pallet { Error::::QueuePaused => ExecuteOverweightError::QueuePaused, Error::::NoPage | Error::::NoMessage | Error::::Queued => ExecuteOverweightError::NotFound, + Error::::RecursiveDisallowed => ExecuteOverweightError::RecursiveDisallowed, _ => ExecuteOverweightError::Other, }, ) diff --git a/substrate/frame/message-queue/src/mock.rs b/substrate/frame/message-queue/src/mock.rs index 97246900597ea..89c6e86251096 100644 --- a/substrate/frame/message-queue/src/mock.rs +++ b/substrate/frame/message-queue/src/mock.rs @@ -108,7 +108,10 @@ impl MockedWeightInfo { impl crate::weights::WeightInfo for MockedWeightInfo { fn reap_page() -> Weight { - WeightForCall::get().get("reap_page").copied().unwrap_or_default() + WeightForCall::get() + .get("reap_page") + .copied() + .unwrap_or(DefaultWeightForCall::get()) } fn execute_overweight_page_updated() -> Weight { WeightForCall::get() @@ -207,6 +210,10 @@ impl ProcessMessage for RecordingMessageProcessor { let required = Weight::from_parts(weight, weight); if meter.try_consume(required).is_ok() { + if let Some(p) = message.strip_prefix(&b"callback="[..]) { + let s = String::from_utf8(p.to_vec()).expect("Need valid UTF8"); + Callback::get()(&origin, s.parse().expect("Expected an u32")); + } let mut m = MessagesProcessed::get(); m.push((message.to_vec(), origin)); MessagesProcessed::set(m); @@ -217,6 +224,10 @@ impl ProcessMessage for RecordingMessageProcessor { } } +parameter_types! { + pub static Callback: Box = Box::new(|_, _| {}); +} + /// Processed a mocked message. Messages that end with `badformat`, `corrupt`, `unsupported` or /// `yield` will fail with an error respectively. fn processing_message(msg: &[u8], origin: &MessageOrigin) -> Result<(), ProcessMessageError> { @@ -264,6 +275,10 @@ impl ProcessMessage for CountingMessageProcessor { let required = Weight::from_parts(1, 1); if meter.try_consume(required).is_ok() { + if let Some(p) = message.strip_prefix(&b"callback="[..]) { + let s = String::from_utf8(p.to_vec()).expect("Need valid UTF8"); + Callback::get()(&origin, s.parse().expect("Expected an u32")); + } NumMessagesProcessed::set(NumMessagesProcessed::get() + 1); Ok(true) } else { @@ -372,3 +387,16 @@ pub fn num_overweight_enqueued_events() -> u32 { pub fn fp(pages: u32, count: u64, size: u64) -> QueueFootprint { QueueFootprint { storage: Footprint { count, size }, pages } } + +/// A random seed that can be overwritten with `MQ_SEED`. +pub fn gen_seed() -> u64 { + use rand::Rng; + let seed = if let Ok(seed) = std::env::var("MQ_SEED") { + seed.parse().expect("Need valid u64 as MQ_SEED env variable") + } else { + rand::thread_rng().gen::() + }; + + println!("Using seed: {}", seed); + seed +} diff --git a/substrate/frame/message-queue/src/mock_helpers.rs b/substrate/frame/message-queue/src/mock_helpers.rs index f6109c127be12..28395e27cdd2a 100644 --- a/substrate/frame/message-queue/src/mock_helpers.rs +++ b/substrate/frame/message-queue/src/mock_helpers.rs @@ -15,6 +15,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![allow(missing_docs)] + //! Std setup helpers for testing and benchmarking. //! //! Cannot be put into mock.rs since benchmarks require no-std and mock.rs is std. @@ -88,10 +90,12 @@ pub fn page(msg: &[u8]) -> PageOf { PageOf::::from_message::(msg.try_into().unwrap()) } +/// Create a book with a single message of one byte. pub fn single_page_book() -> BookStateOf { BookState { begin: 0, end: 1, count: 1, message_count: 1, size: 1, ..Default::default() } } +/// Create an empty book. pub fn empty_book() -> BookStateOf { BookState { begin: 0, end: 1, count: 1, ..Default::default() } } diff --git a/substrate/frame/message-queue/src/tests.rs b/substrate/frame/message-queue/src/tests.rs index d94ad581ea0d5..9198e65e2f9c0 100644 --- a/substrate/frame/message-queue/src/tests.rs +++ b/substrate/frame/message-queue/src/tests.rs @@ -181,7 +181,7 @@ fn service_queues_failing_messages_works() { assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); assert_last_event::( Event::ProcessingFailed { - id: blake2_256(b"badformat"), + id: blake2_256(b"badformat").into(), origin: MessageOrigin::Here, error: ProcessMessageError::BadFormat, } @@ -190,7 +190,7 @@ fn service_queues_failing_messages_works() { assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); assert_last_event::( Event::ProcessingFailed { - id: blake2_256(b"corrupt"), + id: blake2_256(b"corrupt").into(), origin: MessageOrigin::Here, error: ProcessMessageError::Corrupt, } @@ -199,7 +199,7 @@ fn service_queues_failing_messages_works() { assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); assert_last_event::( Event::ProcessingFailed { - id: blake2_256(b"unsupported"), + id: blake2_256(b"unsupported").into(), origin: MessageOrigin::Here, error: ProcessMessageError::Unsupported, } @@ -1264,7 +1264,7 @@ fn permanently_overweight_limit_is_valid_basic() { RuntimeEvent::MessageQueue(Event::Processed { origin: Here, weight_used: 200.into(), - id: blake2_256(m.as_bytes()), + id: blake2_256(m.as_bytes()).into(), success: true, }) ); @@ -1321,7 +1321,7 @@ fn permanently_overweight_limit_is_valid_fuzzy() { RuntimeEvent::MessageQueue(Event::Processed { origin: Here, weight_used: 200.into(), - id: blake2_256(m.as_bytes()), + id: blake2_256(m.as_bytes()).into(), success: true, }) ); @@ -1592,7 +1592,7 @@ fn execute_overweight_respects_suspension() { assert_last_event::( Event::Processed { - id: blake2_256(b"weight=5"), + id: blake2_256(b"weight=5").into(), origin, weight_used: 5.into_weight(), success: true, @@ -1619,7 +1619,7 @@ fn service_queue_suspension_ready_ring_works() { MessageQueue::service_queues(Weight::MAX); assert_last_event::( Event::Processed { - id: blake2_256(b"weight=5"), + id: blake2_256(b"weight=5").into(), origin, weight_used: 5.into_weight(), success: true, @@ -1662,3 +1662,174 @@ fn integrity_test_checks_service_weight() { } }); } + +/// Test for . +#[test] +fn regression_issue_2319() { + build_and_execute::(|| { + Callback::set(Box::new(|_, _| { + MessageQueue::enqueue_message(mock_helpers::msg("anothermessage"), There); + })); + + use MessageOrigin::*; + MessageQueue::enqueue_message(msg("callback=0"), Here); + + // while servicing queue Here, "anothermessage" of origin There is enqueued in + // "firstmessage"'s process_message + assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); + assert_eq!(MessagesProcessed::take(), vec![(b"callback=0".to_vec(), Here)]); + + assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); + // It used to fail here but got fixed. + assert_eq!(MessagesProcessed::take(), vec![(b"anothermessage".to_vec(), There)]); + }); +} + +/// Enqueueing a message from within `service_queues` works. +#[test] +fn recursive_enqueue_works() { + build_and_execute::(|| { + Callback::set(Box::new(|o, i| match i { + 0 => { + MessageQueue::enqueue_message(msg(&format!("callback={}", 1)), *o); + }, + 1 => { + for _ in 0..100 { + MessageQueue::enqueue_message(msg(&format!("callback={}", 2)), *o); + } + for i in 0..100 { + MessageQueue::enqueue_message(msg(&format!("callback={}", 3)), i.into()); + } + }, + 2 | 3 => { + MessageQueue::enqueue_message(msg(&format!("callback={}", 4)), *o); + }, + 4 => (), + _ => unreachable!(), + })); + + MessageQueue::enqueue_message(msg("callback=0"), MessageOrigin::Here); + + for _ in 0..402 { + assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); + } + assert_eq!(MessageQueue::service_queues(Weight::MAX), Weight::zero()); + + assert_eq!(MessagesProcessed::take().len(), 402); + }); +} + +/// Calling `service_queues` from within `service_queues` is forbidden. +#[test] +fn recursive_service_is_forbidden() { + use MessageOrigin::*; + build_and_execute::(|| { + Callback::set(Box::new(|_, _| { + MessageQueue::enqueue_message(msg("m1"), There); + // This call will fail since it is recursive. But it will not mess up the state. + assert_storage_noop!(MessageQueue::service_queues(10.into_weight())); + MessageQueue::enqueue_message(msg("m2"), There); + })); + + for _ in 0..5 { + MessageQueue::enqueue_message(msg("callback=0"), Here); + MessageQueue::service_queues(3.into_weight()); + + // All three messages are correctly processed. + assert_eq!( + MessagesProcessed::take(), + vec![ + (b"callback=0".to_vec(), Here), + (b"m1".to_vec(), There), + (b"m2".to_vec(), There) + ] + ); + } + }); +} + +/// Calling `service_queues` from within `service_queues` is forbidden. +#[test] +fn recursive_overweight_while_service_is_forbidden() { + use MessageOrigin::*; + build_and_execute::(|| { + Callback::set(Box::new(|_, _| { + // Check that the message was permanently overweight. + assert_last_event::( + Event::OverweightEnqueued { + id: blake2_256(b"weight=10"), + origin: There, + message_index: 0, + page_index: 0, + } + .into(), + ); + // This call will fail since it is recursive. But it will not mess up the state. + assert_noop!( + ::execute_overweight( + 10.into_weight(), + (There, 0, 0) + ), + ExecuteOverweightError::RecursiveDisallowed + ); + })); + + MessageQueue::enqueue_message(msg("weight=10"), There); + MessageQueue::enqueue_message(msg("callback=0"), Here); + + // Mark it as permanently overweight. + MessageQueue::service_queues(5.into_weight()); + assert_ok!(::execute_overweight( + 10.into_weight(), + (There, 0, 0) + )); + }); +} + +/// Calling `reap_page` from within `service_queues` is forbidden. +#[test] +fn recursive_reap_page_is_forbidden() { + use MessageOrigin::*; + build_and_execute::(|| { + Callback::set(Box::new(|_, _| { + // This call will fail since it is recursive. But it will not mess up the state. + assert_noop!(MessageQueue::do_reap_page(&Here, 0), Error::::RecursiveDisallowed); + })); + + // Create 10 pages more than the stale limit. + let n = (MaxStale::get() + 10) as usize; + for _ in 0..n { + MessageQueue::enqueue_message(msg("weight=2"), Here); + } + + // Mark all pages as stale since their message is permanently overweight. + MessageQueue::service_queues(1.into_weight()); + assert_ok!(MessageQueue::do_reap_page(&Here, 0)); + + assert_last_event::(Event::PageReaped { origin: Here, index: 0 }.into()); + }); +} + +#[test] +fn with_service_mutex_works() { + let mut called = 0; + with_service_mutex(|| called = 1).unwrap(); + assert_eq!(called, 1); + + // The outer one is fine but the inner one errors. + with_service_mutex(|| with_service_mutex(|| unreachable!())) + .unwrap() + .unwrap_err(); + with_service_mutex(|| with_service_mutex(|| unreachable!()).unwrap_err()).unwrap(); + with_service_mutex(|| { + with_service_mutex(|| unreachable!()).unwrap_err(); + with_service_mutex(|| unreachable!()).unwrap_err(); + called = 2; + }) + .unwrap(); + assert_eq!(called, 2); + + // Still works. + with_service_mutex(|| called = 3).unwrap(); + assert_eq!(called, 3); +} diff --git a/substrate/frame/support/src/traits/messages.rs b/substrate/frame/support/src/traits/messages.rs index 58815b107c829..995ac4f717911 100644 --- a/substrate/frame/support/src/traits/messages.rs +++ b/substrate/frame/support/src/traits/messages.rs @@ -82,6 +82,8 @@ pub enum ExecuteOverweightError { QueuePaused, /// An unspecified error. Other, + /// Another call is currently ongoing and prevents this call from executing. + RecursiveDisallowed, } /// Can service queues and execute overweight messages.