Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 36 additions & 48 deletions crates/core/src/db/datastore/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1857,13 +1857,13 @@ mod tests {
let mut tx = begin_mut_tx(&datastore);
let schema = datastore.schema_for_table_mut_tx(&tx, table_id)?;

assert_eq!(&*tx.tx_state.pending_schema_changes, []);
assert_eq!(tx.pending_schema_changes(), []);
let mut dropped_indexes = 0;
for (pos, index) in schema.indexes.iter().enumerate() {
datastore.drop_index_mut_tx(&mut tx, index.index_id)?;
dropped_indexes += 1;

let psc = &tx.tx_state.pending_schema_changes[pos];
let psc = &tx.pending_schema_changes()[pos];
let PendingSchemaChange::IndexRemoved(tid, iid, _, schema) = psc else {
panic!("wrong pending schema change: {psc:?}");
};
Expand All @@ -1878,7 +1878,7 @@ mod tests {
datastore.commit_mut_tx(tx)?;

let mut tx = begin_mut_tx(&datastore);
assert_eq!(&*tx.tx_state.pending_schema_changes, []);
assert_eq!(tx.pending_schema_changes(), []);
assert!(
datastore.schema_for_table_mut_tx(&tx, table_id)?.indexes.is_empty(),
"no indexes should be left in the schema post-commit"
Expand All @@ -1897,7 +1897,7 @@ mod tests {
true,
)?;
assert_matches!(
&*tx.tx_state.pending_schema_changes,
tx.pending_schema_changes(),
[PendingSchemaChange::IndexAdded(tid, _, Some(_))]
if *tid == table_id
);
Expand All @@ -1920,7 +1920,7 @@ mod tests {
datastore.commit_mut_tx(tx)?;

let tx = begin_mut_tx(&datastore);
assert_eq!(&*tx.tx_state.pending_schema_changes, []);
assert_eq!(tx.pending_schema_changes(), []);
assert_eq!(
datastore.schema_for_table_mut_tx(&tx, table_id)?.indexes,
expected_indexes,
Expand All @@ -1935,10 +1935,10 @@ mod tests {
#[test]
fn test_schema_for_table_rollback() -> ResultTest<()> {
let (datastore, tx, table_id) = setup_table()?;
assert_eq!(tx.tx_state.pending_schema_changes.len(), 6);
assert_eq!(tx.pending_schema_changes().len(), 6);
let _ = datastore.rollback_mut_tx(tx);
let tx = begin_mut_tx(&datastore);
assert_eq!(&*tx.tx_state.pending_schema_changes, []);
assert_eq!(tx.pending_schema_changes(), []);
let schema = datastore.schema_for_table_mut_tx(&tx, table_id);
assert!(schema.is_err());
Ok(())
Expand Down Expand Up @@ -2165,12 +2165,9 @@ mod tests {
commit(&datastore, tx)?;

let mut tx = begin_mut_tx(&datastore);
assert_eq!(&*tx.tx_state.pending_schema_changes, []);
assert_eq!(tx.pending_schema_changes(), []);
create_foo_age_idx_btree(&datastore, &mut tx, table_id)?;
assert_matches!(
&*tx.tx_state.pending_schema_changes,
[PendingSchemaChange::IndexAdded(.., None)]
);
assert_matches!(tx.pending_schema_changes(), [PendingSchemaChange::IndexAdded(.., None)]);
assert_st_indices(&tx, true)?;
let row = u32_str_u32(0, "Bar", 18); // 0 will be ignored.
let result = insert(&datastore, &mut tx, table_id, &row);
Expand All @@ -2195,7 +2192,7 @@ mod tests {
commit(&datastore, tx)?;

let mut tx = begin_mut_tx(&datastore);
assert_eq!(&*tx.tx_state.pending_schema_changes, []);
assert_eq!(tx.pending_schema_changes(), []);
assert_st_indices(&tx, true)?;
let row = u32_str_u32(0, "Bar", 18); // 0 will be ignored.
let result = insert(&datastore, &mut tx, table_id, &row);
Expand Down Expand Up @@ -2235,16 +2232,13 @@ mod tests {
// Start a transaction. Schema changes empty so far.
let datastore = get_datastore()?;
let mut tx = begin_mut_tx(&datastore);
assert_eq!(&*tx.tx_state.pending_schema_changes, []);
assert_eq!(tx.pending_schema_changes(), []);

// Make the table and witness `TableAdded`. Commit.
let column = ColumnSchema::for_test(0, "id", AlgebraicType::I32);
let schema = user_public_table([column], [], [], [], None, None);
let table_id = datastore.create_table_mut_tx(&mut tx, schema)?;
assert_matches!(
&*tx.tx_state.pending_schema_changes,
[PendingSchemaChange::TableAdded(..)]
);
assert_matches!(tx.pending_schema_changes(), [PendingSchemaChange::TableAdded(..)]);
commit(&datastore, tx)?;

// Start a new tx. Insert a row and witness that a sequence isn't used.
Expand Down Expand Up @@ -2274,7 +2268,7 @@ mod tests {
};
let seq_id = datastore.create_sequence_mut_tx(&mut tx, sequence.clone())?;
assert_matches!(
&*tx.tx_state.pending_schema_changes,
tx.pending_schema_changes(),
[PendingSchemaChange::SequenceAdded(_, added_seq_id)]
if *added_seq_id == seq_id
);
Expand All @@ -2283,7 +2277,7 @@ mod tests {
// Drop the uncommitted sequence.
datastore.drop_sequence_mut_tx(&mut tx, seq_id)?;
assert_matches!(
&*tx.tx_state.pending_schema_changes,
tx.pending_schema_changes(),
[
PendingSchemaChange::SequenceAdded(..),
PendingSchemaChange::SequenceRemoved(.., schema),
Expand All @@ -2295,7 +2289,7 @@ mod tests {
// Add the sequence again and rollback, witnessing that this had no effect in the next tx.
let seq_id = datastore.create_sequence_mut_tx(&mut tx, sequence.clone())?;
assert_matches!(
&*tx.tx_state.pending_schema_changes,
tx.pending_schema_changes(),
[
PendingSchemaChange::SequenceAdded(..),
PendingSchemaChange::SequenceRemoved(..),
Expand All @@ -2305,45 +2299,39 @@ mod tests {
);
let _ = datastore.rollback_mut_tx(tx);
let mut tx: MutTxId = begin_mut_tx(&datastore);
assert_eq!(&*tx.tx_state.pending_schema_changes, []);
assert_eq!(tx.pending_schema_changes(), []);
insert_assert_and_remove(&mut tx, &zero, &zero)?;

// Add the sequence and this time actually commit. Check that it exists in next tx.
assert_eq!(&*tx.tx_state.pending_schema_changes, []);
assert_eq!(tx.pending_schema_changes(), []);
let seq_id = datastore.create_sequence_mut_tx(&mut tx, sequence.clone())?;
assert_matches!(
&*tx.tx_state.pending_schema_changes,
tx.pending_schema_changes(),
[PendingSchemaChange::SequenceAdded(_, added_seq_id)]
if *added_seq_id == seq_id
);
commit(&datastore, tx)?;
let mut tx = begin_mut_tx(&datastore);
assert_eq!(&*tx.tx_state.pending_schema_changes, []);
assert_eq!(tx.pending_schema_changes(), []);
insert_assert_and_remove(&mut tx, &zero, &one)?;

// We have the sequence in committed state.
// Drop it and then rollback, so in the next tx the seq is still there.
datastore.drop_sequence_mut_tx(&mut tx, seq_id)?;
assert_matches!(
&*tx.tx_state.pending_schema_changes,
[PendingSchemaChange::SequenceRemoved(..)]
);
assert_matches!(tx.pending_schema_changes(), [PendingSchemaChange::SequenceRemoved(..)]);
insert_assert_and_remove(&mut tx, &zero, &zero)?;
let _ = datastore.rollback_mut_tx(tx);
let mut tx = begin_mut_tx(&datastore);
assert_eq!(&*tx.tx_state.pending_schema_changes, []);
assert_eq!(tx.pending_schema_changes(), []);
insert_assert_and_remove(&mut tx, &zero, &product![2])?;

// Drop the seq and commit this time around. In the next tx, we witness that there's no seq.
datastore.drop_sequence_mut_tx(&mut tx, seq_id)?;
assert_matches!(
&*tx.tx_state.pending_schema_changes,
[PendingSchemaChange::SequenceRemoved(..)]
);
assert_matches!(tx.pending_schema_changes(), [PendingSchemaChange::SequenceRemoved(..)]);
insert_assert_and_remove(&mut tx, &zero, &zero)?;
commit(&datastore, tx)?;
let mut tx = begin_mut_tx(&datastore);
assert_eq!(&*tx.tx_state.pending_schema_changes, []);
assert_eq!(tx.pending_schema_changes(), []);
insert_assert_and_remove(&mut tx, &zero, &zero)?;

Ok(())
Expand Down Expand Up @@ -2570,16 +2558,16 @@ mod tests {
fn test_update_no_such_index_because_deleted() -> ResultTest<()> {
// Setup and immediately commit.
let (datastore, tx, table_id) = setup_table()?;
assert_eq!(tx.tx_state.pending_schema_changes.len(), 6);
assert_eq!(tx.pending_schema_changes().len(), 6);
commit(&datastore, tx)?;

// Remove index in tx state.
let mut tx = begin_mut_tx(&datastore);
assert_eq!(&*tx.tx_state.pending_schema_changes, []);
assert_eq!(tx.pending_schema_changes(), []);
let index_id = extract_index_id(&datastore, &tx, &basic_indices()[0])?;
tx.drop_index(index_id)?;
assert_matches!(
&*tx.tx_state.pending_schema_changes,
tx.pending_schema_changes(),
[PendingSchemaChange::IndexRemoved(tid, iid, _, _)]
if *tid == table_id && *iid == index_id
);
Expand Down Expand Up @@ -2654,7 +2642,7 @@ mod tests {
fn test_update_no_such_row_because_deleted_new_index_in_tx() -> ResultTest<()> {
let (datastore, mut tx, table_id) = setup_table_with_indices([], [])?;
assert_matches!(
&*tx.tx_state.pending_schema_changes,
tx.pending_schema_changes(),
[
PendingSchemaChange::TableAdded(_),
PendingSchemaChange::SequenceAdded(..),
Expand All @@ -2668,13 +2656,13 @@ mod tests {

// Now add the indices and then delete the row.
let mut tx = begin_mut_tx(&datastore);
assert_eq!(&*tx.tx_state.pending_schema_changes, []);
assert_eq!(tx.pending_schema_changes(), []);
let mut indices = basic_indices();
for (pos, index) in indices.iter_mut().enumerate() {
index.table_id = table_id;
index.index_id = datastore.create_index_mut_tx(&mut tx, index.clone(), true)?;
assert_matches!(
&tx.tx_state.pending_schema_changes[pos],
&tx.pending_schema_changes()[pos],
PendingSchemaChange::IndexAdded(_, iid, _)
if *iid == index.index_id
);
Expand Down Expand Up @@ -3087,10 +3075,10 @@ mod tests {

// Create a transaction and drop the table and roll back.
let mut tx = begin_mut_tx(&datastore);
assert_eq!(&*tx.tx_state.pending_schema_changes, []);
assert_eq!(tx.pending_schema_changes(), []);
assert!(datastore.drop_table_mut_tx(&mut tx, table_id).is_ok());
assert_matches!(
&*tx.tx_state.pending_schema_changes,
tx.pending_schema_changes(),
[
PendingSchemaChange::IndexRemoved(..),
PendingSchemaChange::IndexRemoved(..),
Expand All @@ -3105,7 +3093,7 @@ mod tests {

// Ensure the table still exists in the next transaction.
let tx = begin_mut_tx(&datastore);
assert_eq!(&*tx.tx_state.pending_schema_changes, []);
assert_eq!(tx.pending_schema_changes(), []);
assert!(
datastore.table_id_exists_mut_tx(&tx, &table_id),
"Table should still exist",
Expand All @@ -3120,7 +3108,7 @@ mod tests {
// Create a table in a failed transaction.
let (datastore, tx, table_id) = setup_table()?;
assert_matches!(
&*tx.tx_state.pending_schema_changes,
tx.pending_schema_changes(),
[
PendingSchemaChange::TableAdded(added_table_id),
PendingSchemaChange::IndexAdded(.., Some(_)),
Expand All @@ -3135,7 +3123,7 @@ mod tests {

// Nothing should have happened.
let tx = begin_mut_tx(&datastore);
assert_eq!(&*tx.tx_state.pending_schema_changes, []);
assert_eq!(tx.pending_schema_changes(), []);
assert!(
!datastore.table_id_exists_mut_tx(&tx, &table_id),
"Table should not exist"
Expand All @@ -3156,7 +3144,7 @@ mod tests {
assert_access(&tx, StAccess::Public);
tx.alter_table_access(table_id, StAccess::Private)?;
assert_eq!(
&*tx.tx_state.pending_schema_changes,
tx.pending_schema_changes(),
[PendingSchemaChange::TableAlterAccess(table_id, StAccess::Public)]
);
let _ = datastore.rollback_mut_tx(tx);
Expand Down Expand Up @@ -3260,7 +3248,7 @@ mod tests {
let mut tx = begin_mut_tx(&datastore);
datastore.alter_table_row_type_mut_tx(&mut tx, table_id, columns.clone())?;
assert_eq!(
&*tx.tx_state.pending_schema_changes,
tx.pending_schema_changes(),
[PendingSchemaChange::TableAlterRowType(
table_id,
columns_original.clone()
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/db/datastore/locking_tx_datastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub use state_view::{IterByColEqTx, IterByColRangeTx};
pub mod delete_table;
pub(crate) mod tx;
mod tx_state;
#[cfg(test)]
pub(crate) use tx_state::PendingSchemaChange;

use parking_lot::{
lock_api::{ArcMutexGuard, ArcRwLockReadGuard, ArcRwLockWriteGuard},
Expand Down
6 changes: 6 additions & 0 deletions crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ impl MutTxId {
self.tx_state.pending_schema_changes.push(change);
}

/// Get the list of current pending schema changes, for testing.
#[cfg(test)]
pub(crate) fn pending_schema_changes(&self) -> &[PendingSchemaChange] {
&self.tx_state.pending_schema_changes
}

/// Deletes all the rows in table with `table_id`
/// where the column with `col_pos` equals `value`.
fn delete_col_eq(&mut self, table_id: TableId, col_pos: ColId, value: &AlgebraicValue) -> Result<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use spacetimedb_sats::memory_usage::MemoryUsage;
use spacetimedb_schema::schema::SequenceSchema;

#[derive(Debug, PartialEq)]
pub(super) struct Sequence {
pub(crate) struct Sequence {
schema: SequenceSchema,
pub(super) value: i128,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub(super) struct TxState {
/// Architecting this way should benefit performance both during transactions and merge.
/// On rollback, it should be fairly cheap to e.g., just re-add an index or drop it on the floor.
#[derive(Debug, PartialEq)]
pub(super) enum PendingSchemaChange {
pub(crate) enum PendingSchemaChange {
/// The [`TableIndex`] / [`IndexSchema`] with `IndexId`
/// was removed from the table with [`TableId`].
IndexRemoved(TableId, IndexId, TableIndex, IndexSchema),
Expand Down
Loading
Loading