Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/bench/benches/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ fn eval(c: &mut Criterion) {
let (plans, table_id, table_name, _) = compile_subscription(sql, schema_viewer, &auth).unwrap();
let plans = plans
.into_iter()
.map(|plan| plan.optimize().unwrap())
.map(|plan| plan.optimize(&auth).unwrap())
.map(PipelinedProject::from)
.collect::<Vec<_>>();
let tx = DeltaTx::from(&tx);
Expand Down
51 changes: 51 additions & 0 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2088,6 +2088,57 @@ pub mod tests_utils {
Ok((gen_cols, row_ref))
}

/// Allocate a backing table in the datastore for a view
pub fn create_view_for_test(
db: &RelationalDB,
name: &str,
schema: &[(&str, AlgebraicType)],
is_anonymous: bool,
) -> Result<TableId, DBError> {
let mut builder = RawModuleDefV9Builder::new();

// Add the view's product type to the typespace
let type_ref = builder.add_algebraic_type(
[],
name,
AlgebraicType::Product(ProductType::from_iter(schema.iter().cloned())),
true,
);

builder.add_view(
name,
true,
is_anonymous,
ProductType::unit(),
AlgebraicType::array(AlgebraicType::Ref(type_ref)),
);

let module_def: ModuleDef = builder.finish().try_into()?;
let view_def: &ViewDef = module_def.view(name).expect("view not found");

// Allocate a backing table and return its table id
db.with_auto_commit(Workload::Internal, |tx| db.create_view(tx, &module_def, view_def))
.map(|(_, table_id)| table_id)
}

/// Insert a row into a view's backing table
pub fn insert_into_view<'a>(
db: &'a RelationalDB,
tx: &'a mut MutTx,
table_id: TableId,
sender: Option<Identity>,
row: ProductValue,
) -> Result<RowRef<'a>, DBError> {
let meta_cols = match sender {
Some(identity) => vec![identity.into()],
None => vec![],
};
let cols = meta_cols.into_iter().chain(row.elements);
let row = ProductValue::from_iter(cols);
db.insert(tx, table_id, &to_vec(&row).unwrap())
.map(|(_, row_ref, _)| row_ref)
}

/// An in-memory commitlog used for tests that want to replay a known history.
pub struct TestHistory(commitlog::commitlog::Generic<commitlog::repo::Memory, Txdata>);

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/estimation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ mod tests {
.map(|(plans, ..)| plans)
.expect("failed to compile sql query")
.into_iter()
.map(|plan| plan.optimize().expect("failed to optimize sql query"))
.map(|plan| plan.optimize(&auth).expect("failed to optimize sql query"))
.map(|plan| row_estimate(&tx, &plan))
.sum()
}
Expand Down
25 changes: 22 additions & 3 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ use crate::module_host_context::ModuleCreationContext;
use crate::replica_context::ReplicaContext;
use crate::sql::ast::SchemaViewer;
use crate::sql::parser::RowLevelExpr;
use crate::subscription::execute_plan;
use crate::subscription::module_subscription_actor::ModuleSubscriptions;
use crate::subscription::tx::DeltaTx;
use crate::subscription::websocket_building::BuildableWebsocketFormat;
use crate::subscription::{execute_plan, execute_plan_for_view};
use crate::util::jobs::{SingleCoreExecutor, WeakSingleCoreExecutor};
use crate::vm::check_row_limit;
use crate::worker_metrics::WORKER_METRICS;
Expand All @@ -41,7 +41,7 @@ use spacetimedb_datastore::locking_tx_datastore::MutTxId;
use spacetimedb_datastore::system_tables::{ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID};
use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData};
use spacetimedb_durability::DurableOffset;
use spacetimedb_execution::pipelined::PipelinedProject;
use spacetimedb_execution::pipelined::{PipelinedProject, ViewProject};
use spacetimedb_lib::db::raw_def::v9::Lifecycle;
use spacetimedb_lib::identity::{AuthCtx, RequestId};
use spacetimedb_lib::metrics::ExecutionMetrics;
Expand Down Expand Up @@ -1495,7 +1495,7 @@ impl ModuleHost {
// Optimize each fragment
let optimized = plans
.into_iter()
.map(|plan| plan.optimize())
.map(|plan| plan.optimize(&auth))
.collect::<Result<Vec<_>, _>>()?;

check_row_limit(
Expand All @@ -1507,12 +1507,31 @@ impl ModuleHost {
&auth,
)?;

let return_table = || optimized.first().and_then(|plan| plan.return_table());

let returns_view_table = optimized.first().is_some_and(|plan| plan.returns_view_table());
let num_cols = return_table().map(|schema| schema.num_cols()).unwrap_or_default();
let num_private_cols = return_table()
.map(|schema| schema.num_private_cols())
.unwrap_or_default();

let optimized = optimized
.into_iter()
// Convert into something we can execute
.map(PipelinedProject::from)
.collect::<Vec<_>>();

if returns_view_table && num_private_cols > 0 {
let optimized = optimized
.into_iter()
.map(|plan| ViewProject::new(plan, num_cols, num_private_cols))
.collect::<Vec<_>>();
// Execute the union and return the results
return execute_plan_for_view::<_, F>(&optimized, &DeltaTx::from(&*tx))
.map(|(rows, _, metrics)| (OneOffTable { table_name, rows }, metrics))
.context("One-off queries are not allowed to modify the database");
}

// Execute the union and return the results
execute_plan::<_, F>(&optimized, &DeltaTx::from(&*tx))
.map(|(rows, _, metrics)| (OneOffTable { table_name, rows }, metrics))
Expand Down
158 changes: 155 additions & 3 deletions crates/core/src/sql/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ pub fn run(
});

// Evaluate the query
let rows = execute_select_stmt(stmt, &DeltaTx::from(&*tx), &mut metrics, |plan| {
let rows = execute_select_stmt(&auth, stmt, &DeltaTx::from(&*tx), &mut metrics, |plan| {
check_row_limit(
&[&plan],
db,
Expand All @@ -254,7 +254,7 @@ pub fn run(
}

// Evaluate the mutation
let (mut tx, _) = db.with_auto_rollback(tx, |tx| execute_dml_stmt(stmt, tx, &mut metrics))?;
let (mut tx, _) = db.with_auto_rollback(tx, |tx| execute_dml_stmt(&auth, stmt, tx, &mut metrics))?;

// Update transaction metrics
tx.metrics.merge(metrics);
Expand Down Expand Up @@ -332,7 +332,7 @@ pub(crate) mod tests {
use std::sync::Arc;

use super::*;
use crate::db::relational_db::tests_utils::{begin_tx, insert, with_auto_commit, TestDB};
use crate::db::relational_db::tests_utils::{self, begin_tx, insert, with_auto_commit, TestDB};
use crate::vm::tests::create_table_with_rows;
use itertools::Itertools;
use pretty_assertions::assert_eq;
Expand Down Expand Up @@ -904,6 +904,158 @@ pub(crate) mod tests {
Ok(())
}

#[test]
fn test_view() -> anyhow::Result<()> {
let db = TestDB::in_memory()?;

let schema = [("a", AlgebraicType::U8), ("b", AlgebraicType::U8)];
let table_id = tests_utils::create_view_for_test(&db, "my_view", &schema, false)?;

with_auto_commit(&db, |tx| -> Result<_, DBError> {
tests_utils::insert_into_view(&db, tx, table_id, Some(identity_from_u8(1)), product![0u8, 1u8])?;
tests_utils::insert_into_view(&db, tx, table_id, Some(identity_from_u8(2)), product![0u8, 2u8])?;
Ok(())
})?;

let id = identity_from_u8(2);
let auth = AuthCtx::new(Identity::ZERO, id);

assert_query_results(&db, "select * from my_view", &auth, [product![0u8, 2u8]]);

Ok(())
}

#[test]
fn test_anonymous_view() -> anyhow::Result<()> {
let db = TestDB::in_memory()?;

let schema = [("a", AlgebraicType::U8), ("b", AlgebraicType::U8)];
let table_id = tests_utils::create_view_for_test(&db, "my_view", &schema, true)?;

with_auto_commit(&db, |tx| -> Result<_, DBError> {
tests_utils::insert_into_view(&db, tx, table_id, None, product![0u8, 1u8])?;
tests_utils::insert_into_view(&db, tx, table_id, None, product![0u8, 2u8])?;
Ok(())
})?;

let id = identity_from_u8(1);
let auth = AuthCtx::new(Identity::ZERO, id);

assert_query_results(&db, "select b from my_view", &auth, [product![1u8], product![2u8]]);

Ok(())
}

#[test]
fn test_view_join_table() -> anyhow::Result<()> {
let db = TestDB::in_memory()?;

let schema = [("a", AlgebraicType::U8), ("b", AlgebraicType::U8)];
let v_id = tests_utils::create_view_for_test(&db, "v", &schema, false)?;

let schema = [("c", AlgebraicType::U8), ("d", AlgebraicType::U8)];
let t_id = db.create_table_for_test("t", &schema, &[0.into()])?;

with_auto_commit(&db, |tx| -> Result<_, DBError> {
db.insert(tx, t_id, &product![0u8, 3u8].to_bsatn_vec().unwrap())?;
db.insert(tx, t_id, &product![1u8, 4u8].to_bsatn_vec().unwrap())?;
tests_utils::insert_into_view(&db, tx, v_id, Some(identity_from_u8(1)), product![0u8, 1u8])?;
tests_utils::insert_into_view(&db, tx, v_id, Some(identity_from_u8(2)), product![1u8, 2u8])?;
Ok(())
})?;

let id = identity_from_u8(2);
let auth = AuthCtx::new(Identity::ZERO, id);

assert_query_results(
&db,
"select t.* from v join t on v.a = t.c",
&auth,
[product![1u8, 4u8]],
);
assert_query_results(
&db,
"select v.* from v join t on v.a = t.c",
&auth,
[product![1u8, 2u8]],
);
assert_query_results(
&db,
"select v.* from v join t where v.a = t.c",
&auth,
[product![1u8, 2u8]],
);
assert_query_results(
&db,
"select v.b as b, t.d as d from v join t on v.a = t.c",
&auth,
[product![2u8, 4u8]],
);
assert_query_results(
&db,
"select v.b as b, t.d as d from v join t where v.a = t.c",
&auth,
[product![2u8, 4u8]],
);

Ok(())
}

#[test]
fn test_view_join_view() -> anyhow::Result<()> {
let db = TestDB::in_memory()?;

let schema = [("a", AlgebraicType::U8), ("b", AlgebraicType::U8)];
let u_id = tests_utils::create_view_for_test(&db, "u", &schema, false)?;

let schema = [("c", AlgebraicType::U8), ("d", AlgebraicType::U8)];
let v_id = tests_utils::create_view_for_test(&db, "v", &schema, false)?;

with_auto_commit(&db, |tx| -> Result<_, DBError> {
tests_utils::insert_into_view(&db, tx, u_id, Some(identity_from_u8(1)), product![0u8, 1u8])?;
tests_utils::insert_into_view(&db, tx, u_id, Some(identity_from_u8(2)), product![1u8, 2u8])?;
tests_utils::insert_into_view(&db, tx, v_id, Some(identity_from_u8(1)), product![0u8, 3u8])?;
tests_utils::insert_into_view(&db, tx, v_id, Some(identity_from_u8(2)), product![1u8, 4u8])?;
Ok(())
})?;

let id = identity_from_u8(2);
let auth = AuthCtx::new(Identity::ZERO, id);

assert_query_results(
&db,
"select u.* from u join v on u.a = v.c",
&auth,
[product![1u8, 2u8]],
);
assert_query_results(
&db,
"select v.* from u join v on u.a = v.c",
&auth,
[product![1u8, 4u8]],
);
assert_query_results(
&db,
"select v.* from u join v where u.a = v.c",
&auth,
[product![1u8, 4u8]],
);
assert_query_results(
&db,
"select u.b as b, v.d as d from u join v on u.a = v.c",
&auth,
[product![2u8, 4u8]],
);
assert_query_results(
&db,
"select u.b as b, v.d as d from u join v where u.a = v.c",
&auth,
[product![2u8, 4u8]],
);

Ok(())
}

#[test]
fn test_select_star_table() -> ResultTest<()> {
let (db, input) = create_data(1)?;
Expand Down
Loading
Loading