Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 0 additions & 95 deletions crates/core/src/db/datastore/system_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
//! - Use [`st_fields_enum`] to define its column enum.
//! - Register its schema in [`system_module_def`], making sure to call `validate_system_table` at the end of the function.

use crate::db::relational_db::RelationalDB;
use crate::error::DBError;
use spacetimedb_lib::db::auth::{StAccess, StTableType};
use spacetimedb_lib::db::raw_def::v9::{btree, RawSql};
Expand All @@ -21,7 +20,6 @@ use spacetimedb_lib::ser::Serialize;
use spacetimedb_lib::st_var::StVarValue;
use spacetimedb_lib::{ConnectionId, Identity, ProductValue, SpacetimeType};
use spacetimedb_primitives::*;
use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type;
use spacetimedb_sats::algebraic_value::ser::value_serialize;
use spacetimedb_sats::hash::Hash;
use spacetimedb_sats::product_value::InvalidFieldError;
Expand All @@ -34,16 +32,11 @@ use spacetimedb_schema::schema::{
TableSchema,
};
use spacetimedb_table::table::RowRef;
use spacetimedb_vm::errors::{ErrorType, ErrorVm};
use spacetimedb_vm::ops::parse;
use std::cell::RefCell;
use std::str::FromStr;
use strum::Display;
use v9::{RawModuleDefV9Builder, TableType};

use super::locking_tx_datastore::tx::TxId;
use super::locking_tx_datastore::MutTxId;

/// The static ID of the table that defines tables
pub(crate) const ST_TABLE_ID: TableId = TableId(1);
/// The static ID of the table that defines columns
Expand Down Expand Up @@ -950,81 +943,6 @@ impl TryFrom<RowRef<'_>> for StClientRow {
}
}

/// A handle for reading system variables from `st_var`
pub struct StVarTable;

impl StVarTable {
/// Read the value of [ST_VARNAME_ROW_LIMIT] from `st_var`
pub fn row_limit(db: &RelationalDB, tx: &TxId) -> Result<Option<u64>, DBError> {
let data = Self::read_var(db, tx, StVarName::RowLimit);

if let Some(StVarValue::U64(limit)) = data? {
return Ok(Some(limit));
}
Ok(None)
}

/// Read the value of [ST_VARNAME_SLOW_QRY] from `st_var`
pub fn query_limit(db: &RelationalDB, tx: &TxId) -> Result<Option<u64>, DBError> {
if let Some(StVarValue::U64(ms)) = Self::read_var(db, tx, StVarName::SlowQryThreshold)? {
return Ok(Some(ms));
}
Ok(None)
}

/// Read the value of [ST_VARNAME_SLOW_SUB] from `st_var`
pub fn sub_limit(db: &RelationalDB, tx: &TxId) -> Result<Option<u64>, DBError> {
if let Some(StVarValue::U64(ms)) = Self::read_var(db, tx, StVarName::SlowSubThreshold)? {
return Ok(Some(ms));
}
Ok(None)
}

/// Read the value of [ST_VARNAME_SLOW_INC] from `st_var`
pub fn incr_limit(db: &RelationalDB, tx: &TxId) -> Result<Option<u64>, DBError> {
if let Some(StVarValue::U64(ms)) = Self::read_var(db, tx, StVarName::SlowIncThreshold)? {
return Ok(Some(ms));
}
Ok(None)
}

/// Read the value of a system variable from `st_var`
pub fn read_var(db: &RelationalDB, tx: &TxId, name: StVarName) -> Result<Option<StVarValue>, DBError> {
if let Some(row_ref) = db
.iter_by_col_eq(tx, ST_VAR_ID, StVarFields::Name.col_id(), &name.into())?
.next()
{
return Ok(Some(StVarRow::try_from(row_ref)?.value));
}
Ok(None)
}

/// Update the value of a system variable in `st_var`
pub fn write_var(db: &RelationalDB, tx: &mut MutTxId, name: StVarName, literal: &str) -> Result<(), DBError> {
let value = Self::parse_var(name, literal)?;
if let Some(row_ref) = db
.iter_by_col_eq_mut(tx, ST_VAR_ID, StVarFields::Name.col_id(), &name.into())?
.next()
{
db.delete(tx, ST_VAR_ID, [row_ref.pointer()]);
}
tx.insert_via_serialize_bsatn(ST_VAR_ID, &StVarRow { name, value })?;
Ok(())
}

/// Parse the literal representation of a system variable
fn parse_var(name: StVarName, literal: &str) -> Result<StVarValue, DBError> {
StVarValue::try_from_primitive(parse::parse(literal, &name.type_of())?).map_err(|v| {
ErrorVm::Type(ErrorType::Parse {
value: literal.to_string(),
ty: fmt_algebraic_type(&name.type_of()).to_string(),
err: format!("error parsing value: {:?}", v),
})
.into()
})
}
}

/// System table [ST_VAR_NAME]
///
/// | name | value |
Expand Down Expand Up @@ -1210,19 +1128,6 @@ fn to_product_value<T: Serialize>(value: &T) -> ProductValue {
#[cfg(test)]
mod tests {
use super::*;
use crate::db::relational_db::tests_utils::{with_auto_commit, with_read_only, TestDB};

#[test]
fn test_system_variables() {
let db = TestDB::durable().expect("failed to create db");
let _ = with_auto_commit(&db, |tx| StVarTable::write_var(&db, tx, StVarName::RowLimit, "5"));
assert_eq!(
5,
with_read_only(&db, |tx| StVarTable::row_limit(&db, tx))
.expect("failed to read from st_var")
.expect("row_limit does not exist")
);
}

#[test]
fn test_sequences_within_reserved_range() {
Expand Down
94 changes: 92 additions & 2 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::datastore::locking_tx_datastore::datastore::TxMetrics;
use super::datastore::locking_tx_datastore::state_view::{
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, IterTx, StateView,
};
use super::datastore::system_tables::ST_MODULE_ID;
use super::datastore::system_tables::{StFields, StVarFields, StVarName, StVarRow, ST_MODULE_ID, ST_VAR_ID};
use super::datastore::traits::{
InsertFlags, IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore,
UpdateFlags,
Expand Down Expand Up @@ -32,10 +32,12 @@ use spacetimedb_commitlog as commitlog;
use spacetimedb_durability::{self as durability, TxOffset};
use spacetimedb_lib::db::auth::StAccess;
use spacetimedb_lib::db::raw_def::v9::{btree, RawModuleDefV9Builder, RawSql};
use spacetimedb_lib::st_var::StVarValue;
use spacetimedb_lib::ConnectionId;
use spacetimedb_lib::Identity;
use spacetimedb_paths::server::{CommitLogDir, ReplicaDir, SnapshotsPath};
use spacetimedb_primitives::*;
use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type;
use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue};
use spacetimedb_schema::def::{ModuleDef, TableDef};
use spacetimedb_schema::schema::{IndexSchema, RowLevelSecuritySchema, Schema, SequenceSchema, TableSchema};
Expand All @@ -44,6 +46,8 @@ use spacetimedb_table::indexes::RowPointer;
use spacetimedb_table::page_pool::PagePool;
use spacetimedb_table::table::RowRef;
use spacetimedb_table::MemoryUsage;
use spacetimedb_vm::errors::{ErrorType, ErrorVm};
use spacetimedb_vm::ops::parse;
use std::borrow::Cow;
use std::collections::HashSet;
use std::fmt;
Expand Down Expand Up @@ -1388,6 +1392,78 @@ impl RelationalDB {
pub fn report(&self, reducer: &str, metrics: &TxMetrics, tx_data: Option<&TxData>) {
metrics.report(tx_data, reducer, |wl: WorkloadType| self.exec_counters_for(wl));
}

/// Read the value of [ST_VARNAME_ROW_LIMIT] from `st_var`
pub(crate) fn row_limit(&self, tx: &Tx) -> Result<Option<u64>, DBError> {
let data = self.read_var(tx, StVarName::RowLimit);

if let Some(StVarValue::U64(limit)) = data? {
return Ok(Some(limit));
}
Ok(None)
}

/// Read the value of [ST_VARNAME_SLOW_QRY] from `st_var`
pub(crate) fn query_limit(&self, tx: &Tx) -> Result<Option<u64>, DBError> {
if let Some(StVarValue::U64(ms)) = self.read_var(tx, StVarName::SlowQryThreshold)? {
return Ok(Some(ms));
}
Ok(None)
}

/// Read the value of [ST_VARNAME_SLOW_SUB] from `st_var`
#[allow(dead_code)]
pub(crate) fn sub_limit(&self, tx: &Tx) -> Result<Option<u64>, DBError> {
if let Some(StVarValue::U64(ms)) = self.read_var(tx, StVarName::SlowSubThreshold)? {
return Ok(Some(ms));
}
Ok(None)
}

/// Read the value of [ST_VARNAME_SLOW_INC] from `st_var`
#[allow(dead_code)]
pub(crate) fn incr_limit(&self, tx: &Tx) -> Result<Option<u64>, DBError> {
if let Some(StVarValue::U64(ms)) = self.read_var(tx, StVarName::SlowIncThreshold)? {
return Ok(Some(ms));
}
Ok(None)
}

/// Read the value of a system variable from `st_var`
pub(crate) fn read_var(&self, tx: &Tx, name: StVarName) -> Result<Option<StVarValue>, DBError> {
if let Some(row_ref) = self
.iter_by_col_eq(tx, ST_VAR_ID, StVarFields::Name.col_id(), &name.into())?
.next()
{
return Ok(Some(StVarRow::try_from(row_ref)?.value));
}
Ok(None)
}

/// Update the value of a system variable in `st_var`
pub(crate) fn write_var(&self, tx: &mut MutTx, name: StVarName, literal: &str) -> Result<(), DBError> {
let value = Self::parse_var(name, literal)?;
if let Some(row_ref) = self
.iter_by_col_eq_mut(tx, ST_VAR_ID, StVarFields::Name.col_id(), &name.into())?
.next()
{
self.delete(tx, ST_VAR_ID, [row_ref.pointer()]);
}
tx.insert_via_serialize_bsatn(ST_VAR_ID, &StVarRow { name, value })?;
Ok(())
}

/// Parse the literal representation of a system variable
fn parse_var(name: StVarName, literal: &str) -> Result<StVarValue, DBError> {
StVarValue::try_from_primitive(parse::parse(literal, &name.type_of())?).map_err(|v| {
ErrorVm::Type(ErrorType::Parse {
value: literal.to_string(),
ty: fmt_algebraic_type(&name.type_of()).to_string(),
err: format!("error parsing value: {:?}", v),
})
.into()
})
}
}

#[allow(unused)]
Expand Down Expand Up @@ -1948,7 +2024,9 @@ mod tests {
system_tables, StConstraintRow, StIndexRow, StSequenceRow, StTableRow, ST_CONSTRAINT_ID, ST_INDEX_ID,
ST_SEQUENCE_ID, ST_TABLE_ID,
};
use crate::db::relational_db::tests_utils::{begin_tx, insert, make_snapshot, TestDB};
use crate::db::relational_db::tests_utils::{
begin_tx, insert, make_snapshot, with_auto_commit, with_read_only, TestDB,
};
use crate::error::IndexError;
use crate::execution_context::ReducerContext;
use anyhow::bail;
Expand Down Expand Up @@ -2032,6 +2110,18 @@ mod tests {
Ok(())
}

#[test]
fn test_system_variables() {
let db = TestDB::durable().expect("failed to create db");
let _ = with_auto_commit(&db, |tx| db.write_var(tx, StVarName::RowLimit, "5"));
assert_eq!(
5,
with_read_only(&db, |tx| db.row_limit(tx))
.expect("failed to read from st_var")
.expect("row_limit does not exist")
);
}

#[test]
fn test_open_twice() -> ResultTest<()> {
let stdb = TestDB::durable()?;
Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/sql/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::time::Duration;

use super::ast::SchemaViewer;
use crate::db::datastore::locking_tx_datastore::state_view::StateView;
use crate::db::datastore::system_tables::StVarTable;
use crate::db::datastore::traits::IsolationLevel;
use crate::db::relational_db::{RelationalDB, Tx};
use crate::energy::EnergyQuanta;
Expand Down Expand Up @@ -72,7 +71,7 @@ fn execute(
updates: &mut Vec<DatabaseTableUpdate>,
) -> Result<Vec<MemTable>, DBError> {
let slow_query_threshold = if let TxMode::Tx(tx) = p.tx {
StVarTable::query_limit(p.db, tx)?.map(Duration::from_millis)
p.db.query_limit(tx)?.map(Duration::from_millis)
} else {
None
};
Expand Down
8 changes: 4 additions & 4 deletions crates/core/src/vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crate::db::datastore::locking_tx_datastore::state_view::IterByColRangeMutTx;
use crate::db::datastore::locking_tx_datastore::tx::TxId;
use crate::db::datastore::locking_tx_datastore::IterByColRangeTx;
use crate::db::datastore::system_tables::{st_var_schema, StVarName, StVarRow, StVarTable};
use crate::db::datastore::system_tables::{st_var_schema, StVarName, StVarRow};
use crate::db::relational_db::{MutTx, RelationalDB, Tx};
use crate::error::DBError;
use crate::estimation;
Expand Down Expand Up @@ -467,7 +467,7 @@ pub fn check_row_limit<Query>(
auth: &AuthCtx,
) -> Result<(), DBError> {
if auth.caller != auth.owner {
if let Some(limit) = StVarTable::row_limit(db, tx)? {
if let Some(limit) = db.row_limit(tx)? {
let mut estimate: u64 = 0;
for query in queries {
estimate = estimate.saturating_add(row_est(query, tx));
Expand Down Expand Up @@ -603,15 +603,15 @@ impl<'db, 'tx> DbProgram<'db, 'tx> {

fn _set_var(&mut self, name: String, literal: String) -> Result<Code, ErrorVm> {
let tx = self.tx.unwrap_mut();
StVarTable::write_var(self.db, tx, StVarName::from_str(&name)?, &literal)?;
self.db.write_var(tx, StVarName::from_str(&name)?, &literal)?;
Ok(Code::Pass(None))
}

fn _read_var(&self, name: String) -> Result<Code, ErrorVm> {
fn read_key_into_table(env: &DbProgram, name: &str) -> Result<MemTable, ErrorVm> {
if let TxMode::Tx(tx) = &env.tx {
let name = StVarName::from_str(name)?;
if let Some(value) = StVarTable::read_var(env.db, tx, name)? {
if let Some(value) = env.db.read_var(tx, name)? {
return Ok(MemTable::from_iter(
Arc::new(st_var_schema().into()),
[ProductValue::from(StVarRow { name, value })],
Expand Down
Loading