Skip to content

Commit

Permalink
feature(collator): int queue statistics metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
drmick authored and Rexagon committed Jan 23, 2025
1 parent 8061ff1 commit 3fc67c4
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 9 deletions.
2 changes: 2 additions & 0 deletions collator/src/internal_queue/state/commited_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ impl<V: InternalMessageValue> CommittedState<V> for CommittedStateStdImpl {
partition: QueuePartition,
ranges: &[QueueShardRange],
) -> Result<()> {
let _histogram =
HistogramGuard::begin("tycho_internal_queue_committed_statistics_load_time");
for range in ranges {
self.storage
.internal_queue_storage()
Expand Down
3 changes: 3 additions & 0 deletions collator/src/internal_queue/state/uncommitted_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use everscale_types::models::{IntAddr, ShardIdent};
use tycho_block_util::queue::{QueueKey, QueuePartition, RouterAddr};
use tycho_storage::model::{QueueRange, StatKey};
use tycho_storage::Storage;
use tycho_util::metrics::HistogramGuard;
use tycho_util::{FastHashMap, FastHashSet};
use weedb::rocksdb::WriteBatch;
use weedb::OwnedSnapshot;
Expand Down Expand Up @@ -184,6 +185,8 @@ impl<V: InternalMessageValue> UncommittedState<V> for UncommittedStateStdImpl {
partition: QueuePartition,
ranges: &[QueueShardRange],
) -> Result<()> {
let _histogram =
HistogramGuard::begin("tycho_internal_queue_uncommitted_statistics_load_time");
for range in ranges {
self.storage
.internal_queue_storage()
Expand Down
6 changes: 6 additions & 0 deletions scripts/gen-dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1278,6 +1278,12 @@ def collator_queue_metrics() -> RowPanel:
create_heatmap_panel(
"tycho_internal_queue_uncommited_state_iterator_create_time", "Uncommitted iterator init time"
),
create_heatmap_panel(
"tycho_internal_queue_uncommitted_statistics_load_time", "Uncommited statistics load time"
),
create_heatmap_panel(
"tycho_internal_queue_committed_statistics_load_time", "Committed statistics load time"
),
create_heatmap_panel(
"tycho_internal_queue_snapshot_time", "Snapshot time"
),
Expand Down
15 changes: 6 additions & 9 deletions storage/src/store/internal_queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl InternalQueueStorage {
count: u64,
) -> Result<()> {
let cf = self.db.internal_messages_statistics_uncommitted.cf();
self.insert_statistics(batch, &cf, key, count)
Self::insert_statistics(batch, &cf, key, count)
}

pub fn insert_statistics_committed(
Expand All @@ -42,7 +42,7 @@ impl InternalQueueStorage {
count: u64,
) -> Result<()> {
let cf = self.db.internal_messages_statistics_committed.cf();
self.insert_statistics(batch, &cf, key, count)
Self::insert_statistics(batch, &cf, key, count)
}

pub fn collect_committed_stats_in_range(
Expand All @@ -63,7 +63,7 @@ impl InternalQueueStorage {

let mut iter = self.db.rocksdb().raw_iterator_cf_opt(&cf, read_config);

self.collect_dest_counts_in_range(&mut iter, shard_ident, partition, from, to, result)
Self::collect_dest_counts_in_range(&mut iter, shard_ident, partition, from, to, result)
}

pub fn collect_uncommitted_stats_in_range(
Expand All @@ -84,11 +84,10 @@ impl InternalQueueStorage {

let mut iter = self.db.rocksdb().raw_iterator_cf_opt(&cf, read_config);

self.collect_dest_counts_in_range(&mut iter, shard_ident, partition, from, to, result)
Self::collect_dest_counts_in_range(&mut iter, shard_ident, partition, from, to, result)
}

fn collect_dest_counts_in_range(
&self,
iter: &mut DBRawIterator<'_>,
shard_ident: ShardIdent,
partition: QueuePartition,
Expand Down Expand Up @@ -392,14 +391,14 @@ impl InternalQueueStorage {
let (start_stat_key, end_stat_key, start_msg_key, end_msg_key) =
Self::build_range_keys(range);

self.delete_range(
Self::delete_range(
&mut batch,
&self.db.internal_messages_statistics_committed.cf(),
&start_stat_key,
&end_stat_key,
);

self.delete_range(
Self::delete_range(
&mut batch,
&self.db.shards_internal_messages.cf(),
&start_msg_key,
Expand Down Expand Up @@ -464,7 +463,6 @@ impl InternalQueueStorage {
}

fn delete_range(
&self,
batch: &mut WriteBatch,
cf: &BoundedCfHandle<'_>,
start_key: &[u8],
Expand Down Expand Up @@ -580,7 +578,6 @@ impl InternalQueueStorage {
}

fn insert_statistics(
&self,
batch: &mut WriteBatchWithTransaction<false>,
cf: &BoundedCfHandle<'_>,
key: &StatKey,
Expand Down

0 comments on commit 3fc67c4

Please sign in to comment.