Skip to content

Commit

Permalink
Keep multiple latest confirmed nonces at source in messages relay (pa…
Browse files Browse the repository at this point in the history
…ritytech#719)

* keep multiple latest confirmed nonces at source in messages relay

* post-merge fix
  • Loading branch information
svyatonik authored and serban300 committed Apr 10, 2024
1 parent c879742 commit 949187c
Show file tree
Hide file tree
Showing 4 changed files with 275 additions and 82 deletions.
171 changes: 142 additions & 29 deletions bridges/relays/messages-relay/src/message_race_delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ use async_trait::async_trait;
use bp_message_lane::{MessageNonce, UnrewardedRelayersState, Weight};
use futures::stream::FusedStream;
use relay_utils::FailedClient;
use std::{collections::BTreeMap, marker::PhantomData, ops::RangeInclusive, time::Duration};
use std::{
collections::{BTreeMap, VecDeque},
marker::PhantomData,
ops::RangeInclusive,
time::Duration,
};

/// Run message delivery race.
pub async fn run<P: MessageLane>(
Expand Down Expand Up @@ -61,7 +66,7 @@ pub async fn run<P: MessageLane>(
max_messages_in_single_batch: params.max_messages_in_single_batch,
max_messages_weight_in_single_batch: params.max_messages_weight_in_single_batch,
max_messages_size_in_single_batch: params.max_messages_size_in_single_batch,
latest_confirmed_nonce_at_source: None,
latest_confirmed_nonces_at_source: VecDeque::new(),
target_nonces: None,
strategy: BasicStrategy::new(),
},
Expand Down Expand Up @@ -164,14 +169,17 @@ where
async fn nonces(
&self,
at_block: TargetHeaderIdOf<P>,
update_metrics: bool,
) -> Result<(TargetHeaderIdOf<P>, TargetClientNonces<DeliveryRaceTargetNoncesData>), Self::Error> {
let (at_block, latest_received_nonce) = self.client.latest_received_nonce(at_block).await?;
let (at_block, latest_confirmed_nonce) = self.client.latest_confirmed_received_nonce(at_block).await?;
let (at_block, unrewarded_relayers) = self.client.unrewarded_relayers_state(at_block).await?;

if let Some(metrics_msg) = self.metrics_msg.as_ref() {
metrics_msg.update_target_latest_received_nonce::<P>(latest_received_nonce);
metrics_msg.update_target_latest_confirmed_nonce::<P>(latest_confirmed_nonce);
if update_metrics {
if let Some(metrics_msg) = self.metrics_msg.as_ref() {
metrics_msg.update_target_latest_received_nonce::<P>(latest_received_nonce);
metrics_msg.update_target_latest_confirmed_nonce::<P>(latest_confirmed_nonce);
}
}

Ok((
Expand Down Expand Up @@ -221,8 +229,8 @@ struct MessageDeliveryStrategy<P: MessageLane> {
max_messages_weight_in_single_batch: Weight,
/// Maximal messages size in the single delivery transaction.
max_messages_size_in_single_batch: usize,
/// Latest confirmed nonce at the source client.
latest_confirmed_nonce_at_source: Option<MessageNonce>,
/// Latest confirmed nonces at the source client + the header id where we have first met this nonce.
latest_confirmed_nonces_at_source: VecDeque<(SourceHeaderIdOf<P>, MessageNonce)>,
/// Target nonces from the source client.
target_nonces: Option<TargetClientNonces<DeliveryRaceTargetNoncesData>>,
/// Basic delivery strategy.
Expand Down Expand Up @@ -259,8 +267,8 @@ impl<P: MessageLane> std::fmt::Debug for MessageDeliveryStrategy<P> {
&self.max_messages_size_in_single_batch,
)
.field(
"latest_confirmed_noncs_at_source",
&self.latest_confirmed_nonce_at_source,
"latest_confirmed_nonces_at_source",
&self.latest_confirmed_nonces_at_source,
)
.field("target_nonces", &self.target_nonces)
.field("strategy", &self.strategy)
Expand Down Expand Up @@ -292,17 +300,64 @@ impl<P: MessageLane> RaceStrategy<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::M
at_block: SourceHeaderIdOf<P>,
nonces: SourceClientNonces<Self::SourceNoncesRange>,
) {
self.latest_confirmed_nonce_at_source = nonces.confirmed_nonce;
if let Some(confirmed_nonce) = nonces.confirmed_nonce {
let is_confirmed_nonce_updated = self
.latest_confirmed_nonces_at_source
.back()
.map(|(_, prev_nonce)| *prev_nonce != confirmed_nonce)
.unwrap_or(true);
if is_confirmed_nonce_updated {
self.latest_confirmed_nonces_at_source
.push_back((at_block.clone(), confirmed_nonce));
}
}
self.strategy.source_nonces_updated(at_block, nonces)
}

fn target_nonces_updated(
fn best_target_nonces_updated(
&mut self,
nonces: TargetClientNonces<DeliveryRaceTargetNoncesData>,
race_state: &mut RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessagesProof>,
) {
self.target_nonces = Some(nonces.clone());
self.strategy.target_nonces_updated(
// best target nonces must always be ge than finalized target nonces
let mut target_nonces = self.target_nonces.take().unwrap_or_else(|| nonces.clone());
target_nonces.nonces_data = nonces.nonces_data.clone();
target_nonces.latest_nonce = std::cmp::max(target_nonces.latest_nonce, nonces.latest_nonce);
self.target_nonces = Some(target_nonces);

self.strategy.best_target_nonces_updated(
TargetClientNonces {
latest_nonce: nonces.latest_nonce,
nonces_data: (),
},
race_state,
)
}

fn finalized_target_nonces_updated(
&mut self,
nonces: TargetClientNonces<DeliveryRaceTargetNoncesData>,
race_state: &mut RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessagesProof>,
) {
if let Some(ref best_finalized_source_header_id_at_best_target) =
race_state.best_finalized_source_header_id_at_best_target
{
let oldest_header_number_to_keep = best_finalized_source_header_id_at_best_target.0;
while self
.latest_confirmed_nonces_at_source
.front()
.map(|(id, _)| id.0 < oldest_header_number_to_keep)
.unwrap_or(false)
{
self.latest_confirmed_nonces_at_source.pop_front();
}
}

if let Some(ref mut target_nonces) = self.target_nonces {
target_nonces.latest_nonce = std::cmp::max(target_nonces.latest_nonce, nonces.latest_nonce);
}

self.strategy.finalized_target_nonces_updated(
TargetClientNonces {
latest_nonce: nonces.latest_nonce,
nonces_data: (),
Expand All @@ -315,7 +370,14 @@ impl<P: MessageLane> RaceStrategy<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::M
&mut self,
race_state: &RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessagesProof>,
) -> Option<(RangeInclusive<MessageNonce>, Self::ProofParameters)> {
let latest_confirmed_nonce_at_source = self.latest_confirmed_nonce_at_source?;
let best_finalized_source_header_id_at_best_target =
race_state.best_finalized_source_header_id_at_best_target.clone()?;
let latest_confirmed_nonce_at_source = self
.latest_confirmed_nonces_at_source
.iter()
.take_while(|(id, _)| id.0 <= best_finalized_source_header_id_at_best_target.0)
.last()
.map(|(_, nonce)| *nonce)?;
let target_nonces = self.target_nonces.as_ref()?;

// There's additional condition in the message delivery race: target would reject messages
Expand Down Expand Up @@ -509,6 +571,7 @@ mod tests {
best_finalized_source_header_id_at_source: Some(header_id(1)),
best_finalized_source_header_id_at_best_target: Some(header_id(1)),
best_target_header_id: Some(header_id(1)),
best_finalized_target_header_id: Some(header_id(1)),
nonces_to_submit: None,
nonces_submitted: None,
};
Expand All @@ -519,7 +582,7 @@ mod tests {
max_messages_in_single_batch: 4,
max_messages_weight_in_single_batch: 4,
max_messages_size_in_single_batch: 4,
latest_confirmed_nonce_at_source: Some(19),
latest_confirmed_nonces_at_source: vec![(header_id(1), 19)].into_iter().collect(),
target_nonces: Some(TargetClientNonces {
latest_nonce: 19,
nonces_data: DeliveryRaceTargetNoncesData {
Expand Down Expand Up @@ -548,13 +611,17 @@ mod tests {
confirmed_nonce: Some(19),
},
);
race_strategy.strategy.target_nonces_updated(
TargetClientNonces {
latest_nonce: 19,
nonces_data: (),
},
&mut race_state,
);

let target_nonces = TargetClientNonces {
latest_nonce: 19,
nonces_data: (),
};
race_strategy
.strategy
.best_target_nonces_updated(target_nonces.clone(), &mut race_state);
race_strategy
.strategy
.finalized_target_nonces_updated(target_nonces, &mut race_state);

(race_state, race_strategy)
}
Expand Down Expand Up @@ -611,8 +678,12 @@ mod tests {

// if there are already `max_unconfirmed_nonces_at_target` messages on target,
// we need to wait until confirmations will be delivered by receiving race
strategy.latest_confirmed_nonce_at_source =
Some(strategy.target_nonces.as_ref().unwrap().latest_nonce - strategy.max_unconfirmed_nonces_at_target);
strategy.latest_confirmed_nonces_at_source = vec![(
header_id(1),
strategy.target_nonces.as_ref().unwrap().latest_nonce - strategy.max_unconfirmed_nonces_at_target,
)]
.into_iter()
.collect();
assert_eq!(strategy.select_nonces_to_deliver(&state), None);
}

Expand All @@ -622,7 +693,7 @@ mod tests {

// if there are new confirmed nonces on source, we want to relay this information
// to target to prune rewards queue
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonce_at_source.unwrap();
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonces_at_source.back().unwrap().1;
strategy.target_nonces.as_mut().unwrap().nonces_data.confirmed_nonce = prev_confirmed_nonce_at_source - 1;
assert_eq!(
strategy.select_nonces_to_deliver(&state),
Expand Down Expand Up @@ -650,7 +721,7 @@ mod tests {

// if there are already `max_unrewarded_relayer_entries_at_target` entries at target,
// we need to prove at least `messages_in_oldest_entry` rewards
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonce_at_source.unwrap();
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonces_at_source.back().unwrap().1;
{
let mut nonces_data = &mut strategy.target_nonces.as_mut().unwrap().nonces_data;
nonces_data.confirmed_nonce = prev_confirmed_nonce_at_source - 1;
Expand All @@ -667,7 +738,7 @@ mod tests {

// if there are already `max_unrewarded_relayer_entries_at_target` entries at target,
// we need to prove at least `messages_in_oldest_entry` rewards
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonce_at_source.unwrap();
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonces_at_source.back().unwrap().1;
{
let mut nonces_data = &mut strategy.target_nonces.as_mut().unwrap().nonces_data;
nonces_data.confirmed_nonce = prev_confirmed_nonce_at_source - 3;
Expand Down Expand Up @@ -747,12 +818,54 @@ mod tests {

// 1 delivery confirmation from target to source is still missing, so we may only
// relay 3 new messages
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonce_at_source.unwrap();
strategy.latest_confirmed_nonce_at_source = Some(prev_confirmed_nonce_at_source - 1);
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonces_at_source.back().unwrap().1;
strategy.latest_confirmed_nonces_at_source = vec![(header_id(1), prev_confirmed_nonce_at_source - 1)]
.into_iter()
.collect();
strategy.target_nonces.as_mut().unwrap().nonces_data.confirmed_nonce = prev_confirmed_nonce_at_source - 1;
assert_eq!(
strategy.select_nonces_to_deliver(&state),
Some(((20..=22), proof_parameters(false, 3)))
);
}

#[test]
fn message_delivery_strategy_waits_for_confirmed_nonce_header_to_appear_on_target() {
// 1 delivery confirmation from target to source is still missing, so we may deliver
// reward confirmation with our message delivery transaction. But the problem is that
// the reward has been paid at header 2 && this header is still unknown to target node.
//
// => so we can't deliver more than 3 messages
let (mut state, mut strategy) = prepare_strategy();
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonces_at_source.back().unwrap().1;
strategy.latest_confirmed_nonces_at_source = vec![
(header_id(1), prev_confirmed_nonce_at_source - 1),
(header_id(2), prev_confirmed_nonce_at_source),
]
.into_iter()
.collect();
strategy.target_nonces.as_mut().unwrap().nonces_data.confirmed_nonce = prev_confirmed_nonce_at_source - 1;
state.best_finalized_source_header_id_at_best_target = Some(header_id(1));
assert_eq!(
strategy.select_nonces_to_deliver(&state),
Some(((20..=22), proof_parameters(false, 3)))
);

// the same situation, but the header 2 is known to the target node, so we may deliver reward confirmation
let (mut state, mut strategy) = prepare_strategy();
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonces_at_source.back().unwrap().1;
strategy.latest_confirmed_nonces_at_source = vec![
(header_id(1), prev_confirmed_nonce_at_source - 1),
(header_id(2), prev_confirmed_nonce_at_source),
]
.into_iter()
.collect();
strategy.target_nonces.as_mut().unwrap().nonces_data.confirmed_nonce = prev_confirmed_nonce_at_source - 1;
state.best_finalized_source_header_id_at_source = Some(header_id(2));
state.best_finalized_source_header_id_at_best_target = Some(header_id(2));
assert_eq!(
strategy.select_nonces_to_deliver(&state),
Some(((20..=23), proof_parameters(true, 4)))
);
}
}
Loading

0 comments on commit 949187c

Please sign in to comment.