Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion crates/client-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ impl Host {
auth,
Some(&module_host.info.subscriptions),
Some(&module_host),
auth.caller,
&mut header,
)
.await
Expand Down
32 changes: 18 additions & 14 deletions crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -865,10 +865,11 @@ impl ClientConnection {
) -> Result<Option<ExecutionMetrics>, DBError> {
let me = self.clone();
self.module()
.on_module_thread("subscribe_single", move || {
me.module()
.subscriptions()
.add_single_subscription(me.sender, subscription, timer, None)
.on_module_thread_async("subscribe_single", async move || {
let host = me.module();
host.subscriptions()
.add_single_subscription(Some(&host), me.sender, subscription, timer, None)
.await
})
.await?
}
Expand All @@ -890,10 +891,11 @@ impl ClientConnection {
) -> Result<Option<ExecutionMetrics>, DBError> {
let me = self.clone();
self.module()
.on_module_thread("subscribe_multi", move || {
me.module()
.subscriptions()
.add_multi_subscription(me.sender, request, timer, None)
.on_module_thread_async("subscribe_multi", async move || {
let host = me.module();
host.subscriptions()
.add_multi_subscription(Some(&host), me.sender, request, timer, None)
.await
})
.await?
}
Expand All @@ -915,12 +917,14 @@ impl ClientConnection {

pub async fn subscribe(&self, subscription: Subscribe, timer: Instant) -> Result<ExecutionMetrics, DBError> {
let me = self.clone();
asyncify(move || {
me.module()
.subscriptions()
.add_legacy_subscriber(me.sender, subscription, timer, None)
})
.await
self.module()
.on_module_thread_async("subscribe", async move || {
let host = me.module();
host.subscriptions()
.add_legacy_subscriber(Some(&host), me.sender, subscription, timer, None)
.await
})
.await?
}

pub async fn one_off_query_json(
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use spacetimedb_table::table::ReadViaBsatnError;
use thiserror::Error;

use crate::client::ClientActorId;
use crate::host::module_host::ViewCallError;
use crate::host::scheduler::ScheduleError;
use spacetimedb_lib::buffer::DecodeError;
use spacetimedb_primitives::*;
Expand Down Expand Up @@ -147,6 +148,8 @@ pub enum DBError {
RestoreSnapshot(#[from] RestoreSnapshotError),
#[error(transparent)]
DurabilityGone(#[from] DurabilityExited),
#[error(transparent)]
View(#[from] ViewCallError),
}

impl DBError {
Expand Down
40 changes: 37 additions & 3 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,21 @@ use spacetimedb_datastore::system_tables::{ST_CLIENT_ID, ST_CONNECTION_CREDENTIA
use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData};
use spacetimedb_durability::DurableOffset;
use spacetimedb_execution::pipelined::{PipelinedProject, ViewProject};
use spacetimedb_expr::expr::CollectViews;
use spacetimedb_lib::db::raw_def::v9::Lifecycle;
use spacetimedb_lib::identity::{AuthCtx, RequestId};
use spacetimedb_lib::metrics::ExecutionMetrics;
use spacetimedb_lib::ConnectionId;
use spacetimedb_lib::Timestamp;
use spacetimedb_primitives::{ProcedureId, TableId, ViewDatabaseId, ViewId};
use spacetimedb_primitives::{ArgId, ProcedureId, TableId, ViewDatabaseId, ViewId};
use spacetimedb_query::compile_subscription;
use spacetimedb_sats::{AlgebraicTypeRef, ProductValue};
use spacetimedb_schema::auto_migrate::{AutoMigrateError, MigrationPolicy};
use spacetimedb_schema::def::deserialize::ArgsSeed;
use spacetimedb_schema::def::{ModuleDef, ProcedureDef, ReducerDef, TableDef, ViewDef};
use spacetimedb_schema::schema::{Schema, TableSchema};
use spacetimedb_vm::relation::RelValue;
use std::collections::VecDeque;
use std::collections::{HashSet, VecDeque};
use std::fmt;
use std::future::Future;
use std::sync::atomic::AtomicBool;
Expand Down Expand Up @@ -1118,7 +1119,7 @@ impl ModuleHost {
// Decrement the number of subscribers for each view this caller is subscribed to
let dec_view_subscribers = |tx: &mut MutTxId| {
if drop_view_subscribers {
if let Err(err) = tx.dec_st_view_subscribers(caller_identity) {
if let Err(err) = tx.unsubscribe_views(caller_identity) {
log::error!("`call_identity_disconnected`: failed to delete client view data: {err}");
}
}
Expand Down Expand Up @@ -1487,6 +1488,39 @@ impl ModuleHost {
.await?
}

/// Materializes the views return by the `view_collector`, if not already materialized,
/// and updates `st_view_sub` accordingly.
///
/// Passing [`Workload::Sql`] will update `st_view_sub.last_called`.
/// Passing [`Workload::Subscribe`] will also increment `st_view_sub.num_subscribers`,
/// in addition to updating `st_view_sub.last_called`.
pub async fn materialize_views(
&self,
mut tx: MutTxId,
view_collector: &impl CollectViews,
sender: Identity,
workload: Workload,
) -> Result<MutTxId, ViewCallError> {
use FunctionArgs::*;
let mut view_ids = HashSet::new();
view_collector.collect_views(&mut view_ids);
for view_id in view_ids {
let name = tx.lookup_st_view(view_id)?.view_name;
if !tx.is_view_materialized(view_id, ArgId::SENTINEL, sender)? {
tx = self.call_view(tx, &name, Nullary, sender, None).await?.tx;
}
// If this is a sql call, we only update this view's "last called" timestamp
if let Workload::Sql = workload {
tx.update_view_timestamp(view_id, ArgId::SENTINEL, sender)?;
}
// If this is a subscribe call, we also increment this view's subscriber count
if let Workload::Subscribe = workload {
tx.subscribe_view(view_id, ArgId::SENTINEL, sender)?;
}
}
Ok(tx)
}

pub async fn call_view(
&self,
tx: MutTxId,
Expand Down
52 changes: 11 additions & 41 deletions crates/core/src/sql/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use spacetimedb_datastore::traits::IsolationLevel;
use spacetimedb_expr::statement::Statement;
use spacetimedb_lib::identity::AuthCtx;
use spacetimedb_lib::metrics::ExecutionMetrics;
use spacetimedb_lib::Timestamp;
use spacetimedb_lib::{AlgebraicType, ProductType, ProductValue};
use spacetimedb_lib::{Identity, Timestamp};
use spacetimedb_query::{compile_sql_stmt, execute_dml_stmt, execute_select_stmt};
use spacetimedb_schema::relation::FieldName;
use spacetimedb_vm::eval::run_ast;
Expand Down Expand Up @@ -192,49 +192,24 @@ pub async fn run(
auth: AuthCtx,
subs: Option<&ModuleSubscriptions>,
module: Option<&ModuleHost>,
caller_identity: Identity,
head: &mut Vec<(Box<str>, AlgebraicType)>,
) -> Result<SqlResult, DBError> {
// We parse the sql statement in a mutable transaction.
// If it turns out to be a query, we downgrade the tx.
let (mut tx, stmt) = db.with_auto_rollback(db.begin_mut_tx(IsolationLevel::Serializable, Workload::Sql), |tx| {
let (tx, stmt) = db.with_auto_rollback(db.begin_mut_tx(IsolationLevel::Serializable, Workload::Sql), |tx| {
compile_sql_stmt(sql_text, &SchemaViewer::new(tx, &auth), &auth)
})?;

let mut metrics = ExecutionMetrics::default();

for (view_name, args) in stmt.views() {
let (is_materialized, args) = tx
.is_materialized(view_name, args, caller_identity)
.map_err(|e| DBError::Other(anyhow!("Failed to check memoized view: {e}")))?;

// Skip if already memoized
if is_materialized {
continue;
}

let module = module
.as_ref()
.ok_or_else(|| anyhow!("Cannot execute view `{view_name}` without module context"))?;

let res = module
.call_view(
tx,
view_name,
crate::host::FunctionArgs::Bsatn(args),
caller_identity,
None,
)
.await
.map_err(|e| DBError::Other(anyhow!("Failed to execute view `{view_name}`: {e}")))?;

tx = res.tx;
}

match stmt {
Statement::Select(stmt) => {
// Up to this point, the tx has been read-only,
// and hence there are no deltas to process.
// Materialize views and downgrade to a read-only transaction
let tx = match module {
Some(module) => module.materialize_views(tx, &stmt, auth.caller, Workload::Sql).await?,
None => tx,
};

let (tx_data, tx_metrics_mut, tx) = tx.commit_downgrade(Workload::Sql);

let (tx_offset_send, tx_offset) = oneshot::channel();
Expand Down Expand Up @@ -397,7 +372,6 @@ pub(crate) mod tests {
AuthCtx::for_testing(),
Some(&subs),
None,
Identity::ZERO,
&mut vec![],
))
.map(|x| x.rows)
Expand Down Expand Up @@ -550,7 +524,7 @@ pub(crate) mod tests {
expected: impl IntoIterator<Item = ProductValue>,
) {
assert_eq!(
run(db, sql, *auth, None, None, Identity::ZERO, &mut vec![])
run(db, sql, *auth, None, None, &mut vec![])
.await
.unwrap()
.rows
Expand Down Expand Up @@ -1505,9 +1479,7 @@ pub(crate) mod tests {

let rt = db.runtime().expect("runtime should be there");

let run = |db, sql, auth, subs, mut tmp_vec| {
rt.block_on(run(db, sql, auth, subs, None, Identity::ZERO, &mut tmp_vec))
};
let run = |db, sql, auth, subs, mut tmp_vec| rt.block_on(run(db, sql, auth, subs, None, &mut tmp_vec));
// No row limit, both queries pass.
assert!(run(&db, "SELECT * FROM T", internal_auth, None, tmp_vec.clone()).is_ok());
assert!(run(&db, "SELECT * FROM T", external_auth, None, tmp_vec.clone()).is_ok());
Expand Down Expand Up @@ -1557,9 +1529,7 @@ pub(crate) mod tests {
let internal_auth = AuthCtx::new(server, server);

let tmp_vec = Vec::new();
let run = |db, sql, auth, subs, mut tmp_vec| async move {
run(db, sql, auth, subs, None, Identity::ZERO, &mut tmp_vec).await
};
let run = |db, sql, auth, subs, mut tmp_vec| async move { run(db, sql, auth, subs, None, &mut tmp_vec).await };

let check = |db, sql, auth, metrics: ExecutionMetrics| {
let result = rt.block_on(run(db, sql, auth, None, tmp_vec.clone()))?;
Expand Down
Loading
Loading