Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions monad-raptorcast/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ rand_chacha = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
tokio = { workspace = true }
alloy-primitives.workspace = true

[dev-dependencies]
monad-testutil = { workspace = true }
Expand Down
273 changes: 221 additions & 52 deletions monad-raptorcast/src/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::{
ops::Range,
};

use alloy_primitives::U256;
use bitvec::prelude::*;
use bytes::{Bytes, BytesMut};
use itertools::Itertools;
Expand All @@ -35,7 +36,7 @@ use monad_dataplane::RecvUdpMsg;
use monad_merkle::{MerkleHash, MerkleProof, MerkleTree};
use monad_raptor::{ManagedDecoder, SOURCE_SYMBOLS_MIN};
use monad_types::{Epoch, NodeId, Stake};
use rand::seq::SliceRandom;
use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};
use tracing::warn;

use crate::{
Expand Down Expand Up @@ -463,6 +464,170 @@ where
true
}

// Determine how many UDP datagrams (packets) we need to send out. Each
// datagram can only effectively transport `data_size` (~1220) bytes out of
// a total of `app_message_len` bytes (~18% total overhead).
// We also take into account a redundancy factor.
// In the case of Broadcast, this function is called with `num_copies` as an
// additional factor.
// This function can optionally be called with an assignment of the number of
// initial chunks per validator (`ic`), which is rounded up. In that case, the
// number of packets is computed as the sum of all initial chunks plus one.
// If the number of packets exceeds MAX_NUM_PACKETS, this function returns 0.
fn compute_num_packets(
app_message_len: u32,
data_size: u16,
redundancy: Redundancy,
num_copies: u32,
ic: &Option<Vec<u64>>,
) -> usize {
let mut num_packets: usize = (app_message_len as usize)
.div_ceil(usize::from(data_size))
.max(SOURCE_SYMBOLS_MIN);
// amplify by redundancy factor
num_packets = redundancy
.scale(num_packets * num_copies as usize)
.expect("redundancy-scaled num_packets doesn't fit in usize");

// Round up the number of chunks sent to each validator for v2v RaptorCast.
// Only do this if we have initial chunks (`ic`) provided.
if let Some(ic) = ic {
// Calculate num_packets as the sum of all initial chunks + 1 rounding chunk
// per validator
let rounded_up_num_packets =
ic.iter().map(|&value| value as usize).sum::<usize>() + ic.len();
if rounded_up_num_packets > num_packets + ic.len() {
tracing::warn!(
app_message_len,
rounded_up_num_packets,
num_packets,
"Rounded up num packets is larger than expected. This indicates a bug."
);
}
// The rounded-up number of packets should be larger than the non-rounded-up number
if num_packets <= rounded_up_num_packets {
num_packets = rounded_up_num_packets;
} else {
tracing::warn!(
app_message_len,
rounded_up_num_packets,
num_packets,
"Rounded up num packets is smaller than non-rounded-up value, using non-rounded-up value"
);
}
}
if num_packets > MAX_NUM_PACKETS {
tracing::warn!(
// ?build_target,
// ?known_addresses,
num_packets,
MAX_NUM_PACKETS,
"exceeded maximum number of packets in a message, dropping message",
);
num_packets = 0;
}
num_packets
}

// Return the vector `ic`, which, for each validator, contains the number of
// initial chunks that a given validator is assigned. The number is the stake-
// weighted obligation of the validator, rounded down. In addition to these
// initial chunks, each validator will receive one additional rounding chunk.
// In total, each validator will receive `ic[i] + 1` chunks.
// The order of validators in the return vector is the same as in the
// `validator_stake_map`.
fn chunk_assignment<ST: CertificateSignatureRecoverable>(
app_message_len: u32,
data_size: u16,
redundancy: Redundancy,
leader: NodeId<CertificateSignaturePubKey<ST>>,
validator_stake_map: Vec<(NodeId<CertificateSignaturePubKey<ST>>, Stake)>,
) -> Vec<u64> {
// First, compute the number of packets without taking rounding into account
let num_packets_without_rounding = compute_num_packets(
app_message_len,
data_size,
redundancy,
1, // Only needed for Broadcast, not RaptorCast
&None,
);
// compute_num_packets returns 0 if MAX_NUM_PACKETS exceeded, in that case, return zero vector.
if num_packets_without_rounding == 0 {
return vec![0u64; validator_stake_map.len()];
}
let validator_stake_map_without_leader = validator_stake_map
.iter()
.filter(|(i, _)| leader != *i)
.cloned()
.collect::<Vec<_>>();
// Compute total weight
let total_weight: U256 = validator_stake_map_without_leader
.iter()
.map(|(_, stake)| -> U256 { stake.0 })
.sum();

let num_receivers = validator_stake_map_without_leader.len();

// Compute obligations: num_packets_without_rounding * stake / total_weight for each validator.
// The obligations are scaled by the basis points (bps) to avoid floating point arithmetic on
// the large values of stake and total_weight. Only at the very end we convert to a float.
// In the future, we might want to avoid floats alltogether.
let unit_bias: f64 = 10_000_000.0;
Copy link

Copilot AI Aug 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The magic number 10_000_000.0 should be defined as a named constant with documentation explaining its purpose in avoiding floating point precision issues.

Suggested change
let unit_bias: f64 = 10_000_000.0;
let unit_bias: f64 = STAKE_UNIT_BIAS;

Copilot uses AI. Check for mistakes.
Copy link
Contributor

@xinyuan-dev xinyuan-dev Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this bias necessary? stake / total_stake is a fractional number between 0 and 1. In IEEE floating point representation, there is roughly same number of representable real numbers between 0 and 1, versus between 1 and infinity. (source) so there should be enough precision if stake / total_stake is precise enough.

let obligations: Vec<f64> = validator_stake_map_without_leader
.iter()
.map(|(_, stake)| {
let scaled_stake = stake.0 * U256::from(unit_bias);
let normalized_stake = scaled_stake.div_ceil(total_weight);
let total_obligation: U256 =
normalized_stake * U256::from(num_packets_without_rounding);
(total_obligation.to::<u64>() as f64) / unit_bias
})
.collect();

// Compute ic (initial chunks) for each validator. Since there is an
// additional rounding chunk sent to each valdiator, we round down here.
let ic: Vec<u64> = obligations.iter().map(|o| (o.floor() as u64)).collect();

tracing::trace!(
?leader,
num_packets_without_rounding,
data_size,
app_message_len,
total_weight = total_weight.to_string(),
num_receivers,
obligations =? obligations.iter().map(|value| value.to_string()).collect::<Vec<_>>(),
ic =? ic.iter().map(|value| value.to_string()).collect::<Vec<_>>(),
validator_stake_map_without_leader =? validator_stake_map_without_leader
.iter()
.map(|(node_id, stake)| (node_id, stake.0.to_string()))
.collect::<Vec<_>>(),
"Assigning initial chunks to validators",
);

ic
}

// Shuffle the validator stake map for chunk assignment. This uses a deterministic
// seed, as in the future, it will be required that the leader and all validators
// compute the shuffling in the same way (for features not yet implemented).
// In the future, this should be done using known shuffling algorithm to allow for easy
// implementation in other languages, e.g., using Mt19937 and Fisher Yates shuffle.
fn shuffle_validator_num_chunks<ST: CertificateSignatureRecoverable>(
map_validator_num_chunks: HashMap<NodeId<CertificateSignaturePubKey<ST>>, usize>,
seed: HexBytes<20>,
) -> Vec<(NodeId<CertificateSignaturePubKey<ST>>, usize)> {
// We first bring the validators into a defined order
let mut validator_num_chunks: Vec<_> = map_validator_num_chunks.into_iter().collect();
validator_num_chunks.sort_by_key(|(node_id, _)| *node_id);

let mut padded_seed = [0u8; 32];
padded_seed[..20].copy_from_slice(&seed.0);
let mut rng = StdRng::from_seed(padded_seed);

validator_num_chunks.shuffle(&mut rng);
validator_num_chunks
}

/// Stuff to include:
/// - 65 bytes => Signature of sender over hash(rest of message up to merkle proof, concatenated with merkle root)
/// - 2 bytes => Version: bumped on protocol updates
Expand Down Expand Up @@ -611,38 +776,46 @@ where
BuildTarget::Raptorcast(_) | BuildTarget::FullNodeRaptorCast(_)
);

// Determine how many UDP datagrams (packets) we need to send out. Each
// datagram can only effectively transport `data_size` (~1220) bytes out of
// a total of `app_message_len` bytes (~18% total overhead).
let num_packets: usize = {
let mut num_packets: usize = (app_message_len as usize)
.div_ceil(usize::from(data_size))
.max(SOURCE_SYMBOLS_MIN);
// amplify by redundancy factor
num_packets = redundancy
.scale(num_packets)
.expect("redundancy-scaled num_packets doesn't fit in usize");

if let BuildTarget::Broadcast(nodes) = &build_target {
num_packets = num_packets
.checked_mul(nodes.len())
.expect("num_packets doesn't fit in usize")
}

if num_packets > MAX_NUM_PACKETS {
tracing::warn!(
?build_target,
?known_addresses,
num_packets,
MAX_NUM_PACKETS,
"exceeded maximum number of packets in a message, dropping message",
// Compute the number of chunks per validator in Validator-to-Validator RaptorCast.
let mut ic: Option<Vec<u64>> = None;
let mut map_validator_num_chunks: HashMap<NodeId<CertificateSignaturePubKey<ST>>, usize> =
HashMap::new();
if let BuildTarget::Raptorcast(epoch_validators) = &build_target {
if !epoch_validators.is_empty() {
let validator_stake_map: Vec<_> = epoch_validators
.iter()
.map(|(node_id, validator)| (*node_id, validator))
.collect();
let new_ic = chunk_assignment::<ST>(
app_message_len,
data_size,
redundancy,
self_id,
validator_stake_map,
);
return Vec::new();
// Number of chunks is a validator's initial chunks plus one rounding chunk
for (i, (node_id, _stake)) in epoch_validators.iter().enumerate() {
let num_assigned_chunks = new_ic[i] as usize + 1;
map_validator_num_chunks.insert(*node_id, num_assigned_chunks);
}
ic = Some(new_ic);
}
}

num_packets
// Number of copies we send for the message. Only relevant for broadcast.
let num_copies: u32 = if let BuildTarget::Broadcast(ref nodes) = build_target {
nodes.len() as u32
} else {
1
};

// Compute the number of packets we need to send out.
// If the number of packets exceeds MAX_NUM_PACKETS, this function returns 0.
let num_packets = compute_num_packets(app_message_len, data_size, redundancy, num_copies, &ic);
if num_packets == 0 {
return Vec::new();
}

// Create a long flat message, concatenating the (future) UDP bodies of all
// datagrams. This includes everything except Ethernet, IP and UDP headers.
let mut message = BytesMut::zeroed(segment_size as usize * num_packets);
Expand Down Expand Up @@ -748,53 +921,50 @@ where

assert!(!epoch_validators.is_empty());

// generate chunks if epoch validators is not empty
// FIXME should self be included in total_stake?
let total_stake: Stake = epoch_validators.total_stake();

if total_stake == Stake::ZERO {
if map_validator_num_chunks.is_empty() {
tracing::warn!(
?self_id,
"RaptorCast build_message got zero total stake, not sending message"
"RaptorCast map_validator_num_chunks is empty, cannot generate chunks"
);
return Vec::new();
}

let mut running_stake = Stake::ZERO;
let mut start_idx = 0;
let mut chunk_idx = 0_u16;
let mut validator_set: Vec<_> = epoch_validators.iter().collect();
// Group shuffling so chunks for small proposals aren't always assigned
// to the same nodes, until researchers come up with something better.
validator_set.shuffle(&mut rand::thread_rng());
for (node_id, stake) in validator_set {
let start_idx: usize =
(num_packets as f64 * (running_stake / total_stake)) as usize;
running_stake += stake;
let end_idx: usize = (num_packets as f64 * (running_stake / total_stake)) as usize;
// Shuffle the validator_num_chunks to avoid always assigning the same nodes
let validator_num_chunks: Vec<(NodeId<CertificateSignaturePubKey<ST>>, usize)> =
shuffle_validator_num_chunks::<ST>(map_validator_num_chunks, app_message_hash);

if start_idx == end_idx {
continue;
}
let mut trace_string = String::new();
for (node_id, num_chunks) in validator_num_chunks.iter() {
let end_idx: usize = start_idx + num_chunks;
if let Some(addr) = known_addresses.get(node_id) {
outbound_gso_idx.push((
*addr,
start_idx * segment_size as usize..end_idx * segment_size as usize,
));
trace_string.push_str(&format!("{:?}: {} chunks, ", node_id, num_chunks));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably do not want to leave have the tracing code run conditionlessly. how about put a trace statement here instead?

} else {
tracing::warn!(
?node_id,
"RaptorCast build_message Raptorcast not sending message, address unknown"
)
}

let node_hash = compute_hash(node_id);
for (chunk_symbol_id, chunk_data) in chunk_datas[start_idx..end_idx].iter_mut() {
// populate chunk_recipient
chunk_data[0..20].copy_from_slice(node_hash.as_slice());
chunk_data[0..20].copy_from_slice(&compute_hash(node_id).0);
*chunk_symbol_id = Some(chunk_idx);
chunk_idx += 1;
}
start_idx = end_idx;
}
tracing::trace!(
?self_id,
app_message_hash =? app_message_hash,
app_message_len,
trace_string =? trace_string,
"Assigning chunks to validators"
);
}
BuildTarget::FullNodeRaptorCast(group) => {
assert!(is_broadcast && is_raptor_broadcast);
Expand All @@ -813,8 +983,7 @@ where
let total_peers = group.size_excl_self();
let mut pp = 0;
let mut chunk_idx = 0_u16;
// Group shuffling so chunks for small proposals aren't always assigned
// to the same nodes, until researchers come up with something better.
// TODO: shuffling needed?
for node_id in group.iter_skip_self_and_author(&self_id, rand::random::<usize>()) {
let start_idx: usize = num_packets * pp / total_peers;
pp += 1;
Expand Down
Loading