Skip to content

Commit 69029e7

Browse files
authored
Store SubscriptionMetrics for Update, Subscribe, Unsubscribe in ModuleSubscriptions (#3821)
# Description of Changes This showed up in flamegraphs, in particular for `Update`, so let's cache the `SubscriptionMetrics`s. # API and ABI breaking changes None # Expected complexity level and risk 1 # Testing Covered by existing tests.
1 parent 8b2bf03 commit 69029e7

File tree

1 file changed

+41
-25
lines changed

1 file changed

+41
-25
lines changed

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 41 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use spacetimedb_lib::identity::AuthCtx;
4242
use spacetimedb_lib::metrics::ExecutionMetrics;
4343
use spacetimedb_lib::Identity;
4444
use spacetimedb_primitives::ArgId;
45+
use spacetimedb_table::static_assert_size;
4546
use std::collections::HashSet;
4647
use std::{sync::Arc, time::Instant};
4748
use tokio::sync::oneshot;
@@ -56,10 +57,11 @@ pub struct ModuleSubscriptions {
5657
subscriptions: Subscriptions,
5758
broadcast_queue: BroadcastQueue,
5859
stats: Arc<SubscriptionGauges>,
60+
metrics: Arc<SubscriptionMetricsForWorkloads>,
5961
}
6062

6163
#[derive(Debug, Clone)]
62-
pub struct SubscriptionGauges {
64+
struct SubscriptionGauges {
6365
db_identity: Identity,
6466
num_queries: IntGauge,
6567
num_connections: IntGauge,
@@ -111,17 +113,36 @@ impl SubscriptionGauges {
111113
}
112114
}
113115

114-
pub struct SubscriptionMetrics {
115-
pub lock_waiters: IntGauge,
116-
pub lock_wait_time: Histogram,
117-
pub compilation_time: Histogram,
118-
pub num_queries_subscribed: IntCounter,
119-
pub num_new_queries_subscribed: IntCounter,
120-
pub num_queries_evaluated: IntCounter,
116+
struct SubscriptionMetricsForWorkloads {
117+
update: SubscriptionMetrics,
118+
subscribe: SubscriptionMetrics,
119+
unsubscribe: SubscriptionMetrics,
121120
}
122121

122+
impl SubscriptionMetricsForWorkloads {
123+
fn new(db: &Identity) -> Self {
124+
Self {
125+
update: SubscriptionMetrics::new(db, WorkloadType::Update),
126+
subscribe: SubscriptionMetrics::new(db, WorkloadType::Subscribe),
127+
unsubscribe: SubscriptionMetrics::new(db, WorkloadType::Unsubscribe),
128+
}
129+
}
130+
}
131+
132+
struct SubscriptionMetrics {
133+
lock_waiters: IntGauge,
134+
lock_wait_time: Histogram,
135+
compilation_time: Histogram,
136+
num_queries_subscribed: IntCounter,
137+
num_new_queries_subscribed: IntCounter,
138+
num_queries_evaluated: IntCounter,
139+
}
140+
141+
static_assert_size!(SubscriptionMetrics, 48);
142+
123143
impl SubscriptionMetrics {
124-
pub fn new(db: &Identity, workload: &WorkloadType) -> Self {
144+
fn new(db: &Identity, workload: WorkloadType) -> Self {
145+
let workload = &workload;
125146
Self {
126147
lock_waiters: DB_METRICS.subscription_lock_waiters.with_label_values(db, workload),
127148
lock_wait_time: DB_METRICS.subscription_lock_wait_time.with_label_values(db, workload),
@@ -218,12 +239,14 @@ impl ModuleSubscriptions {
218239
) -> Self {
219240
let db = &relational_db.database_identity();
220241
let stats = Arc::new(SubscriptionGauges::new(db));
242+
let metrics = Arc::new(SubscriptionMetricsForWorkloads::new(db));
221243

222244
Self {
223245
relational_db,
224246
subscriptions,
225247
broadcast_queue,
226248
stats,
249+
metrics,
227250
}
228251
}
229252

@@ -579,8 +602,7 @@ impl ModuleSubscriptions {
579602
)
580603
};
581604

582-
let database_identity = self.relational_db.database_identity();
583-
let subscription_metrics = SubscriptionMetrics::new(&database_identity, &WorkloadType::Unsubscribe);
605+
let subscription_metrics = &self.metrics.unsubscribe;
584606

585607
// Always lock the db before the subscription lock to avoid deadlocks.
586608
let (mut_tx, _) = self.begin_mut_tx(Workload::Unsubscribe);
@@ -780,12 +802,9 @@ impl ModuleSubscriptions {
780802
);
781803
};
782804

783-
let num_queries = request.query_strings.len();
784-
785-
let database_identity = self.relational_db.database_identity();
786-
let subscription_metrics = SubscriptionMetrics::new(&database_identity, &WorkloadType::Subscribe);
787-
788805
// How many queries make up this subscription?
806+
let subscription_metrics = &self.metrics.subscribe;
807+
let num_queries = request.query_strings.len();
789808
subscription_metrics.num_queries_subscribed.inc_by(num_queries as _);
790809

791810
let (queries, auth, mut_tx, compile_timer) = return_on_err!(
@@ -794,7 +813,7 @@ impl ModuleSubscriptions {
794813
auth,
795814
&request.query_strings,
796815
num_queries,
797-
&subscription_metrics
816+
subscription_metrics
798817
),
799818
send_err_msg,
800819
None
@@ -885,19 +904,17 @@ impl ModuleSubscriptions {
885904
timer: Instant,
886905
_assert: Option<AssertTxFn>,
887906
) -> Result<ExecutionMetrics, DBError> {
888-
let num_queries = subscription.query_strings.len();
889-
let database_identity = self.relational_db.database_identity();
890-
let subscription_metrics = SubscriptionMetrics::new(&database_identity, &WorkloadType::Subscribe);
891-
892907
// How many queries make up this subscription?
908+
let subscription_metrics = &self.metrics.subscribe;
909+
let num_queries = subscription.query_strings.len();
893910
subscription_metrics.num_queries_subscribed.inc_by(num_queries as _);
894911

895912
let (queries, auth, mut_tx, compile_timer) = self.compile_queries(
896913
sender.id.identity,
897914
auth,
898915
&subscription.query_strings,
899916
num_queries,
900-
&subscription_metrics,
917+
subscription_metrics,
901918
)?;
902919

903920
let (tx, tx_offset) = self
@@ -929,7 +946,7 @@ impl ModuleSubscriptions {
929946
)?,
930947
};
931948

932-
record_query_metrics(&database_identity, query_metrics);
949+
record_query_metrics(&self.relational_db.database_identity(), query_metrics);
933950

934951
// It acquires the subscription lock after `eval`, allowing `add_subscription` to run concurrently.
935952
// This also makes it possible for `broadcast_event` to get scheduled before the subsequent part here
@@ -994,8 +1011,7 @@ impl ModuleSubscriptions {
9941011
mut event: ModuleEvent,
9951012
tx: MutTx,
9961013
) -> Result<CommitAndBroadcastEventResult, DBError> {
997-
let database_identity = self.relational_db.database_identity();
998-
let subscription_metrics = SubscriptionMetrics::new(&database_identity, &WorkloadType::Update);
1014+
let subscription_metrics = &self.metrics.update;
9991015

10001016
// Take a read lock on `subscriptions` before committing tx
10011017
// else it can result in subscriber receiving duplicate updates.

0 commit comments

Comments
 (0)