Skip to content

Commit 27f1a77

Browse files
group id enum
1 parent 9dd6943 commit 27f1a77

File tree

12 files changed

+116
-118
lines changed

12 files changed

+116
-118
lines changed

monad-raptorcast/benches/encode_bench.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@ use monad_crypto::certificate_signature::{CertificateSignature, CertificateSigna
2222
use monad_dataplane::udp::DEFAULT_SEGMENT_SIZE;
2323
use monad_raptorcast::{
2424
packet,
25+
udp::GroupId,
2526
util::{BuildTarget, EpochValidators, Redundancy},
2627
};
2728
use monad_secp::SecpSignature;
2829
use monad_testutil::signing::get_key;
29-
use monad_types::{NodeId, Stake};
30+
use monad_types::{Epoch, NodeId, Stake};
3031

3132
const NUM_NODES: usize = 100;
3233

@@ -63,8 +64,8 @@ pub fn bench_build_messages(c: &mut Criterion, name: &str, message_size: usize,
6364
DEFAULT_SEGMENT_SIZE, // segment_size
6465
message.clone(),
6566
Redundancy::from_u8(2),
66-
0, // epoch_no
67-
0, // unix_ts_ms
67+
GroupId::Primary(Epoch(0)), // epoch_no
68+
0, // unix_ts_ms
6869
build_target.clone(),
6970
&known_addrs,
7071
);

monad-raptorcast/benches/raptor_bench.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ use monad_dataplane::udp::DEFAULT_SEGMENT_SIZE;
2424
use monad_raptor::ManagedDecoder;
2525
use monad_raptorcast::{
2626
packet::build_messages,
27-
udp::{parse_message, MAX_REDUNDANCY, SIGNATURE_CACHE_SIZE},
27+
udp::{parse_message, GroupId, MAX_REDUNDANCY, SIGNATURE_CACHE_SIZE},
2828
util::{BuildTarget, EpochValidators, Redundancy},
2929
};
3030
use monad_secp::{KeyPair, SecpSignature};
31-
use monad_types::{NodeId, Stake};
31+
use monad_types::{Epoch, NodeId, Stake};
3232

3333
#[allow(clippy::useless_vec)]
3434
pub fn criterion_benchmark(c: &mut Criterion) {
@@ -71,8 +71,8 @@ pub fn criterion_benchmark(c: &mut Criterion) {
7171
DEFAULT_SEGMENT_SIZE, // segment_size
7272
message.clone(),
7373
Redundancy::from_u8(2),
74-
0, // epoch_no
75-
0, // unix_ts_ms
74+
GroupId::Primary(Epoch(0)), // epoch_no
75+
0, // unix_ts_ms
7676
BuildTarget::Raptorcast(epoch_validators),
7777
&known_addresses,
7878
);
@@ -112,8 +112,8 @@ pub fn criterion_benchmark(c: &mut Criterion) {
112112
DEFAULT_SEGMENT_SIZE, // segment_size
113113
message.clone(),
114114
Redundancy::from_u8(2),
115-
0, // epoch_no
116-
0, // unix_ts_ms
115+
GroupId::Primary(Epoch(0)), // epoch_no
116+
0, // unix_ts_ms
117117
BuildTarget::Raptorcast(epoch_validators),
118118
&known_addresses,
119119
)

monad-raptorcast/src/decoding.rs

Lines changed: 18 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -436,19 +436,13 @@ pub struct PruneConfig {
436436
pub struct DecodingContext<'a, PT: PubKey> {
437437
validator_set: Option<&'a ValidatorSet<PT>>,
438438
unix_ts_now: UnixTimestamp,
439-
current_epoch: Epoch,
440439
}
441440

442441
impl<'a, PT: PubKey> DecodingContext<'a, PT> {
443-
pub fn new(
444-
validator_set: Option<&'a ValidatorSet<PT>>,
445-
unix_ts_now: UnixTimestamp,
446-
current_epoch: Epoch,
447-
) -> Self {
442+
pub fn new(validator_set: Option<&'a ValidatorSet<PT>>, unix_ts_now: UnixTimestamp) -> Self {
448443
Self {
449444
validator_set,
450445
unix_ts_now,
451-
current_epoch,
452446
}
453447
}
454448
}
@@ -806,15 +800,11 @@ impl<PT: PubKey> AuthorIndex<PT> {
806800
.prune_config
807801
.max_unix_ts_ms_delta
808802
.and_then(|delta| context.unix_ts_now.checked_sub(delta));
809-
let epoch_threshold: Option<Epoch> = self
810-
.prune_config
811-
.max_epoch_delta
812-
.and_then(|delta| context.current_epoch.checked_sub(delta));
813803

814804
let mut evicted_keys = PrunedKeys::empty();
815805

816806
// we first try only pruning expired keys
817-
let expired_keys = author_index.prune_expired(unix_ts_threshold, epoch_threshold);
807+
let expired_keys = author_index.prune_expired(unix_ts_threshold);
818808
evicted_keys.extend(expired_keys);
819809

820810
// if still over quota, compact the cache to fit under quota
@@ -847,18 +837,14 @@ impl<PT: PubKey> AuthorIndex<PT> {
847837
.prune_config
848838
.max_unix_ts_ms_delta
849839
.and_then(|delta| context.unix_ts_now.checked_sub(delta));
850-
let epoch_threshold: Option<Epoch> = self
851-
.prune_config
852-
.max_epoch_delta
853-
.and_then(|delta| context.current_epoch.checked_sub(delta));
854840

855841
let mut authors_to_drop = vec![];
856842
let mut total_slots = 0;
857843

858844
let mut pruned_keys = PrunedKeys::empty();
859845
for (author, author_index) in &mut self.per_author_index {
860846
total_slots += author_index.len();
861-
pruned_keys.extend(author_index.prune_expired(unix_ts_threshold, epoch_threshold));
847+
pruned_keys.extend(author_index.prune_expired(unix_ts_threshold));
862848

863849
if author_index.is_empty() {
864850
authors_to_drop.push(*author);
@@ -1053,11 +1039,7 @@ impl PerAuthorIndex {
10531039
}
10541040

10551041
// Remove expired entries.
1056-
pub fn prune_expired(
1057-
&mut self,
1058-
unix_ts_threshold: Option<UnixTimestamp>,
1059-
epoch_threshold: Option<Epoch>,
1060-
) -> PrunedKeys {
1042+
pub fn prune_expired(&mut self, unix_ts_threshold: Option<UnixTimestamp>) -> PrunedKeys {
10611043
let mut evicted_keys = PrunedKeys::empty();
10621044
// first, we prune all expired keys
10631045
if let Some(threshold) = unix_ts_threshold {
@@ -1551,7 +1533,7 @@ mod test {
15511533
use rand::seq::SliceRandom as _;
15521534

15531535
use super::*;
1554-
use crate::util::BroadcastMode;
1536+
use crate::{udp::GroupId, util::BroadcastMode};
15551537
type PT = monad_crypto::NopPubKey;
15561538

15571539
const EPOCH: Epoch = Epoch(1);
@@ -1620,7 +1602,7 @@ mod test {
16201602
// these fields are never touched in this module
16211603
recipient_hash: HexBytes([0; 20]),
16221604
message: Bytes::new(),
1623-
group_id: EPOCH.0,
1605+
group_id: GroupId::Primary(EPOCH),
16241606
unix_ts_ms,
16251607
};
16261608
messages.push(message);
@@ -1633,7 +1615,7 @@ mod test {
16331615
let app_message = Bytes::from(vec![1u8; APP_MESSAGE_LEN]);
16341616
let author = node_id(0);
16351617
let symbols = make_symbols(&app_message, author, UNIX_TS_MS);
1636-
let context = DecodingContext::new(None, UNIX_TS_MS, EPOCH);
1618+
let context = DecodingContext::new(None, UNIX_TS_MS);
16371619

16381620
for n in 0..MIN_DECODABLE_SYMBOLS {
16391621
let mut cache = make_cache(10, 10, 10);
@@ -1710,7 +1692,7 @@ mod test {
17101692
// single slot per tier is enough
17111693
let mut cache = make_cache(1, 1, 1);
17121694

1713-
let context = DecodingContext::new(Some(&validator_set), UNIX_TS_MS, EPOCH);
1695+
let context = DecodingContext::new(Some(&validator_set), UNIX_TS_MS);
17141696
let res = try_decode_all(&mut cache, &context, all_symbols.iter())
17151697
.expect("Decoding should succeed");
17161698

@@ -1723,7 +1705,7 @@ mod test {
17231705
let app_message = Bytes::from(vec![1u8; APP_MESSAGE_LEN]);
17241706
let author = node_id(0);
17251707
let symbols = make_symbols(&app_message, author, UNIX_TS_MS);
1726-
let context = DecodingContext::new(None, UNIX_TS_MS, EPOCH);
1708+
let context = DecodingContext::new(None, UNIX_TS_MS);
17271709
let mut cache = make_cache(10, 10, 10);
17281710

17291711
// Decode a message completely.
@@ -1746,7 +1728,7 @@ mod test {
17461728
let app_message = Bytes::from(vec![1u8; APP_MESSAGE_LEN]);
17471729
let author = node_id(0);
17481730
let symbols = make_symbols(&app_message, author, old_ts);
1749-
let context = DecodingContext::new(None, old_ts, EPOCH);
1731+
let context = DecodingContext::new(None, old_ts);
17501732

17511733
// Insert an old message.
17521734
let _ = cache.try_decode(&symbols[0], &context);
@@ -1757,7 +1739,7 @@ mod test {
17571739
let new_app_message = Bytes::from(vec![2u8; APP_MESSAGE_LEN]);
17581740
let new_author = node_id(i);
17591741
let new_symbols = make_symbols(&new_app_message, new_author, UNIX_TS_MS);
1760-
let new_context = DecodingContext::new(None, UNIX_TS_MS, EPOCH);
1742+
let new_context = DecodingContext::new(None, UNIX_TS_MS);
17611743
let _ = cache.try_decode(&new_symbols[0], &new_context);
17621744
assert!(cache.consistency_breaches().is_empty());
17631745
}
@@ -1801,7 +1783,7 @@ mod test {
18011783
config.validator_tier.min_slots_per_validator = Some(2);
18021784

18031785
let mut cache = DecoderCache::new(config);
1804-
let context = DecodingContext::new(Some(&validator_set), UNIX_TS_MS, EPOCH);
1786+
let context = DecodingContext::new(Some(&validator_set), UNIX_TS_MS);
18051787
let res = try_decode_all(&mut cache, &context, all_symbols_part_1.iter())
18061788
.expect("Decoding should succeed");
18071789
assert!(cache.consistency_breaches().is_empty());
@@ -1883,7 +1865,7 @@ mod test {
18831865
}
18841866

18851867
let mut cache = DecoderCache::new(config);
1886-
let context = DecodingContext::new(Some(&validator_set), UNIX_TS_MS, EPOCH);
1868+
let context = DecodingContext::new(Some(&validator_set), UNIX_TS_MS);
18871869
let res = try_decode_all(&mut cache, &context, all_symbols_part_1.iter())
18881870
.expect("Decoding should succeed");
18891871
assert!(cache.consistency_breaches().is_empty());
@@ -1930,7 +1912,7 @@ mod test {
19301912
config.p2p_tier.min_slots_per_author = 2; // each author gets at least 2 slots
19311913

19321914
let mut cache = DecoderCache::new(config);
1933-
let context = DecodingContext::new(None, UNIX_TS_MS, EPOCH);
1915+
let context = DecodingContext::new(None, UNIX_TS_MS);
19341916
let res = try_decode_all(&mut cache, &context, all_symbols_part_1.iter())
19351917
.expect("Decoding should succeed");
19361918
assert!(cache.consistency_breaches().is_empty());
@@ -1996,7 +1978,7 @@ mod test {
19961978
}
19971979

19981980
let mut cache = DecoderCache::new(config);
1999-
let context = DecodingContext::new(None, UNIX_TS_MS, EPOCH);
1981+
let context = DecodingContext::new(None, UNIX_TS_MS);
20001982
let res = try_decode_all(&mut cache, &context, all_symbols_part_1.iter())
20011983
.expect("Decoding should succeed");
20021984
assert!(cache.consistency_breaches().is_empty());
@@ -2022,7 +2004,7 @@ mod test {
20222004
let app_message = Bytes::from(vec![1u8; APP_MESSAGE_LEN]);
20232005
let author = node_id(0);
20242006
let symbols = make_symbols(&app_message, author, UNIX_TS_MS);
2025-
let context = DecodingContext::new(None, UNIX_TS_MS, EPOCH);
2007+
let context = DecodingContext::new(None, UNIX_TS_MS);
20262008

20272009
// Insert a valid symbol first.
20282010
let _ = cache.try_decode(&symbols[0], &context);
@@ -2056,7 +2038,7 @@ mod test {
20562038
config.p2p_tier.min_slots_per_author = 2;
20572039

20582040
let mut cache = DecoderCache::new(config);
2059-
let context = DecodingContext::new(None, UNIX_TS_MS, EPOCH);
2041+
let context = DecodingContext::new(None, UNIX_TS_MS);
20602042

20612043
// Fill the cache.
20622044
let app_message0 = Bytes::from(vec![0u8; APP_MESSAGE_LEN]);
@@ -2095,7 +2077,7 @@ mod test {
20952077
config.p2p_tier.min_slots_per_author = 5;
20962078

20972079
let mut cache = DecoderCache::new(config);
2098-
let context = DecodingContext::new(None, UNIX_TS_MS, EPOCH);
2080+
let context = DecodingContext::new(None, UNIX_TS_MS);
20992081

21002082
// take a single symbol for a given message
21012083
let partial_symbol = |msg: u8, ts: UnixTimestamp| {

monad-raptorcast/src/lib.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ use monad_validator::signature_collection::SignatureCollection;
5858
use raptorcast_secondary::{group_message::FullNodesGroupMessage, SecondaryRaptorCastModeConfig};
5959
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
6060
use tracing::{debug, debug_span, error, trace, warn};
61+
use udp::GroupId;
6162
use util::{BuildTarget, EpochValidators, FullNodes, Group, ReBroadcastGroupMap, Redundancy};
6263

6364
pub mod config;
@@ -152,7 +153,7 @@ where
152153
let message_builder =
153154
OwnedMessageBuilder::new(config.shared_key.clone(), peer_discovery_driver.clone())
154155
.segment_size(segment_size_for_mtu(config.mtu))
155-
.group_id(current_epoch)
156+
.group_id(GroupId::Primary(current_epoch))
156157
.redundancy(redundancy);
157158

158159
Self {
@@ -323,7 +324,7 @@ where
323324
if let Some(rc_chunks) = self
324325
.message_builder
325326
.prepare()
326-
.group_id(epoch)
327+
.group_id(GroupId::Primary(epoch))
327328
.build_unicast_msg(&outbound_message, &build_target)
328329
{
329330
self.dataplane_writer
@@ -473,7 +474,7 @@ where
473474
}
474475

475476
self.current_epoch = epoch;
476-
self.message_builder.set_group_id(epoch);
477+
self.message_builder.set_group_id(GroupId::Primary(epoch));
477478

478479
while let Some(entry) = self.epoch_validators.first_entry() {
479480
if *entry.key() + Epoch(1) < self.current_epoch {
@@ -577,7 +578,7 @@ where
577578
if let Some(rc_chunks) = self
578579
.message_builder
579580
.prepare_with_peer_lookup(&node_addrs)
580-
.group_id(epoch)
581+
.group_id(GroupId::Primary(epoch))
581582
.build_unicast_msg(&outbound_message, &build_target)
582583
{
583584
self.dataplane_writer.udp_write_unicast(rc_chunks);
@@ -709,7 +710,6 @@ where
709710
let decoded_app_messages = {
710711
// FIXME: pass dataplane as arg to handle_message
711712
this.udp_state.handle_message(
712-
this.current_epoch,
713713
&this.rebroadcast_map, // contains the NodeIds for all the RC participants for each epoch
714714
&this.epoch_validators,
715715
|targets, payload, bcast_stride| {

monad-raptorcast/src/packet/assembler.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use super::{
3030
assigner::{ChunkAssignment, ChunkOrder},
3131
BuildError, Collector, PeerAddrLookup, Result, UdpMessage,
3232
};
33-
use crate::{util::Redundancy, SIGNATURE_SIZE};
33+
use crate::{udp::GroupId, util::Redundancy, SIGNATURE_SIZE};
3434

3535
#[derive(Default, Clone, Copy, PartialEq, Eq)]
3636
pub enum AssembleMode {
@@ -577,7 +577,7 @@ pub(crate) fn build_header(
577577
version: u16,
578578
broadcast_type: BroadcastType,
579579
merkle_tree_depth: u8,
580-
group_id: u64,
580+
group_id: GroupId,
581581
unix_ts_ms: u64,
582582
app_message: &[u8],
583583
) -> Result<Bytes> {
@@ -610,8 +610,9 @@ pub(crate) fn build_header(
610610
broadcast_byte |= merkle_tree_depth & 0b0000_1111;
611611
cursor_broadcast_merkle_depth[0] = broadcast_byte;
612612

613-
let (cursor_epoch_no, cursor) = cursor.split_at_mut_checked(8).expect("header too short");
614-
cursor_epoch_no.copy_from_slice(&group_id.to_le_bytes());
613+
let group_id: u64 = group_id.into();
614+
let (cursor_group_id, cursor) = cursor.split_at_mut_checked(8).expect("header too short");
615+
cursor_group_id.copy_from_slice(&group_id.to_le_bytes());
615616

616617
let (cursor_unix_ts_ms, cursor) = cursor.split_at_mut_checked(8).expect("header too short");
617618
cursor_unix_ts_ms.copy_from_slice(&unix_ts_ms.to_le_bytes());

0 commit comments

Comments
 (0)