Skip to content

Commit 10854cc

Browse files
Shubham8287Centril
andauthored
datastore: add columns support (#3230)
# Description of Changes - `add_columns_to_table` api to handle `AutoMigrateStep::AddColumns` -- Look at doc comment for more Info. depends on: #3261 TODO: handle `AutoMigrateStep::DisconnectAllUsers`. # API and ABI breaking changes N/A. # Expected complexity level and risk 2? Changes are not in hotpath but a bug in migration can corrupt tables. # Testing a test to exercise the API. --------- Signed-off-by: Mazdak Farrokhzad <twingoow@gmail.com> Signed-off-by: Shubham Mishra <shivam828787@gmail.com> Co-authored-by: Mazdak Farrokhzad <twingoow@gmail.com>
1 parent ecc00ca commit 10854cc

File tree

13 files changed

+546
-55
lines changed

13 files changed

+546
-55
lines changed

crates/core/src/db/relational_db.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1088,6 +1088,18 @@ impl RelationalDB {
10881088
Ok(self.inner.alter_table_row_type_mut_tx(tx, table_id, column_schemas)?)
10891089
}
10901090

1091+
pub(crate) fn add_columns_to_table(
1092+
&self,
1093+
tx: &mut MutTx,
1094+
table_id: TableId,
1095+
column_schemas: Vec<ColumnSchema>,
1096+
default_values: Vec<AlgebraicValue>,
1097+
) -> Result<TableId, DBError> {
1098+
Ok(self
1099+
.inner
1100+
.add_columns_to_table_mut_tx(tx, table_id, column_schemas, default_values)?)
1101+
}
1102+
10911103
/// Reports the `TxMetrics`s passed.
10921104
///
10931105
/// Should only be called after the tx lock has been fully released.

crates/core/src/db/update.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,18 @@ fn auto_migrate_database(
252252
log!(logger, "Removing-row level security `{sql_rls}`");
253253
stdb.drop_row_level_security(tx, sql_rls.clone())?;
254254
}
255+
spacetimedb_schema::auto_migrate::AutoMigrateStep::AddColumns(table_name) => {
256+
let table_def = plan.new.stored_in_table_def(table_name).expect("table must exist");
257+
let table_id = stdb.table_id_from_name_mut(tx, table_name).unwrap().unwrap();
258+
let column_schemas = column_schemas_from_defs(plan.new, &table_def.columns, table_id);
259+
260+
let default_values: Vec<AlgebraicValue> = table_def
261+
.columns
262+
.iter()
263+
.filter_map(|col_def| col_def.default_value.clone())
264+
.collect();
265+
stdb.add_columns_to_table(tx, table_id, column_schemas, default_values)?;
266+
}
255267
_ => anyhow::bail!("migration step not implemented: {step:?}"),
256268
}
257269
}

crates/datastore/src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ pub enum TableError {
8181
#[error(transparent)]
8282
// Error here is `Box`ed to avoid triggering https://rust-lang.github.io/rust-clippy/master/index.html#result_large_err .
8383
ChangeColumnsError(#[from] Box<table::ChangeColumnsError>),
84+
#[error(transparent)]
85+
AddColumnsError(#[from] Box<table::AddColumnsError>),
8486
}
8587

8688
#[derive(Error, Debug, PartialEq, Eq)]

crates/datastore/src/locking_tx_datastore/committed_state.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -706,6 +706,7 @@ impl CommittedState {
706706
for change in pending_schema_changes {
707707
if let PendingSchemaChange::TableRemoved(table_id, mut table) = change {
708708
let row_ptrs = table.scan_all_row_ptrs();
709+
truncates.insert(table_id);
709710
delete_rows(
710711
tx_data,
711712
table_id,

crates/datastore/src/locking_tx_datastore/datastore.rs

Lines changed: 155 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,16 @@ impl Locking {
335335
) -> Result<()> {
336336
tx.alter_table_row_type(table_id, column_schemas)
337337
}
338+
339+
pub fn add_columns_to_table_mut_tx(
340+
&self,
341+
tx: &mut MutTxId,
342+
table_id: TableId,
343+
column_schemas: Vec<ColumnSchema>,
344+
defaults: Vec<AlgebraicValue>,
345+
) -> Result<TableId> {
346+
tx.add_columns_to_table(table_id, column_schemas, defaults)
347+
}
338348
}
339349

340350
impl DataRow for Locking {
@@ -1264,6 +1274,7 @@ mod tests {
12641274
use core::{fmt, mem};
12651275
use itertools::Itertools;
12661276
use pretty_assertions::{assert_eq, assert_matches};
1277+
use spacetimedb_execution::dml::MutDatastore as _;
12671278
use spacetimedb_execution::Datastore;
12681279
use spacetimedb_lib::db::auth::{StAccess, StTableType};
12691280
use spacetimedb_lib::error::ResultTest;
@@ -3228,11 +3239,12 @@ mod tests {
32283239
// Now drop the table again and commit.
32293240
assert!(datastore.drop_table_mut_tx(&mut tx, table_id).is_ok());
32303241
let tx_data = commit(&datastore, tx)?;
3231-
let (_, deleted) = tx_data
3242+
let (_, deleted_rows) = tx_data
32323243
.deletes()
32333244
.find(|(id, _)| **id == table_id)
32343245
.expect("should have deleted rows for `table_id`");
3235-
assert_eq!(&**deleted, [row]);
3246+
assert_eq!(&**deleted_rows, [row]);
3247+
assert!(tx_data.truncates().contains(&table_id), "table should be truncated");
32363248

32373249
// In the next transaction, the table doesn't exist.
32383250
assert!(
@@ -3453,8 +3465,148 @@ mod tests {
34533465

34543466
Ok(())
34553467
}
3456-
34573468
// TODO: Add the following tests
34583469
// - Create a tx that inserts 2000 rows with an auto_inc column
34593470
// - Create a tx that inserts 2000 rows with an auto_inc column and then rolls back
3471+
3472+
#[test]
3473+
fn test_add_columns_to_table() -> ResultTest<()> {
3474+
let datastore = get_datastore()?;
3475+
3476+
let mut tx = begin_mut_tx(&datastore);
3477+
3478+
let initial_sum_type = AlgebraicType::sum([("ba", AlgebraicType::U16)]);
3479+
let initial_columns = [
3480+
ColumnSchema::for_test(0, "a", AlgebraicType::U64),
3481+
ColumnSchema::for_test(1, "b", initial_sum_type.clone()),
3482+
];
3483+
3484+
let initial_indices = [
3485+
IndexSchema::for_test("index_a", BTreeAlgorithm::from(0)),
3486+
IndexSchema::for_test("index_b", BTreeAlgorithm::from(1)),
3487+
];
3488+
3489+
let sequence = SequenceRow {
3490+
id: SequenceId::SENTINEL.into(),
3491+
table: 0,
3492+
col_pos: 0,
3493+
name: "Foo_id_seq",
3494+
start: 5,
3495+
};
3496+
3497+
let schema = user_public_table(
3498+
initial_columns,
3499+
initial_indices.clone(),
3500+
[],
3501+
map_array([sequence]),
3502+
None,
3503+
None,
3504+
);
3505+
3506+
let table_id = datastore.create_table_mut_tx(&mut tx, schema)?;
3507+
3508+
let mut columns_original = tx.get_schema(table_id).unwrap().columns.to_vec();
3509+
3510+
// Insert initial rows
3511+
let initial_row = product![0u64, AlgebraicValue::sum(0, AlgebraicValue::U16(1))];
3512+
insert(&datastore, &mut tx, table_id, &initial_row).unwrap();
3513+
insert(&datastore, &mut tx, table_id, &initial_row).unwrap();
3514+
commit(&datastore, tx)?;
3515+
3516+
// Alter Table: Add Variant and Column
3517+
//
3518+
// Change the `b` column, adding a variant.
3519+
let vars_ref = &mut columns_original[1].col_type.as_sum_mut().unwrap().variants;
3520+
let mut vars = Vec::from(mem::take(vars_ref));
3521+
vars.push(SumTypeVariant::new_named(AlgebraicType::U8, "bb"));
3522+
*vars_ref = vars.into();
3523+
// Add column `c`
3524+
let mut new_columns = columns_original.clone();
3525+
new_columns.push(ColumnSchema::for_test(2, "c", AlgebraicType::U8));
3526+
let defaults = vec![AlgebraicValue::U8(42)];
3527+
3528+
let mut tx = begin_mut_tx(&datastore);
3529+
// insert a row in tx_state before adding column
3530+
tx.insert_product_value(table_id, &initial_row).unwrap();
3531+
// add column and then rollback
3532+
let rollback_table_id =
3533+
datastore.add_columns_to_table_mut_tx(&mut tx, table_id, new_columns.clone(), defaults.clone())?;
3534+
let _ = tx.rollback();
3535+
3536+
let old_rows = [
3537+
product![5u64, AlgebraicValue::sum(0, 1u16.into())],
3538+
product![6u64, AlgebraicValue::sum(0, 1u16.into())],
3539+
];
3540+
3541+
let mut tx = begin_mut_tx(&datastore);
3542+
// check rollback was successful
3543+
let rows = tx
3544+
.table_scan(table_id)
3545+
.unwrap()
3546+
.map(|row| row.to_product_value())
3547+
.collect::<Vec<_>>();
3548+
assert_eq!(rows, old_rows, "Rows shouldn't be changed if rolledback");
3549+
let table = tx.table(rollback_table_id);
3550+
assert!(table.is_none(), "new table shouldn't be created if rolledback");
3551+
3552+
// Add column and actually commit this time.
3553+
tx.insert_product_value(table_id, &initial_row).unwrap();
3554+
let new_table_id = datastore.add_columns_to_table_mut_tx(&mut tx, table_id, new_columns.clone(), defaults)?;
3555+
3556+
let tx_data = commit(&datastore, tx)?;
3557+
3558+
assert_ne!(
3559+
new_table_id, table_id,
3560+
"New table ID after migration should differ from old one"
3561+
);
3562+
3563+
// Validate Commitlog Changes
3564+
let (_, deletes) = tx_data
3565+
.deletes()
3566+
.find(|(id, _)| **id == table_id)
3567+
.expect("Expected delete log for original table");
3568+
3569+
assert_eq!(
3570+
&**deletes, &old_rows,
3571+
"Unexpected delete entries after altering the table"
3572+
);
3573+
3574+
let inserted_rows = [
3575+
product![5u64, AlgebraicValue::sum(0, 1u16.into()), 42u8],
3576+
product![6u64, AlgebraicValue::sum(0, 1u16.into()), 42u8],
3577+
product![8u64, AlgebraicValue::sum(0, 1u16.into()), 42u8],
3578+
];
3579+
3580+
let (_, inserts) = tx_data
3581+
.inserts()
3582+
.find(|(id, _)| **id == new_table_id)
3583+
.expect("Expected insert log for new table");
3584+
3585+
assert_eq!(
3586+
&**inserts, &inserted_rows,
3587+
"Unexpected insert entries after altering the table"
3588+
);
3589+
3590+
// Insert Rows into New Table
3591+
let mut tx = begin_mut_tx(&datastore);
3592+
3593+
let new_row = product![0u64, AlgebraicValue::sum(0, 1u16.into()), 0u8];
3594+
tx.insert_product_value(new_table_id, &new_row).unwrap();
3595+
commit(&datastore, tx)?;
3596+
3597+
// test for auto_inc feields
3598+
let tx = begin_mut_tx(&datastore);
3599+
let rows = tx.table_scan(new_table_id).unwrap().map(|row| row.to_product_value());
3600+
3601+
let mut last_row_auto_inc = 0;
3602+
for row in rows {
3603+
let auto_inc_col = row.get_field(0, None)?;
3604+
if let AlgebraicValue::U64(val) = auto_inc_col {
3605+
assert!(val > &last_row_auto_inc, "Auto-increment value did not increase");
3606+
last_row_auto_inc = *val;
3607+
}
3608+
}
3609+
3610+
Ok(())
3611+
}
34603612
}

crates/datastore/src/locking_tx_datastore/mut_tx.rs

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ use spacetimedb_table::{
5959
table_index::TableIndex,
6060
};
6161
use std::{
62+
collections::HashMap,
6263
sync::Arc,
6364
time::{Duration, Instant},
6465
};
@@ -494,6 +495,134 @@ impl MutTxId {
494495
Ok(())
495496
}
496497

498+
/// Change the row type of the table identified by `table_id`.
499+
///
500+
/// This is an incompatible change that requires a new table to be created.
501+
/// The existing rows are copied over to the new table,
502+
/// with `default_values` appended to each row.
503+
///
504+
/// `column_schemas` must contain all the old columns in same order as before,
505+
/// followed by the new columns to be added.
506+
/// All new columns must have a default value in `default_values`.
507+
///
508+
/// After calling this method, Table referred by `table_id` is dropped,
509+
/// and a new table is created with the same name but a new `table_id`.
510+
/// The new `table_id` is returned.
511+
pub(crate) fn add_columns_to_table(
512+
&mut self,
513+
table_id: TableId,
514+
column_schemas: Vec<ColumnSchema>,
515+
mut default_values: Vec<AlgebraicValue>,
516+
) -> Result<TableId> {
517+
let original_table_schema: TableSchema = (*self.schema_for_table(table_id)?).clone();
518+
519+
// Only keep the default values of new columns.
520+
let new_cols = column_schemas
521+
.len()
522+
.checked_sub(original_table_schema.columns.len())
523+
.ok_or_else(|| {
524+
anyhow::anyhow!(
525+
"new column schemas must be more than existing ones for table_id: {}",
526+
table_id
527+
)
528+
})?;
529+
let older_defaults = default_values.len().checked_sub(new_cols).ok_or_else(|| {
530+
anyhow::anyhow!(
531+
"not enough default values provided for new columns for table_id: {}",
532+
table_id
533+
)
534+
})?;
535+
default_values.drain(..older_defaults);
536+
537+
let ((tx_table, ..), _) = self.get_or_create_insert_table_mut(table_id)?;
538+
539+
tx_table
540+
.validate_add_columns_schema(&column_schemas, &default_values)
541+
.map_err(TableError::from)?;
542+
543+
// Copy all rows as `ProductValue` before dropping the table.
544+
// This approach isn't ideal, as allocating a large contiguous block of memory
545+
// can lead to memory overflow errors.
546+
// Ideally, we should use iterators to avoid this, but that's not sufficient on its own.
547+
// `CommittedState::merge_apply_inserts` also allocates a similar `Vec` to hold the data.
548+
// So, if we really find this problematic in practice, this should be fixed in both places.
549+
let mut table_rows: Vec<ProductValue> = iter(&self.tx_state, &self.committed_state_write_lock, table_id)?
550+
.map(|r| r.to_product_value())
551+
.collect();
552+
553+
log::debug!(
554+
"ADDING TABLE COLUMN (incompatible layout): {}, table_id: {}",
555+
original_table_schema.table_name,
556+
table_id
557+
);
558+
559+
// Store sequence values to restore them later with new table.
560+
// Using a map from name to value as the new sequence ids will be different.
561+
// and I am not sure if we should rely on the order of sequences in the table schema.
562+
let seq_values: HashMap<Box<str>, i128> = original_table_schema
563+
.sequences
564+
.iter()
565+
.map(|s| {
566+
(
567+
s.sequence_name.clone(),
568+
self.sequence_state_lock
569+
.get_sequence_mut(s.sequence_id)
570+
.expect("sequence exists in original schema and should in sequence state.")
571+
.get_value(),
572+
)
573+
})
574+
.collect();
575+
576+
// Drop existing table first due to unique constraints on table name in `st_table`
577+
self.drop_table(table_id)?;
578+
579+
// Update existing (dropped) table schema with provided columns, reset Ids.
580+
let mut updated_table_schema = original_table_schema;
581+
updated_table_schema.columns = column_schemas;
582+
updated_table_schema.reset();
583+
let new_table_id = self.create_table_and_update_seq(updated_table_schema, seq_values)?;
584+
585+
// Populate rows with default values for new columns
586+
for product_value in table_rows.iter_mut() {
587+
let mut row_elements = product_value.elements.to_vec();
588+
row_elements.extend(default_values.iter().cloned());
589+
product_value.elements = row_elements.into();
590+
}
591+
592+
let (new_table, tx_blob_store) = self
593+
.tx_state
594+
.get_table_and_blob_store(new_table_id)
595+
.ok_or(TableError::IdNotFoundState(new_table_id))?;
596+
597+
for row in table_rows {
598+
new_table.insert(&self.committed_state_write_lock.page_pool, tx_blob_store, &row)?;
599+
}
600+
601+
Ok(new_table_id)
602+
}
603+
604+
fn create_table_and_update_seq(
605+
&mut self,
606+
table_schema: TableSchema,
607+
seq_values: HashMap<Box<str>, i128>,
608+
) -> Result<TableId> {
609+
let table_id = self.create_table(table_schema)?;
610+
let table_schema = self.schema_for_table(table_id)?;
611+
612+
for seq in table_schema.sequences.iter() {
613+
let new_seq = self
614+
.sequence_state_lock
615+
.get_sequence_mut(seq.sequence_id)
616+
.expect("sequence just created");
617+
let value = *seq_values
618+
.get(&seq.sequence_name)
619+
.ok_or_else(|| SequenceError::NotFound(seq.sequence_id))?;
620+
new_seq.update_value(value);
621+
}
622+
623+
Ok(table_id)
624+
}
625+
497626
/// Create an index.
498627
///
499628
/// Requires:

0 commit comments

Comments
 (0)