Skip to content
18 changes: 12 additions & 6 deletions crates/client-api-messages/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,13 +622,19 @@ pub struct TableUpdate<F: WebsocketFormat> {
pub updates: SmallVec<[F::QueryUpdate; 1]>,
}

/// Computed update for a single query, annotated with the number of matching rows.
pub struct SingleQueryUpdate<F: WebsocketFormat> {
pub update: F::QueryUpdate,
pub num_rows: u64,
}

impl<F: WebsocketFormat> TableUpdate<F> {
pub fn new(table_id: TableId, table_name: Box<str>, (update, num_rows): (F::QueryUpdate, u64)) -> Self {
pub fn new(table_id: TableId, table_name: Box<str>, update: SingleQueryUpdate<F>) -> Self {
Self {
table_id,
table_name,
num_rows,
updates: [update].into(),
num_rows: update.num_rows,
updates: [update.update].into(),
}
}

Expand All @@ -641,9 +647,9 @@ impl<F: WebsocketFormat> TableUpdate<F> {
}
}

pub fn push(&mut self, (update, num_rows): (F::QueryUpdate, u64)) {
self.updates.push(update);
self.num_rows += num_rows;
pub fn push(&mut self, update: SingleQueryUpdate<F>) {
self.updates.push(update.update);
self.num_rows += update.num_rows;
}

pub fn num_rows(&self) -> usize {
Expand Down
5 changes: 4 additions & 1 deletion crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::messages::control_db::{Database, HostType};
use crate::module_host_context::ModuleCreationContext;
use crate::replica_context::ReplicaContext;
use crate::subscription::module_subscription_actor::ModuleSubscriptions;
use crate::subscription::module_subscription_manager::SubscriptionManager;
use crate::util::{asyncify, spawn_rayon};
use crate::worker_metrics::WORKER_METRICS;
use anyhow::{anyhow, ensure, Context};
Expand Down Expand Up @@ -525,7 +526,9 @@ async fn make_replica_ctx(
relational_db: Arc<RelationalDB>,
) -> anyhow::Result<ReplicaContext> {
let logger = tokio::task::block_in_place(move || Arc::new(DatabaseLogger::open_today(path.module_logs())));
let subscriptions = <_>::default();
let subscriptions = Arc::new(parking_lot::RwLock::new(SubscriptionManager::for_database(
database.database_identity,
)));
let downgraded = Arc::downgrade(&subscriptions);
let subscriptions = ModuleSubscriptions::new(relational_db.clone(), subscriptions, database.owner_identity);

Expand Down
65 changes: 34 additions & 31 deletions crates/core/src/host/instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,38 +532,41 @@ mod test {
Ok(DatabaseLogger::open(path))
}

/// An `InstanceEnv` requires `ModuleSubscriptions`
fn subscription_actor(relational_db: Arc<RelationalDB>) -> ModuleSubscriptions {
ModuleSubscriptions::new(relational_db, <_>::default(), Identity::ZERO)
}

/// An `InstanceEnv` requires a `ReplicaContext`.
/// For our purposes this is just a wrapper for `RelationalDB`.
fn replica_ctx(relational_db: Arc<RelationalDB>) -> Result<ReplicaContext> {
Ok(ReplicaContext {
database: Database {
id: 0,
database_identity: Identity::ZERO,
owner_identity: Identity::ZERO,
host_type: HostType::Wasm,
initial_program: Hash::ZERO,
fn replica_ctx(relational_db: Arc<RelationalDB>) -> Result<(ReplicaContext, tokio::runtime::Runtime)> {
let (subs, runtime) = ModuleSubscriptions::for_test_new_runtime(relational_db.clone());
Ok((
ReplicaContext {
database: Database {
id: 0,
database_identity: Identity::ZERO,
owner_identity: Identity::ZERO,
host_type: HostType::Wasm,
initial_program: Hash::ZERO,
},
replica_id: 0,
logger: Arc::new(temp_logger()?),
subscriptions: subs,
relational_db,
},
replica_id: 0,
logger: Arc::new(temp_logger()?),
subscriptions: subscription_actor(relational_db.clone()),
relational_db,
})
runtime,
))
}

/// An `InstanceEnv` used for testing the database syscalls.
fn instance_env(db: Arc<RelationalDB>) -> Result<InstanceEnv> {
fn instance_env(db: Arc<RelationalDB>) -> Result<(InstanceEnv, tokio::runtime::Runtime)> {
let (scheduler, _) = Scheduler::open(db.clone());
Ok(InstanceEnv {
replica_ctx: Arc::new(replica_ctx(db)?),
scheduler,
tx: TxSlot::default(),
start_time: Timestamp::now(),
})
let (replica_context, runtime) = replica_ctx(db)?;
Ok((
InstanceEnv {
replica_ctx: Arc::new(replica_context),
scheduler,
tx: TxSlot::default(),
start_time: Timestamp::now(),
},
runtime,
))
}

/// An in-memory `RelationalDB` for testing.
Expand Down Expand Up @@ -662,7 +665,7 @@ mod test {
#[test]
fn table_scan_metrics() -> Result<()> {
let db = relational_db()?;
let env = instance_env(db.clone())?;
let (env, _runtime) = instance_env(db.clone())?;

let (table_id, _) = create_table_with_index(&db)?;

Expand Down Expand Up @@ -694,7 +697,7 @@ mod test {
#[test]
fn index_scan_metrics() -> Result<()> {
let db = relational_db()?;
let env = instance_env(db.clone())?;
let (env, _runtime) = instance_env(db.clone())?;

let (_, index_id) = create_table_with_index(&db)?;

Expand Down Expand Up @@ -746,7 +749,7 @@ mod test {
#[test]
fn insert_metrics() -> Result<()> {
let db = relational_db()?;
let env = instance_env(db.clone())?;
let (env, _runtime) = instance_env(db.clone())?;

let (table_id, _) = create_table_with_index(&db)?;

Expand Down Expand Up @@ -783,7 +786,7 @@ mod test {
#[test]
fn update_metrics() -> Result<()> {
let db = relational_db()?;
let env = instance_env(db.clone())?;
let (env, _runtime) = instance_env(db.clone())?;

let (table_id, index_id) = create_table_with_unique_index(&db)?;

Expand All @@ -810,7 +813,7 @@ mod test {
#[test]
fn delete_by_index_metrics() -> Result<()> {
let db = relational_db()?;
let env = instance_env(db.clone())?;
let (env, _runtime) = instance_env(db.clone())?;

let (_, index_id) = create_table_with_index(&db)?;

Expand Down Expand Up @@ -838,7 +841,7 @@ mod test {
#[test]
fn delete_by_value_metrics() -> Result<()> {
let db = relational_db()?;
let env = instance_env(db.clone())?;
let (env, _runtime) = instance_env(db.clone())?;

let (table_id, _) = create_table_with_index(&db)?;

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
let (event, _) = match self
.info
.subscriptions
.commit_and_broadcast_event(client.as_deref(), event, tx)
.commit_and_broadcast_event(client, event, tx)
.unwrap()
{
Ok(ev) => ev,
Expand Down
7 changes: 4 additions & 3 deletions crates/core/src/sql/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,8 @@ pub fn translate_col(tx: &Tx, field: FieldName) -> Option<Box<str>> {

#[cfg(test)]
pub(crate) mod tests {
use std::sync::Arc;

use super::*;
use crate::db::datastore::system_tables::{
StRowLevelSecurityRow, StTableFields, ST_ROW_LEVEL_SECURITY_ID, ST_TABLE_ID, ST_TABLE_NAME,
Expand All @@ -317,20 +319,19 @@ pub(crate) mod tests {
use spacetimedb_primitives::{col_list, ColId, TableId};
use spacetimedb_sats::{product, AlgebraicType, ArrayValue, ProductType};
use spacetimedb_vm::eval::test_helpers::create_game_data;
use std::sync::Arc;

pub(crate) fn execute_for_testing(
db: &RelationalDB,
sql_text: &str,
q: Vec<CrudExpr>,
) -> Result<Vec<MemTable>, DBError> {
let subs = ModuleSubscriptions::new(Arc::new(db.clone()), <_>::default(), Identity::ZERO);
let (subs, _runtime) = ModuleSubscriptions::for_test_new_runtime(Arc::new(db.clone()));
execute_sql(db, sql_text, q, AuthCtx::for_testing(), Some(&subs))
}

/// Short-cut for simplify test execution
pub(crate) fn run_for_testing(db: &RelationalDB, sql_text: &str) -> Result<Vec<ProductValue>, DBError> {
let subs = ModuleSubscriptions::new(Arc::new(db.clone()), <_>::default(), Identity::ZERO);
let (subs, _runtime) = ModuleSubscriptions::for_test_new_runtime(Arc::new(db.clone()));
run(db, sql_text, AuthCtx::for_testing(), Some(&subs), &mut vec![]).map(|x| x.rows)
}

Expand Down
10 changes: 8 additions & 2 deletions crates/core/src/subscription/execution_unit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use crate::host::module_host::{DatabaseTableUpdate, DatabaseTableUpdateRelValue,
use crate::messages::websocket::TableUpdate;
use crate::util::slow::SlowQueryLogger;
use crate::vm::{build_query, TxMode};
use spacetimedb_client_api_messages::websocket::{Compression, QueryUpdate, RowListLen as _, WebsocketFormat};
use spacetimedb_client_api_messages::websocket::{
Compression, QueryUpdate, RowListLen as _, SingleQueryUpdate, WebsocketFormat,
};
use spacetimedb_lib::db::error::AuthError;
use spacetimedb_lib::relation::DbTable;
use spacetimedb_lib::{Identity, ProductValue};
Expand Down Expand Up @@ -254,7 +256,11 @@ impl ExecutionUnit {
let deletes = F::List::default();
let qu = QueryUpdate { deletes, inserts };
let update = F::into_query_update(qu, compression);
TableUpdate::new(self.return_table(), self.return_name(), (update, num_rows))
TableUpdate::new(
self.return_table(),
self.return_name(),
SingleQueryUpdate { update, num_rows },
)
})
}

Expand Down
9 changes: 6 additions & 3 deletions crates/core/src/subscription/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use module_subscription_manager::Plan;
use prometheus::IntCounter;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use spacetimedb_client_api_messages::websocket::{
ByteListLen, Compression, DatabaseUpdate, QueryUpdate, TableUpdate, WebsocketFormat,
ByteListLen, Compression, DatabaseUpdate, QueryUpdate, SingleQueryUpdate, TableUpdate, WebsocketFormat,
};
use spacetimedb_execution::{pipelined::PipelinedProject, Datastore, DeltaStore};
use spacetimedb_lib::{metrics::ExecutionMetrics, Identity};
Expand Down Expand Up @@ -151,7 +151,10 @@ where
// after we release the tx lock.
// There's no need to compress the inner table update too.
let update = F::into_query_update(qu, Compression::None);
(TableUpdate::new(table_id, table_name, (update, num_rows)), metrics)
(
TableUpdate::new(table_id, table_name, SingleQueryUpdate { update, num_rows }),
metrics,
)
})
}

Expand Down Expand Up @@ -180,7 +183,7 @@ where
.clone()
.optimize()
.map(|plan| (sql, PipelinedProject::from(plan)))
.and_then(|(_, plan)| collect_table_update(&[plan], table_id, table_name.into(), tx, update_type))
.and_then(|(_, plan)| collect_table_update(&[plan], table_id, (&**table_name).into(), tx, update_type))
.map_err(|err| DBError::WithSql {
sql: sql.into(),
error: Box::new(DBError::Other(err)),
Expand Down
Loading
Loading