Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

126 changes: 122 additions & 4 deletions crates/core/src/sql/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,13 @@ pub fn run(

// Evaluate the query
let rows = execute_select_stmt(stmt, &DeltaTx::from(&*tx), &mut metrics, |plan| {
check_row_limit(&plan, db, &tx, |plan, tx| estimate_rows_scanned(tx, plan), &auth)?;
check_row_limit(
&plan,
db,
&tx,
|plan, tx| plan.plan_iter().map(|plan| estimate_rows_scanned(tx, plan)).sum(),
&auth,
)?;
Ok(plan)
})?;

Expand Down Expand Up @@ -289,18 +295,22 @@ pub fn translate_col(tx: &Tx, field: FieldName) -> Option<Box<str>> {
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::db::datastore::system_tables::{StTableFields, ST_TABLE_ID, ST_TABLE_NAME};
use crate::db::datastore::system_tables::{
StRowLevelSecurityRow, StTableFields, ST_ROW_LEVEL_SECURITY_ID, ST_TABLE_ID, ST_TABLE_NAME,
};
use crate::db::relational_db::tests_utils::{insert, TestDB};
use crate::subscription::module_subscription_manager::SubscriptionManager;
use crate::vm::tests::create_table_with_rows;
use parking_lot::RwLock;
use pretty_assertions::assert_eq;
use spacetimedb_lib::bsatn::ToBsatn;
use spacetimedb_lib::db::auth::{StAccess, StTableType};
use spacetimedb_lib::error::{ResultTest, TestError};
use spacetimedb_lib::relation::Header;
use spacetimedb_lib::{AlgebraicValue, Identity};
use spacetimedb_primitives::{col_list, ColId};
use spacetimedb_sats::{product, AlgebraicType, ArrayValue, ProductType};
use spacetimedb_primitives::{col_list, ColId, TableId};
use spacetimedb_sats::{product, u256, AlgebraicType, ArrayValue, ProductType};
use spacetimedb_schema::schema::{ColumnSchema, TableSchema};
use spacetimedb_vm::eval::test_helpers::create_game_data;
use std::sync::Arc;

Expand Down Expand Up @@ -356,6 +366,17 @@ pub(crate) mod tests {
Ok((stdb, MemTable::new(header, schema.table_access, rows)))
}

/// Manually insert RLS rules into the RLS system table
fn insert_rls_rules(db: &RelationalDB, rules: impl IntoIterator<Item = (TableId, &'static str)>) -> ResultTest<()> {
db.with_auto_commit(Workload::ForTests, |tx| {
for (table_id, sql) in rules.into_iter().map(|(table_id, sql)| (table_id, sql.into())) {
let row = ProductValue::from(StRowLevelSecurityRow { table_id, sql });
db.insert(tx, ST_ROW_LEVEL_SECURITY_ID, &row.to_bsatn_vec()?)?;
}
Ok(())
})
}

#[test]
fn test_select_star() -> ResultTest<()> {
let (db, input) = create_data(1)?;
Expand Down Expand Up @@ -425,6 +446,103 @@ pub(crate) mod tests {
Ok(())
}

#[test]
fn test_rls_rules() -> ResultTest<()> {
let db = TestDB::in_memory()?;

let create_table = |table_name: &str, column_names_and_types: Vec<(&str, AlgebraicType)>| {
db.with_auto_commit(Workload::ForTests, |tx| {
db.create_table(
tx,
TableSchema::new(
TableId::SENTINEL,
table_name.into(),
column_names_and_types
.into_iter()
.enumerate()
.map(|(i, (name, col_type))| ColumnSchema {
table_id: TableId::SENTINEL,
col_pos: i.into(),
col_name: name.into(),
col_type,
})
.collect(),
vec![],
vec![],
vec![],
StTableType::User,
StAccess::Public,
None,
None,
),
)
})
};

let insert_into = |table_id, rows: Vec<ProductValue>| {
db.with_auto_commit(Workload::ForTests, |tx| -> ResultTest<()> {
for row in rows {
db.insert(tx, table_id, &row.to_bsatn_vec()?)?;
}
Ok(())
})
};

let users_table_id = create_table("users", vec![("identity", AlgebraicType::identity())])?;
let sales_table_id = create_table(
"sales",
vec![
("order_id", AlgebraicType::U64),
("product_id", AlgebraicType::U64),
("date", AlgebraicType::U64),
("customer", AlgebraicType::identity()),
],
)?;

let id_1 = Identity::from_u256(u256::new(1));
let id_2 = Identity::from_u256(u256::new(2));

insert_into(users_table_id, vec![product![id_1], product![id_2]])?;
insert_into(
sales_table_id,
vec![
product![1u64, 1u64, 1u64, id_1],
product![2u64, 1u64, 2u64, id_2],
product![3u64, 2u64, 3u64, id_1],
product![4u64, 2u64, 4u64, id_2],
],
)?;

insert_rls_rules(
&db,
[
(users_table_id, "select * from users where identity = :sender"),
(
sales_table_id,
"select s.* from users u join sales s on u.identity = s.customer",
),
],
)?;

let auth_for_id_1 = AuthCtx::new(Identity::ZERO, id_1);
let auth_for_id_2 = AuthCtx::new(Identity::ZERO, id_2);

let run = |sql, user| run(&db, sql, user, None, &mut vec![]).map(|sql_result| sql_result.rows);

assert_eq!(run("select * from users", auth_for_id_1)?, vec![product![id_1]]);
assert_eq!(run("select * from users", auth_for_id_2)?, vec![product![id_2]]);
assert_eq!(
run("select * from sales", auth_for_id_1)?,
vec![product![1u64, 1u64, 1u64, id_1], product![3u64, 2u64, 3u64, id_1]]
);
assert_eq!(
run("select * from sales", auth_for_id_2)?,
vec![product![2u64, 1u64, 2u64, id_2], product![4u64, 2u64, 4u64, id_2]]
);

Ok(())
}

#[test]
fn test_select_star_table() -> ResultTest<()> {
let (db, input) = create_data(1)?;
Expand Down
80 changes: 47 additions & 33 deletions crates/execution/src/pipelined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,23 @@ use crate::{Datastore, DeltaStore, Row, Tuple};
/// which is not considered performance critical.
/// Hence this operator is not particularly optimized.
pub enum ProjectListExecutor {
Name(PipelinedProject),
List(PipelinedExecutor, Vec<TupleField>),
Name(Vec<PipelinedProject>),
List(Vec<PipelinedExecutor>, Vec<TupleField>),
Limit(Box<ProjectListExecutor>, u64),
Agg(PipelinedExecutor, AggType),
Agg(Vec<PipelinedExecutor>, AggType),
}

impl From<ProjectListPlan> for ProjectListExecutor {
fn from(plan: ProjectListPlan) -> Self {
match plan {
ProjectListPlan::Name(plan) => Self::Name(plan.into()),
ProjectListPlan::List(plan, fields) => Self::List(plan.into(), fields),
ProjectListPlan::Name(plan) => Self::Name(plan.into_iter().map(PipelinedProject::from).collect()),
ProjectListPlan::List(plan, fields) => {
Self::List(plan.into_iter().map(PipelinedExecutor::from).collect(), fields)
}
ProjectListPlan::Limit(plan, n) => Self::Limit(Box::new((*plan).into()), n),
ProjectListPlan::Agg(plan, AggType::Count) => Self::Agg(plan.into(), AggType::Count),
ProjectListPlan::Agg(plan, AggType::Count) => {
Self::Agg(plan.into_iter().map(PipelinedExecutor::from).collect(), AggType::Count)
}
}
}
}
Expand All @@ -48,21 +52,25 @@ impl ProjectListExecutor {
let mut n = 0;
let mut bytes_scanned = 0;
match self {
Self::Name(plan) => {
plan.execute(tx, metrics, &mut |row| {
n += 1;
let row = row.to_product_value();
bytes_scanned += row.size_of();
f(row)
})?;
Self::Name(plans) => {
for plan in plans {
plan.execute(tx, metrics, &mut |row| {
n += 1;
let row = row.to_product_value();
bytes_scanned += row.size_of();
f(row)
})?;
}
}
Self::List(plan, fields) => {
plan.execute(tx, metrics, &mut |t| {
n += 1;
let row = ProductValue::from_iter(fields.iter().map(|field| t.project(field)));
bytes_scanned += row.size_of();
f(row)
})?;
Self::List(plans, fields) => {
for plan in plans {
plan.execute(tx, metrics, &mut |t| {
n += 1;
let row = ProductValue::from_iter(fields.iter().map(|field| t.project(field)));
bytes_scanned += row.size_of();
f(row)
})?;
}
}
Self::Limit(plan, limit) => {
plan.execute(tx, metrics, &mut |row| {
Expand All @@ -73,19 +81,25 @@ impl ProjectListExecutor {
Ok(())
})?;
}
// TODO: This is a hack that needs to be removed.
// We check if this is a COUNT on a physical table,
// and if so, we retrieve the count from table metadata.
// It's a valid optimization but one that should be done by the optimizer.
// There should be no optimizations performed during execution.
Self::Agg(PipelinedExecutor::TableScan(table_scan), AggType::Count) => {
f(product![tx.table_or_err(table_scan.table)?.num_rows()])?;
}
Self::Agg(plan, AggType::Count) => {
plan.execute(tx, metrics, &mut |_| {
n += 1;
Ok(())
})?;
Self::Agg(plans, AggType::Count) => {
for plan in plans {
match plan {
// TODO: This is a hack that needs to be removed.
// We check if this is a COUNT on a physical table,
// and if so, we retrieve the count from table metadata.
// It's a valid optimization but one that should be done by the optimizer.
// There should be no optimizations performed during execution.
PipelinedExecutor::TableScan(table_scan) => {
n += tx.table_or_err(table_scan.table)?.num_rows() as usize;
}
_ => {
plan.execute(tx, metrics, &mut |_| {
n += 1;
Ok(())
})?;
}
}
}
f(product![n as u64])?;
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ spacetimedb-sql-parser.workspace = true

[dev-dependencies]
pretty_assertions.workspace = true
spacetimedb = { workspace = true, features = ["unstable"] }
spacetimedb-lib.workspace = true
25 changes: 6 additions & 19 deletions crates/expr/src/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::collections::HashMap;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;

use crate::expr::LeftDeepJoin;
use crate::expr::{Expr, ProjectList, ProjectName, Relvar};
use crate::{expr::LeftDeepJoin, statement::Statement};
use spacetimedb_lib::identity::AuthCtx;
use spacetimedb_lib::AlgebraicType;
use spacetimedb_primitives::TableId;
Expand All @@ -17,7 +17,7 @@ use spacetimedb_sql_parser::{
use super::{
errors::{DuplicateName, TypingError, Unresolved, Unsupported},
expr::RelExpr,
type_expr, type_proj, type_select, StatementCtx, StatementSource,
type_expr, type_proj, type_select,
};

/// The result of type checking and name resolution
Expand Down Expand Up @@ -164,27 +164,14 @@ pub fn parse_and_type_sub(sql: &str, tx: &impl SchemaView, auth: &AuthCtx) -> Ty
expect_table_type(SubChecker::type_ast(ast, tx)?).map(|plan| (plan, has_param))
}

/// Type check a subscription query
pub fn type_subscription(ast: SqlSelect, tx: &impl SchemaView) -> TypingResult<ProjectName> {
expect_table_type(SubChecker::type_ast(ast, tx)?)
}

/// Parse and type check a *subscription* query into a `StatementCtx`
pub fn compile_sql_sub<'a>(sql: &'a str, tx: &impl SchemaView, auth: &AuthCtx) -> TypingResult<StatementCtx<'a>> {
let (plan, _) = parse_and_type_sub(sql, tx, auth)?;
Ok(StatementCtx {
statement: Statement::Select(ProjectList::Name(plan)),
sql,
source: StatementSource::Subscription,
})
}

/// Returns an error if the input type is not a table type or relvar
fn expect_table_type(expr: ProjectList) -> TypingResult<ProjectName> {
match expr {
ProjectList::Name(proj) => Ok(proj),
// Note, this is called before we do any RLS resolution.
// Hence this length should always be 1.
ProjectList::Name(mut proj) if proj.len() == 1 => Ok(proj.pop().unwrap()),
ProjectList::Limit(input, _) => expect_table_type(*input),
ProjectList::List(..) | ProjectList::Agg(..) => Err(Unsupported::ReturnType.into()),
ProjectList::Name(..) | ProjectList::List(..) | ProjectList::Agg(..) => Err(Unsupported::ReturnType.into()),
}
}

Expand Down
Loading
Loading