Skip to content

Commit f5d3bcd

Browse files
Add view handling to query engine and planner (#3578)
# Description of Changes This patch does the following: 1. Expands views as part of query planning. Views are always assumed to be materialized by the query planner, however a view's backing table may have private columns such as the `sender` column. The query planner needs to filter by this column in order to select the rows pertaining to a particular caller. 2. Plumbs `AuthCtx` through the query optimizer. This is needed in order to implement (1). 3. Adds a new operator for views to the query engine that drops a view's private columns # API and ABI breaking changes None # Expected complexity level and risk 2.5 # Testing - [x] SQL http tests - [ ] Subscription tests - [ ] One off query tests
1 parent 18af6c4 commit f5d3bcd

File tree

12 files changed

+690
-75
lines changed

12 files changed

+690
-75
lines changed

crates/bench/benches/subscription.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ fn eval(c: &mut Criterion) {
128128
let (plans, table_id, table_name, _) = compile_subscription(sql, schema_viewer, &auth).unwrap();
129129
let plans = plans
130130
.into_iter()
131-
.map(|plan| plan.optimize().unwrap())
131+
.map(|plan| plan.optimize(&auth).unwrap())
132132
.map(PipelinedProject::from)
133133
.collect::<Vec<_>>();
134134
let tx = DeltaTx::from(&tx);

crates/core/src/db/relational_db.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2088,6 +2088,57 @@ pub mod tests_utils {
20882088
Ok((gen_cols, row_ref))
20892089
}
20902090

2091+
/// Allocate a backing table in the datastore for a view
2092+
pub fn create_view_for_test(
2093+
db: &RelationalDB,
2094+
name: &str,
2095+
schema: &[(&str, AlgebraicType)],
2096+
is_anonymous: bool,
2097+
) -> Result<TableId, DBError> {
2098+
let mut builder = RawModuleDefV9Builder::new();
2099+
2100+
// Add the view's product type to the typespace
2101+
let type_ref = builder.add_algebraic_type(
2102+
[],
2103+
name,
2104+
AlgebraicType::Product(ProductType::from_iter(schema.iter().cloned())),
2105+
true,
2106+
);
2107+
2108+
builder.add_view(
2109+
name,
2110+
true,
2111+
is_anonymous,
2112+
ProductType::unit(),
2113+
AlgebraicType::array(AlgebraicType::Ref(type_ref)),
2114+
);
2115+
2116+
let module_def: ModuleDef = builder.finish().try_into()?;
2117+
let view_def: &ViewDef = module_def.view(name).expect("view not found");
2118+
2119+
// Allocate a backing table and return its table id
2120+
db.with_auto_commit(Workload::Internal, |tx| db.create_view(tx, &module_def, view_def))
2121+
.map(|(_, table_id)| table_id)
2122+
}
2123+
2124+
/// Insert a row into a view's backing table
2125+
pub fn insert_into_view<'a>(
2126+
db: &'a RelationalDB,
2127+
tx: &'a mut MutTx,
2128+
table_id: TableId,
2129+
sender: Option<Identity>,
2130+
row: ProductValue,
2131+
) -> Result<RowRef<'a>, DBError> {
2132+
let meta_cols = match sender {
2133+
Some(identity) => vec![identity.into()],
2134+
None => vec![],
2135+
};
2136+
let cols = meta_cols.into_iter().chain(row.elements);
2137+
let row = ProductValue::from_iter(cols);
2138+
db.insert(tx, table_id, &to_vec(&row).unwrap())
2139+
.map(|(_, row_ref, _)| row_ref)
2140+
}
2141+
20912142
/// An in-memory commitlog used for tests that want to replay a known history.
20922143
pub struct TestHistory(commitlog::commitlog::Generic<commitlog::repo::Memory, Txdata>);
20932144

crates/core/src/estimation.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ mod tests {
198198
.map(|(plans, ..)| plans)
199199
.expect("failed to compile sql query")
200200
.into_iter()
201-
.map(|plan| plan.optimize().expect("failed to optimize sql query"))
201+
.map(|plan| plan.optimize(&auth).expect("failed to optimize sql query"))
202202
.map(|plan| row_estimate(&tx, &plan))
203203
.sum()
204204
}

crates/core/src/host/module_host.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@ use crate::module_host_context::ModuleCreationContext;
1717
use crate::replica_context::ReplicaContext;
1818
use crate::sql::ast::SchemaViewer;
1919
use crate::sql::parser::RowLevelExpr;
20-
use crate::subscription::execute_plan;
2120
use crate::subscription::module_subscription_actor::ModuleSubscriptions;
2221
use crate::subscription::tx::DeltaTx;
2322
use crate::subscription::websocket_building::BuildableWebsocketFormat;
23+
use crate::subscription::{execute_plan, execute_plan_for_view};
2424
use crate::util::jobs::{SingleCoreExecutor, WeakSingleCoreExecutor};
2525
use crate::vm::check_row_limit;
2626
use crate::worker_metrics::WORKER_METRICS;
@@ -41,7 +41,7 @@ use spacetimedb_datastore::locking_tx_datastore::MutTxId;
4141
use spacetimedb_datastore::system_tables::{ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID};
4242
use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData};
4343
use spacetimedb_durability::DurableOffset;
44-
use spacetimedb_execution::pipelined::PipelinedProject;
44+
use spacetimedb_execution::pipelined::{PipelinedProject, ViewProject};
4545
use spacetimedb_lib::db::raw_def::v9::Lifecycle;
4646
use spacetimedb_lib::identity::{AuthCtx, RequestId};
4747
use spacetimedb_lib::metrics::ExecutionMetrics;
@@ -1495,7 +1495,7 @@ impl ModuleHost {
14951495
// Optimize each fragment
14961496
let optimized = plans
14971497
.into_iter()
1498-
.map(|plan| plan.optimize())
1498+
.map(|plan| plan.optimize(&auth))
14991499
.collect::<Result<Vec<_>, _>>()?;
15001500

15011501
check_row_limit(
@@ -1507,12 +1507,31 @@ impl ModuleHost {
15071507
&auth,
15081508
)?;
15091509

1510+
let return_table = || optimized.first().and_then(|plan| plan.return_table());
1511+
1512+
let returns_view_table = optimized.first().is_some_and(|plan| plan.returns_view_table());
1513+
let num_cols = return_table().map(|schema| schema.num_cols()).unwrap_or_default();
1514+
let num_private_cols = return_table()
1515+
.map(|schema| schema.num_private_cols())
1516+
.unwrap_or_default();
1517+
15101518
let optimized = optimized
15111519
.into_iter()
15121520
// Convert into something we can execute
15131521
.map(PipelinedProject::from)
15141522
.collect::<Vec<_>>();
15151523

1524+
if returns_view_table && num_private_cols > 0 {
1525+
let optimized = optimized
1526+
.into_iter()
1527+
.map(|plan| ViewProject::new(plan, num_cols, num_private_cols))
1528+
.collect::<Vec<_>>();
1529+
// Execute the union and return the results
1530+
return execute_plan_for_view::<_, F>(&optimized, &DeltaTx::from(&*tx))
1531+
.map(|(rows, _, metrics)| (OneOffTable { table_name, rows }, metrics))
1532+
.context("One-off queries are not allowed to modify the database");
1533+
}
1534+
15161535
// Execute the union and return the results
15171536
execute_plan::<_, F>(&optimized, &DeltaTx::from(&*tx))
15181537
.map(|(rows, _, metrics)| (OneOffTable { table_name, rows }, metrics))

crates/core/src/sql/execute.rs

Lines changed: 155 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ pub fn run(
227227
});
228228

229229
// Evaluate the query
230-
let rows = execute_select_stmt(stmt, &DeltaTx::from(&*tx), &mut metrics, |plan| {
230+
let rows = execute_select_stmt(&auth, stmt, &DeltaTx::from(&*tx), &mut metrics, |plan| {
231231
check_row_limit(
232232
&[&plan],
233233
db,
@@ -254,7 +254,7 @@ pub fn run(
254254
}
255255

256256
// Evaluate the mutation
257-
let (mut tx, _) = db.with_auto_rollback(tx, |tx| execute_dml_stmt(stmt, tx, &mut metrics))?;
257+
let (mut tx, _) = db.with_auto_rollback(tx, |tx| execute_dml_stmt(&auth, stmt, tx, &mut metrics))?;
258258

259259
// Update transaction metrics
260260
tx.metrics.merge(metrics);
@@ -332,7 +332,7 @@ pub(crate) mod tests {
332332
use std::sync::Arc;
333333

334334
use super::*;
335-
use crate::db::relational_db::tests_utils::{begin_tx, insert, with_auto_commit, TestDB};
335+
use crate::db::relational_db::tests_utils::{self, begin_tx, insert, with_auto_commit, TestDB};
336336
use crate::vm::tests::create_table_with_rows;
337337
use itertools::Itertools;
338338
use pretty_assertions::assert_eq;
@@ -904,6 +904,158 @@ pub(crate) mod tests {
904904
Ok(())
905905
}
906906

907+
#[test]
908+
fn test_view() -> anyhow::Result<()> {
909+
let db = TestDB::in_memory()?;
910+
911+
let schema = [("a", AlgebraicType::U8), ("b", AlgebraicType::U8)];
912+
let table_id = tests_utils::create_view_for_test(&db, "my_view", &schema, false)?;
913+
914+
with_auto_commit(&db, |tx| -> Result<_, DBError> {
915+
tests_utils::insert_into_view(&db, tx, table_id, Some(identity_from_u8(1)), product![0u8, 1u8])?;
916+
tests_utils::insert_into_view(&db, tx, table_id, Some(identity_from_u8(2)), product![0u8, 2u8])?;
917+
Ok(())
918+
})?;
919+
920+
let id = identity_from_u8(2);
921+
let auth = AuthCtx::new(Identity::ZERO, id);
922+
923+
assert_query_results(&db, "select * from my_view", &auth, [product![0u8, 2u8]]);
924+
925+
Ok(())
926+
}
927+
928+
#[test]
929+
fn test_anonymous_view() -> anyhow::Result<()> {
930+
let db = TestDB::in_memory()?;
931+
932+
let schema = [("a", AlgebraicType::U8), ("b", AlgebraicType::U8)];
933+
let table_id = tests_utils::create_view_for_test(&db, "my_view", &schema, true)?;
934+
935+
with_auto_commit(&db, |tx| -> Result<_, DBError> {
936+
tests_utils::insert_into_view(&db, tx, table_id, None, product![0u8, 1u8])?;
937+
tests_utils::insert_into_view(&db, tx, table_id, None, product![0u8, 2u8])?;
938+
Ok(())
939+
})?;
940+
941+
let id = identity_from_u8(1);
942+
let auth = AuthCtx::new(Identity::ZERO, id);
943+
944+
assert_query_results(&db, "select b from my_view", &auth, [product![1u8], product![2u8]]);
945+
946+
Ok(())
947+
}
948+
949+
#[test]
950+
fn test_view_join_table() -> anyhow::Result<()> {
951+
let db = TestDB::in_memory()?;
952+
953+
let schema = [("a", AlgebraicType::U8), ("b", AlgebraicType::U8)];
954+
let v_id = tests_utils::create_view_for_test(&db, "v", &schema, false)?;
955+
956+
let schema = [("c", AlgebraicType::U8), ("d", AlgebraicType::U8)];
957+
let t_id = db.create_table_for_test("t", &schema, &[0.into()])?;
958+
959+
with_auto_commit(&db, |tx| -> Result<_, DBError> {
960+
db.insert(tx, t_id, &product![0u8, 3u8].to_bsatn_vec().unwrap())?;
961+
db.insert(tx, t_id, &product![1u8, 4u8].to_bsatn_vec().unwrap())?;
962+
tests_utils::insert_into_view(&db, tx, v_id, Some(identity_from_u8(1)), product![0u8, 1u8])?;
963+
tests_utils::insert_into_view(&db, tx, v_id, Some(identity_from_u8(2)), product![1u8, 2u8])?;
964+
Ok(())
965+
})?;
966+
967+
let id = identity_from_u8(2);
968+
let auth = AuthCtx::new(Identity::ZERO, id);
969+
970+
assert_query_results(
971+
&db,
972+
"select t.* from v join t on v.a = t.c",
973+
&auth,
974+
[product![1u8, 4u8]],
975+
);
976+
assert_query_results(
977+
&db,
978+
"select v.* from v join t on v.a = t.c",
979+
&auth,
980+
[product![1u8, 2u8]],
981+
);
982+
assert_query_results(
983+
&db,
984+
"select v.* from v join t where v.a = t.c",
985+
&auth,
986+
[product![1u8, 2u8]],
987+
);
988+
assert_query_results(
989+
&db,
990+
"select v.b as b, t.d as d from v join t on v.a = t.c",
991+
&auth,
992+
[product![2u8, 4u8]],
993+
);
994+
assert_query_results(
995+
&db,
996+
"select v.b as b, t.d as d from v join t where v.a = t.c",
997+
&auth,
998+
[product![2u8, 4u8]],
999+
);
1000+
1001+
Ok(())
1002+
}
1003+
1004+
#[test]
1005+
fn test_view_join_view() -> anyhow::Result<()> {
1006+
let db = TestDB::in_memory()?;
1007+
1008+
let schema = [("a", AlgebraicType::U8), ("b", AlgebraicType::U8)];
1009+
let u_id = tests_utils::create_view_for_test(&db, "u", &schema, false)?;
1010+
1011+
let schema = [("c", AlgebraicType::U8), ("d", AlgebraicType::U8)];
1012+
let v_id = tests_utils::create_view_for_test(&db, "v", &schema, false)?;
1013+
1014+
with_auto_commit(&db, |tx| -> Result<_, DBError> {
1015+
tests_utils::insert_into_view(&db, tx, u_id, Some(identity_from_u8(1)), product![0u8, 1u8])?;
1016+
tests_utils::insert_into_view(&db, tx, u_id, Some(identity_from_u8(2)), product![1u8, 2u8])?;
1017+
tests_utils::insert_into_view(&db, tx, v_id, Some(identity_from_u8(1)), product![0u8, 3u8])?;
1018+
tests_utils::insert_into_view(&db, tx, v_id, Some(identity_from_u8(2)), product![1u8, 4u8])?;
1019+
Ok(())
1020+
})?;
1021+
1022+
let id = identity_from_u8(2);
1023+
let auth = AuthCtx::new(Identity::ZERO, id);
1024+
1025+
assert_query_results(
1026+
&db,
1027+
"select u.* from u join v on u.a = v.c",
1028+
&auth,
1029+
[product![1u8, 2u8]],
1030+
);
1031+
assert_query_results(
1032+
&db,
1033+
"select v.* from u join v on u.a = v.c",
1034+
&auth,
1035+
[product![1u8, 4u8]],
1036+
);
1037+
assert_query_results(
1038+
&db,
1039+
"select v.* from u join v where u.a = v.c",
1040+
&auth,
1041+
[product![1u8, 4u8]],
1042+
);
1043+
assert_query_results(
1044+
&db,
1045+
"select u.b as b, v.d as d from u join v on u.a = v.c",
1046+
&auth,
1047+
[product![2u8, 4u8]],
1048+
);
1049+
assert_query_results(
1050+
&db,
1051+
"select u.b as b, v.d as d from u join v where u.a = v.c",
1052+
&auth,
1053+
[product![2u8, 4u8]],
1054+
);
1055+
1056+
Ok(())
1057+
}
1058+
9071059
#[test]
9081060
fn test_select_star_table() -> ResultTest<()> {
9091061
let (db, input) = create_data(1)?;

0 commit comments

Comments
 (0)