Skip to content

Commit

Permalink
feature(collator): commit statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
drmick authored and Rexagon committed Jan 23, 2025
1 parent d2b8d99 commit 07503b4
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 101 deletions.
6 changes: 6 additions & 0 deletions block-util/src/queue/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ impl From<u8> for QueuePartition {
}
}

impl QueuePartition {
pub const fn all() -> [QueuePartition; 2] {
[QueuePartition::NormalPriority, QueuePartition::LowPriority]
}
}

impl QueueKey {
const SIZE_HINT: usize = 8 + 32;

Expand Down
14 changes: 11 additions & 3 deletions collator/src/internal_queue/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,8 @@ where

fn commit_diff(&self, mc_top_blocks: &[(BlockIdShort, bool)]) -> Result<()> {
let mut shards_to_commit = FastHashMap::default();
#[cfg(FALSE)]
let mut gc_ranges = FastHashMap::default();

#[cfg(FALSE)]
for (block_id_short, top_shard_block_changed) in mc_top_blocks {
let mut diffs_to_commit = vec![];

Expand Down Expand Up @@ -301,7 +299,17 @@ where
}
}

self.uncommitted_state.commit_messages(&shards_to_commit)?;
let commit_ranges: Vec<QueueShardRange> = shards_to_commit
.into_iter()
.map(|(shard_ident, end_key)| QueueShardRange {
shard_ident,
from: QueueKey::default(),
to: end_key,
})
.collect();

self.uncommitted_state
.commit(QueuePartition::all().as_slice(), commit_ranges.as_slice())?;

let uncommitted_diffs_count: usize =
self.uncommitted_diffs.iter().map(|r| r.value().len()).sum();
Expand Down
46 changes: 27 additions & 19 deletions collator/src/internal_queue/state/uncommitted_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use everscale_types::models::{IntAddr, ShardIdent};
use everscale_types::prelude::Boc;
use serde::Serialize;
use tycho_block_util::queue::{QueueKey, QueuePartition};
use tycho_storage::model::StatKey;
use tycho_storage::model::{QueueRange, StatKey};
use tycho_storage::Storage;
use tycho_util::{FastHashMap, FastHashSet};
use weedb::rocksdb::WriteBatch;
Expand All @@ -18,7 +18,7 @@ use crate::internal_queue::state::state_iterator::{
ShardIteratorWithRange, StateIterator, StateIteratorImpl,
};
use crate::internal_queue::types::{
DiffStatistics, InternalMessageValue, QueueRange, QueueShardRange, QueueStatistics,
DiffStatistics, InternalMessageValue, QueueShardRange, QueueStatistics,
};

// CONFIG
Expand Down Expand Up @@ -70,14 +70,6 @@ pub trait UncommittedStateFactory<V: InternalMessageValue> {

#[trait_variant::make(UncommittedState: Send)]
pub trait LocalUncommittedState<V: InternalMessageValue> {
fn add_messages_with_statistics(
&self,
source: ShardIdent,
partition_router: &FastHashMap<IntAddr, QueuePartition>,
messages: &BTreeMap<QueueKey, Arc<V>>,
statistics: DiffStatistics,
) -> Result<()>;

fn iterator(
&self,
snapshot: &OwnedSnapshot,
Expand All @@ -86,8 +78,17 @@ pub trait LocalUncommittedState<V: InternalMessageValue> {
ranges: Vec<QueueShardRange>,
) -> Result<Box<dyn StateIterator<V>>>;

fn commit_messages(&self, ranges: &FastHashMap<ShardIdent, QueueKey>) -> Result<()>;
fn commit(&self, partitions: &[QueuePartition], ranges: &[QueueShardRange]) -> Result<()>;
fn truncate(&self) -> Result<()>;

fn add_messages_with_statistics(
&self,
source: ShardIdent,
partition_router: &FastHashMap<IntAddr, QueuePartition>,
messages: &BTreeMap<QueueKey, Arc<V>>,
statistics: DiffStatistics,
) -> Result<()>;

fn load_statistics(
&self,
result: &mut FastHashMap<IntAddr, u64>,
Expand Down Expand Up @@ -148,14 +149,6 @@ impl<V: InternalMessageValue> UncommittedState<V> for UncommittedStateStdImpl {
Ok(Box::new(iterator))
}

fn commit_messages(&self, ranges: &FastHashMap<ShardIdent, QueueKey>) -> Result<()> {
#[cfg(FALSE)]
let ranges = ranges.iter().map(|(shard, key)| (*shard, *key)).collect();
#[cfg(FALSE)]
self.storage.internal_queue_storage().commit(ranges);
Ok(())
}

fn truncate(&self) -> Result<()> {
self.storage
.internal_queue_storage()
Expand Down Expand Up @@ -184,6 +177,21 @@ impl<V: InternalMessageValue> UncommittedState<V> for UncommittedStateStdImpl {

Ok(())
}

fn commit(&self, partitions: &[QueuePartition], ranges: &[QueueShardRange]) -> Result<()> {
let mut queue_ranges = vec![];
for partition in partitions {
for range in ranges {
queue_ranges.push(QueueRange {
partition: *partition,
shard_ident: range.shard_ident,
from: range.from,
to: range.to,
});
}
}
self.storage.internal_queue_storage().commit(queue_ranges)
}
}

impl UncommittedStateStdImpl {
Expand Down
4 changes: 2 additions & 2 deletions collator/src/internal_queue/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl QueueStatistics {

pub fn apply_diff_statistics(&mut self, diff_statistics: DiffStatistics) {
let diff_statistics = diff_statistics.inner.statistics.clone();
for (_, values) in diff_statistics.iter() {
for values in diff_statistics.values() {
for value in values.iter() {
*self.statistics.entry(value.0.clone()).or_insert(0) += *value.1;
}
Expand Down Expand Up @@ -278,7 +278,7 @@ impl<V: InternalMessageValue> From<(QueueDiffWithMessages<V>, ShardIdent)> for D

let mut statistics = FastHashMap::default();

for (_, message) in diff.messages {
for message in diff.messages.values() {
let destination = message.destination();

let partition = diff
Expand Down
13 changes: 8 additions & 5 deletions collator/tests/internal_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ async fn test_queue() -> anyhow::Result<()> {

let queue: QueueImpl<UncommittedStateStdImpl, CommittedStateStdImpl, StoredObject> =
queue_factory.create();

// create first block with queue diff
let block = BlockIdShort {
shard: ShardIdent::new_full(0),
seqno: 0,
Expand Down Expand Up @@ -166,6 +168,7 @@ async fn test_queue() -> anyhow::Result<()> {

queue.commit_diff(&top_blocks)?;

// create second block with queue diff
let block2 = BlockIdShort {
shard: ShardIdent::new_full(1),
seqno: 1,
Expand Down Expand Up @@ -215,7 +218,7 @@ async fn test_queue() -> anyhow::Result<()> {
partition_router,
};

let statistics = (diff_with_messages.clone(), block.shard).into();
let statistics = (diff_with_messages.clone(), block2.shard).into();

queue.apply_diff(
diff_with_messages,
Expand Down Expand Up @@ -396,6 +399,10 @@ async fn test_statistics() -> anyhow::Result<()> {
statistics,
)?;

let top_blocks = vec![(block, true)];

queue.commit_diff(&top_blocks)?;

let partition = QueuePartition::NormalPriority;

let range = QueueShardRange {
Expand All @@ -408,10 +415,6 @@ async fn test_statistics() -> anyhow::Result<()> {

let stat = queue.load_statistics(partition, ranges)?;

for s in stat.statistics() {
println!("{:?}", s);
}

assert_eq!(*stat.statistics().iter().next().unwrap().1, 1);

let ranges = vec![range.clone(), range];
Expand Down
Loading

0 comments on commit 07503b4

Please sign in to comment.