Skip to content

Commit

Permalink
refactor(collator): refactor QueueDiff partition router storage
Browse files Browse the repository at this point in the history
  • Loading branch information
drmick authored and Rexagon committed Jan 23, 2025
1 parent c42c68c commit 8061ff1
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 72 deletions.
86 changes: 58 additions & 28 deletions block-util/src/queue/proto.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Debug;

use anyhow::bail;
Expand Down Expand Up @@ -33,7 +33,7 @@ pub struct QueueDiff {
/// List of message hashes (sorted ASC).
pub messages: Vec<HashBytes>,
/// Partition router
pub partition_router: BTreeMap<RouterDirection, BTreeMap<RouterAddr, QueuePartition>>,
pub partition_router: BTreeMap<RouterDirection, BTreeMap<QueuePartition, BTreeSet<RouterAddr>>>,
}

impl QueueDiff {
Expand Down Expand Up @@ -389,37 +389,46 @@ mod partition_router_list {
use super::*;

const MAX_DIRECTIONS: usize = 2;
const MAX_ENTRIES_PER_DIRECTION: usize = 1_000_000;
const MAX_PARTITIONS_PER_DIRECTION: QueuePartition = 255;
const MAX_ADDRS_PER_PARTITION: usize = 1_000_000;

pub fn size_hint(
items: &BTreeMap<RouterDirection, BTreeMap<RouterAddr, QueuePartition>>,
items: &BTreeMap<RouterDirection, BTreeMap<QueuePartition, BTreeSet<RouterAddr>>>,
) -> usize {
let mut size = 4;
for (direction, map) in items {
for (direction, partitions) in items {
size += direction.max_size_hint();
size += 4;
size += map.len() * (RouterAddr::SIZE_HINT + 1);
size += 4; // partitions count
for addrs in partitions.values() {
size += 1; // partition u8
size += 4; // addresses count
size += addrs.len() * RouterAddr::SIZE_HINT;
}
}
size
}

pub fn write<P: TlPacket>(
items: &BTreeMap<RouterDirection, BTreeMap<RouterAddr, QueuePartition>>,
items: &BTreeMap<RouterDirection, BTreeMap<QueuePartition, BTreeSet<RouterAddr>>>,
packet: &mut P,
) {
packet.write_u32(items.len() as u32);
for (direction, map) in items {
for (direction, partitions) in items {
direction.write_to(packet);
packet.write_u32(map.len() as u32);
for (addr, partition) in map {
addr.write_to(packet);
packet.write_raw_slice(&partition.to_le_bytes());
packet.write_u32(partitions.len() as u32);
for (partition, addrs) in partitions {
packet.write_raw_slice(&[*partition]);
packet.write_u32(addrs.len() as u32);
for addr in addrs {
addr.write_to(packet);
}
}
}
}

pub fn read(
data: &mut &[u8],
) -> TlResult<BTreeMap<RouterDirection, BTreeMap<RouterAddr, QueuePartition>>> {
) -> TlResult<BTreeMap<RouterDirection, BTreeMap<QueuePartition, BTreeSet<RouterAddr>>>> {
let len = u32::read_from(data)? as usize;
if len > MAX_DIRECTIONS {
return Err(tl_proto::TlError::InvalidData);
Expand All @@ -430,21 +439,31 @@ mod partition_router_list {
for _ in 0..len {
let direction = RouterDirection::read_from(data)?;

let inner_len = u32::read_from(data)? as usize;
if inner_len > MAX_ENTRIES_PER_DIRECTION {
let partitions_len = u32::read_from(data)? as usize;
if partitions_len > MAX_PARTITIONS_PER_DIRECTION as usize {
return Err(tl_proto::TlError::InvalidData);
}

let mut map = BTreeMap::new();
for _ in 0..inner_len {
let addr = RouterAddr::read_from(data)?;
let partition_byte = data[0];
let mut partitions = BTreeMap::new();
for _ in 0..partitions_len {
let partition = data[0];
*data = &data[1..];

map.insert(addr, partition_byte);
let addrs_len = u32::read_from(data)? as usize;
if addrs_len > MAX_ADDRS_PER_PARTITION {
return Err(tl_proto::TlError::InvalidData);
}

let mut addrs = BTreeSet::new();
for _ in 0..addrs_len {
let addr = RouterAddr::read_from(data)?;
addrs.insert(addr);
}

partitions.insert(partition, addrs);
}

result.insert(direction, map);
result.insert(direction, partitions);
}

Ok(result)
Expand Down Expand Up @@ -622,9 +641,15 @@ mod tests {
account: HashBytes::from([0x01; 32]),
};

partition_router.insert(addr1, 1);
let addr2 = RouterAddr {
workchain: 1,
account: HashBytes::from([0x02; 32]),
};

let mut partition_map = BTreeMap::new();
partition_map.insert(1, BTreeSet::from([addr1, addr2]));

let partition_router = BTreeMap::from([(RouterDirection::Dest, partition_router)]);
partition_router.insert(RouterDirection::Dest, partition_map);

let mut diff = QueueDiff {
hash: HashBytes::ZERO, // NOTE: Uninitialized
Expand Down Expand Up @@ -671,18 +696,23 @@ mod tests {
fn queue_state_binary_repr() {
let mut queue_diffs = Vec::<QueueDiff>::new();
for seqno in 1..=10 {
let prev_hash = queue_diffs.last().map(|diff| diff.hash).unwrap_or_default();

let mut partition_router = BTreeMap::new();

let addr1 = RouterAddr {
workchain: 0,
account: HashBytes::from([0x01; 32]),
};

let addr2 = RouterAddr {
workchain: 1,
account: HashBytes::from([0x02; 32]),
};

partition_router.insert(addr1, 1);
let mut partition_map = BTreeMap::new();
partition_map.insert(1, BTreeSet::from([addr1, addr2]));

let partition_router = BTreeMap::from([(RouterDirection::Dest, partition_router)]);
partition_router.insert(RouterDirection::Dest, partition_map);
let prev_hash = queue_diffs.last().map(|diff| diff.hash).unwrap_or_default();

let mut diff = QueueDiff {
hash: HashBytes::ZERO, // NOTE: Uninitialized
Expand Down
7 changes: 5 additions & 2 deletions collator/src/collator/tests/execution_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl<V: InternalMessageValue + Default> MessageQueueAdapter<V> for MessageQueueA
unimplemented!()
}

fn commit_diff(&self, mc_top_blocks: Vec<(BlockIdShort, bool)>) -> Result<()> {
fn commit_diff(&self, _mc_top_blocks: Vec<(BlockIdShort, bool)>) -> Result<()> {
unimplemented!()
}

Expand Down Expand Up @@ -141,7 +141,10 @@ impl<V: InternalMessageValue + Default> MessageQueueAdapter<V> for MessageQueueA
unimplemented!()
}

fn get_diffs(&self, blocks: FastHashMap<ShardIdent, u32>) -> Vec<(ShardIdent, ShortQueueDiff)> {
fn get_diffs(
&self,
_blocks: FastHashMap<ShardIdent, u32>,
) -> Vec<(ShardIdent, ShortQueueDiff)> {
todo!()
}

Expand Down
92 changes: 81 additions & 11 deletions collator/src/internal_queue/types.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::cmp::{Ordering, Reverse};
use std::collections::{hash_map, BTreeMap, BinaryHeap};
use std::collections::{hash_map, BTreeMap, BTreeSet, BinaryHeap};
use std::sync::Arc;

use anyhow::{bail, Context, Result};
Expand Down Expand Up @@ -74,21 +74,31 @@ impl PartitionRouter {
}
}

impl From<BTreeMap<RouterDirection, BTreeMap<RouterAddr, QueuePartition>>> for PartitionRouter {
fn from(value: BTreeMap<RouterDirection, BTreeMap<RouterAddr, QueuePartition>>) -> Self {
impl From<BTreeMap<RouterDirection, BTreeMap<QueuePartition, BTreeSet<RouterAddr>>>>
for PartitionRouter
{
fn from(
value: BTreeMap<RouterDirection, BTreeMap<QueuePartition, BTreeSet<RouterAddr>>>,
) -> Self {
let mut router = FastHashMap::default();
let mut partitions = FastHashSet::default();

let mut full_partitions = FastHashSet::default();

for (direction, direction_router) in value {
partitions.extend(direction_router.values().cloned());
let r = direction_router
.into_iter()
.map(|(addr, partition)| (addr.to_int_addr(), partition))
.collect();
router.insert(direction, r);
let mut partitions = FastHashMap::default();
for (partition, addresses) in direction_router {
for address in &addresses {
partitions.insert(address.to_int_addr(), partition);
full_partitions.insert(partition);
}
}
router.insert(direction, partitions);
}

Self { router, partitions }
Self {
router,
partitions: full_partitions,
}
}
}

Expand Down Expand Up @@ -420,3 +430,63 @@ impl<V: InternalMessageValue> From<(&QueueDiffWithMessages<V>, ShardIdent)> for
}
}
}

#[cfg(test)]
mod tests {
use std::collections::{BTreeMap, BTreeSet};

use super::*;

#[test]
fn test_partition_router_from_btreemap() {
let addr1 = RouterAddr {
workchain: 0,
account: HashBytes([0x01; 32]),
};
let addr2 = RouterAddr {
workchain: 0,
account: HashBytes([0x02; 32]),
};
let addr3 = RouterAddr {
workchain: 1,
account: HashBytes([0x03; 32]),
};
let addr4 = RouterAddr {
workchain: 1,
account: HashBytes([0x04; 32]),
};

let mut dest_map = BTreeMap::new();
dest_map.insert(1, BTreeSet::from([addr1, addr2]));
dest_map.insert(2, BTreeSet::from([addr3]));

let mut src_map = BTreeMap::new();
src_map.insert(10, BTreeSet::from([addr4]));

let mut router_data = BTreeMap::new();
router_data.insert(RouterDirection::Dest, dest_map);
router_data.insert(RouterDirection::Src, src_map);

let partition_router = PartitionRouter::from(router_data);

{
let expected_partitions = [1, 2, 10].into_iter().collect::<FastHashSet<_>>();
assert_eq!(partition_router.partitions(), &expected_partitions);
}

{
// Dest
let dest_router = partition_router.router.get(&RouterDirection::Dest).unwrap();
// addr1 и addr2 -> partition 1
assert_eq!(*dest_router.get(&addr1.to_int_addr()).unwrap(), 1);
assert_eq!(*dest_router.get(&addr2.to_int_addr()).unwrap(), 1);
// addr3 -> partition 2
assert_eq!(*dest_router.get(&addr3.to_int_addr()).unwrap(), 2);

// Src
let src_router = partition_router.router.get(&RouterDirection::Src).unwrap();
// addr4 -> partition 10
assert_eq!(*src_router.get(&addr4.to_int_addr()).unwrap(), 10);
}
}
}
25 changes: 10 additions & 15 deletions collator/tests/internal_queue.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -967,25 +967,19 @@ fn test_queue_diff_with_messages_from_queue_diff_stuff() -> anyhow::Result<()> {
let mut messages = vec![message1_hash, message2_hash, message3_hash];
messages.sort_unstable();

let mut direction_router = BTreeMap::default();
let addr1 = RouterAddr::try_from(IntAddr::Std(StdAddr::new(0, HashBytes::ZERO)))?;
let addr2 = RouterAddr::try_from(IntAddr::Std(StdAddr::new(0, HashBytes::ZERO)))?;
let addr3 = RouterAddr::try_from(IntAddr::Std(StdAddr::new(0, HashBytes::ZERO)))?;

direction_router.insert(
RouterAddr::try_from(IntAddr::Std(StdAddr::new(0, HashBytes::ZERO)))?,
1,
);
direction_router.insert(
RouterAddr::try_from(IntAddr::Std(StdAddr::new(0, HashBytes::ZERO)))?,
2,
);
direction_router.insert(
RouterAddr::try_from(IntAddr::Std(StdAddr::new(0, HashBytes::ZERO)))?,
3,
);
let mut direction_router = BTreeMap::new();
direction_router.insert(1, BTreeSet::from([addr1]));
direction_router.insert(2, BTreeSet::from([addr2]));
direction_router.insert(3, BTreeSet::from([addr3]));

let mut partition_router = BTreeMap::new();

partition_router.insert(RouterDirection::Dest, direction_router);

// И теперь можно создать QueueDiff:
let diff = QueueDiff {
hash: HashBytes::ZERO,
prev_hash: HashBytes::from([0x33; 32]),
Expand All @@ -1012,6 +1006,7 @@ fn test_queue_diff_with_messages_from_queue_diff_stuff() -> anyhow::Result<()> {
messages,
partition_router,
};

let data = tl_proto::serialize(&diff);

let block_id = BlockId {
Expand Down
9 changes: 5 additions & 4 deletions network/src/network/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::net::{SocketAddr, ToSocketAddrs};
use std::path::Path;
use std::sync::{Arc, Weak};

use anyhow::{Context, Result};
#[cfg(target_os = "linux")]
use anyhow::Context;
use anyhow::Result;
use everscale_crypto::ed25519;
use tokio::sync::{broadcast, mpsc, oneshot};

Expand Down Expand Up @@ -409,7 +410,7 @@ impl MaxBufferSize {
let proc_path = std::env::var("MOCK_PROC_PATH").unwrap_or_else(|_| "/proc".to_string());
#[cfg(not(any(feature = "test", test)))]
let proc_path = "/proc";
let proc_path = Path::new(&proc_path).join("sys/net/core");
let proc_path = std::path::Path::new(&proc_path).join("sys/net/core");

let read_and_parse = |file_name: &str| -> Result<Option<usize>> {
let path = proc_path.join(file_name);
Expand Down Expand Up @@ -658,7 +659,7 @@ mod tests {

#[test]
fn socket_size_works() {
if Path::new("/proc").exists() {
if std::path::Path::new("/proc").exists() {
let socket_size = MaxBufferSize::read()
.unwrap()
.expect("socket size not found");
Expand Down
Loading

0 comments on commit 8061ff1

Please sign in to comment.