Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bridges] Prune messages from confirmation tx body, not from the on_idle #4944

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 60 additions & 35 deletions bridges/modules/messages/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ use bp_runtime::{
};
use codec::{Decode, Encode, MaxEncodedLen};
use frame_support::{dispatch::PostDispatchInfo, ensure, fail, traits::Get, DefaultNoBound};
use sp_runtime::traits::UniqueSaturatedFrom;
use sp_std::{marker::PhantomData, prelude::*};

mod inbound_lane;
Expand Down Expand Up @@ -153,40 +152,6 @@ pub mod pallet {
type OperatingModeStorage = PalletOperatingMode<T, I>;
}

#[pallet::hooks]
impl<T: Config<I>, I: 'static> Hooks<BlockNumberFor<T>> for Pallet<T, I>
where
u32: TryFrom<BlockNumberFor<T>>,
{
fn on_idle(_block: BlockNumberFor<T>, remaining_weight: Weight) -> Weight {
// we'll need at least to read outbound lane state, kill a message and update lane state
let db_weight = T::DbWeight::get();
if !remaining_weight.all_gte(db_weight.reads_writes(1, 2)) {
return Weight::zero()
}

// messages from lane with index `i` in `ActiveOutboundLanes` are pruned when
// `System::block_number() % lanes.len() == i`. Otherwise we need to read lane states on
// every block, wasting the whole `remaining_weight` for nothing and causing starvation
// of the last lane pruning
let active_lanes = T::ActiveOutboundLanes::get();
let active_lanes_len = (active_lanes.len() as u32).into();
let active_lane_index = u32::unique_saturated_from(
frame_system::Pallet::<T>::block_number() % active_lanes_len,
);
let active_lane_id = active_lanes[active_lane_index as usize];

// first db read - outbound lane state
let mut active_lane = outbound_lane::<T, I>(active_lane_id);
let mut used_weight = db_weight.reads(1);
// and here we'll have writes
used_weight += active_lane.prune_messages(db_weight, remaining_weight - used_weight);

// we already checked we have enough `remaining_weight` to cover this `used_weight`
used_weight
}
}

#[pallet::call]
impl<T: Config<I>, I: 'static> Pallet<T, I> {
/// Change `PalletOwner`.
Expand Down Expand Up @@ -610,6 +575,14 @@ pub mod pallet {
}
}

#[pallet::hooks]
impl<T: Config<I>, I: 'static> Hooks<BlockNumberFor<T>> for Pallet<T, I> {
#[cfg(feature = "try-runtime")]
fn try_state(_n: BlockNumberFor<T>) -> Result<(), sp_runtime::TryRuntimeError> {
Self::do_try_state()
}
}

impl<T: Config<I>, I: 'static> Pallet<T, I> {
/// Get stored data of the outbound message with given nonce.
pub fn outbound_message_data(lane: LaneId, nonce: MessageNonce) -> Option<MessagePayload> {
Expand Down Expand Up @@ -644,6 +617,58 @@ pub mod pallet {
}
}

#[cfg(any(feature = "try-runtime", test))]
impl<T: Config<I>, I: 'static> Pallet<T, I> {
/// Ensure the correctness of the state of this pallet.
pub fn do_try_state() -> Result<(), sp_runtime::TryRuntimeError> {
Self::do_try_state_for_outbound_lanes()
}

/// Ensure the correctness of the state of outbound lanes.
pub fn do_try_state_for_outbound_lanes() -> Result<(), sp_runtime::TryRuntimeError> {
use sp_runtime::traits::One;
use sp_std::vec::Vec;

// collect unpruned lanes
let mut unpruned_lanes = Vec::new();
for (lane_id, lane_data) in OutboundLanes::<T, I>::iter() {
let Some(expected_last_prunned_nonce) =
lane_data.oldest_unpruned_nonce.checked_sub(One::one())
else {
continue;
};

// collect message_nonces that were supposed to be pruned
let mut unpruned_message_nonces = Vec::new();
const MAX_MESSAGES_ITERATION: u64 = 16;
let start_nonce =
expected_last_prunned_nonce.checked_sub(MAX_MESSAGES_ITERATION).unwrap_or(0);
for current_nonce in start_nonce..=expected_last_prunned_nonce {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with try_state(). Why do we use MAX_MESSAGES_ITERATION and not iterate all the way ?

Otherwise replacing the loop wit ha for looks good.

Copy link
Contributor Author

@bkontur bkontur Jul 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with try_state(). Why do we use MAX_MESSAGES_ITERATION and not iterate all the way ?

Because, on every runtime upgrade it would be more and more waste of time, e.g.

  • first time it would run from e.g. message with nonce 1000 to 0
  • another runtime upgrade (when more messages happen) it would run e.g. 3000 to 0 (we previously checked interval 1000 to 0)
  • another runtime upgrade (when more messages happen) it would run e.g. 6000 to 0 (we previously checked interval 3000 to 0)
  • and so on

Yes, that's why I updated description that maybe oldest_unpruned_nonce and do_try_state_for_outbound_lanes are not really needed.

Slava wanted to create migration, that remove all messages < oldest_unpruned_nonce, but I think his concert was valid only if we have already more than one lane, because on_idle he picked different/random lanes to prune depending on ActiveOutboundLanes::count, which is for fellows 1 :). I think MBM migration is too much for this and also for standard migration, we would need to handle consumed weights and store last pruned block somewhere else, because oldest_unpruned_nonce is incremented on delivery now.

But as I am thinking about it again now, I think we should really remove attribute lane.oldest_unpruned_nonce, because we prune on delivery and this attribute does not make sense. So, the removal of this attribute could cause storage version change and migration, which would clean everything :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, MBM migration and removed oldest_unpruned_nonce, would do the job :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't use oldest_unpruned_nonce for relayer, anyway we can leave also without these migration, the worst can happen that we will have just a few messages in the storage, which we don't need. We add monitoring or do some sanity checks with seaching storage manually after upgrade, and when few messages left, we could remove them with set_prefix / kill_storage.

// check a message for current_nonce
if OutboundMessages::<T, I>::contains_key(MessageKey {
lane_id,
nonce: current_nonce,
}) {
unpruned_message_nonces.push(current_nonce);
}
}

if !unpruned_message_nonces.is_empty() {
log::warn!(
target: LOG_TARGET,
"do_try_state_for_outbound_lanes for lane_id: {lane_id:?} with lane_data: {lane_data:?} found unpruned_message_nonces: {unpruned_message_nonces:?}",
);
unpruned_lanes.push((lane_id, lane_data, unpruned_message_nonces));
}
}

// ensure messages before `oldest_unpruned_nonce` are really pruned.
ensure!(unpruned_lanes.is_empty(), "Found unpruned lanes!");

Ok(())
}
}

/// Get-parameter that returns number of active outbound lanes that the pallet maintains.
pub struct MaybeOutboundLanesCount<T, I>(PhantomData<(T, I)>);

Expand Down
106 changes: 13 additions & 93 deletions bridges/modules/messages/src/outbound_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,9 @@ use bp_messages::{
ChainWithMessages, DeliveredMessages, LaneId, MessageNonce, OutboundLaneData, UnrewardedRelayer,
};
use codec::{Decode, Encode};
use frame_support::{
traits::Get,
weights::{RuntimeDbWeight, Weight},
BoundedVec, PalletError,
};
use frame_support::{traits::Get, BoundedVec, PalletError};
use scale_info::TypeInfo;
use sp_runtime::{traits::Zero, RuntimeDebug};
use sp_runtime::RuntimeDebug;
use sp_std::{collections::vec_deque::VecDeque, marker::PhantomData};

/// Outbound lane storage.
Expand Down Expand Up @@ -143,41 +139,17 @@ impl<S: OutboundLaneStorage> OutboundLane<S> {

ensure_unrewarded_relayers_are_correct(confirmed_messages.end, relayers)?;

// prune all confirmed messages
for nonce in confirmed_messages.begin..=confirmed_messages.end {
self.storage.remove_message(&nonce);
}

data.latest_received_nonce = confirmed_messages.end;
data.oldest_unpruned_nonce = data.latest_received_nonce.saturating_add(1);
self.storage.set_data(data);

Ok(Some(confirmed_messages))
}

/// Prune at most `max_messages_to_prune` already received messages.
///
/// Returns weight, consumed by messages pruning and lane state update.
pub fn prune_messages(
&mut self,
db_weight: RuntimeDbWeight,
mut remaining_weight: Weight,
) -> Weight {
let write_weight = db_weight.writes(1);
let two_writes_weight = write_weight + write_weight;
let mut spent_weight = Weight::zero();
let mut data = self.storage.data();
while remaining_weight.all_gte(two_writes_weight) &&
data.oldest_unpruned_nonce <= data.latest_received_nonce
{
self.storage.remove_message(&data.oldest_unpruned_nonce);

spent_weight += write_weight;
remaining_weight -= write_weight;
data.oldest_unpruned_nonce += 1;
}

if !spent_weight.is_zero() {
spent_weight += write_weight;
self.storage.set_data(data);
}

spent_weight
}
}

/// Verifies unrewarded relayers vec.
Expand Down Expand Up @@ -221,7 +193,6 @@ mod tests {
REGULAR_PAYLOAD, TEST_LANE_ID,
},
};
use frame_support::weights::constants::RocksDbWeight;
use sp_std::ops::RangeInclusive;

fn unrewarded_relayers(
Expand Down Expand Up @@ -281,7 +252,7 @@ mod tests {
);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 3);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 4);
});
}

Expand All @@ -302,15 +273,15 @@ mod tests {
);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 2);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 3);

assert_eq!(
lane.confirm_delivery(3, 3, &unrewarded_relayers(3..=3)),
Ok(Some(delivered_messages(3..=3))),
);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 3);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 4);
});
}

Expand All @@ -331,12 +302,12 @@ mod tests {
assert_eq!(lane.confirm_delivery(3, 3, &unrewarded_relayers(1..=3)), Ok(None),);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 3);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 4);

assert_eq!(lane.confirm_delivery(1, 2, &unrewarded_relayers(1..=1)), Ok(None),);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 3);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 4);
});
}

Expand Down Expand Up @@ -394,57 +365,6 @@ mod tests {
);
}

#[test]
fn prune_messages_works() {
run_test(|| {
let mut lane = outbound_lane::<TestRuntime, _>(TEST_LANE_ID);
// when lane is empty, nothing is pruned
assert_eq!(
lane.prune_messages(RocksDbWeight::get(), RocksDbWeight::get().writes(101)),
Weight::zero()
);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);
// when nothing is confirmed, nothing is pruned
lane.send_message(outbound_message_data(REGULAR_PAYLOAD));
lane.send_message(outbound_message_data(REGULAR_PAYLOAD));
lane.send_message(outbound_message_data(REGULAR_PAYLOAD));
assert!(lane.storage.message(&1).is_some());
assert!(lane.storage.message(&2).is_some());
assert!(lane.storage.message(&3).is_some());
assert_eq!(
lane.prune_messages(RocksDbWeight::get(), RocksDbWeight::get().writes(101)),
Weight::zero()
);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);
// after confirmation, some messages are received
assert_eq!(
lane.confirm_delivery(2, 2, &unrewarded_relayers(1..=2)),
Ok(Some(delivered_messages(1..=2))),
);
assert_eq!(
lane.prune_messages(RocksDbWeight::get(), RocksDbWeight::get().writes(101)),
RocksDbWeight::get().writes(3),
);
assert!(lane.storage.message(&1).is_none());
assert!(lane.storage.message(&2).is_none());
assert!(lane.storage.message(&3).is_some());
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 3);
// after last message is confirmed, everything is pruned
assert_eq!(
lane.confirm_delivery(1, 3, &unrewarded_relayers(3..=3)),
Ok(Some(delivered_messages(3..=3))),
);
assert_eq!(
lane.prune_messages(RocksDbWeight::get(), RocksDbWeight::get().writes(101)),
RocksDbWeight::get().writes(2),
);
assert!(lane.storage.message(&1).is_none());
assert!(lane.storage.message(&2).is_none());
assert!(lane.storage.message(&3).is_none());
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 4);
});
}

#[test]
fn confirm_delivery_detects_when_more_than_expected_messages_are_confirmed() {
run_test(|| {
Expand Down
Loading
Loading