Skip to content

Commit 657d86b

Browse files
actually execute views
1 parent b0a6942 commit 657d86b

File tree

9 files changed

+54
-90
lines changed

9 files changed

+54
-90
lines changed

crates/client-api/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ impl Host {
9191
auth,
9292
Some(&module_host.info.subscriptions),
9393
Some(&module_host),
94-
auth.caller,
9594
&mut header,
9695
)
9796
.await

crates/core/src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use spacetimedb_table::table::ReadViaBsatnError;
1515
use thiserror::Error;
1616

1717
use crate::client::ClientActorId;
18+
use crate::host::module_host::ViewCallError;
1819
use crate::host::scheduler::ScheduleError;
1920
use spacetimedb_lib::buffer::DecodeError;
2021
use spacetimedb_primitives::*;
@@ -147,6 +148,8 @@ pub enum DBError {
147148
RestoreSnapshot(#[from] RestoreSnapshotError),
148149
#[error(transparent)]
149150
DurabilityGone(#[from] DurabilityExited),
151+
#[error(transparent)]
152+
View(#[from] ViewCallError),
150153
}
151154

152155
impl DBError {

crates/core/src/host/module_host.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,25 +39,27 @@ use spacetimedb_data_structures::error_stream::ErrorStream;
3939
use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
4040
use spacetimedb_datastore::error::DatastoreError;
4141
use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType};
42-
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
42+
use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
43+
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId};
4344
use spacetimedb_datastore::system_tables::{ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID};
4445
use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData};
4546
use spacetimedb_durability::DurableOffset;
4647
use spacetimedb_execution::pipelined::{PipelinedProject, ViewProject};
48+
use spacetimedb_expr::expr::CollectViews;
4749
use spacetimedb_lib::db::raw_def::v9::Lifecycle;
4850
use spacetimedb_lib::identity::{AuthCtx, RequestId};
4951
use spacetimedb_lib::metrics::ExecutionMetrics;
5052
use spacetimedb_lib::ConnectionId;
5153
use spacetimedb_lib::Timestamp;
52-
use spacetimedb_primitives::{ProcedureId, TableId, ViewDatabaseId, ViewId};
54+
use spacetimedb_primitives::{ArgId, ProcedureId, TableId, ViewDatabaseId, ViewId};
5355
use spacetimedb_query::compile_subscription;
5456
use spacetimedb_sats::{AlgebraicTypeRef, ProductValue};
5557
use spacetimedb_schema::auto_migrate::{AutoMigrateError, MigrationPolicy};
5658
use spacetimedb_schema::def::deserialize::ArgsSeed;
5759
use spacetimedb_schema::def::{ModuleDef, ProcedureDef, ReducerDef, TableDef, ViewDef};
5860
use spacetimedb_schema::schema::{Schema, TableSchema};
5961
use spacetimedb_vm::relation::RelValue;
60-
use std::collections::VecDeque;
62+
use std::collections::{HashSet, VecDeque};
6163
use std::fmt;
6264
use std::future::Future;
6365
use std::sync::atomic::AtomicBool;
@@ -1487,6 +1489,29 @@ impl ModuleHost {
14871489
.await?
14881490
}
14891491

1492+
/// Downgrade this mutable `tx` after:
1493+
/// 1. Collecting view ids from `view_collector` and
1494+
/// 2. Materializing them if necessary
1495+
pub async fn materialize_views_and_downgrade_tx(
1496+
&self,
1497+
mut tx: MutTxId,
1498+
view_collector: &impl CollectViews,
1499+
sender: Identity,
1500+
workload: Workload,
1501+
) -> Result<(TxData, TxMetrics, TxId), ViewCallError> {
1502+
use FunctionArgs::*;
1503+
let mut view_ids = HashSet::new();
1504+
view_collector.collect_views(&mut view_ids);
1505+
for view_id in view_ids {
1506+
let name = tx.lookup_st_view(view_id)?.view_name;
1507+
if !tx.is_view_materialized(view_id, ArgId::SENTINEL, sender)? {
1508+
tx = self.call_view(tx, &name, Nullary, sender, None).await?.tx;
1509+
}
1510+
tx.st_view_sub_update_or_insert_last_called(view_id, ArgId::SENTINEL, sender)?;
1511+
}
1512+
Ok(tx.commit_downgrade(workload))
1513+
}
1514+
14901515
pub async fn call_view(
14911516
&self,
14921517
tx: MutTxId,

crates/core/src/sql/execute.rs

Lines changed: 15 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ use spacetimedb_datastore::traits::IsolationLevel;
2020
use spacetimedb_expr::statement::Statement;
2121
use spacetimedb_lib::identity::AuthCtx;
2222
use spacetimedb_lib::metrics::ExecutionMetrics;
23+
use spacetimedb_lib::Timestamp;
2324
use spacetimedb_lib::{AlgebraicType, ProductType, ProductValue};
24-
use spacetimedb_lib::{Identity, Timestamp};
2525
use spacetimedb_query::{compile_sql_stmt, execute_dml_stmt, execute_select_stmt};
2626
use spacetimedb_schema::relation::FieldName;
2727
use spacetimedb_vm::eval::run_ast;
@@ -192,51 +192,26 @@ pub async fn run(
192192
auth: AuthCtx,
193193
subs: Option<&ModuleSubscriptions>,
194194
module: Option<&ModuleHost>,
195-
caller_identity: Identity,
196195
head: &mut Vec<(Box<str>, AlgebraicType)>,
197196
) -> Result<SqlResult, DBError> {
197+
let module = module
198+
.as_ref()
199+
.ok_or_else(|| anyhow!("Cannot execute views without module context"))?;
200+
201+
let mut metrics = ExecutionMetrics::default();
202+
198203
// We parse the sql statement in a mutable transaction.
199204
// If it turns out to be a query, we downgrade the tx.
200-
let (mut tx, stmt) = db.with_auto_rollback(db.begin_mut_tx(IsolationLevel::Serializable, Workload::Sql), |tx| {
205+
let (tx, stmt) = db.with_auto_rollback(db.begin_mut_tx(IsolationLevel::Serializable, Workload::Sql), |tx| {
201206
compile_sql_stmt(sql_text, &SchemaViewer::new(tx, &auth), &auth)
202207
})?;
203208

204-
let mut metrics = ExecutionMetrics::default();
205-
206-
for (view_name, args) in stmt.views() {
207-
let (is_materialized, args) = tx
208-
.is_materialized(view_name, args, caller_identity)
209-
.map_err(|e| DBError::Other(anyhow!("Failed to check memoized view: {e}")))?;
210-
211-
// Skip if already memoized
212-
if is_materialized {
213-
continue;
214-
}
215-
216-
let module = module
217-
.as_ref()
218-
.ok_or_else(|| anyhow!("Cannot execute view `{view_name}` without module context"))?;
219-
220-
let res = module
221-
.call_view(
222-
tx,
223-
view_name,
224-
crate::host::FunctionArgs::Bsatn(args),
225-
caller_identity,
226-
None,
227-
)
228-
.await
229-
.map_err(|e| DBError::Other(anyhow!("Failed to execute view `{view_name}`: {e}")))?;
230-
231-
tx = res.tx;
232-
}
233-
234209
match stmt {
235210
Statement::Select(stmt) => {
236-
// Materialize views before we downgrade to a read-only transaction
237-
tx.materialize_views(&stmt, auth.caller)?;
238-
239-
let (tx_data, tx_metrics_mut, tx) = tx.commit_downgrade(Workload::Sql);
211+
// Materialize views and downgrade to a read-only transaction
212+
let (tx_data, tx_metrics_mut, tx) = module
213+
.materialize_views_and_downgrade_tx(tx, &stmt, auth.caller, Workload::Sql)
214+
.await?;
240215

241216
let (tx_offset_send, tx_offset) = oneshot::channel();
242217
// Release the tx on drop, so that we record metrics
@@ -398,7 +373,6 @@ pub(crate) mod tests {
398373
AuthCtx::for_testing(),
399374
Some(&subs),
400375
None,
401-
Identity::ZERO,
402376
&mut vec![],
403377
))
404378
.map(|x| x.rows)
@@ -551,7 +525,7 @@ pub(crate) mod tests {
551525
expected: impl IntoIterator<Item = ProductValue>,
552526
) {
553527
assert_eq!(
554-
run(db, sql, *auth, None, None, Identity::ZERO, &mut vec![])
528+
run(db, sql, *auth, None, None, &mut vec![])
555529
.await
556530
.unwrap()
557531
.rows
@@ -1506,9 +1480,7 @@ pub(crate) mod tests {
15061480

15071481
let rt = db.runtime().expect("runtime should be there");
15081482

1509-
let run = |db, sql, auth, subs, mut tmp_vec| {
1510-
rt.block_on(run(db, sql, auth, subs, None, Identity::ZERO, &mut tmp_vec))
1511-
};
1483+
let run = |db, sql, auth, subs, mut tmp_vec| rt.block_on(run(db, sql, auth, subs, None, &mut tmp_vec));
15121484
// No row limit, both queries pass.
15131485
assert!(run(&db, "SELECT * FROM T", internal_auth, None, tmp_vec.clone()).is_ok());
15141486
assert!(run(&db, "SELECT * FROM T", external_auth, None, tmp_vec.clone()).is_ok());
@@ -1558,9 +1530,7 @@ pub(crate) mod tests {
15581530
let internal_auth = AuthCtx::new(server, server);
15591531

15601532
let tmp_vec = Vec::new();
1561-
let run = |db, sql, auth, subs, mut tmp_vec| async move {
1562-
run(db, sql, auth, subs, None, Identity::ZERO, &mut tmp_vec).await
1563-
};
1533+
let run = |db, sql, auth, subs, mut tmp_vec| async move { run(db, sql, auth, subs, None, &mut tmp_vec).await };
15641534

15651535
let check = |db, sql, auth, metrics: ExecutionMetrics| {
15661536
let result = rt.block_on(run(db, sql, auth, None, tmp_vec.clone()))?;

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2163,38 +2163,19 @@ mod tests {
21632163
auth,
21642164
Some(&subs),
21652165
None,
2166-
Identity::ZERO,
21672166
&mut vec![],
21682167
)
21692168
.await?;
21702169

21712170
// Client should receive insert
21722171
assert_tx_update_for_table(rx.recv(), t_id, &schema, [product![0_u8, 1_u8]], []).await;
21732172

2174-
run(
2175-
&db,
2176-
"UPDATE t SET y=2 WHERE x=0",
2177-
auth,
2178-
Some(&subs),
2179-
None,
2180-
Identity::ZERO,
2181-
&mut vec![],
2182-
)
2183-
.await?;
2173+
run(&db, "UPDATE t SET y=2 WHERE x=0", auth, Some(&subs), None, &mut vec![]).await?;
21842174

21852175
// Client should receive update
21862176
assert_tx_update_for_table(rx.recv(), t_id, &schema, [product![0_u8, 2_u8]], [product![0_u8, 1_u8]]).await;
21872177

2188-
run(
2189-
&db,
2190-
"DELETE FROM t WHERE x=0",
2191-
auth,
2192-
Some(&subs),
2193-
None,
2194-
Identity::ZERO,
2195-
&mut vec![],
2196-
)
2197-
.await?;
2178+
run(&db, "DELETE FROM t WHERE x=0", auth, Some(&subs), None, &mut vec![]).await?;
21982179

21992180
// Client should receive delete
22002181
assert_tx_update_for_table(rx.recv(), t_id, &schema, [], [product![0_u8, 2_u8]]).await;
@@ -3046,7 +3027,6 @@ mod tests {
30463027
auth,
30473028
Some(&subs),
30483029
None,
3049-
Identity::ZERO,
30503030
&mut vec![],
30513031
)
30523032
.await?;

crates/datastore/src/locking_tx_datastore/mut_tx.rs

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ use smallvec::SmallVec;
3535
use spacetimedb_data_structures::map::{IntMap, IntSet};
3636
use spacetimedb_durability::TxOffset;
3737
use spacetimedb_execution::{dml::MutDatastore, Datastore, DeltaStore, Row};
38-
use spacetimedb_expr::expr::CollectViews;
3938
use spacetimedb_lib::{bsatn::ToBsatn as _, db::raw_def::v9::RawSql, metrics::ExecutionMetrics, Timestamp};
4039
use spacetimedb_lib::{
4140
db::{auth::StAccess, raw_def::SEQUENCE_ALLOCATION_STEP},
@@ -66,7 +65,7 @@ use spacetimedb_table::{
6665
table_index::TableIndex,
6766
};
6867
use std::{
69-
collections::{HashMap, HashSet},
68+
collections::HashMap,
7069
sync::Arc,
7170
time::{Duration, Instant},
7271
};
@@ -1850,19 +1849,6 @@ impl<'a, I: Iterator<Item = RowRef<'a>>> Iterator for FilterDeleted<'a, I> {
18501849
}
18511850

18521851
impl MutTxId {
1853-
/// Materialize views for `sender`, collected from `view_collector`.
1854-
pub fn materialize_views(&mut self, view_collector: &impl CollectViews, sender: Identity) -> Result<()> {
1855-
let mut view_ids = HashSet::new();
1856-
view_collector.collect_views(&mut view_ids);
1857-
for view_id in view_ids {
1858-
if !self.is_view_materialized(view_id, ArgId::SENTINEL, sender)? {
1859-
// TODO: __call_view__
1860-
}
1861-
self.st_view_sub_update_or_insert_last_called(view_id, ArgId::SENTINEL, sender)?;
1862-
}
1863-
Ok(())
1864-
}
1865-
18661852
/// Does this caller have an entry for `view_id` in `st_view_sub`?
18671853
pub fn is_view_materialized(&self, view_id: ViewDatabaseId, arg_id: ArgId, sender: Identity) -> Result<bool> {
18681854
use StViewSubFields::*;

crates/expr/src/expr.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ impl CollectViews for RelExpr {
262262
fn collect_views(&self, views: &mut HashSet<ViewDatabaseId>) {
263263
self.visit(&mut |expr| {
264264
if let Self::RelVar(Relvar { schema, .. }) = expr {
265-
if let Some(info) = schema.view_info {
265+
if let Some(info) = &schema.view_info {
266266
views.insert(info.view_id);
267267
}
268268
}

crates/physical-plan/src/plan.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -256,17 +256,17 @@ impl CollectViews for PhysicalPlan {
256256
fn collect_views(&self, views: &mut HashSet<ViewDatabaseId>) {
257257
self.visit(&mut |plan| match plan {
258258
Self::TableScan(scan, _) => {
259-
if let Some(info) = scan.schema.view_info {
259+
if let Some(info) = &scan.schema.view_info {
260260
views.insert(info.view_id);
261261
}
262262
}
263263
Self::IxScan(scan, _) => {
264-
if let Some(info) = scan.schema.view_info {
264+
if let Some(info) = &scan.schema.view_info {
265265
views.insert(info.view_id);
266266
}
267267
}
268268
Self::IxJoin(join, _) => {
269-
if let Some(info) = join.rhs.view_info {
269+
if let Some(info) = &join.rhs.view_info {
270270
views.insert(info.view_id);
271271
}
272272
}

crates/schema/src/schema.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,7 @@ impl TableSchema {
273273
/// Will only be non-zero in the case of views.
274274
pub fn num_private_cols(&self) -> usize {
275275
self.view_info
276+
.as_ref()
276277
.map(|view_info| view_info.num_private_cols())
277278
.unwrap_or_default()
278279
}

0 commit comments

Comments
 (0)