Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions crates/core/src/db/db_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,16 @@ metrics_group!(
#[labels(db: Identity)]
pub delta_queries_matched: IntCounterVec,

#[name = spacetime_num_duplicate_rows_evaluated]
#[help = "The number of times we evaluate the same row in a subscription update"]
#[labels(db: Identity)]
pub duplicate_rows_evaluated: IntCounterVec,

#[name = spacetime_num_duplicate_rows_sent]
#[help = "The number of duplicate rows we send in a subscription update"]
#[labels(db: Identity)]
pub duplicate_rows_sent: IntCounterVec,

#[name = spacetime_subscription_connections]
#[help = "Number of connections with active subscriptions"]
#[labels(database_identity: Identity)]
Expand Down
86 changes: 78 additions & 8 deletions crates/core/src/subscription/delta.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use anyhow::Result;
use hashbrown::HashMap;
use spacetimedb_execution::{Datastore, DeltaStore};
use spacetimedb_lib::metrics::ExecutionMetrics;
use spacetimedb_subscription::SubscriptionPlan;
use spacetimedb_vm::relation::RelValue;

use crate::host::module_host::UpdatesRelValue;

Expand All @@ -21,24 +23,92 @@ pub fn eval_delta<'a, Tx: Datastore + DeltaStore>(
plan: &SubscriptionPlan,
) -> Result<Option<UpdatesRelValue<'a>>> {
metrics.delta_queries_evaluated += 1;

let mut inserts = vec![];
let mut deletes = vec![];

plan.for_each_insert(tx, metrics, &mut |row| {
inserts.push(row.into());
Ok(())
})?;
let mut duplicate_rows_evaluated = 0;
let mut duplicate_rows_sent = 0;

if !plan.is_join() {
// Single table plans will never return redundant rows,
// so there's no need to track row counts.
plan.for_each_insert(tx, metrics, &mut |row| {
inserts.push(row.into());
Ok(())
})?;

plan.for_each_delete(tx, metrics, &mut |row| {
deletes.push(row.into());
Ok(())
})?;
} else {
// Query plans for joins may return redundant rows.
// We track row counts to avoid sending them to clients.
let mut insert_counts = HashMap::new();
let mut delete_counts = HashMap::new();

plan.for_each_insert(tx, metrics, &mut |row| {
let n = insert_counts.entry(row).or_default();
if *n > 0 {
duplicate_rows_evaluated += 1;
}
*n += 1;
Ok(())
})?;

plan.for_each_delete(tx, metrics, &mut |row| {
deletes.push(row.into());
Ok(())
})?;
plan.for_each_delete(tx, metrics, &mut |row| {
match insert_counts.get_mut(&row) {
// We have not seen an insert for this row.
// If we have seen a delete, increment the metric.
// Always increment the delete_count.
None => {
let n = delete_counts.entry(row).or_default();
if *n > 0 {
duplicate_rows_evaluated += 1;
}
*n += 1;
}
// We have already seen an insert for this row.
// This is a duplicate, so increment the metric.
//
// There are no more inserts for this row,
// so increment the delete_count as well.
Some(0) => {
duplicate_rows_evaluated += 1;
*delete_counts.entry(row).or_default() += 1;
}
// We have already seen an insert for this row.
// This is a duplicate, so increment the metric.
//
// There are still more inserts for this row,
// so don't increment the delete_count.
Some(n) => {
duplicate_rows_evaluated += 1;
*n -= 1;
}
}
Ok(())
})?;

for (row, n) in insert_counts.into_iter().filter(|(_, n)| *n > 0) {
duplicate_rows_sent += n as u64 - 1;
inserts.extend(std::iter::repeat_n(row, n).map(RelValue::from));
}
for (row, n) in delete_counts.into_iter().filter(|(_, n)| *n > 0) {
duplicate_rows_sent += n as u64 - 1;
deletes.extend(std::iter::repeat_n(row, n).map(RelValue::from));
}
}

// Return `None` for empty updates
if inserts.is_empty() && deletes.is_empty() {
return Ok(None);
}

metrics.delta_queries_matched += 1;
metrics.duplicate_rows_evaluated += duplicate_rows_evaluated;
metrics.duplicate_rows_sent += duplicate_rows_sent;

Ok(Some(UpdatesRelValue { inserts, deletes }))
}
12 changes: 12 additions & 0 deletions crates/core/src/subscription/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ pub(crate) fn record_exec_metrics(workload: &WorkloadType, db: &Identity, metric
.with_label_values(db)
.inc_by(metrics.delta_queries_evaluated);
}
if metrics.duplicate_rows_evaluated > 0 {
DB_METRICS
.duplicate_rows_evaluated
.with_label_values(db)
.inc_by(metrics.duplicate_rows_evaluated);
}
if metrics.duplicate_rows_sent > 0 {
DB_METRICS
.duplicate_rows_sent
.with_label_values(db)
.inc_by(metrics.duplicate_rows_sent);
}
}

/// Execute a subscription query
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1788,7 +1788,7 @@ mod tests {

// We should only have evaluated a single query
assert_eq!(metrics.delta_queries_evaluated, 1);
assert_eq!(metrics.delta_queries_matched, 1);
assert_eq!(metrics.delta_queries_matched, 0);

// Insert a new row into `v`
let metrics = commit_tx(&db, &subs, [], [(v_id, product![2u64, 6u64, 6u64])])?;
Expand Down
16 changes: 14 additions & 2 deletions crates/execution/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::ops::RangeBounds;
use std::{
hash::{Hash, Hasher},
ops::RangeBounds,
};

use anyhow::{anyhow, Result};
use iter::PlanIter;
Expand Down Expand Up @@ -95,12 +98,21 @@ pub trait DeltaStore {
}
}

#[derive(Clone)]
#[derive(Clone, PartialEq, Eq)]
pub enum Row<'a> {
Ptr(RowRef<'a>),
Ref(&'a ProductValue),
}

impl Hash for Row<'_> {
fn hash<H: Hasher>(&self, state: &mut H) {
match self {
Self::Ptr(x) => x.hash(state),
Self::Ref(x) => x.hash(state),
}
}
}

impl Row<'_> {
pub fn to_product_value(&self) -> ProductValue {
match self {
Expand Down
12 changes: 12 additions & 0 deletions crates/lib/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ pub struct ExecutionMetrics {
pub delta_queries_evaluated: u64,
/// How many subscriptions had some updates?
pub delta_queries_matched: u64,
/// How many times do we evaluate the same row in a subscription update?
pub duplicate_rows_evaluated: u64,
/// How many duplicate rows do we send in a subscription update?
pub duplicate_rows_sent: u64,
}

impl ExecutionMetrics {
Expand All @@ -66,6 +70,8 @@ impl ExecutionMetrics {
rows_updated,
delta_queries_evaluated,
delta_queries_matched,
duplicate_rows_evaluated,
duplicate_rows_sent,
}: ExecutionMetrics,
) {
self.index_seeks += index_seeks;
Expand All @@ -78,6 +84,8 @@ impl ExecutionMetrics {
self.rows_updated += rows_updated;
self.delta_queries_evaluated += delta_queries_evaluated;
self.delta_queries_matched += delta_queries_matched;
self.duplicate_rows_evaluated += duplicate_rows_evaluated;
self.duplicate_rows_sent += duplicate_rows_sent;
}
}

Expand All @@ -100,6 +108,8 @@ mod tests {
rows_updated: 1,
delta_queries_evaluated: 2,
delta_queries_matched: 3,
duplicate_rows_evaluated: 4,
duplicate_rows_sent: 2,
});

assert_eq!(a.index_seeks, 1);
Expand All @@ -111,5 +121,7 @@ mod tests {
assert_eq!(a.rows_deleted, 1);
assert_eq!(a.delta_queries_evaluated, 2);
assert_eq!(a.delta_queries_matched, 3);
assert_eq!(a.duplicate_rows_evaluated, 4);
assert_eq!(a.duplicate_rows_sent, 2);
}
}
5 changes: 5 additions & 0 deletions crates/subscription/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ pub struct SubscriptionPlan {
}

impl SubscriptionPlan {
/// Is this a plan for a join?
pub fn is_join(&self) -> bool {
self.fragments.insert_plans.len() > 1 && self.fragments.delete_plans.len() > 1
}

/// To which table does this plan subscribe?
pub fn subscribed_table_id(&self) -> TableId {
self.return_id
Expand Down
Loading