Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FRAME] Use 'ready' pages in XCMP suspend logic #2393

Merged
merged 11 commits into from
Mar 5, 2024
6 changes: 3 additions & 3 deletions cumulus/pallets/xcmp-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ impl<T: Config> Pallet<T> {
let QueueConfigData { drop_threshold, .. } = <QueueConfig<T>>::get();
let fp = T::XcmpQueue::footprint(sender);
// Assume that it will not fit into the current page:
let new_pages = fp.pages.saturating_add(1);
let new_pages = fp.ready_pages.saturating_add(1);
if new_pages > drop_threshold {
// This should not happen since the channel should have been suspended in
// [`on_queue_changed`].
Expand Down Expand Up @@ -663,12 +663,12 @@ impl<T: Config> OnQueueChanged<ParaId> for Pallet<T> {
let mut suspended_channels = <InboundXcmpSuspended<T>>::get();
let suspended = suspended_channels.contains(&para);

if suspended && fp.pages <= resume_threshold {
if suspended && fp.ready_pages <= resume_threshold {
Self::send_signal(para, ChannelSignal::Resume);

suspended_channels.remove(&para);
<InboundXcmpSuspended<T>>::put(suspended_channels);
} else if !suspended && fp.pages >= suspend_threshold {
} else if !suspended && fp.ready_pages >= suspend_threshold {
log::warn!("XCMP queue for sibling {:?} is full; suspending channel.", para);
Self::send_signal(para, ChannelSignal::Suspend);

Expand Down
1 change: 1 addition & 0 deletions cumulus/pallets/xcmp-queue/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ impl<T: OnQueueChanged<ParaId>> EnqueueMessage<ParaId> for EnqueueToLocalStorage
}
}
footprint.pages = footprint.storage.size as u32 / 16; // Number does not matter
footprint.ready_pages = footprint.pages;
footprint
}
}
Expand Down
6 changes: 5 additions & 1 deletion cumulus/pallets/xcmp-queue/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,11 @@ fn xcm_enqueueing_starts_dropping_on_overflow() {
// The drop threshold for pages is 48, the others numbers dont really matter:
assert_eq!(
<Test as Config>::XcmpQueue::footprint(1000.into()),
QueueFootprint { storage: Footprint { count: 256, size: 768 }, pages: 48 }
QueueFootprint {
storage: Footprint { count: 256, size: 768 },
pages: 48,
ready_pages: 48
}
);
})
}
Expand Down
5 changes: 5 additions & 0 deletions substrate/frame/message-queue/src/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,11 @@ fn process_some_messages(num_msgs: u32) {
ServiceWeight::set(Some(weight));
let consumed = next_block();

for origin in BookStateFor::<Test>::iter_keys() {
let fp = MessageQueue::footprint(origin);
assert_eq!(fp.pages, fp.ready_pages);
}

assert_eq!(consumed, weight, "\n{}", MessageQueue::debug_info());
assert_eq!(NumMessagesProcessed::take(), num_msgs as usize);
}
Expand Down
13 changes: 11 additions & 2 deletions substrate/frame/message-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,9 @@ use frame_support::{
defensive,
pallet_prelude::*,
traits::{
DefensiveTruncateFrom, EnqueueMessage, ExecuteOverweightError, Footprint, ProcessMessage,
ProcessMessageError, QueueFootprint, QueuePausedQuery, ServiceQueues,
DefensiveSaturating, DefensiveTruncateFrom, EnqueueMessage, ExecuteOverweightError,
Footprint, ProcessMessage, ProcessMessageError, QueueFootprint, QueuePausedQuery,
ServiceQueues,
},
BoundedSlice, CloneNoBound, DefaultNoBound,
};
Expand Down Expand Up @@ -427,6 +428,7 @@ impl<MessageOrigin> From<BookState<MessageOrigin>> for QueueFootprint {
fn from(book: BookState<MessageOrigin>) -> Self {
QueueFootprint {
pages: book.count,
ready_pages: book.end.defensive_saturating_sub(book.begin),
storage: Footprint { count: book.message_count, size: book.size },
}
}
Expand Down Expand Up @@ -1181,6 +1183,13 @@ impl<T: Config> Pallet<T> {
"Memory Corruption in Pages"
);

// Basic checks for each book
for book in BookStateFor::<T>::iter_values() {
let fp: QueueFootprint = book.into();

ensure!(fp.ready_pages <= fp.pages, "There can be no more ready than total pages");
}

// No state to check
if ServiceHead::<T>::get().is_none() {
return Ok(())
Expand Down
4 changes: 2 additions & 2 deletions substrate/frame/message-queue/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,6 @@ pub fn num_overweight_enqueued_events() -> u32 {
.count() as u32
}

pub fn fp(pages: u32, count: u64, size: u64) -> QueueFootprint {
QueueFootprint { storage: Footprint { count, size }, pages }
pub fn fp(pages: u32, ready_pages: u32, count: u64, size: u64) -> QueueFootprint {
QueueFootprint { storage: Footprint { count, size }, pages, ready_pages }
}
13 changes: 9 additions & 4 deletions substrate/frame/message-queue/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1064,28 +1064,33 @@ fn footprint_num_pages_works() {
MessageQueue::enqueue_message(msg("weight=2"), Here);
MessageQueue::enqueue_message(msg("weight=3"), Here);

assert_eq!(MessageQueue::footprint(Here), fp(2, 2, 16));
assert_eq!(MessageQueue::footprint(Here), fp(2, 2, 2, 16));

// Mark the messages as overweight.
assert_eq!(MessageQueue::service_queues(1.into_weight()), 0.into_weight());
assert_eq!(System::events().len(), 2);
// Overweight does not change the footprint.
assert_eq!(MessageQueue::footprint(Here), fp(2, 2, 16));
// `ready_pages` decreases but `page` count does not.
assert_eq!(MessageQueue::footprint(Here), fp(2, 0, 2, 16));

// Now execute the second message.
assert_eq!(
<MessageQueue as ServiceQueues>::execute_overweight(3.into_weight(), (Here, 1, 0))
.unwrap(),
3.into_weight()
);
assert_eq!(MessageQueue::footprint(Here), fp(1, 1, 8));
assert_eq!(MessageQueue::footprint(Here), fp(1, 0, 1, 8));
// And the first one:
assert_eq!(
<MessageQueue as ServiceQueues>::execute_overweight(2.into_weight(), (Here, 0, 0))
.unwrap(),
2.into_weight()
);
assert_eq!(MessageQueue::footprint(Here), Default::default());
assert_eq!(MessageQueue::footprint(Here), fp(0, 0, 0, 0));

// `ready_pages` and normal `pages` increases again:
MessageQueue::enqueue_message(msg("weight=3"), Here);
assert_eq!(MessageQueue::footprint(Here), fp(1, 1, 1, 8));
})
}

Expand Down
2 changes: 2 additions & 0 deletions substrate/frame/support/src/traits/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ impl<OverweightAddr> ServiceQueues for NoopServiceQueues<OverweightAddr> {
pub struct QueueFootprint {
/// The number of pages in the queue (including overweight pages).
pub pages: u32,
/// The number of pages that are ready (not yet processed and also not overweight).
pub ready_pages: u32,
/// The storage footprint of the queue (including overweight messages).
pub storage: Footprint,
}
Expand Down
Loading