Skip to content

Commit 664577c

Browse files
Materialize views for subscriptions
1 parent b0a6942 commit 664577c

File tree

6 files changed

+286
-44
lines changed

6 files changed

+286
-44
lines changed

crates/core/src/host/module_host.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1118,7 +1118,7 @@ impl ModuleHost {
11181118
// Decrement the number of subscribers for each view this caller is subscribed to
11191119
let dec_view_subscribers = |tx: &mut MutTxId| {
11201120
if drop_view_subscribers {
1121-
if let Err(err) = tx.dec_st_view_subscribers(caller_identity) {
1121+
if let Err(err) = tx.st_view_dec_subscriber_count_for_sender(caller_identity) {
11221122
log::error!("`call_identity_disconnected`: failed to delete client view data: {err}");
11231123
}
11241124
}

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 129 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,10 @@ use spacetimedb_datastore::db_metrics::DB_METRICS;
3333
use spacetimedb_datastore::execution_context::{Workload, WorkloadType};
3434
use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
3535
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId};
36-
use spacetimedb_datastore::traits::TxData;
36+
use spacetimedb_datastore::traits::{IsolationLevel, TxData};
3737
use spacetimedb_durability::TxOffset;
3838
use spacetimedb_execution::pipelined::{PipelinedProject, ViewProject};
39+
use spacetimedb_expr::expr::CollectViews;
3940
use spacetimedb_lib::identity::AuthCtx;
4041
use spacetimedb_lib::metrics::ExecutionMetrics;
4142
use spacetimedb_lib::Identity;
@@ -400,7 +401,7 @@ impl ModuleSubscriptions {
400401
let hash = QueryHash::from_string(&sql, auth.caller, false);
401402
let hash_with_param = QueryHash::from_string(&sql, auth.caller, true);
402403

403-
let (tx, tx_offset) = self.begin_tx(Workload::Subscribe);
404+
let (mut_tx, _) = self.begin_mut_tx(Workload::Subscribe);
404405

405406
let existing_query = {
406407
let guard = self.subscriptions.read();
@@ -410,7 +411,7 @@ impl ModuleSubscriptions {
410411
let query = return_on_err_with_sql!(
411412
existing_query.map(Ok).unwrap_or_else(|| compile_query_with_hashes(
412413
&auth,
413-
&tx,
414+
&*mut_tx,
414415
&sql,
415416
hash,
416417
hash_with_param
@@ -420,6 +421,9 @@ impl ModuleSubscriptions {
420421
send_err_msg
421422
);
422423

424+
let mut_tx = ScopeGuard::<MutTxId, _>::into_inner(mut_tx);
425+
let (tx, tx_offset) = self.materialize_views_and_downgrade_tx(mut_tx, &query, auth.caller)?;
426+
423427
let (table_rows, metrics) = return_on_err_with_sql!(
424428
self.evaluate_initial_subscription(sender.clone(), query.clone(), &tx, &auth, TableUpdateType::Subscribe),
425429
query.sql(),
@@ -501,8 +505,10 @@ impl ModuleSubscriptions {
501505
return Ok(None);
502506
};
503507

504-
let (tx, tx_offset) = self.begin_tx(Workload::Unsubscribe);
505508
let auth = AuthCtx::new(self.owner_identity, sender.id.identity);
509+
510+
let (tx, tx_offset) = self.drop_views(query, auth.caller)?;
511+
506512
let (table_rows, metrics) = return_on_err_with_sql!(
507513
self.evaluate_initial_subscription(sender.clone(), query.clone(), &tx, &auth, TableUpdateType::Unsubscribe),
508514
query.sql(),
@@ -558,11 +564,12 @@ impl ModuleSubscriptions {
558564
)
559565
};
560566

567+
let auth = AuthCtx::new(self.owner_identity, sender.id.identity);
561568
let database_identity = self.relational_db.database_identity();
562569
let subscription_metrics = SubscriptionMetrics::new(&database_identity, &WorkloadType::Unsubscribe);
563570

564571
// Always lock the db before the subscription lock to avoid deadlocks.
565-
let (tx, tx_offset) = self.begin_tx(Workload::Unsubscribe);
572+
let (mut_tx, _) = self.begin_mut_tx(Workload::Unsubscribe);
566573

567574
let removed_queries = {
568575
let _compile_timer = subscription_metrics.compilation_time.start_timer();
@@ -580,12 +587,15 @@ impl ModuleSubscriptions {
580587
)
581588
};
582589

590+
let mut_tx = ScopeGuard::<MutTxId, _>::into_inner(mut_tx);
591+
let (tx, tx_offset) = self.drop_views_and_downgrade_tx(mut_tx, &removed_queries, auth.caller)?;
592+
583593
let (update, metrics) = return_on_err!(
584594
self.evaluate_queries(
585595
sender.clone(),
586596
&removed_queries,
587597
&tx,
588-
&AuthCtx::new(self.owner_identity, sender.id.identity),
598+
&auth,
589599
TableUpdateType::Unsubscribe,
590600
),
591601
send_err_msg,
@@ -637,7 +647,7 @@ impl ModuleSubscriptions {
637647
queries: &[Box<str>],
638648
num_queries: usize,
639649
metrics: &SubscriptionMetrics,
640-
) -> Result<(Vec<Arc<Plan>>, AuthCtx, TxId, HistogramTimer), DBError> {
650+
) -> Result<(Vec<Arc<Plan>>, AuthCtx, MutTxId, HistogramTimer), DBError> {
641651
let mut subscribe_to_all_tables = false;
642652
let mut plans = Vec::with_capacity(num_queries);
643653
let mut query_hashes = Vec::with_capacity(num_queries);
@@ -656,7 +666,7 @@ impl ModuleSubscriptions {
656666
let auth = AuthCtx::new(self.owner_identity, sender);
657667

658668
// We always get the db lock before the subscription lock to avoid deadlocks.
659-
let (tx, _tx_offset) = self.begin_tx(Workload::Subscribe);
669+
let (mut_tx, _tx_offset) = self.begin_mut_tx(Workload::Subscribe);
660670

661671
let compile_timer = metrics.compilation_time.start_timer();
662672

@@ -669,9 +679,14 @@ impl ModuleSubscriptions {
669679

670680
if subscribe_to_all_tables {
671681
plans.extend(
672-
super::subscription::get_all(&self.relational_db, &tx, &auth)?
673-
.into_iter()
674-
.map(Arc::new),
682+
super::subscription::get_all(
683+
|relational_db, tx| relational_db.get_all_tables_mut(tx).map(|schemas| schemas.into_iter()),
684+
&self.relational_db,
685+
&*mut_tx,
686+
&auth,
687+
)?
688+
.into_iter()
689+
.map(Arc::new),
675690
);
676691
}
677692

@@ -684,7 +699,7 @@ impl ModuleSubscriptions {
684699
plans.push(unit);
685700
} else {
686701
plans.push(Arc::new(
687-
compile_query_with_hashes(&auth, &tx, sql, hash, hash_with_param).map_err(|err| {
702+
compile_query_with_hashes(&auth, &*mut_tx, sql, hash, hash_with_param).map_err(|err| {
688703
DBError::WithSql {
689704
error: Box::new(DBError::Other(err.into())),
690705
sql: sql.into(),
@@ -698,7 +713,7 @@ impl ModuleSubscriptions {
698713
// How many queries in this subscription are not cached?
699714
metrics.num_new_queries_subscribed.inc_by(new_queries);
700715

701-
Ok((plans, auth, scopeguard::ScopeGuard::into_inner(tx), compile_timer))
716+
Ok((plans, auth, ScopeGuard::<MutTxId, _>::into_inner(mut_tx), compile_timer))
702717
}
703718

704719
/// Send a message to a client connection.
@@ -763,7 +778,7 @@ impl ModuleSubscriptions {
763778
// How many queries make up this subscription?
764779
subscription_metrics.num_queries_subscribed.inc_by(num_queries as _);
765780

766-
let (queries, auth, tx, compile_timer) = return_on_err!(
781+
let (queries, auth, mut_tx, compile_timer) = return_on_err!(
767782
self.compile_queries(
768783
sender.id.identity,
769784
&request.query_strings,
@@ -773,7 +788,7 @@ impl ModuleSubscriptions {
773788
send_err_msg,
774789
None
775790
);
776-
let (tx, tx_offset) = self.guard_tx(tx, <_>::default());
791+
let (mut_tx, _) = self.guard_mut_tx(mut_tx, <_>::default());
777792

778793
// We minimize locking so that other clients can add subscriptions concurrently.
779794
// We are protected from race conditions with broadcasts, because we have the db lock,
@@ -793,6 +808,9 @@ impl ModuleSubscriptions {
793808
// Record how long it took to compile the subscription
794809
drop(compile_timer);
795810

811+
let mut_tx = ScopeGuard::<MutTxId, _>::into_inner(mut_tx);
812+
let (tx, tx_offset) = self.materialize_views_and_downgrade_tx(mut_tx, &queries, auth.caller)?;
813+
796814
let Ok((update, metrics)) =
797815
self.evaluate_queries(sender.clone(), &queries, &tx, &auth, TableUpdateType::Subscribe)
798816
else {
@@ -859,13 +877,14 @@ impl ModuleSubscriptions {
859877
// How many queries make up this subscription?
860878
subscription_metrics.num_queries_subscribed.inc_by(num_queries as _);
861879

862-
let (queries, auth, tx, compile_timer) = self.compile_queries(
880+
let (queries, auth, mut_tx, compile_timer) = self.compile_queries(
863881
sender.id.identity,
864882
&subscription.query_strings,
865883
num_queries,
866884
&subscription_metrics,
867885
)?;
868-
let (tx, tx_offset) = self.guard_tx(tx, <_>::default());
886+
887+
let (tx, tx_offset) = self.materialize_views_and_downgrade_tx(mut_tx, &queries, auth.caller)?;
869888

870889
check_row_limit(
871890
&queries,
@@ -1015,13 +1034,61 @@ impl ModuleSubscriptions {
10151034
}))
10161035
}
10171036

1018-
/// Helper that starts a new read transaction, and guards it using
1019-
/// [`Self::guard_tx`] with the default configuration.
1020-
fn begin_tx(&self, workload: Workload) -> (ScopeGuard<TxId, impl FnOnce(TxId) + '_>, TransactionOffset) {
1021-
self.guard_tx(self.relational_db.begin_tx(workload), <_>::default())
1037+
/// Collect view ids from `view_collector` and decrement their subscriber count.
1038+
fn drop_views(
1039+
&self,
1040+
view_collector: &impl CollectViews,
1041+
sender: Identity,
1042+
) -> Result<(TxGuard<impl FnOnce(TxId) + '_>, TransactionOffset), DBError> {
1043+
use IsolationLevel::*;
1044+
use Workload::*;
1045+
let mut tx = self.relational_db.begin_mut_tx(Serializable, Unsubscribe);
1046+
tx.dec_subscriber_count_for_views(view_collector, sender)?;
1047+
let (tx_data, tx_metrics_mut, tx) = tx.commit_downgrade(Unsubscribe);
1048+
let opts = GuardTxOptions::from_mut(tx_data, tx_metrics_mut);
1049+
Ok(self.guard_tx(tx, opts))
1050+
}
1051+
1052+
/// Downgrade this mutable `tx` after:
1053+
/// 1. Collecting view ids from `view_collector` and
1054+
/// 2. Decrementing their subscriber count
1055+
fn drop_views_and_downgrade_tx(
1056+
&self,
1057+
mut tx: MutTxId,
1058+
view_collector: &impl CollectViews,
1059+
sender: Identity,
1060+
) -> Result<(TxGuard<impl FnOnce(TxId) + '_>, TransactionOffset), DBError> {
1061+
tx.dec_subscriber_count_for_views(view_collector, sender)?;
1062+
let (tx_data, tx_metrics_mut, tx) = tx.commit_downgrade(Workload::Subscribe);
1063+
let opts = GuardTxOptions::from_mut(tx_data, tx_metrics_mut);
1064+
Ok(self.guard_tx(tx, opts))
1065+
}
1066+
1067+
/// Downgrade this mutable `tx` after:
1068+
/// 1. Collecting view ids from `view_collector` and
1069+
/// 2. Materializing them if necessary
1070+
fn materialize_views_and_downgrade_tx(
1071+
&self,
1072+
mut tx: MutTxId,
1073+
view_collector: &impl CollectViews,
1074+
sender: Identity,
1075+
) -> Result<(TxGuard<impl FnOnce(TxId) + '_>, TransactionOffset), DBError> {
1076+
tx.materialize_views(view_collector, sender)?;
1077+
let (tx_data, tx_metrics_mut, tx) = tx.commit_downgrade(Workload::Subscribe);
1078+
let opts = GuardTxOptions::from_mut(tx_data, tx_metrics_mut);
1079+
Ok(self.guard_tx(tx, opts))
1080+
}
1081+
1082+
/// Helper that starts a new mutable transaction, and guards it using
1083+
/// [`Self::guard_mut_tx`] with the default configuration.
1084+
fn begin_mut_tx(&self, workload: Workload) -> (MutTxGuard<impl FnOnce(MutTxId) + '_>, TransactionOffset) {
1085+
self.guard_mut_tx(
1086+
self.relational_db.begin_mut_tx(IsolationLevel::Serializable, workload),
1087+
<_>::default(),
1088+
)
10221089
}
10231090

1024-
/// Helper wrapping `tx` in a scopegard, with a configurable drop fn.
1091+
/// Helper wrapping a [`TxId`] in a scopegard, with a configurable drop fn.
10251092
///
10261093
/// By default, `tx` is released when the returned [`ScopeGuard`] is dropped,
10271094
/// and reports the transaction metrics via [`RelationalDB::report_tx_metrics`].
@@ -1036,27 +1103,42 @@ impl ModuleSubscriptions {
10361103
/// If another receiver of the transaction offset is needed, its sending
10371104
/// side can be passed in as `extra_tx_offset_sender`. It will be sent the
10381105
/// offset as well.
1039-
fn guard_tx(
1040-
&self,
1041-
tx: TxId,
1042-
GuardTxOptions {
1043-
extra_tx_offset_sender,
1044-
tx_data,
1045-
tx_metrics_mut,
1046-
}: GuardTxOptions,
1047-
) -> (ScopeGuard<TxId, impl FnOnce(TxId) + '_>, TransactionOffset) {
1106+
fn guard_tx(&self, tx: TxId, opts: GuardTxOptions) -> (TxGuard<impl FnOnce(TxId) + '_>, TransactionOffset) {
10481107
let (offset_tx, offset_rx) = oneshot::channel();
10491108
let guard = scopeguard::guard(tx, |tx| {
10501109
let (tx_offset, tx_metrics, reducer) = self.relational_db.release_tx(tx);
10511110
log::trace!("read tx released with offset {tx_offset}");
10521111
let _ = offset_tx.send(tx_offset);
1053-
if let Some(extra) = extra_tx_offset_sender {
1112+
if let Some(extra) = opts.extra_tx_offset_sender {
10541113
let _ = extra.send(tx_offset);
10551114
}
10561115
self.relational_db
1057-
.report_tx_metrics(reducer, tx_data, tx_metrics_mut, Some(tx_metrics));
1116+
.report_tx_metrics(reducer, opts.tx_data, opts.tx_metrics_mut, Some(tx_metrics));
10581117
});
1118+
(guard, offset_rx)
1119+
}
10591120

1121+
/// The same as [`Self::guard_tx`] but for mutable transactions.
1122+
///
1123+
/// By default, `tx` is committed when the returned [`ScopeGuard`] is dropped,
1124+
/// and reports the transaction metrics via [`RelationalDB::report_tx_metrics`].
1125+
fn guard_mut_tx(
1126+
&self,
1127+
tx: MutTxId,
1128+
opts: GuardTxOptions,
1129+
) -> (MutTxGuard<impl FnOnce(MutTxId) + '_>, TransactionOffset) {
1130+
let (offset_tx, offset_rx) = oneshot::channel();
1131+
let guard = scopeguard::guard(tx, |tx| {
1132+
if let Ok(Some((tx_offset, tx_data, tx_metrics_mut, reducer))) = self.relational_db.commit_tx(tx) {
1133+
log::trace!("mutable tx committed with offset {tx_offset}");
1134+
let _ = offset_tx.send(tx_offset);
1135+
if let Some(extra) = opts.extra_tx_offset_sender {
1136+
let _ = extra.send(tx_offset);
1137+
}
1138+
self.relational_db
1139+
.report_tx_metrics(reducer, Some(Arc::new(tx_data)), Some(tx_metrics_mut), None);
1140+
}
1141+
});
10601142
(guard, offset_rx)
10611143
}
10621144
}
@@ -1084,10 +1166,24 @@ impl GuardTxOptions {
10841166
tx_metrics_mut: tx_metrics_mut.into(),
10851167
}
10861168
}
1169+
1170+
fn from_mut(tx_data: TxData, tx_metrics_mut: TxMetrics) -> Self {
1171+
Self {
1172+
extra_tx_offset_sender: None,
1173+
tx_data: Some(Arc::new(tx_data)),
1174+
tx_metrics_mut: tx_metrics_mut.into(),
1175+
}
1176+
}
10871177
}
10881178

10891179
pub struct WriteConflict;
10901180

1181+
/// A [`ScopeGuard`] for [`TxId`]
1182+
type TxGuard<F> = ScopeGuard<TxId, F>;
1183+
1184+
/// A [`ScopeGuard`] for [`MutTxId`]
1185+
type MutTxGuard<F> = ScopeGuard<MutTxId, F>;
1186+
10911187
#[cfg(test)]
10921188
mod tests {
10931189
use super::{AssertTxFn, ModuleSubscriptions};

crates/core/src/subscription/module_subscription_manager.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@ use spacetimedb_client_api_messages::websocket::{
2222
use spacetimedb_data_structures::map::{Entry, IntMap};
2323
use spacetimedb_datastore::locking_tx_datastore::state_view::StateView;
2424
use spacetimedb_durability::TxOffset;
25+
use spacetimedb_expr::expr::CollectViews;
2526
use spacetimedb_lib::metrics::ExecutionMetrics;
2627
use spacetimedb_lib::{AlgebraicValue, ConnectionId, Identity, ProductValue};
27-
use spacetimedb_primitives::{ColId, IndexId, TableId};
28+
use spacetimedb_primitives::{ColId, IndexId, TableId, ViewDatabaseId};
2829
use spacetimedb_subscription::{JoinEdge, SubscriptionPlan, TableName};
2930
use std::collections::BTreeMap;
3031
use std::fmt::Debug;
@@ -53,6 +54,14 @@ pub struct Plan {
5354
plans: Vec<SubscriptionPlan>,
5455
}
5556

57+
impl CollectViews for Plan {
58+
fn collect_views(&self, views: &mut std::collections::HashSet<ViewDatabaseId>) {
59+
for plan in &self.plans {
60+
plan.collect_views(views);
61+
}
62+
}
63+
}
64+
5665
impl Plan {
5766
/// Create a new subscription plan to be cached
5867
pub fn new(plans: Vec<SubscriptionPlan>, hash: QueryHash, text: String) -> Self {

crates/core/src/subscription/query.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ use crate::sql::compiler::compile_sql;
55
use crate::subscription::subscription::SupportedQuery;
66
use once_cell::sync::Lazy;
77
use regex::Regex;
8+
use spacetimedb_datastore::locking_tx_datastore::state_view::StateView;
9+
use spacetimedb_execution::Datastore;
810
use spacetimedb_lib::identity::AuthCtx;
911
use spacetimedb_subscription::SubscriptionPlan;
1012
use spacetimedb_vm::expr::{self, Crud, CrudExpr, QueryExpr};
@@ -93,7 +95,7 @@ pub fn compile_read_only_query(auth: &AuthCtx, tx: &Tx, input: &str) -> Result<P
9395

9496
/// Compile a string into a single read-only query.
9597
/// This returns an error if the string has multiple queries or mutations.
96-
pub fn compile_query_with_hashes(
98+
pub fn compile_query_with_hashes<Tx: Datastore + StateView>(
9799
auth: &AuthCtx,
98100
tx: &Tx,
99101
input: &str,

0 commit comments

Comments
 (0)