Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 80 additions & 9 deletions crates/core/src/db/datastore/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use super::{
tx::TxId,
tx_state::TxState,
};
use crate::db::datastore::locking_tx_datastore::state_view::{IterByColRangeMutTx, IterMutTx, IterTx};
use crate::db::datastore::{
locking_tx_datastore::state_view::{IterByColRangeMutTx, IterMutTx, IterTx},
traits::{InsertFlags, UpdateFlags},
};
use crate::execution_context::Workload;
use crate::{
db::{
Expand Down Expand Up @@ -576,9 +579,9 @@ impl MutTxDatastore for Locking {
tx: &'a mut Self::MutTx,
table_id: TableId,
row: &[u8],
) -> Result<(ColList, RowRef<'a>)> {
let (gens, row_ref) = tx.insert::<true>(table_id, row)?;
Ok((gens, row_ref.collapse()))
) -> Result<(ColList, RowRef<'a>, InsertFlags)> {
let (gens, row_ref, insert_flags) = tx.insert::<true>(table_id, row)?;
Ok((gens, row_ref.collapse(), insert_flags))
}

fn update_mut_tx<'a>(
Expand All @@ -587,7 +590,7 @@ impl MutTxDatastore for Locking {
table_id: TableId,
index_id: IndexId,
row: &[u8],
) -> Result<(ColList, RowRef<'a>)> {
) -> Result<(ColList, RowRef<'a>, UpdateFlags)> {
tx.update(table_id, index_id, row)
}

Expand Down Expand Up @@ -1005,12 +1008,13 @@ mod tests {
use pretty_assertions::{assert_eq, assert_matches};
use spacetimedb_lib::db::auth::{StAccess, StTableType};
use spacetimedb_lib::error::ResultTest;
use spacetimedb_lib::resolved_type_via_v9;
use spacetimedb_lib::{resolved_type_via_v9, ScheduleAt};
use spacetimedb_primitives::{col_list, ColId, ScheduleId};
use spacetimedb_sats::algebraic_value::ser::value_serialize;
use spacetimedb_sats::{product, AlgebraicType, GroundSpacetimeType};
use spacetimedb_schema::def::{BTreeAlgorithm, ConstraintData, IndexAlgorithm, UniqueConstraintData};
use spacetimedb_schema::schema::{
ColumnSchema, ConstraintSchema, IndexSchema, RowLevelSecuritySchema, SequenceSchema,
ColumnSchema, ConstraintSchema, IndexSchema, RowLevelSecuritySchema, ScheduleSchema, SequenceSchema,
};
use spacetimedb_table::table::UniqueConstraintViolation;

Expand Down Expand Up @@ -1409,7 +1413,7 @@ mod tests {
row: &ProductValue,
) -> Result<(AlgebraicValue, RowRef<'a>)> {
let row = to_vec(&row).unwrap();
let (gen_cols, row_ref) = datastore.insert_mut_tx(tx, table_id, &row)?;
let (gen_cols, row_ref, _) = datastore.insert_mut_tx(tx, table_id, &row)?;
let gen_cols = row_ref.project(&gen_cols)?;
Ok((gen_cols, row_ref))
}
Expand All @@ -1422,7 +1426,7 @@ mod tests {
row: &ProductValue,
) -> Result<(AlgebraicValue, RowRef<'a>)> {
let row = to_vec(&row).unwrap();
let (gen_cols, row_ref) = datastore.update_mut_tx(tx, table_id, index_id, &row)?;
let (gen_cols, row_ref, _) = datastore.update_mut_tx(tx, table_id, index_id, &row)?;
let gen_cols = row_ref.project(&gen_cols)?;
Ok((gen_cols, row_ref))
}
Expand Down Expand Up @@ -2501,6 +2505,73 @@ mod tests {
Ok(())
}

#[test]
fn test_scheduled_table_insert_and_update() -> ResultTest<()> {
let table_id = TableId::SENTINEL;
// Build the minimal schema that is a valid scheduler table.
let schema = TableSchema::new(
table_id,
"Foo".into(),
vec![
ColumnSchema {
table_id,
col_pos: 0.into(),
col_name: "id".into(),
col_type: AlgebraicType::U64,
},
ColumnSchema {
table_id,
col_pos: 1.into(),
col_name: "at".into(),
col_type: ScheduleAt::get_type(),
},
],
vec![IndexSchema {
table_id,
index_id: IndexId::SENTINEL,
index_name: "id_idx".into(),
index_algorithm: IndexAlgorithm::BTree(BTreeAlgorithm { columns: 0.into() }),
}],
vec![ConstraintSchema {
table_id,
constraint_id: ConstraintId::SENTINEL,
constraint_name: "id_unique".into(),
data: ConstraintData::Unique(UniqueConstraintData { columns: 0.into() }),
}],
vec![],
StTableType::User,
StAccess::Public,
Some(ScheduleSchema {
table_id,
schedule_id: ScheduleId::SENTINEL,
schedule_name: "schedule".into(),
reducer_name: "reducer".into(),
at_column: 1.into(),
}),
Some(0.into()),
);

// Create the table.
let datastore = get_datastore()?;
let mut tx = begin_mut_tx(&datastore);
let table_id = datastore.create_table_mut_tx(&mut tx, schema)?;
let index_id = datastore
.index_id_from_name_mut_tx(&tx, "id_idx")?
.expect("there should be an index with this name");

// Make us a row and insert + identity update.
let row = &product![24u64, value_serialize(&ScheduleAt::Interval(42))];
let row = &to_vec(row).unwrap();
let (_, _, insert_flags) = datastore.insert_mut_tx(&mut tx, table_id, row)?;
let (_, _, update_flags) = datastore.update_mut_tx(&mut tx, table_id, index_id, row)?;

// The whole point of the test.
assert!(insert_flags.is_scheduler_table);
assert!(update_flags.is_scheduler_table);

Ok(())
}

#[test]
fn test_row_level_security() -> ResultTest<()> {
let (_, mut tx, table_id) = setup_table()?;
Expand Down
42 changes: 31 additions & 11 deletions crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,22 @@ use super::{
tx_state::{DeleteTable, IndexIdMap, TxState},
SharedMutexGuard, SharedWriteGuard,
};
use crate::db::datastore::locking_tx_datastore::committed_state::CommittedIndexIterWithDeletedMutTx;
use crate::db::datastore::locking_tx_datastore::state_view::{
IndexSeekIterIdWithDeletedMutTx, IterByColEqMutTx, IterByColRangeMutTx, IterMutTx,
};
use crate::db::datastore::system_tables::{
with_sys_table_buf, StColumnFields, StColumnRow, StConstraintFields, StConstraintRow, StFields as _, StIndexFields,
StIndexRow, StRowLevelSecurityFields, StRowLevelSecurityRow, StScheduledFields, StScheduledRow, StSequenceFields,
StSequenceRow, StTableFields, StTableRow, SystemTable, ST_COLUMN_ID, ST_CONSTRAINT_ID, ST_INDEX_ID,
ST_ROW_LEVEL_SECURITY_ID, ST_SCHEDULED_ID, ST_SEQUENCE_ID, ST_TABLE_ID,
};
use crate::db::datastore::traits::{RowTypeForTable, TxData};
use crate::db::datastore::{
locking_tx_datastore::committed_state::CommittedIndexIterWithDeletedMutTx, traits::InsertFlags,
};
use crate::db::datastore::{
locking_tx_datastore::state_view::{
IndexSeekIterIdWithDeletedMutTx, IterByColEqMutTx, IterByColRangeMutTx, IterMutTx,
},
traits::UpdateFlags,
};
use crate::execution_context::Workload;
use crate::{
error::{IndexError, SequenceError, TableError},
Expand Down Expand Up @@ -1174,7 +1179,7 @@ impl MutTxId {
&'a mut self,
table_id: TableId,
row: &T,
) -> Result<(ColList, RowRefInsertion<'a>)> {
) -> Result<(ColList, RowRefInsertion<'a>, InsertFlags)> {
thread_local! {
static BUF: RefCell<Vec<u8>> = const { RefCell::new(Vec::new()) };
}
Expand All @@ -1199,11 +1204,12 @@ impl MutTxId {
/// Returns:
/// - a list of columns which have been replaced with generated values.
/// - a ref to the inserted row.
/// - any insert flags.
pub(super) fn insert<const GENERATE: bool>(
&mut self,
table_id: TableId,
row: &[u8],
) -> Result<(ColList, RowRefInsertion<'_>)> {
) -> Result<(ColList, RowRefInsertion<'_>, InsertFlags)> {
// Get the insert table, so we can write the row into it.
let (tx_table, tx_blob_store, ..) = self
.tx_state
Expand All @@ -1213,6 +1219,10 @@ impl MutTxId {
)
.ok_or(TableError::IdNotFoundState(table_id))?;

let insert_flags = InsertFlags {
is_scheduler_table: tx_table.is_scheduler(),
};

// 1. Insert the physical row.
let (tx_row_ref, blob_bytes) = tx_table.insert_physically_bsatn(tx_blob_store, row)?;
// 2. Optionally: Detect, generate, write sequence values.
Expand Down Expand Up @@ -1322,7 +1332,7 @@ impl MutTxId {
// SAFETY: `find_same_row` told us that `ptr` refers to a valid row in `commit_table`.
unsafe { commit_table.get_row_ref_unchecked(blob_store, commit_ptr) },
);
return Ok((gen_cols, rri));
return Ok((gen_cols, rri, insert_flags));
}

// Pacify the borrow checker.
Expand Down Expand Up @@ -1350,15 +1360,15 @@ impl MutTxId {
// as there haven't been any interleaving `&mut` calls that could invalidate the pointer.
tx_table.get_row_ref_unchecked(tx_blob_store, tx_row_ptr)
});
Ok((gen_cols, rri))
Ok((gen_cols, rri, insert_flags))
}
// `row` previously present in insert tables; do nothing but return `ptr`.
Err(InsertError::Duplicate(DuplicateError(ptr))) => {
let rri = RowRefInsertion::Existed(
// SAFETY: `tx_table` told us that `ptr` refers to a valid row in it.
unsafe { tx_table.get_row_ref_unchecked(tx_blob_store, ptr) },
);
Ok((gen_cols, rri))
Ok((gen_cols, rri, insert_flags))
}

// Unwrap these error into `TableError::{IndexError, Bflatn}`:
Expand All @@ -1382,7 +1392,13 @@ impl MutTxId {
/// Returns:
/// - a list of columns which have been replaced with generated values.
/// - a ref to the new row.
pub(crate) fn update(&mut self, table_id: TableId, index_id: IndexId, row: &[u8]) -> Result<(ColList, RowRef<'_>)> {
/// - any update flags.
pub(crate) fn update(
&mut self,
table_id: TableId,
index_id: IndexId,
row: &[u8],
) -> Result<(ColList, RowRef<'_>, UpdateFlags)> {
let tx_removed_index = self.tx_state_removed_index(index_id);

// 1. Insert the physical row into the tx insert table.
Expand Down Expand Up @@ -1425,6 +1441,10 @@ impl MutTxId {
// SAFETY: `tx_table.is_row_present(tx_row_ptr)` holds as we haven't deleted it yet.
let tx_row_ref = unsafe { tx_table.get_row_ref_unchecked(tx_blob_store, tx_row_ptr) };

let update_flags = UpdateFlags {
is_scheduler_table: tx_table.is_scheduler(),
};

// 3. Find the old row and remove it.
//----------------------------------------------------------------------
#[inline]
Expand Down Expand Up @@ -1564,7 +1584,7 @@ impl MutTxId {
// per post-condition of `confirm_insertion` and `confirm_update`
// in the if/else branches respectively.
let tx_row_ref = unsafe { tx_table.get_row_ref_unchecked(tx_blob_store, tx_row_ptr) };
return Ok((cols_to_gen, tx_row_ref));
return Ok((cols_to_gen, tx_row_ref, update_flags));
};

// When we reach here, we had an error and we need to revert the insertion of `tx_row_ref`.
Expand Down
21 changes: 19 additions & 2 deletions crates/core/src/db/datastore/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,19 @@ impl Program {
}
}

/// Additional information about an insert operation.
pub struct InsertFlags {
/// Is the table a scheduler table?
pub is_scheduler_table: bool,
}

/// Additional information about an update operation.
// TODO(centril): consider fusing this with `InsertFlags`.
pub struct UpdateFlags {
/// Is the table a scheduler table?
pub is_scheduler_table: bool,
}

pub trait TxDatastore: DataRow + Tx {
type IterTx<'a>: Iterator<Item = Self::RowRef<'a>>
where
Expand Down Expand Up @@ -486,30 +499,34 @@ pub trait MutTxDatastore: TxDatastore + MutTx {
///
/// Returns the list of columns with sequence-trigger values that were replaced with generated ones
/// and a reference to the row as a [`RowRef`].
/// Also returns any additional insert flags.
///
/// Generated columns are columns with an auto-inc sequence
/// and where the column was `0` in `row`.
// TODO(centril): consider making the tuple into a struct.
fn insert_mut_tx<'a>(
&'a self,
tx: &'a mut Self::MutTx,
table_id: TableId,
row: &[u8],
) -> Result<(ColList, RowRef<'a>)>;
) -> Result<(ColList, RowRef<'a>, InsertFlags)>;
/// Updates a row to `row`, encoded in BSATN, into the table identified by `table_id`
/// using the index identified by `index_id`.
///
/// Returns the list of columns with sequence-trigger values that were replaced with generated ones
/// and a reference to the row as a [`RowRef`].
/// Also returns any additional update flags.
///
/// Generated columns are columns with an auto-inc sequence
/// and where the column was `0` in `row`.
// TODO(centril): consider making the tuple into a struct.
fn update_mut_tx<'a>(
&'a self,
tx: &'a mut Self::MutTx,
table_id: TableId,
index_id: IndexId,
row: &[u8],
) -> Result<(ColList, RowRef<'a>)>;
) -> Result<(ColList, RowRef<'a>, UpdateFlags)>;

/// Obtain the [`Metadata`] for this datastore.
///
Expand Down
9 changes: 5 additions & 4 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use super::datastore::locking_tx_datastore::state_view::{
};
use super::datastore::system_tables::ST_MODULE_ID;
use super::datastore::traits::{
IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore,
InsertFlags, IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore,
UpdateFlags,
};
use super::datastore::{
locking_tx_datastore::{
Expand Down Expand Up @@ -1130,7 +1131,7 @@ impl RelationalDB {
tx: &'a mut MutTx,
table_id: TableId,
row: &[u8],
) -> Result<(ColList, RowRef<'a>), DBError> {
) -> Result<(ColList, RowRef<'a>, InsertFlags), DBError> {
self.inner.insert_mut_tx(tx, table_id, row)
}

Expand All @@ -1140,7 +1141,7 @@ impl RelationalDB {
table_id: TableId,
index_id: IndexId,
row: &[u8],
) -> Result<(ColList, RowRef<'a>), DBError> {
) -> Result<(ColList, RowRef<'a>, UpdateFlags), DBError> {
self.inner.update_mut_tx(tx, table_id, index_id, row)
}

Expand Down Expand Up @@ -1563,7 +1564,7 @@ pub mod tests_utils {
table_id: TableId,
row: &T,
) -> Result<(AlgebraicValue, RowRef<'a>), DBError> {
let (gen_cols, row_ref) = db.insert(tx, table_id, &to_vec(row).unwrap())?;
let (gen_cols, row_ref, _) = db.insert(tx, table_id, &to_vec(row).unwrap())?;
let gen_cols = row_ref.project(&gen_cols).unwrap();
Ok((gen_cols, row_ref))
}
Expand Down
Loading