Skip to content

Commit 8168623

Browse files
Materialize views for sql http calls
1 parent 75c6e67 commit 8168623

File tree

10 files changed

+133
-18
lines changed

10 files changed

+133
-18
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/core/src/host/module_host.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use spacetimedb_lib::identity::{AuthCtx, RequestId};
4949
use spacetimedb_lib::metrics::ExecutionMetrics;
5050
use spacetimedb_lib::ConnectionId;
5151
use spacetimedb_lib::Timestamp;
52-
use spacetimedb_primitives::{ProcedureId, TableId, ViewDatabaseId, ViewId};
52+
use spacetimedb_primitives::{ProcedureId, TableId, ViewDatabaseId};
5353
use spacetimedb_query::compile_subscription;
5454
use spacetimedb_sats::{AlgebraicTypeRef, ProductValue};
5555
use spacetimedb_schema::auto_migrate::{AutoMigrateError, MigrationPolicy};
@@ -541,7 +541,7 @@ pub struct CallViewParams {
541541
pub timestamp: Timestamp,
542542
pub caller_identity: Identity,
543543
pub caller_connection_id: Option<ConnectionId>,
544-
pub view_id: ViewId,
544+
pub view_id: ViewDatabaseId,
545545
pub view_db_id: ViewDatabaseId,
546546
pub args: ArgsTuple,
547547

@@ -1531,7 +1531,7 @@ impl ModuleHost {
15311531
async fn call_view_inner(
15321532
&self,
15331533
tx: MutTxId,
1534-
view_id: ViewId,
1534+
view_id: ViewDatabaseId,
15351535
view_def: &ViewDef,
15361536
args: ArgsTuple,
15371537
caller_identity: Identity,

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use spacetimedb_lib::db::raw_def::v9::Lifecycle;
44
use spacetimedb_lib::de::DeserializeSeed as _;
55
use spacetimedb_primitives::ProcedureId;
66
use spacetimedb_primitives::ViewDatabaseId;
7-
use spacetimedb_primitives::ViewId;
87
use spacetimedb_schema::auto_migrate::{MigratePlan, MigrationPolicy, MigrationPolicyError};
98
use std::future::Future;
109
use std::sync::Arc;
@@ -992,7 +991,7 @@ fn commit_and_broadcast_event(
992991
/// Describes a view call in a cheaply shareable way.
993992
#[derive(Clone, Debug)]
994993
pub struct ViewOp<'a> {
995-
pub id: ViewId,
994+
pub id: ViewDatabaseId,
996995
pub db_id: ViewDatabaseId,
997996
pub name: &'a str,
998997
pub args: &'a ArgsTuple,
@@ -1003,7 +1002,7 @@ pub struct ViewOp<'a> {
10031002
/// Describes an anonymous view call in a cheaply shareable way.
10041003
#[derive(Clone, Debug)]
10051004
pub struct AnonymousViewOp<'a> {
1006-
pub id: ViewId,
1005+
pub id: ViewDatabaseId,
10071006
pub db_id: ViewDatabaseId,
10081007
pub name: &'a str,
10091008
pub args: &'a ArgsTuple,

crates/core/src/sql/execute.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,8 +233,9 @@ pub async fn run(
233233

234234
match stmt {
235235
Statement::Select(stmt) => {
236-
// Up to this point, the tx has been read-only,
237-
// and hence there are no deltas to process.
236+
// Materialize views before we downgrade to a read-only transaction
237+
tx.materialize_views(&stmt, auth.caller)?;
238+
238239
let (tx_data, tx_metrics_mut, tx) = tx.commit_downgrade(Workload::Sql);
239240

240241
let (tx_offset_send, tx_offset) = oneshot::channel();

crates/datastore/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ spacetimedb-schema.workspace = true
1919
spacetimedb-table.workspace = true
2020
spacetimedb-snapshot.workspace = true
2121
spacetimedb-execution.workspace = true
22+
spacetimedb-expr.workspace = true
2223

2324
anyhow = { workspace = true, features = ["backtrace"] }
2425
bytes.workspace = true

crates/datastore/src/locking_tx_datastore/mut_tx.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use smallvec::SmallVec;
3535
use spacetimedb_data_structures::map::{IntMap, IntSet};
3636
use spacetimedb_durability::TxOffset;
3737
use spacetimedb_execution::{dml::MutDatastore, Datastore, DeltaStore, Row};
38+
use spacetimedb_expr::expr::CollectViews;
3839
use spacetimedb_lib::{bsatn::ToBsatn as _, db::raw_def::v9::RawSql, metrics::ExecutionMetrics, Timestamp};
3940
use spacetimedb_lib::{
4041
db::{auth::StAccess, raw_def::SEQUENCE_ALLOCATION_STEP},
@@ -65,7 +66,7 @@ use spacetimedb_table::{
6566
table_index::TableIndex,
6667
};
6768
use std::{
68-
collections::HashMap,
69+
collections::{HashMap, HashSet},
6970
sync::Arc,
7071
time::{Duration, Instant},
7172
};
@@ -1849,6 +1850,19 @@ impl<'a, I: Iterator<Item = RowRef<'a>>> Iterator for FilterDeleted<'a, I> {
18491850
}
18501851

18511852
impl MutTxId {
1853+
/// Materialize views for `sender`, collected from `view_collector`.
1854+
pub fn materialize_views(&mut self, view_collector: &impl CollectViews, sender: Identity) -> Result<()> {
1855+
let mut view_ids = HashSet::new();
1856+
view_collector.collect_views(&mut view_ids);
1857+
for view_id in view_ids {
1858+
if !self.is_view_materialized(view_id, ArgId::SENTINEL, sender)? {
1859+
// TODO: __call_view__
1860+
}
1861+
self.st_view_sub_update_or_insert_last_called(view_id, ArgId::SENTINEL, sender)?;
1862+
}
1863+
Ok(())
1864+
}
1865+
18521866
/// Does this caller have an entry for `view_id` in `st_view_sub`?
18531867
pub fn is_view_materialized(&self, view_id: ViewDatabaseId, arg_id: ArgId, sender: Identity) -> Result<bool> {
18541868
use StViewSubFields::*;

crates/expr/src/expr.rs

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,28 @@
1-
use std::sync::Arc;
1+
use std::{collections::HashSet, sync::Arc};
22

33
use spacetimedb_lib::{query::Delta, AlgebraicType, AlgebraicValue};
4-
use spacetimedb_primitives::TableId;
4+
use spacetimedb_primitives::{TableId, ViewDatabaseId};
55
use spacetimedb_schema::schema::TableOrViewSchema;
66
use spacetimedb_sql_parser::ast::{BinOp, LogOp};
77

8+
pub trait CollectViews {
9+
fn collect_views(&self, views: &mut HashSet<ViewDatabaseId>);
10+
}
11+
12+
impl<T: CollectViews> CollectViews for Arc<T> {
13+
fn collect_views(&self, views: &mut HashSet<ViewDatabaseId>) {
14+
self.as_ref().collect_views(views);
15+
}
16+
}
17+
18+
impl<T: CollectViews> CollectViews for Vec<T> {
19+
fn collect_views(&self, views: &mut HashSet<ViewDatabaseId>) {
20+
for item in self {
21+
item.collect_views(views);
22+
}
23+
}
24+
}
25+
826
/// A projection is the root of any relational expression.
927
/// This type represents a projection that returns relvars.
1028
///
@@ -25,6 +43,14 @@ pub enum ProjectName {
2543
Some(RelExpr, Box<str>),
2644
}
2745

46+
impl CollectViews for ProjectName {
47+
fn collect_views(&self, views: &mut HashSet<ViewDatabaseId>) {
48+
match self {
49+
Self::None(expr) | Self::Some(expr, _) => expr.collect_views(views),
50+
}
51+
}
52+
}
53+
2854
impl ProjectName {
2955
/// Unwrap the outer projection, returning the inner expression
3056
pub fn unwrap(self) -> RelExpr {
@@ -146,6 +172,26 @@ pub enum AggType {
146172
Count,
147173
}
148174

175+
impl CollectViews for ProjectList {
176+
fn collect_views(&self, views: &mut HashSet<ViewDatabaseId>) {
177+
match self {
178+
Self::Limit(proj, _) => {
179+
proj.collect_views(views);
180+
}
181+
Self::Name(exprs) => {
182+
for expr in exprs {
183+
expr.collect_views(views);
184+
}
185+
}
186+
Self::List(exprs, _) | Self::Agg(exprs, ..) => {
187+
for expr in exprs {
188+
expr.collect_views(views);
189+
}
190+
}
191+
}
192+
}
193+
}
194+
149195
impl ProjectList {
150196
/// Does this expression project a single relvar?
151197
/// If so, we return it's [`TableOrViewSchema`].
@@ -212,6 +258,18 @@ pub struct Relvar {
212258
pub delta: Option<Delta>,
213259
}
214260

261+
impl CollectViews for RelExpr {
262+
fn collect_views(&self, views: &mut HashSet<ViewDatabaseId>) {
263+
self.visit(&mut |expr| {
264+
if let Self::RelVar(Relvar { schema, .. }) = expr {
265+
if let Some(info) = schema.view_info {
266+
views.insert(info.view_id);
267+
}
268+
}
269+
});
270+
}
271+
}
272+
215273
impl RelExpr {
216274
/// Walk the expression tree and call `f` on each node
217275
pub fn visit(&self, f: &mut impl FnMut(&Self)) {

crates/physical-plan/src/plan.rs

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
use std::{
22
borrow::Cow,
3+
collections::HashSet,
34
ops::{Bound, Deref, DerefMut},
45
sync::Arc,
56
};
67

78
use anyhow::{bail, Result};
89
use derive_more::From;
910
use either::Either;
10-
use spacetimedb_expr::{expr::AggType, StatementSource};
11+
use spacetimedb_expr::{
12+
expr::{AggType, CollectViews},
13+
StatementSource,
14+
};
1115
use spacetimedb_lib::{identity::AuthCtx, query::Delta, sats::size_of::SizeOf, AlgebraicValue, ProductValue};
12-
use spacetimedb_primitives::{ColId, ColSet, IndexId, TableId};
16+
use spacetimedb_primitives::{ColId, ColSet, IndexId, TableId, ViewDatabaseId};
1317
use spacetimedb_schema::schema::{IndexSchema, TableSchema};
1418
use spacetimedb_sql_parser::ast::{BinOp, LogOp};
1519
use spacetimedb_table::table::RowRef;
@@ -68,6 +72,14 @@ impl DerefMut for ProjectPlan {
6872
}
6973
}
7074

75+
impl CollectViews for ProjectPlan {
76+
fn collect_views(&self, views: &mut HashSet<ViewDatabaseId>) {
77+
match self {
78+
Self::None(plan) | Self::Name(plan, ..) => plan.collect_views(views),
79+
}
80+
}
81+
}
82+
7183
impl ProjectPlan {
7284
pub fn optimize(self, auth: &AuthCtx) -> Result<Self> {
7385
match self {
@@ -240,6 +252,29 @@ pub enum PhysicalPlan {
240252
Filter(Box<PhysicalPlan>, PhysicalExpr),
241253
}
242254

255+
impl CollectViews for PhysicalPlan {
256+
fn collect_views(&self, views: &mut HashSet<ViewDatabaseId>) {
257+
self.visit(&mut |plan| match plan {
258+
Self::TableScan(scan, _) => {
259+
if let Some(info) = scan.schema.view_info {
260+
views.insert(info.view_id);
261+
}
262+
}
263+
Self::IxScan(scan, _) => {
264+
if let Some(info) = scan.schema.view_info {
265+
views.insert(info.view_id);
266+
}
267+
}
268+
Self::IxJoin(join, _) => {
269+
if let Some(info) = join.rhs.view_info {
270+
views.insert(info.view_id);
271+
}
272+
}
273+
_ => {}
274+
});
275+
}
276+
}
277+
243278
impl PhysicalPlan {
244279
/// Walks the plan tree and calls `f` on every op
245280
pub fn visit(&self, f: &mut impl FnMut(&Self)) {

crates/schema/src/def.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use spacetimedb_lib::db::raw_def::v9::{
3737
RawUniqueConstraintDataV9, RawViewDefV9, TableAccess, TableType,
3838
};
3939
use spacetimedb_lib::{ProductType, RawModuleDef};
40-
use spacetimedb_primitives::{ColId, ColList, ColOrCols, ColSet, ProcedureId, ReducerId, TableId, ViewId};
40+
use spacetimedb_primitives::{ColId, ColList, ColOrCols, ColSet, ProcedureId, ReducerId, TableId, ViewDatabaseId};
4141
use spacetimedb_sats::{AlgebraicType, AlgebraicValue};
4242
use spacetimedb_sats::{AlgebraicTypeRef, Typespace};
4343

@@ -246,7 +246,7 @@ impl ModuleDef {
246246
}
247247

248248
/// Convenience method to look up a view, possibly by a string, returning its id as well.
249-
pub fn view_full<K: ?Sized + Hash + Equivalent<Identifier>>(&self, name: &K) -> Option<(ViewId, &ViewDef)> {
249+
pub fn view_full<K: ?Sized + Hash + Equivalent<Identifier>>(&self, name: &K) -> Option<(ViewDatabaseId, &ViewDef)> {
250250
// If the string IS a valid identifier, we can just look it up.
251251
self.views.get_full(name).map(|(idx, _, def)| (idx.into(), def))
252252
}
@@ -296,7 +296,7 @@ impl ModuleDef {
296296
}
297297

298298
/// Look up a view by its id, panicking if it doesn't exist.
299-
pub fn view_by_id(&self, id: ViewId) -> &ViewDef {
299+
pub fn view_by_id(&self, id: ViewDatabaseId) -> &ViewDef {
300300
&self.views[id.idx()]
301301
}
302302

crates/subscription/src/lib.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ use spacetimedb_execution::{
66
},
77
Datastore, DeltaStore, Row,
88
};
9-
use spacetimedb_expr::check::SchemaView;
9+
use spacetimedb_expr::{check::SchemaView, expr::CollectViews};
1010
use spacetimedb_lib::{identity::AuthCtx, metrics::ExecutionMetrics, query::Delta, AlgebraicValue};
1111
use spacetimedb_physical_plan::plan::{IxJoin, IxScan, Label, PhysicalPlan, ProjectPlan, Sarg, TableScan, TupleField};
12-
use spacetimedb_primitives::{ColId, ColList, IndexId, TableId};
12+
use spacetimedb_primitives::{ColId, ColList, IndexId, TableId, ViewDatabaseId};
1313
use spacetimedb_query::compile_subscription;
1414
use std::sync::Arc;
1515
use std::{collections::HashSet, ops::RangeBounds};
@@ -363,6 +363,12 @@ pub struct SubscriptionPlan {
363363
plan_opt: ProjectPlan,
364364
}
365365

366+
impl CollectViews for SubscriptionPlan {
367+
fn collect_views(&self, views: &mut HashSet<ViewDatabaseId>) {
368+
self.plan_opt.collect_views(views);
369+
}
370+
}
371+
366372
impl SubscriptionPlan {
367373
/// Is this a plan for a join?
368374
pub fn is_join(&self) -> bool {

0 commit comments

Comments
 (0)