Skip to content

Commit

Permalink
[Runtime] Bound XCMP queue (#3952)
Browse files Browse the repository at this point in the history
Re-applying #2302 after increasing the `MaxPageSize`.  

Remove `without_storage_info` from the XCMP queue pallet. Part of
#323

Changes:
- Limit the number of messages and signals a HRMP channel can have at
most.
- Limit the number of HRML channels.

A No-OP migration is put in place to ensure that all `BoundedVec`s still
decode and not truncate after upgrade. The storage version is thereby
bumped to 5 to have our tooling remind us to deploy that migration.

## Integration

If you see this error in your try-runtime-cli:  
```pre
Max message size for channel is too large. This means that the V5 migration can be front-run and an
attacker could place a large message just right before the migration to make other messages un-decodable.
Please either increase `MaxPageSize` or decrease the `max_message_size` for this channel. Channel max:
102400, MaxPageSize: 65535
```

Then increase the `MaxPageSize` of the `cumulus_pallet_xcmp_queue` to
something like this:
```rust
type MaxPageSize = ConstU32<{ 103 * 1024 }>;
```

There is currently no easy way for on-chain governance to adjust the
HRMP max message size of all channels, but it could be done:
#3145.

---------

Signed-off-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>
Co-authored-by: Francisco Aguirre <franciscoaguirreperez@gmail.com>
  • Loading branch information
ggwpez and franciscoaguirre authored May 16, 2024
1 parent 6487ac1 commit 4adfa37
Show file tree
Hide file tree
Showing 26 changed files with 407 additions and 67 deletions.
9 changes: 8 additions & 1 deletion cumulus/pallets/parachain-system/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
use codec::{Decode, Encode};
use cumulus_primitives_core::{
relay_chain, AbridgedHostConfiguration, ChannelInfo, ChannelStatus, CollationInfo,
GetChannelInfo, InboundDownwardMessage, InboundHrmpMessage, MessageSendError,
GetChannelInfo, InboundDownwardMessage, InboundHrmpMessage, ListChannelInfos, MessageSendError,
OutboundHrmpMessage, ParaId, PersistedValidationData, UpwardMessage, UpwardMessageSender,
XcmpMessageHandler, XcmpMessageSource,
};
Expand Down Expand Up @@ -1022,6 +1022,13 @@ impl<T: Config> FeeTracker for Pallet<T> {
}
}

impl<T: Config> ListChannelInfos for Pallet<T> {
fn outgoing_channels() -> Vec<ParaId> {
let Some(state) = RelevantMessagingState::<T>::get() else { return Vec::new() };
state.egress_channels.into_iter().map(|(id, _)| id).collect()
}
}

impl<T: Config> GetChannelInfo for Pallet<T> {
fn get_channel_status(id: ParaId) -> ChannelStatus {
// Note, that we are using `relevant_messaging_state` which may be from the previous
Expand Down
2 changes: 1 addition & 1 deletion cumulus/pallets/parachain-system/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl pallet_message_queue::Config for Test {
type Size = u32;
type QueueChangeHandler = ();
type QueuePausedQuery = ();
type HeapSize = sp_core::ConstU32<{ 64 * 1024 }>;
type HeapSize = sp_core::ConstU32<{ 103 * 1024 }>;
type MaxStale = sp_core::ConstU32<8>;
type ServiceWeight = MaxWeight;
type IdleMaxServiceWeight = ();
Expand Down
120 changes: 88 additions & 32 deletions cumulus/pallets/xcmp-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ pub mod weights;
pub use weights::WeightInfo;

use bounded_collections::BoundedBTreeSet;
use codec::{Decode, DecodeLimit, Encode};
use codec::{Decode, DecodeLimit, Encode, MaxEncodedLen};
use cumulus_primitives_core::{
relay_chain::BlockNumber as RelayBlockNumber, ChannelStatus, GetChannelInfo, MessageSendError,
ParaId, XcmpMessageFormat, XcmpMessageHandler, XcmpMessageSource,
};

use frame_support::{
defensive, defensive_assert,
traits::{EnqueueMessage, EnsureOrigin, Get, QueueFootprint, QueuePausedQuery},
traits::{Defensive, EnqueueMessage, EnsureOrigin, Get, QueueFootprint, QueuePausedQuery},
weights::{Weight, WeightMeter},
BoundedVec,
};
Expand All @@ -68,7 +68,7 @@ use polkadot_runtime_common::xcm_sender::PriceForMessageDelivery;
use polkadot_runtime_parachains::FeeTracker;
use scale_info::TypeInfo;
use sp_core::MAX_POSSIBLE_ALLOCATION;
use sp_runtime::{FixedU128, RuntimeDebug, Saturating};
use sp_runtime::{FixedU128, RuntimeDebug, Saturating, WeakBoundedVec};
use sp_std::prelude::*;
use xcm::{latest::prelude::*, VersionedLocation, VersionedXcm, WrapVersion, MAX_XCM_DECODE_DEPTH};
use xcm_builder::InspectMessageQueues;
Expand Down Expand Up @@ -106,7 +106,6 @@ pub mod pallet {

#[pallet::pallet]
#[pallet::storage_version(migration::STORAGE_VERSION)]
#[pallet::without_storage_info]
pub struct Pallet<T>(_);

#[pallet::config]
Expand All @@ -133,6 +132,25 @@ pub mod pallet {
#[pallet::constant]
type MaxInboundSuspended: Get<u32>;

/// Maximal number of outbound XCMP channels that can have messages queued at the same time.
///
/// If this is reached, then no further messages can be sent to channels that do not yet
/// have a message queued. This should be set to the expected maximum of outbound channels
/// which is determined by [`Self::ChannelInfo`]. It is important to set this large enough,
/// since otherwise the congestion control protocol will not work as intended and messages
/// may be dropped. This value increases the PoV and should therefore not be picked too
/// high. Governance needs to pay attention to not open more channels than this value.
#[pallet::constant]
type MaxActiveOutboundChannels: Get<u32>;

/// The maximal page size for HRMP message pages.
///
/// A lower limit can be set dynamically, but this is the hard-limit for the PoV worst case
/// benchmarking. The limit for the size of a message is slightly below this, since some
/// overhead is incurred for encoding the format.
#[pallet::constant]
type MaxPageSize: Get<u32>;

/// The origin that is allowed to resume or suspend the XCMP queue.
type ControllerOrigin: EnsureOrigin<Self::RuntimeOrigin>;

Expand Down Expand Up @@ -277,6 +295,10 @@ pub mod pallet {
AlreadySuspended,
/// The execution is already resumed.
AlreadyResumed,
/// There are too many active outbound channels.
TooManyActiveOutboundChannels,
/// The message is too big.
TooBig,
}

/// The suspended inbound XCMP channels. All others are not suspended.
Expand All @@ -298,19 +320,28 @@ pub mod pallet {
/// case of the need to send a high-priority signal message this block.
/// The bool is true if there is a signal message waiting to be sent.
#[pallet::storage]
pub(super) type OutboundXcmpStatus<T: Config> =
StorageValue<_, Vec<OutboundChannelDetails>, ValueQuery>;
pub(super) type OutboundXcmpStatus<T: Config> = StorageValue<
_,
BoundedVec<OutboundChannelDetails, T::MaxActiveOutboundChannels>,
ValueQuery,
>;

// The new way of doing it:
/// The messages outbound in a given XCMP channel.
#[pallet::storage]
pub(super) type OutboundXcmpMessages<T: Config> =
StorageDoubleMap<_, Blake2_128Concat, ParaId, Twox64Concat, u16, Vec<u8>, ValueQuery>;
pub(super) type OutboundXcmpMessages<T: Config> = StorageDoubleMap<
_,
Blake2_128Concat,
ParaId,
Twox64Concat,
u16,
WeakBoundedVec<u8, T::MaxPageSize>,
ValueQuery,
>;

/// Any signal messages waiting to be sent.
#[pallet::storage]
pub(super) type SignalMessages<T: Config> =
StorageMap<_, Blake2_128Concat, ParaId, Vec<u8>, ValueQuery>;
StorageMap<_, Blake2_128Concat, ParaId, WeakBoundedVec<u8, T::MaxPageSize>, ValueQuery>;

/// The configuration which controls the dynamics of the outbound queue.
#[pallet::storage]
Expand All @@ -332,15 +363,14 @@ pub mod pallet {
StorageMap<_, Twox64Concat, ParaId, FixedU128, ValueQuery, InitialFactor>;
}

#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo)]
#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)]
pub enum OutboundState {
Ok,
Suspended,
}

/// Struct containing detailed information about the outbound channel.
#[derive(Clone, Eq, PartialEq, Encode, Decode, TypeInfo)]
#[cfg_attr(feature = "std", derive(Debug))]
#[derive(Clone, Eq, PartialEq, Encode, Decode, TypeInfo, RuntimeDebug, MaxEncodedLen)]
pub struct OutboundChannelDetails {
/// The `ParaId` of the parachain that this channel is connected with.
recipient: ParaId,
Expand Down Expand Up @@ -376,7 +406,7 @@ impl OutboundChannelDetails {
}
}

#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo)]
#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)]
pub struct QueueConfigData {
/// The number of pages which must be in the queue for the other side to be told to suspend
/// their sending.
Expand Down Expand Up @@ -479,7 +509,10 @@ impl<T: Config> Pallet<T> {
{
details
} else {
all_channels.push(OutboundChannelDetails::new(recipient));
all_channels.try_push(OutboundChannelDetails::new(recipient)).map_err(|e| {
log::error!("Failed to activate HRMP channel: {:?}", e);
MessageSendError::TooManyChannels
})?;
all_channels
.last_mut()
.expect("can't be empty; a new element was just pushed; qed")
Expand All @@ -504,7 +537,9 @@ impl<T: Config> Pallet<T> {
if page.len() + encoded_fragment.len() > max_message_size {
return None
}
page.extend_from_slice(&encoded_fragment[..]);
for frag in encoded_fragment.iter() {
page.try_push(*frag).ok()?;
}
Some(page.len())
},
)
Expand All @@ -522,7 +557,10 @@ impl<T: Config> Pallet<T> {
new_page.extend_from_slice(&encoded_fragment[..]);
let last_page_size = new_page.len();
let number_of_pages = (channel_details.last_index - channel_details.first_index) as u32;
<OutboundXcmpMessages<T>>::insert(recipient, page_index, new_page);
let bounded_page = BoundedVec::<u8, T::MaxPageSize>::try_from(new_page)
.map_err(|_| MessageSendError::TooBig)?;
let bounded_page = WeakBoundedVec::force_from(bounded_page.into_inner(), None);
<OutboundXcmpMessages<T>>::insert(recipient, page_index, bounded_page);
<OutboundXcmpStatus<T>>::put(all_channels);
(number_of_pages, last_page_size)
};
Expand All @@ -544,17 +582,24 @@ impl<T: Config> Pallet<T> {

/// Sends a signal to the `dest` chain over XCMP. This is guaranteed to be dispatched on this
/// block.
fn send_signal(dest: ParaId, signal: ChannelSignal) {
fn send_signal(dest: ParaId, signal: ChannelSignal) -> Result<(), Error<T>> {
let mut s = <OutboundXcmpStatus<T>>::get();
if let Some(details) = s.iter_mut().find(|item| item.recipient == dest) {
details.signals_exist = true;
} else {
s.push(OutboundChannelDetails::new(dest).with_signals());
s.try_push(OutboundChannelDetails::new(dest).with_signals())
.map_err(|_| Error::<T>::TooManyActiveOutboundChannels)?;
}
<SignalMessages<T>>::mutate(dest, |page| {
*page = (XcmpMessageFormat::Signals, signal).encode();
});

let page = BoundedVec::<u8, T::MaxPageSize>::try_from(
(XcmpMessageFormat::Signals, signal).encode(),
)
.map_err(|_| Error::<T>::TooBig)?;
let page = WeakBoundedVec::force_from(page.into_inner(), None);

<SignalMessages<T>>::insert(dest, page);
<OutboundXcmpStatus<T>>::put(s);
Ok(())
}

fn suspend_channel(target: ParaId) {
Expand All @@ -564,7 +609,9 @@ impl<T: Config> Pallet<T> {
defensive_assert!(ok, "WARNING: Attempt to suspend channel that was not Ok.");
details.state = OutboundState::Suspended;
} else {
s.push(OutboundChannelDetails::new(target).with_suspended_state());
if s.try_push(OutboundChannelDetails::new(target).with_suspended_state()).is_err() {
defensive!("Cannot pause channel; too many outbound channels");
}
}
});
}
Expand Down Expand Up @@ -665,18 +712,25 @@ impl<T: Config> OnQueueChanged<ParaId> for Pallet<T> {
let suspended = suspended_channels.contains(&para);

if suspended && fp.ready_pages <= resume_threshold {
Self::send_signal(para, ChannelSignal::Resume);

suspended_channels.remove(&para);
<InboundXcmpSuspended<T>>::put(suspended_channels);
if let Err(err) = Self::send_signal(para, ChannelSignal::Resume) {
log::error!("defensive: Could not send resumption signal to inbound channel of sibling {:?}: {:?}; channel remains suspended.", para, err);
} else {
suspended_channels.remove(&para);
<InboundXcmpSuspended<T>>::put(suspended_channels);
}
} else if !suspended && fp.ready_pages >= suspend_threshold {
log::warn!("XCMP queue for sibling {:?} is full; suspending channel.", para);
Self::send_signal(para, ChannelSignal::Suspend);

if let Err(err) = suspended_channels.try_insert(para) {
if let Err(err) = Self::send_signal(para, ChannelSignal::Suspend) {
// It will retry if `drop_threshold` is not reached, but it could be too late.
log::error!(
"defensive: Could not send suspension signal; future messages may be dropped: {:?}", err
);
} else if let Err(err) = suspended_channels.try_insert(para) {
log::error!("Too many channels suspended; cannot suspend sibling {:?}: {:?}; further messages may be dropped.", para, err);
} else {
<InboundXcmpSuspended<T>>::put(suspended_channels);
}
<InboundXcmpSuspended<T>>::put(suspended_channels);
}
}
}
Expand Down Expand Up @@ -843,7 +897,7 @@ impl<T: Config> XcmpMessageSource for Pallet<T> {
// since it's so unlikely then for now we just drop it.
defensive!("WARNING: oversize message in queue - dropping");
} else {
result.push((para_id, page));
result.push((para_id, page.into_inner()));
}

let max_total_size = match T::ChannelInfo::get_channel_info(para_id) {
Expand Down Expand Up @@ -891,7 +945,9 @@ impl<T: Config> XcmpMessageSource for Pallet<T> {
let pruned = old_statuses_len - statuses.len();
// removing an item from status implies a message being sent, so the result messages must
// be no less than the pruned channels.
statuses.rotate_left(result.len().saturating_sub(pruned));
let _ = statuses.try_rotate_left(result.len().saturating_sub(pruned)).defensive_proof(
"Could not store HRMP channels config. Some HRMP channels may be broken.",
);

<OutboundXcmpStatus<T>>::put(statuses);

Expand Down
4 changes: 3 additions & 1 deletion cumulus/pallets/xcmp-queue/src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

//! A module that is responsible for migration of storage.

pub mod v5;

use crate::{Config, OverweightIndex, Pallet, QueueConfig, QueueConfigData, DEFAULT_POV_SIZE};
use cumulus_primitives_core::XcmpMessageFormat;
use frame_support::{
Expand All @@ -25,7 +27,7 @@ use frame_support::{
};

/// The in-code storage version.
pub const STORAGE_VERSION: StorageVersion = StorageVersion::new(4);
pub const STORAGE_VERSION: StorageVersion = StorageVersion::new(5);

pub const LOG: &str = "runtime::xcmp-queue-migration";

Expand Down
Loading

0 comments on commit 4adfa37

Please sign in to comment.