Skip to content

Commit ec98c15

Browse files
committed
assign rounding chunks to raptorcast validators
1 parent f0d050a commit ec98c15

File tree

6 files changed

+193
-37
lines changed

6 files changed

+193
-37
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

monad-raptorcast/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ futures-util = { workspace = true }
4949
insta = { workspace = true }
5050
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "sync"] }
5151
tracing-subscriber = { workspace = true, features = ["env-filter"] }
52+
rand_distr = { workspace = true }
5253

5354
[[bench]]
5455
name = "raptor_bench"

monad-raptorcast/src/packet/assembler.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@ use super::{
3030
assigner::{ChunkAssignment, ChunkOrder},
3131
BuildError, Collector, PeerAddrLookup, Result, UdpMessage,
3232
};
33-
use crate::{util::Redundancy, SIGNATURE_SIZE};
33+
use crate::{
34+
util::{compute_app_message_hash, compute_hash, Redundancy},
35+
SIGNATURE_SIZE,
36+
};
3437

3538
#[derive(Default, Clone, Copy, PartialEq, Eq)]
3639
pub enum AssembleMode {
@@ -156,7 +159,7 @@ impl<PT: PubKey> PartialEq for RecipientInner<PT> {
156159

157160
impl<PT: PubKey> Recipient<PT> {
158161
pub fn new(node_id: NodeId<PT>) -> Self {
159-
let node_hash = crate::util::compute_hash(&node_id).0;
162+
let node_hash = compute_hash(&node_id).0;
160163
let addr = OnceCell::new();
161164
let inner = RecipientInner {
162165
node_id,
@@ -618,8 +621,8 @@ pub(crate) fn build_header(
618621

619622
let (cursor_app_message_hash, cursor) =
620623
cursor.split_at_mut_checked(20).expect("header too short");
621-
let app_message_hash = calc_full_hash(app_message);
622-
cursor_app_message_hash.copy_from_slice(&app_message_hash[..20]);
624+
let app_message_hash = compute_app_message_hash(app_message).0;
625+
cursor_app_message_hash.copy_from_slice(&app_message_hash);
623626

624627
let (cursor_app_message_len, cursor) =
625628
cursor.split_at_mut_checked(4).expect("header too short");
@@ -645,12 +648,6 @@ where
645648
}
646649
}
647650

648-
fn calc_full_hash(bytes: &[u8]) -> Hash {
649-
let mut hasher = HasherType::new();
650-
hasher.update(bytes);
651-
hasher.hash()
652-
}
653-
654651
impl AssembleMode {
655652
pub fn expected_chunk_order(self) -> Option<ChunkOrder> {
656653
match self {

monad-raptorcast/src/packet/assigner.rs

Lines changed: 149 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,14 @@
1616
use std::{collections::HashMap, ops::Range};
1717

1818
use bytes::BytesMut;
19-
use monad_crypto::certificate_signature::PubKey;
19+
use monad_crypto::certificate_signature::{
20+
CertificateSignaturePubKey, CertificateSignatureRecoverable, PubKey,
21+
};
2022
use monad_types::{NodeId, Stake};
23+
use rand::{rngs::StdRng, seq::SliceRandom as _, SeedableRng as _};
2124

2225
use super::{BuildError, Chunk, PacketLayout, Recipient, Result};
26+
use crate::util::compute_app_message_hash;
2327

2428
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2529
pub enum ChunkOrder {
@@ -384,6 +388,10 @@ pub(crate) struct Partitioned<PT: PubKey> {
384388
}
385389

386390
impl<PT: PubKey> Partitioned<PT> {
391+
// This assigner is only used for full-node raptorcast, which is
392+
// based on homogeneous peers. RaptorCast between validators has
393+
// switched to StakeBasedWithRC assigner.
394+
#[cfg_attr(not(test), expect(unused))]
387395
pub fn from_validator_set(validator_set: Vec<(NodeId<PT>, Stake)>) -> Self {
388396
let mut total_stake = Stake::ZERO;
389397
let weighted_nodes = validator_set
@@ -456,16 +464,47 @@ pub(crate) struct StakeBasedWithRC<PT: PubKey> {
456464
}
457465

458466
impl<PT: PubKey> StakeBasedWithRC<PT> {
459-
#[cfg_attr(not(test), expect(unused))]
467+
pub fn seed_from_app_message(app_message: &[u8]) -> [u8; 32] {
468+
let mut padded_seed = [0u8; 32];
469+
let app_message_hash = compute_app_message_hash(app_message);
470+
padded_seed[..20].copy_from_slice(&app_message_hash.0);
471+
padded_seed
472+
}
473+
474+
// Shuffle the validator stake map for chunk assignment. This uses
475+
// a deterministic seed, as in the future, it will be required
476+
// that the leader and all validators compute the shuffling in the
477+
// same way (for features not yet implemented). In the future,
478+
// this should be done using known shuffling algorithm to allow
479+
// for easy implementation in other languages, e.g., using Mt19937
480+
// and Fisher Yates shuffle.
481+
pub fn shuffle_validators<ST>(
482+
view: &crate::util::ValidatorsView<ST>,
483+
seed: [u8; 32],
484+
) -> Vec<(NodeId<CertificateSignaturePubKey<ST>>, Stake)>
485+
where
486+
ST: CertificateSignatureRecoverable,
487+
{
488+
let mut validator_set = view
489+
.iter()
490+
.map(|(node_id, stake)| (*node_id, stake))
491+
.collect::<std::collections::BinaryHeap<_>>()
492+
.into_sorted_vec();
493+
let mut rng = StdRng::from_seed(seed);
494+
validator_set.shuffle(&mut rng);
495+
validator_set
496+
}
497+
460498
pub fn from_validator_set(validator_set: Vec<(NodeId<PT>, Stake)>) -> Self {
461499
let mut total_stake = Stake::ZERO;
462-
let validator_set = validator_set
500+
let validator_set: Vec<_> = validator_set
463501
.into_iter()
464502
.map(|(nid, stake)| {
465503
total_stake += stake;
466504
(Recipient::new(nid), stake)
467505
})
468506
.collect();
507+
469508
Self {
470509
validator_set,
471510
total_stake,
@@ -538,15 +577,19 @@ fn split_off_chunks_into<PT: PubKey>(
538577

539578
#[cfg(test)]
540579
mod tests {
541-
use std::{collections::HashMap, ops::Range};
580+
use std::{
581+
collections::{BTreeSet, HashMap},
582+
ops::Range,
583+
};
542584

543585
use alloy_primitives::U256;
544586
use itertools::Itertools as _;
545587
use monad_crypto::certificate_signature::CertificateSignaturePubKey;
546588
use monad_secp::SecpSignature;
547589
use monad_testutil::signing::get_key;
548590
use monad_types::{NodeId, Stake};
549-
use rand::{seq::SliceRandom, Rng as _};
591+
use rand::{seq::SliceRandom, Rng};
592+
use rand_distr::{Distribution, Normal};
550593

551594
use super::{ChunkAssignment, ChunkOrder, Partitioned, StakeBasedWithRC};
552595
use crate::{
@@ -600,12 +643,20 @@ mod tests {
600643
fn rand_validator_set(rng: &mut impl rand::Rng, max_n: usize) -> Vec<(NodeId<PT>, Stake)> {
601644
let n: usize = rng.gen_range(1..max_n);
602645
let mut validator_set = Vec::with_capacity(n);
646+
647+
let mon = U256::from(1_000_000_000_000_000_000u64);
648+
let min_stake = mon * U256::from(100_000); // approximated
649+
let mean = f64::from(min_stake * U256::from(100)); // estimated
650+
let std_dev = f64::from(mon * U256::from(500_000)); // estimated
651+
let stake_distr = Normal::new(mean, std_dev).unwrap();
652+
603653
loop {
604654
let mut total_stake = Stake::ZERO;
605655

606656
for i in 1..=n {
607-
// divide by n to ensure the sum never overflows.
608-
let stake = Stake::from(rng.gen::<U256>() / U256::from(n));
657+
let stake = stake_distr.sample(rng).max(f64::from(min_stake));
658+
let stake = Stake::from(u256_from_f64_lossy(stake));
659+
609660
// NOTE: we don't forbid individual stake to be zero,
610661
// as long as total stake is non-zero. we do this to
611662
// test the robustness of assignment algorithm.
@@ -638,7 +689,7 @@ mod tests {
638689
fn test_replicated_assignment() {
639690
let rng = &mut rand::thread_rng();
640691

641-
for _ in 0..1000 {
692+
for _ in 0..100 {
642693
let node_set = rand_node_set(rng, 2000);
643694
let assigner = Replicated::from_broadcast(node_set);
644695
let num_symbols = rng.gen_range(0..10);
@@ -661,7 +712,7 @@ mod tests {
661712
fn test_partitioned_assignment() {
662713
let rng = &mut rand::thread_rng();
663714

664-
for _ in 0..300 {
715+
for _ in 0..30 {
665716
let validator_set = rand_validator_set(rng, 2000);
666717
let assigner = Partitioned::from_validator_set(validator_set);
667718
let num_symbols = rng.gen_range(0..1000);
@@ -722,4 +773,93 @@ mod tests {
722773
);
723774
}
724775
}
776+
777+
#[test]
778+
fn test_stake_with_rc_properties() {
779+
let rng = &mut rand::thread_rng();
780+
for _ in 0..50 {
781+
let n_validators = rng.gen_range(1..2000);
782+
let validator_set = rand_validator_set(rng, n_validators);
783+
let assigner = Partitioned::from_validator_set(validator_set.clone());
784+
let assigner_rc = StakeBasedWithRC::from_validator_set(validator_set);
785+
786+
// estimated from at most 2MB data, 1400-byte segments, 3x redundancy
787+
let num_symbols = rng.gen_range(1..5000);
788+
let assignment = assigner
789+
.assign_chunks(num_symbols, None)
790+
.expect("should assign successfully");
791+
let assignment_rc = assigner_rc
792+
.assign_chunks(num_symbols, None)
793+
.expect("should assign successfully");
794+
795+
// assignment with rc must produce at least the same number of chunks as without rc
796+
assert!(assignment_rc.total_chunks() >= assignment.total_chunks());
797+
// the difference in total chunks must not exceed number of validators
798+
assert!(assignment_rc.total_chunks() - assignment.total_chunks() <= n_validators);
799+
800+
let chunk_ids: BTreeSet<_> = assignment
801+
.assignments
802+
.iter()
803+
.flat_map(|slice| slice.chunk_id_range.clone())
804+
.collect();
805+
let chunk_ids_rc: BTreeSet<_> = assignment_rc
806+
.assignments
807+
.iter()
808+
.flat_map(|slice| slice.chunk_id_range.clone())
809+
.collect();
810+
811+
// both assignments must be continuous from 0 to total_chunks - 1
812+
assert_eq!(chunk_ids.len(), assignment.total_chunks());
813+
assert_eq!(chunk_ids.first().cloned(), Some(0));
814+
assert_eq!(
815+
chunk_ids.last().cloned(),
816+
Some(assignment.total_chunks() - 1)
817+
);
818+
819+
assert_eq!(chunk_ids_rc.len(), assignment_rc.total_chunks());
820+
assert_eq!(chunk_ids_rc.first().cloned(), Some(0));
821+
assert_eq!(
822+
chunk_ids_rc.last().cloned(),
823+
Some(assignment_rc.total_chunks() - 1)
824+
);
825+
826+
let validator_to_chunks: HashMap<_, _> = assignment
827+
.assignments
828+
.iter()
829+
.map(|slice| (slice.recipient.node_hash(), slice.chunk_id_range.len()))
830+
.collect();
831+
let validator_to_chunks_rc: HashMap<_, _> = assignment_rc
832+
.assignments
833+
.iter()
834+
.map(|slice| (slice.recipient.node_hash(), slice.chunk_id_range.len()))
835+
.collect();
836+
837+
for (validator, num_chunks) in validator_to_chunks {
838+
let num_chunks_rc = validator_to_chunks_rc.get(validator);
839+
// each validator that exist in non-rc assignment must
840+
// also exist in rc assignment
841+
assert!(num_chunks_rc.is_some());
842+
// each validator must get at least as many chunks in rc assignment
843+
assert!(*num_chunks_rc.unwrap() >= num_chunks);
844+
}
845+
}
846+
}
847+
848+
// Ported from alloy_primitives::U256::from_f64_lossy from a newer version
849+
fn u256_from_f64_lossy(value: f64) -> U256 {
850+
if value >= 1.0 {
851+
let bits = value.to_bits();
852+
let exponent = ((bits >> 52) & 0x7ff) - 1023;
853+
let mantissa = (bits & 0x0f_ffff_ffff_ffff) | 0x10_0000_0000_0000;
854+
if exponent <= 52 {
855+
U256::from(mantissa >> (52 - exponent))
856+
} else if exponent >= 256 {
857+
U256::MAX
858+
} else {
859+
U256::from(mantissa) << U256::from(exponent - 52)
860+
}
861+
} else {
862+
U256::ZERO
863+
}
864+
}
725865
}

monad-raptorcast/src/packet/builder.rs

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use monad_crypto::certificate_signature::{
2121
};
2222
use monad_dataplane::udp::DEFAULT_SEGMENT_SIZE;
2323
use monad_types::NodeId;
24-
use rand::{seq::SliceRandom as _, Rng};
24+
use rand::Rng;
2525

2626
use super::{
2727
assembler::{self, build_header, AssembleMode, BroadcastType, PacketLayout},
@@ -34,7 +34,7 @@ use crate::{
3434
MAX_MERKLE_TREE_DEPTH, MAX_NUM_PACKETS, MAX_REDUNDANCY, MAX_SEGMENT_LENGTH,
3535
MIN_CHUNK_LENGTH, MIN_MERKLE_TREE_DEPTH,
3636
},
37-
util::{self, BuildTarget, Redundancy},
37+
util::{unix_ts_ms_now, BuildTarget, Redundancy},
3838
};
3939

4040
pub const DEFAULT_MERKLE_TREE_DEPTH: u8 = 6;
@@ -285,7 +285,7 @@ where
285285
fn unwrap_unix_ts_ms(&self) -> Result<u64> {
286286
let unix_ts_ms = match self.base.unix_ts_ms {
287287
TimestampMode::Fixed(ts) => ts,
288-
TimestampMode::RealTime => util::unix_ts_ms_now(),
288+
TimestampMode::RealTime => unix_ts_ms_now(),
289289
};
290290
Ok(unix_ts_ms)
291291
}
@@ -383,34 +383,41 @@ where
383383
Ok(header_buf)
384384
}
385385

386-
fn choose_assigner(
386+
fn make_assigner(
387387
build_target: &BuildTarget<ST>,
388388
self_node_id: &NodeId<CertificateSignaturePubKey<ST>>,
389+
app_message: &[u8],
389390
rng: &mut impl Rng,
390391
) -> Box<dyn ChunkAssigner<CertificateSignaturePubKey<ST>>>
391392
where
392393
ST: CertificateSignatureRecoverable,
393394
{
395+
use assigner::{Partitioned, Replicated, StakeBasedWithRC};
394396
match build_target {
395-
BuildTarget::PointToPoint(to) => Box::new(assigner::Replicated::from_unicast(**to)),
396-
BuildTarget::Broadcast(nodes) => Box::new(assigner::Replicated::from_broadcast(
397-
nodes.iter().copied().collect(),
398-
)),
397+
BuildTarget::PointToPoint(to) => Box::new(Replicated::from_unicast(**to)),
398+
BuildTarget::Broadcast(nodes) => {
399+
let assigner = Replicated::from_broadcast(nodes.iter().copied().collect());
400+
Box::new(assigner)
401+
}
399402
BuildTarget::Raptorcast(validators) => {
400-
let mut validator_set: Vec<_> = validators
401-
.iter()
402-
.map(|(node_id, stake)| (*node_id, stake))
403-
.collect();
404-
validator_set.shuffle(rng);
405-
Box::new(assigner::Partitioned::from_validator_set(validator_set))
403+
let seed =
404+
StakeBasedWithRC::<CertificateSignaturePubKey<ST>>::seed_from_app_message(
405+
app_message,
406+
);
407+
let sorted_validators =
408+
StakeBasedWithRC::<CertificateSignaturePubKey<ST>>::shuffle_validators::<ST>(
409+
validators, seed,
410+
);
411+
let assigner = StakeBasedWithRC::from_validator_set(sorted_validators);
412+
Box::new(assigner)
406413
}
407414
BuildTarget::FullNodeRaptorCast(group) => {
408415
let seed = rng.gen::<usize>();
409416
let nodes = group
410417
.iter_skip_self_and_author(self_node_id, seed)
411418
.copied()
412419
.collect();
413-
Box::new(assigner::Partitioned::from_homogeneous_peers(nodes))
420+
Box::new(Partitioned::from_homogeneous_peers(nodes))
414421
}
415422
}
416423
}
@@ -433,7 +440,7 @@ where
433440
// select chunk assignment algorithm based on build target
434441
let rng = &mut rand::thread_rng();
435442
let self_node_id = NodeId::new(self.base.key.as_ref().pubkey());
436-
let assigner = Self::choose_assigner(build_target, &self_node_id, rng);
443+
let assigner = Self::make_assigner(build_target, &self_node_id, app_message, rng);
437444

438445
// calculate the number of symbols needed for assignment
439446
let app_message_len = self.checked_message_len(app_message.len())?;

0 commit comments

Comments
 (0)