Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions crates/core/src/db/db_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,16 @@ metrics_group!(
#[labels(db: Identity, reducer: str)]
pub reducer_abi_time_usec: IntCounterVec,

#[name = spacetime_num_delta_queries_evaluated]
#[help = "The total number of times we performed incremental evaluation of a query"]
#[labels(db: Identity)]
pub delta_queries_evaluated: IntCounterVec,

#[name = spacetime_num_delta_queries_matched]
#[help = "The total number of times incremental evaluation resulted in a subscription update"]
#[labels(db: Identity)]
pub delta_queries_matched: IntCounterVec,

#[name = spacetime_subscription_connections]
#[help = "Number of connections with active subscriptions"]
#[labels(database_identity: Identity)]
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/subscription/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub fn eval_delta<'a, Tx: Datastore + DeltaStore>(
metrics: &mut ExecutionMetrics,
plan: &SubscriptionPlan,
) -> Result<Option<UpdatesRelValue<'a>>> {
metrics.delta_queries_evaluated += 1;
let mut inserts = vec![];
let mut deletes = vec![];

Expand All @@ -38,5 +39,6 @@ pub fn eval_delta<'a, Tx: Datastore + DeltaStore>(
return Ok(None);
}

metrics.delta_queries_matched += 1;
Ok(Some(UpdatesRelValue { inserts, deletes }))
}
62 changes: 42 additions & 20 deletions crates/core/src/subscription/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,48 @@ pub mod tx;

/// Update the global system metrics with transaction-level execution metrics
pub(crate) fn record_exec_metrics(workload: &WorkloadType, db: &Identity, metrics: ExecutionMetrics) {
DB_METRICS
.rdb_num_index_seeks
.with_label_values(workload, db)
.inc_by(metrics.index_seeks as u64);
DB_METRICS
.rdb_num_rows_scanned
.with_label_values(workload, db)
.inc_by(metrics.rows_scanned as u64);
DB_METRICS
.rdb_num_bytes_scanned
.with_label_values(workload, db)
.inc_by(metrics.bytes_scanned as u64);
DB_METRICS
.rdb_num_bytes_written
.with_label_values(workload, db)
.inc_by(metrics.bytes_written as u64);
WORKER_METRICS
.bytes_sent_to_clients
.with_label_values(workload, db)
.inc_by(metrics.bytes_sent_to_clients as u64);
if metrics.index_seeks > 0 {
DB_METRICS
.rdb_num_index_seeks
.with_label_values(workload, db)
.inc_by(metrics.index_seeks as u64);
}
if metrics.rows_scanned > 0 {
DB_METRICS
.rdb_num_rows_scanned
.with_label_values(workload, db)
.inc_by(metrics.rows_scanned as u64);
}
if metrics.bytes_scanned > 0 {
DB_METRICS
.rdb_num_bytes_scanned
.with_label_values(workload, db)
.inc_by(metrics.bytes_scanned as u64);
}
if metrics.bytes_written > 0 {
DB_METRICS
.rdb_num_bytes_written
.with_label_values(workload, db)
.inc_by(metrics.bytes_written as u64);
}
if metrics.bytes_sent_to_clients > 0 {
WORKER_METRICS
.bytes_sent_to_clients
.with_label_values(workload, db)
.inc_by(metrics.bytes_sent_to_clients as u64);
}
if metrics.delta_queries_matched > 0 {
DB_METRICS
.delta_queries_matched
.with_label_values(db)
.inc_by(metrics.delta_queries_matched);
}
if metrics.delta_queries_evaluated > 0 {
DB_METRICS
.delta_queries_evaluated
.with_label_values(db)
.inc_by(metrics.delta_queries_evaluated);
}
}

/// Execute a subscription query
Expand Down
12 changes: 12 additions & 0 deletions crates/lib/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ pub struct ExecutionMetrics {
pub rows_deleted: u64,
/// How many rows were updated?
pub rows_updated: u64,
/// How many subscription updates did we execute?
pub delta_queries_evaluated: u64,
/// How many subscriptions had some updates?
pub delta_queries_matched: u64,
}

impl ExecutionMetrics {
Expand All @@ -60,6 +64,8 @@ impl ExecutionMetrics {
rows_inserted,
rows_deleted,
rows_updated,
delta_queries_evaluated,
delta_queries_matched,
}: ExecutionMetrics,
) {
self.index_seeks += index_seeks;
Expand All @@ -70,6 +76,8 @@ impl ExecutionMetrics {
self.rows_inserted += rows_inserted;
self.rows_deleted += rows_deleted;
self.rows_updated += rows_updated;
self.delta_queries_evaluated += delta_queries_evaluated;
self.delta_queries_matched += delta_queries_matched;
}
}

Expand All @@ -90,6 +98,8 @@ mod tests {
rows_inserted: 1,
rows_deleted: 1,
rows_updated: 1,
delta_queries_evaluated: 2,
delta_queries_matched: 3,
});

assert_eq!(a.index_seeks, 1);
Expand All @@ -99,5 +109,7 @@ mod tests {
assert_eq!(a.bytes_sent_to_clients, 1);
assert_eq!(a.rows_inserted, 1);
assert_eq!(a.rows_deleted, 1);
assert_eq!(a.delta_queries_evaluated, 2);
assert_eq!(a.delta_queries_matched, 3);
}
}
Loading