Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion crates/client-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ impl Host {
auth,
Some(&module_host.info.subscriptions),
Some(&module_host),
auth.caller,
&mut header,
)
.await
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use spacetimedb_table::table::ReadViaBsatnError;
use thiserror::Error;

use crate::client::ClientActorId;
use crate::host::module_host::ViewCallError;
use crate::host::scheduler::ScheduleError;
use spacetimedb_lib::buffer::DecodeError;
use spacetimedb_primitives::*;
Expand Down Expand Up @@ -147,6 +148,8 @@ pub enum DBError {
RestoreSnapshot(#[from] RestoreSnapshotError),
#[error(transparent)]
DurabilityGone(#[from] DurabilityExited),
#[error(transparent)]
View(#[from] ViewCallError),
}

impl DBError {
Expand Down
31 changes: 28 additions & 3 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,27 @@ use spacetimedb_data_structures::error_stream::ErrorStream;
use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
use spacetimedb_datastore::error::DatastoreError;
use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType};
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId};
use spacetimedb_datastore::system_tables::{ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID};
use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData};
use spacetimedb_durability::DurableOffset;
use spacetimedb_execution::pipelined::{PipelinedProject, ViewProject};
use spacetimedb_expr::expr::CollectViews;
use spacetimedb_lib::db::raw_def::v9::Lifecycle;
use spacetimedb_lib::identity::{AuthCtx, RequestId};
use spacetimedb_lib::metrics::ExecutionMetrics;
use spacetimedb_lib::ConnectionId;
use spacetimedb_lib::Timestamp;
use spacetimedb_primitives::{ProcedureId, TableId, ViewDatabaseId, ViewId};
use spacetimedb_primitives::{ArgId, ProcedureId, TableId, ViewDatabaseId, ViewId};
use spacetimedb_query::compile_subscription;
use spacetimedb_sats::{AlgebraicTypeRef, ProductValue};
use spacetimedb_schema::auto_migrate::{AutoMigrateError, MigrationPolicy};
use spacetimedb_schema::def::deserialize::ArgsSeed;
use spacetimedb_schema::def::{ModuleDef, ProcedureDef, ReducerDef, TableDef, ViewDef};
use spacetimedb_schema::schema::{Schema, TableSchema};
use spacetimedb_vm::relation::RelValue;
use std::collections::VecDeque;
use std::collections::{HashSet, VecDeque};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't use HashSet from libstd.

use std::fmt;
use std::future::Future;
use std::sync::atomic::AtomicBool;
Expand Down Expand Up @@ -1487,6 +1489,29 @@ impl ModuleHost {
.await?
}

/// Downgrade this mutable `tx` after:
/// 1. Collecting view ids from `view_collector` and
/// 2. Materializing them if necessary
pub async fn materialize_views_and_downgrade_tx(
&self,
mut tx: MutTxId,
view_collector: &impl CollectViews,
sender: Identity,
workload: Workload,
) -> Result<(TxData, TxMetrics, TxId), ViewCallError> {
use FunctionArgs::*;
let mut view_ids = HashSet::new();
view_collector.collect_views(&mut view_ids);
for view_id in view_ids {
let name = tx.lookup_st_view(view_id)?.view_name;
if !tx.is_view_materialized(view_id, ArgId::SENTINEL, sender)? {
tx = self.call_view(tx, &name, Nullary, sender, None).await?.tx;
}
tx.st_view_sub_update_or_insert_last_called(view_id, ArgId::SENTINEL, sender)?;
}
Ok(tx.commit_downgrade(workload))
}

pub async fn call_view(
&self,
tx: MutTxId,
Expand Down
59 changes: 15 additions & 44 deletions crates/core/src/sql/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use spacetimedb_datastore::traits::IsolationLevel;
use spacetimedb_expr::statement::Statement;
use spacetimedb_lib::identity::AuthCtx;
use spacetimedb_lib::metrics::ExecutionMetrics;
use spacetimedb_lib::Timestamp;
use spacetimedb_lib::{AlgebraicType, ProductType, ProductValue};
use spacetimedb_lib::{Identity, Timestamp};
use spacetimedb_query::{compile_sql_stmt, execute_dml_stmt, execute_select_stmt};
use spacetimedb_schema::relation::FieldName;
use spacetimedb_vm::eval::run_ast;
Expand Down Expand Up @@ -192,50 +192,26 @@ pub async fn run(
auth: AuthCtx,
subs: Option<&ModuleSubscriptions>,
module: Option<&ModuleHost>,
caller_identity: Identity,
head: &mut Vec<(Box<str>, AlgebraicType)>,
) -> Result<SqlResult, DBError> {
let module = module
.as_ref()
.ok_or_else(|| anyhow!("Cannot execute views without module context"))?;

let mut metrics = ExecutionMetrics::default();

// We parse the sql statement in a mutable transaction.
// If it turns out to be a query, we downgrade the tx.
let (mut tx, stmt) = db.with_auto_rollback(db.begin_mut_tx(IsolationLevel::Serializable, Workload::Sql), |tx| {
let (tx, stmt) = db.with_auto_rollback(db.begin_mut_tx(IsolationLevel::Serializable, Workload::Sql), |tx| {
compile_sql_stmt(sql_text, &SchemaViewer::new(tx, &auth), &auth)
})?;

let mut metrics = ExecutionMetrics::default();

for (view_name, args) in stmt.views() {
let (is_materialized, args) = tx
.is_materialized(view_name, args, caller_identity)
.map_err(|e| DBError::Other(anyhow!("Failed to check memoized view: {e}")))?;

// Skip if already memoized
if is_materialized {
continue;
}

let module = module
.as_ref()
.ok_or_else(|| anyhow!("Cannot execute view `{view_name}` without module context"))?;

let res = module
.call_view(
tx,
view_name,
crate::host::FunctionArgs::Bsatn(args),
caller_identity,
None,
)
.await
.map_err(|e| DBError::Other(anyhow!("Failed to execute view `{view_name}`: {e}")))?;

tx = res.tx;
}

match stmt {
Statement::Select(stmt) => {
// Up to this point, the tx has been read-only,
// and hence there are no deltas to process.
let (tx_data, tx_metrics_mut, tx) = tx.commit_downgrade(Workload::Sql);
// Materialize views and downgrade to a read-only transaction
let (tx_data, tx_metrics_mut, tx) = module
.materialize_views_and_downgrade_tx(tx, &stmt, auth.caller, Workload::Sql)
.await?;

let (tx_offset_send, tx_offset) = oneshot::channel();
// Release the tx on drop, so that we record metrics
Expand Down Expand Up @@ -397,7 +373,6 @@ pub(crate) mod tests {
AuthCtx::for_testing(),
Some(&subs),
None,
Identity::ZERO,
&mut vec![],
))
.map(|x| x.rows)
Expand Down Expand Up @@ -550,7 +525,7 @@ pub(crate) mod tests {
expected: impl IntoIterator<Item = ProductValue>,
) {
assert_eq!(
run(db, sql, *auth, None, None, Identity::ZERO, &mut vec![])
run(db, sql, *auth, None, None, &mut vec![])
.await
.unwrap()
.rows
Expand Down Expand Up @@ -1505,9 +1480,7 @@ pub(crate) mod tests {

let rt = db.runtime().expect("runtime should be there");

let run = |db, sql, auth, subs, mut tmp_vec| {
rt.block_on(run(db, sql, auth, subs, None, Identity::ZERO, &mut tmp_vec))
};
let run = |db, sql, auth, subs, mut tmp_vec| rt.block_on(run(db, sql, auth, subs, None, &mut tmp_vec));
// No row limit, both queries pass.
assert!(run(&db, "SELECT * FROM T", internal_auth, None, tmp_vec.clone()).is_ok());
assert!(run(&db, "SELECT * FROM T", external_auth, None, tmp_vec.clone()).is_ok());
Expand Down Expand Up @@ -1557,9 +1530,7 @@ pub(crate) mod tests {
let internal_auth = AuthCtx::new(server, server);

let tmp_vec = Vec::new();
let run = |db, sql, auth, subs, mut tmp_vec| async move {
run(db, sql, auth, subs, None, Identity::ZERO, &mut tmp_vec).await
};
let run = |db, sql, auth, subs, mut tmp_vec| async move { run(db, sql, auth, subs, None, &mut tmp_vec).await };

let check = |db, sql, auth, metrics: ExecutionMetrics| {
let result = rt.block_on(run(db, sql, auth, None, tmp_vec.clone()))?;
Expand Down
24 changes: 2 additions & 22 deletions crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2163,38 +2163,19 @@ mod tests {
auth,
Some(&subs),
None,
Identity::ZERO,
&mut vec![],
)
.await?;

// Client should receive insert
assert_tx_update_for_table(rx.recv(), t_id, &schema, [product![0_u8, 1_u8]], []).await;

run(
&db,
"UPDATE t SET y=2 WHERE x=0",
auth,
Some(&subs),
None,
Identity::ZERO,
&mut vec![],
)
.await?;
run(&db, "UPDATE t SET y=2 WHERE x=0", auth, Some(&subs), None, &mut vec![]).await?;

// Client should receive update
assert_tx_update_for_table(rx.recv(), t_id, &schema, [product![0_u8, 2_u8]], [product![0_u8, 1_u8]]).await;

run(
&db,
"DELETE FROM t WHERE x=0",
auth,
Some(&subs),
None,
Identity::ZERO,
&mut vec![],
)
.await?;
run(&db, "DELETE FROM t WHERE x=0", auth, Some(&subs), None, &mut vec![]).await?;

// Client should receive delete
assert_tx_update_for_table(rx.recv(), t_id, &schema, [], [product![0_u8, 2_u8]]).await;
Expand Down Expand Up @@ -3046,7 +3027,6 @@ mod tests {
auth,
Some(&subs),
None,
Identity::ZERO,
&mut vec![],
)
.await?;
Expand Down
62 changes: 60 additions & 2 deletions crates/expr/src/expr.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,28 @@
use std::sync::Arc;
use std::{collections::HashSet, sync::Arc};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


use spacetimedb_lib::{query::Delta, AlgebraicType, AlgebraicValue};
use spacetimedb_primitives::TableId;
use spacetimedb_primitives::{TableId, ViewDatabaseId};
use spacetimedb_schema::schema::TableOrViewSchema;
use spacetimedb_sql_parser::ast::{BinOp, LogOp};

pub trait CollectViews {
fn collect_views(&self, views: &mut HashSet<ViewDatabaseId>);
}

impl<T: CollectViews> CollectViews for Arc<T> {
fn collect_views(&self, views: &mut HashSet<ViewDatabaseId>) {
self.as_ref().collect_views(views);
}
}

impl<T: CollectViews> CollectViews for Vec<T> {
fn collect_views(&self, views: &mut HashSet<ViewDatabaseId>) {
for item in self {
item.collect_views(views);
}
}
}

/// A projection is the root of any relational expression.
/// This type represents a projection that returns relvars.
///
Expand All @@ -25,6 +43,14 @@ pub enum ProjectName {
Some(RelExpr, Box<str>),
}

impl CollectViews for ProjectName {
fn collect_views(&self, views: &mut HashSet<ViewDatabaseId>) {
match self {
Self::None(expr) | Self::Some(expr, _) => expr.collect_views(views),
}
}
}

impl ProjectName {
/// Unwrap the outer projection, returning the inner expression
pub fn unwrap(self) -> RelExpr {
Expand Down Expand Up @@ -146,6 +172,26 @@ pub enum AggType {
Count,
}

impl CollectViews for ProjectList {
fn collect_views(&self, views: &mut HashSet<ViewDatabaseId>) {
match self {
Self::Limit(proj, _) => {
proj.collect_views(views);
}
Self::Name(exprs) => {
for expr in exprs {
expr.collect_views(views);
}
}
Self::List(exprs, _) | Self::Agg(exprs, ..) => {
for expr in exprs {
expr.collect_views(views);
}
}
}
}
}

impl ProjectList {
/// Does this expression project a single relvar?
/// If so, we return it's [`TableOrViewSchema`].
Expand Down Expand Up @@ -212,6 +258,18 @@ pub struct Relvar {
pub delta: Option<Delta>,
}

impl CollectViews for RelExpr {
fn collect_views(&self, views: &mut HashSet<ViewDatabaseId>) {
self.visit(&mut |expr| {
if let Self::RelVar(Relvar { schema, .. }) = expr {
if let Some(info) = &schema.view_info {
views.insert(info.view_id);
}
}
});
}
}

impl RelExpr {
/// Walk the expression tree and call `f` on each node
pub fn visit(&self, f: &mut impl FnMut(&Self)) {
Expand Down
8 changes: 0 additions & 8 deletions crates/expr/src/statement.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::sync::Arc;

use bytes::Bytes;
use spacetimedb_lib::{identity::AuthCtx, st_var::StVarValue, AlgebraicType, AlgebraicValue, ProductValue};
use spacetimedb_primitives::{ColId, TableId};
use spacetimedb_schema::schema::{ColumnSchema, TableOrViewSchema};
Expand Down Expand Up @@ -32,13 +31,6 @@ pub enum Statement {
DML(DML),
}

impl Statement {
pub fn views(&self) -> Vec<(&str, Bytes)> {
//TODO: implement view name extraction
vec![]
}
}

pub enum DML {
Insert(TableInsert),
Update(TableUpdate),
Expand Down
Loading
Loading