Skip to content

Commit

Permalink
Use plain source_queue view when selecting nonces for delivery (parit…
Browse files Browse the repository at this point in the history
…ytech#1010)

* use plain source_queue view when selecting nonces for delivery

* Revert "use plain source_queue view when selecting nonces for delivery"

This reverts commit f1fdc3f.

* Revert "Revert "use plain source_queue view when selecting nonces for delivery""

This reverts commit ccefa5e.

* clippy

* fmt
  • Loading branch information
svyatonik authored Jun 21, 2021
1 parent fe757ac commit 85c2582
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 122 deletions.
102 changes: 75 additions & 27 deletions relays/messages/src/message_race_delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,20 @@ use crate::message_race_loop::{
MessageRace, NoncesRange, RaceState, RaceStrategy, SourceClient, SourceClientNonces, TargetClient,
TargetClientNonces,
};
use crate::message_race_strategy::BasicStrategy;
use crate::message_race_strategy::{BasicStrategy, SourceRangesQueue};
use crate::metrics::MessageLaneLoopMetrics;

use async_trait::async_trait;
use bp_messages::{MessageNonce, UnrewardedRelayersState, Weight};
use futures::stream::FusedStream;
use num_traits::{SaturatingAdd, Zero};
use relay_utils::FailedClient;
use std::{collections::VecDeque, marker::PhantomData, ops::RangeInclusive, time::Duration};
use std::{
collections::VecDeque,
marker::PhantomData,
ops::{Range, RangeInclusive},
time::Duration,
};

/// Run message delivery race.
pub async fn run<P: MessageLane>(
Expand Down Expand Up @@ -499,22 +504,25 @@ where
let lane_source_client = self.lane_source_client.clone();
let lane_target_client = self.lane_target_client.clone();

let maximal_source_queue_index = self.strategy.maximal_available_source_queue_index(race_state)?;
let previous_total_dispatch_weight = self.total_queued_dispatch_weight();
let selected_nonces = self
.strategy
.select_nonces_to_deliver_with_selector(race_state.clone(), |range| async {
select_nonces_for_delivery_transaction(
relayer_mode,
max_nonces,
max_messages_weight_in_single_batch,
max_messages_size_in_single_batch,
lane_source_client.clone(),
lane_target_client.clone(),
range,
)
.await
})
.await?;
let source_queue = self.strategy.source_queue();
let range_end = select_nonces_for_delivery_transaction(
relayer_mode,
max_nonces,
max_messages_weight_in_single_batch,
max_messages_size_in_single_batch,
lane_source_client.clone(),
lane_target_client.clone(),
source_queue,
0..maximal_source_queue_index + 1,
)
.await?;

let range_begin = source_queue[0].1.begin();
let selected_nonces = range_begin..=range_end;
self.strategy.remove_le_nonces_from_source_queue(range_end);

let new_total_dispatch_weight = self.total_queued_dispatch_weight();
let dispatch_weight = previous_total_dispatch_weight - new_total_dispatch_weight;

Expand All @@ -533,15 +541,21 @@ where
///
/// The function returns nonces that are NOT selected for current batch and will be
/// delivered later.
#[allow(clippy::too_many_arguments)]
async fn select_nonces_for_delivery_transaction<P: MessageLane>(
relayer_mode: RelayerMode,
max_messages_in_this_batch: MessageNonce,
max_messages_weight_in_single_batch: Weight,
max_messages_size_in_single_batch: u32,
lane_source_client: impl MessageLaneSourceClient<P>,
lane_target_client: impl MessageLaneTargetClient<P>,
ready_nonces: MessageDetailsMap<P::SourceChainBalance>,
) -> Option<MessageDetailsMap<P::SourceChainBalance>> {
nonces_queue: &SourceRangesQueue<
P::SourceHeaderHash,
P::SourceHeaderNumber,
MessageDetailsMap<P::SourceChainBalance>,
>,
nonces_queue_range: Range<usize>,
) -> Option<MessageNonce> {
let mut hard_selected_count = 0;
let mut soft_selected_count = 0;

Expand All @@ -563,7 +577,11 @@ async fn select_nonces_for_delivery_transaction<P: MessageLane>(
Zero::zero()
};

for (index, (nonce, details)) in ready_nonces.iter().enumerate() {
let all_ready_nonces = nonces_queue
.range(nonces_queue_range.clone())
.flat_map(|(_, ready_nonces)| ready_nonces.iter())
.enumerate();
for (index, (nonce, details)) in all_ready_nonces {
// Since we (hopefully) have some reserves in `max_messages_weight_in_single_batch`
// and `max_messages_size_in_single_batch`, we may still try to submit transaction
// with single message if message overflows these limits. The worst case would be if
Expand Down Expand Up @@ -671,23 +689,27 @@ async fn select_nonces_for_delivery_transaction<P: MessageLane>(
selected_count = new_selected_count;
}

let hard_selected_begin_nonce = nonces_queue[nonces_queue_range.start].1.begin();
if hard_selected_count != soft_selected_count {
let hard_selected_end_nonce = hard_selected_begin_nonce + hard_selected_count as MessageNonce - 1;
let soft_selected_begin_nonce = hard_selected_begin_nonce;
let soft_selected_end_nonce = soft_selected_begin_nonce + soft_selected_count as MessageNonce - 1;
log::warn!(
target: "bridge",
"Relayer may deliver nonces [{:?}; {:?}], but because of its strategy ({:?}) it has selected \
nonces [{:?}; {:?}].",
ready_nonces.keys().next(),
ready_nonces.keys().next().map(|begin| begin + (hard_selected_count as MessageNonce) - 1),
hard_selected_begin_nonce,
hard_selected_end_nonce,
relayer_mode,
ready_nonces.keys().next(),
ready_nonces.keys().next().map(|begin| begin + (soft_selected_count as MessageNonce) - 1),

soft_selected_begin_nonce,
soft_selected_end_nonce,
);

hard_selected_count = soft_selected_count;
}
if hard_selected_count != ready_nonces.len() {
Some(ready_nonces.into_iter().skip(hard_selected_count).collect())

if hard_selected_count != 0 {
Some(hard_selected_begin_nonce + hard_selected_count as MessageNonce - 1)
} else {
None
}
Expand Down Expand Up @@ -1127,4 +1149,30 @@ mod tests {
Some(((20..=23), proof_parameters(false, 4)))
);
}

#[async_std::test]
async fn relayer_uses_flattened_view_of_the_source_queue_to_select_nonces() {
// Real scenario that has happened on test deployments:
// 1) relayer witnessed M1 at block 1 => it has separate entry in the `source_queue`
// 2) relayer witnessed M2 at block 2 => it has separate entry in the `source_queue`
// 3) if block 2 is known to the target node, then both M1 and M2 are selected for single delivery,
// even though weight(M1+M2) > larger than largest allowed weight
//
// This was happening because selector (`select_nonces_for_delivery_transaction`) has been called
// for every `source_queue` entry separately without preserving any context.
let (mut state, mut strategy) = prepare_strategy();
let nonces = source_nonces(24..=25, 19, DEFAULT_REWARD - DELIVERY_TRANSACTION_COST);
strategy.strategy.source_nonces_updated(header_id(2), nonces);
strategy.max_unrewarded_relayer_entries_at_target = 100;
strategy.max_unconfirmed_nonces_at_target = 100;
strategy.max_messages_in_single_batch = 5;
strategy.max_messages_weight_in_single_batch = 100;
strategy.max_messages_size_in_single_batch = 100;
state.best_finalized_source_header_id_at_best_target = Some(header_id(2));

assert_eq!(
strategy.select_nonces_to_deliver(state).await,
Some(((20..=24), proof_parameters(false, 5)))
);
}
}
Loading

0 comments on commit 85c2582

Please sign in to comment.