-
Couldn't load subscription status.
- Fork 290
Feature: round up RaptorCast chunk assignments #2215
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ use std::{ | |
| ops::Range, | ||
| }; | ||
|
|
||
| use alloy_primitives::U256; | ||
| use bitvec::prelude::*; | ||
| use bytes::{Bytes, BytesMut}; | ||
| use itertools::Itertools; | ||
|
|
@@ -35,7 +36,7 @@ use monad_dataplane::RecvUdpMsg; | |
| use monad_merkle::{MerkleHash, MerkleProof, MerkleTree}; | ||
| use monad_raptor::{ManagedDecoder, SOURCE_SYMBOLS_MIN}; | ||
| use monad_types::{Epoch, NodeId, Stake}; | ||
| use rand::seq::SliceRandom; | ||
| use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng}; | ||
| use tracing::warn; | ||
|
|
||
| use crate::{ | ||
|
|
@@ -463,6 +464,170 @@ where | |
| true | ||
| } | ||
|
|
||
| // Determine how many UDP datagrams (packets) we need to send out. Each | ||
| // datagram can only effectively transport `data_size` (~1220) bytes out of | ||
| // a total of `app_message_len` bytes (~18% total overhead). | ||
| // We also take into account a redundancy factor. | ||
| // In the case of Broadcast, this function is called with `num_copies` as an | ||
| // additional factor. | ||
| // This function can optionally be called with an assignment of the number of | ||
| // initial chunks per validator (`ic`), which is rounded up. In that case, the | ||
| // number of packets is computed as the sum of all initial chunks plus one. | ||
| // If the number of packets exceeds MAX_NUM_PACKETS, this function returns 0. | ||
| fn compute_num_packets( | ||
| app_message_len: u32, | ||
| data_size: u16, | ||
| redundancy: Redundancy, | ||
| num_copies: u32, | ||
| ic: &Option<Vec<u64>>, | ||
| ) -> usize { | ||
| let mut num_packets: usize = (app_message_len as usize) | ||
| .div_ceil(usize::from(data_size)) | ||
| .max(SOURCE_SYMBOLS_MIN); | ||
| // amplify by redundancy factor | ||
| num_packets = redundancy | ||
| .scale(num_packets * num_copies as usize) | ||
| .expect("redundancy-scaled num_packets doesn't fit in usize"); | ||
|
|
||
| // Round up the number of chunks sent to each validator for v2v RaptorCast. | ||
| // Only do this if we have initial chunks (`ic`) provided. | ||
| if let Some(ic) = ic { | ||
| // Calculate num_packets as the sum of all initial chunks + 1 rounding chunk | ||
| // per validator | ||
| let rounded_up_num_packets = | ||
| ic.iter().map(|&value| value as usize).sum::<usize>() + ic.len(); | ||
| if rounded_up_num_packets > num_packets + ic.len() { | ||
| tracing::warn!( | ||
| app_message_len, | ||
| rounded_up_num_packets, | ||
| num_packets, | ||
| "Rounded up num packets is larger than expected. This indicates a bug." | ||
| ); | ||
| } | ||
| // The rounded-up number of packets should be larger than the non-rounded-up number | ||
| if num_packets <= rounded_up_num_packets { | ||
| num_packets = rounded_up_num_packets; | ||
| } else { | ||
| tracing::warn!( | ||
| app_message_len, | ||
| rounded_up_num_packets, | ||
| num_packets, | ||
| "Rounded up num packets is smaller than non-rounded-up value, using non-rounded-up value" | ||
| ); | ||
| } | ||
| } | ||
| if num_packets > MAX_NUM_PACKETS { | ||
| tracing::warn!( | ||
| // ?build_target, | ||
| // ?known_addresses, | ||
| num_packets, | ||
| MAX_NUM_PACKETS, | ||
| "exceeded maximum number of packets in a message, dropping message", | ||
| ); | ||
| num_packets = 0; | ||
| } | ||
| num_packets | ||
| } | ||
|
|
||
| // Return the vector `ic`, which, for each validator, contains the number of | ||
| // initial chunks that a given validator is assigned. The number is the stake- | ||
| // weighted obligation of the validator, rounded down. In addition to these | ||
| // initial chunks, each validator will receive one additional rounding chunk. | ||
| // In total, each validator will receive `ic[i] + 1` chunks. | ||
| // The order of validators in the return vector is the same as in the | ||
| // `validator_stake_map`. | ||
| fn chunk_assignment<ST: CertificateSignatureRecoverable>( | ||
| app_message_len: u32, | ||
| data_size: u16, | ||
| redundancy: Redundancy, | ||
| leader: NodeId<CertificateSignaturePubKey<ST>>, | ||
| validator_stake_map: Vec<(NodeId<CertificateSignaturePubKey<ST>>, Stake)>, | ||
| ) -> Vec<u64> { | ||
| // First, compute the number of packets without taking rounding into account | ||
| let num_packets_without_rounding = compute_num_packets( | ||
| app_message_len, | ||
| data_size, | ||
| redundancy, | ||
| 1, // Only needed for Broadcast, not RaptorCast | ||
| &None, | ||
| ); | ||
| // compute_num_packets returns 0 if MAX_NUM_PACKETS exceeded, in that case, return zero vector. | ||
| if num_packets_without_rounding == 0 { | ||
| return vec![0u64; validator_stake_map.len()]; | ||
| } | ||
| let validator_stake_map_without_leader = validator_stake_map | ||
| .iter() | ||
| .filter(|(i, _)| leader != *i) | ||
| .cloned() | ||
| .collect::<Vec<_>>(); | ||
| // Compute total weight | ||
| let total_weight: U256 = validator_stake_map_without_leader | ||
| .iter() | ||
| .map(|(_, stake)| -> U256 { stake.0 }) | ||
| .sum(); | ||
|
|
||
| let num_receivers = validator_stake_map_without_leader.len(); | ||
|
|
||
| // Compute obligations: num_packets_without_rounding * stake / total_weight for each validator. | ||
| // The obligations are scaled by the basis points (bps) to avoid floating point arithmetic on | ||
| // the large values of stake and total_weight. Only at the very end we convert to a float. | ||
| // In the future, we might want to avoid floats alltogether. | ||
| let unit_bias: f64 = 10_000_000.0; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this bias necessary? |
||
| let obligations: Vec<f64> = validator_stake_map_without_leader | ||
| .iter() | ||
| .map(|(_, stake)| { | ||
| let scaled_stake = stake.0 * U256::from(unit_bias); | ||
| let normalized_stake = scaled_stake.div_ceil(total_weight); | ||
| let total_obligation: U256 = | ||
| normalized_stake * U256::from(num_packets_without_rounding); | ||
| (total_obligation.to::<u64>() as f64) / unit_bias | ||
| }) | ||
| .collect(); | ||
|
|
||
| // Compute ic (initial chunks) for each validator. Since there is an | ||
| // additional rounding chunk sent to each valdiator, we round down here. | ||
| let ic: Vec<u64> = obligations.iter().map(|o| (o.floor() as u64)).collect(); | ||
|
|
||
| tracing::trace!( | ||
| ?leader, | ||
| num_packets_without_rounding, | ||
| data_size, | ||
| app_message_len, | ||
| total_weight = total_weight.to_string(), | ||
| num_receivers, | ||
| obligations =? obligations.iter().map(|value| value.to_string()).collect::<Vec<_>>(), | ||
| ic =? ic.iter().map(|value| value.to_string()).collect::<Vec<_>>(), | ||
| validator_stake_map_without_leader =? validator_stake_map_without_leader | ||
| .iter() | ||
| .map(|(node_id, stake)| (node_id, stake.0.to_string())) | ||
| .collect::<Vec<_>>(), | ||
| "Assigning initial chunks to validators", | ||
| ); | ||
|
|
||
| ic | ||
| } | ||
|
|
||
| // Shuffle the validator stake map for chunk assignment. This uses a deterministic | ||
| // seed, as in the future, it will be required that the leader and all validators | ||
| // compute the shuffling in the same way (for features not yet implemented). | ||
| // In the future, this should be done using known shuffling algorithm to allow for easy | ||
| // implementation in other languages, e.g., using Mt19937 and Fisher Yates shuffle. | ||
| fn shuffle_validator_num_chunks<ST: CertificateSignatureRecoverable>( | ||
| map_validator_num_chunks: HashMap<NodeId<CertificateSignaturePubKey<ST>>, usize>, | ||
| seed: HexBytes<20>, | ||
| ) -> Vec<(NodeId<CertificateSignaturePubKey<ST>>, usize)> { | ||
| // We first bring the validators into a defined order | ||
| let mut validator_num_chunks: Vec<_> = map_validator_num_chunks.into_iter().collect(); | ||
| validator_num_chunks.sort_by_key(|(node_id, _)| *node_id); | ||
|
|
||
| let mut padded_seed = [0u8; 32]; | ||
| padded_seed[..20].copy_from_slice(&seed.0); | ||
| let mut rng = StdRng::from_seed(padded_seed); | ||
|
|
||
| validator_num_chunks.shuffle(&mut rng); | ||
| validator_num_chunks | ||
| } | ||
|
|
||
| /// Stuff to include: | ||
| /// - 65 bytes => Signature of sender over hash(rest of message up to merkle proof, concatenated with merkle root) | ||
| /// - 2 bytes => Version: bumped on protocol updates | ||
|
|
@@ -611,38 +776,46 @@ where | |
| BuildTarget::Raptorcast(_) | BuildTarget::FullNodeRaptorCast(_) | ||
| ); | ||
|
|
||
| // Determine how many UDP datagrams (packets) we need to send out. Each | ||
| // datagram can only effectively transport `data_size` (~1220) bytes out of | ||
| // a total of `app_message_len` bytes (~18% total overhead). | ||
| let num_packets: usize = { | ||
| let mut num_packets: usize = (app_message_len as usize) | ||
| .div_ceil(usize::from(data_size)) | ||
| .max(SOURCE_SYMBOLS_MIN); | ||
| // amplify by redundancy factor | ||
| num_packets = redundancy | ||
| .scale(num_packets) | ||
| .expect("redundancy-scaled num_packets doesn't fit in usize"); | ||
|
|
||
| if let BuildTarget::Broadcast(nodes) = &build_target { | ||
| num_packets = num_packets | ||
| .checked_mul(nodes.len()) | ||
| .expect("num_packets doesn't fit in usize") | ||
| } | ||
|
|
||
| if num_packets > MAX_NUM_PACKETS { | ||
| tracing::warn!( | ||
| ?build_target, | ||
| ?known_addresses, | ||
| num_packets, | ||
| MAX_NUM_PACKETS, | ||
| "exceeded maximum number of packets in a message, dropping message", | ||
| // Compute the number of chunks per validator in Validator-to-Validator RaptorCast. | ||
| let mut ic: Option<Vec<u64>> = None; | ||
| let mut map_validator_num_chunks: HashMap<NodeId<CertificateSignaturePubKey<ST>>, usize> = | ||
| HashMap::new(); | ||
| if let BuildTarget::Raptorcast(epoch_validators) = &build_target { | ||
| if !epoch_validators.is_empty() { | ||
| let validator_stake_map: Vec<_> = epoch_validators | ||
| .iter() | ||
| .map(|(node_id, validator)| (*node_id, validator)) | ||
| .collect(); | ||
| let new_ic = chunk_assignment::<ST>( | ||
| app_message_len, | ||
| data_size, | ||
| redundancy, | ||
| self_id, | ||
| validator_stake_map, | ||
| ); | ||
| return Vec::new(); | ||
| // Number of chunks is a validator's initial chunks plus one rounding chunk | ||
| for (i, (node_id, _stake)) in epoch_validators.iter().enumerate() { | ||
| let num_assigned_chunks = new_ic[i] as usize + 1; | ||
| map_validator_num_chunks.insert(*node_id, num_assigned_chunks); | ||
| } | ||
| ic = Some(new_ic); | ||
| } | ||
| } | ||
|
|
||
| num_packets | ||
| // Number of copies we send for the message. Only relevant for broadcast. | ||
| let num_copies: u32 = if let BuildTarget::Broadcast(ref nodes) = build_target { | ||
| nodes.len() as u32 | ||
| } else { | ||
| 1 | ||
| }; | ||
|
|
||
| // Compute the number of packets we need to send out. | ||
| // If the number of packets exceeds MAX_NUM_PACKETS, this function returns 0. | ||
| let num_packets = compute_num_packets(app_message_len, data_size, redundancy, num_copies, &ic); | ||
| if num_packets == 0 { | ||
| return Vec::new(); | ||
| } | ||
|
|
||
| // Create a long flat message, concatenating the (future) UDP bodies of all | ||
| // datagrams. This includes everything except Ethernet, IP and UDP headers. | ||
| let mut message = BytesMut::zeroed(segment_size as usize * num_packets); | ||
|
|
@@ -748,53 +921,50 @@ where | |
|
|
||
| assert!(!epoch_validators.is_empty()); | ||
|
|
||
| // generate chunks if epoch validators is not empty | ||
| // FIXME should self be included in total_stake? | ||
| let total_stake: Stake = epoch_validators.total_stake(); | ||
|
|
||
| if total_stake == Stake::ZERO { | ||
| if map_validator_num_chunks.is_empty() { | ||
| tracing::warn!( | ||
| ?self_id, | ||
| "RaptorCast build_message got zero total stake, not sending message" | ||
| "RaptorCast map_validator_num_chunks is empty, cannot generate chunks" | ||
| ); | ||
| return Vec::new(); | ||
| } | ||
|
|
||
| let mut running_stake = Stake::ZERO; | ||
| let mut start_idx = 0; | ||
| let mut chunk_idx = 0_u16; | ||
| let mut validator_set: Vec<_> = epoch_validators.iter().collect(); | ||
| // Group shuffling so chunks for small proposals aren't always assigned | ||
| // to the same nodes, until researchers come up with something better. | ||
| validator_set.shuffle(&mut rand::thread_rng()); | ||
| for (node_id, stake) in validator_set { | ||
| let start_idx: usize = | ||
| (num_packets as f64 * (running_stake / total_stake)) as usize; | ||
| running_stake += stake; | ||
| let end_idx: usize = (num_packets as f64 * (running_stake / total_stake)) as usize; | ||
| // Shuffle the validator_num_chunks to avoid always assigning the same nodes | ||
| let validator_num_chunks: Vec<(NodeId<CertificateSignaturePubKey<ST>>, usize)> = | ||
| shuffle_validator_num_chunks::<ST>(map_validator_num_chunks, app_message_hash); | ||
|
|
||
| if start_idx == end_idx { | ||
| continue; | ||
| } | ||
| let mut trace_string = String::new(); | ||
| for (node_id, num_chunks) in validator_num_chunks.iter() { | ||
| let end_idx: usize = start_idx + num_chunks; | ||
| if let Some(addr) = known_addresses.get(node_id) { | ||
| outbound_gso_idx.push(( | ||
| *addr, | ||
| start_idx * segment_size as usize..end_idx * segment_size as usize, | ||
| )); | ||
| trace_string.push_str(&format!("{:?}: {} chunks, ", node_id, num_chunks)); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we probably do not want to leave have the tracing code run conditionlessly. how about put a |
||
| } else { | ||
| tracing::warn!( | ||
| ?node_id, | ||
| "RaptorCast build_message Raptorcast not sending message, address unknown" | ||
| ) | ||
| } | ||
|
|
||
| let node_hash = compute_hash(node_id); | ||
| for (chunk_symbol_id, chunk_data) in chunk_datas[start_idx..end_idx].iter_mut() { | ||
| // populate chunk_recipient | ||
| chunk_data[0..20].copy_from_slice(node_hash.as_slice()); | ||
| chunk_data[0..20].copy_from_slice(&compute_hash(node_id).0); | ||
| *chunk_symbol_id = Some(chunk_idx); | ||
| chunk_idx += 1; | ||
| } | ||
| start_idx = end_idx; | ||
| } | ||
| tracing::trace!( | ||
| ?self_id, | ||
| app_message_hash =? app_message_hash, | ||
| app_message_len, | ||
| trace_string =? trace_string, | ||
| "Assigning chunks to validators" | ||
| ); | ||
| } | ||
| BuildTarget::FullNodeRaptorCast(group) => { | ||
| assert!(is_broadcast && is_raptor_broadcast); | ||
|
|
@@ -813,8 +983,7 @@ where | |
| let total_peers = group.size_excl_self(); | ||
| let mut pp = 0; | ||
| let mut chunk_idx = 0_u16; | ||
| // Group shuffling so chunks for small proposals aren't always assigned | ||
| // to the same nodes, until researchers come up with something better. | ||
| // TODO: shuffling needed? | ||
| for node_id in group.iter_skip_self_and_author(&self_id, rand::random::<usize>()) { | ||
| let start_idx: usize = num_packets * pp / total_peers; | ||
| pp += 1; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The magic number 10_000_000.0 should be defined as a named constant with documentation explaining its purpose in avoiding floating point precision issues.