Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 82 additions & 16 deletions crates/datastore/src/locking_tx_datastore/committed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,22 @@ pub struct CommittedState {
/// - system tables: `st_view_sub`, `st_view_arg`
/// - Tables which back views.
pub(super) ephemeral_tables: EphemeralTables,

/// Rows within `st_column` which should be ignored during replay
/// due to having been superseded by a new row representing the same column.
///
/// During replay, we visit all of the inserts table-by-table, followed by all of the deletes table-by-table.
/// This means that, when multiple columns of a table change type within the same transaction,
/// we see all of the newly-inserted `st_column` rows first, and then later, all of the deleted rows.
/// We may even see inserts into the altered table before seeing the `st_column` deletes!
///
/// In order to maintain a proper view of the schema of tables during replay,
/// we must remember the old versions of the `st_column` rows when we insert the new ones,
/// so that we can respect only the new versions.
///
/// We insert into this set during [`Self::replay_insert`] of `st_column` rows
/// and delete from it during [`Self::replay_delete`] of `st_column` rows.
replay_columns_to_ignore: HashSet<RowPointer>,
}

impl CommittedState {
Expand Down Expand Up @@ -120,6 +136,7 @@ impl MemoryUsage for CommittedState {
table_dropped,
read_sets,
ephemeral_tables,
replay_columns_to_ignore,
} = self;
// NOTE(centril): We do not want to include the heap usage of `page_pool` as it's a shared resource.
next_tx_offset.heap_usage()
Expand All @@ -129,6 +146,7 @@ impl MemoryUsage for CommittedState {
+ table_dropped.heap_usage()
+ read_sets.heap_usage()
+ ephemeral_tables.heap_usage()
+ replay_columns_to_ignore.heap_usage()
}
}

Expand Down Expand Up @@ -199,6 +217,7 @@ impl CommittedState {
read_sets: <_>::default(),
page_pool,
ephemeral_tables: <_>::default(),
replay_columns_to_ignore: <_>::default(),
}
}

Expand Down Expand Up @@ -483,7 +502,7 @@ impl CommittedState {
let (table, blob_store, _, page_pool) = self.get_table_and_blob_store_mut(table_id)?;

// Delete the row.
table
let row_ptr = table
.delete_equal_row(page_pool, blob_store, row)
.map_err(TableError::Bflatn)?
.ok_or_else(|| anyhow!("Delete for non-existent row when replaying transaction"))?;
Expand All @@ -503,6 +522,19 @@ impl CommittedState {
self.table_dropped.insert(dropped_table_id);
}

if table_id == ST_COLUMN_ID {
// We may have reached the corresponding delete to an insert in `st_column`
// as the result of a column-type-altering migration.
// Now that the outdated `st_column` row isn't present any more,
// we can stop ignoring it.
//
// It's also possible that we're deleting this column as the result of a deleted table,
// and that there wasn't any corresponding insert at all.
// If that's the case, `row_ptr` won't be in `self.replay_columns_to_ignore`,
// which is fine.
self.replay_columns_to_ignore.remove(&row_ptr);
}

Ok(())
}

Expand Down Expand Up @@ -532,39 +564,63 @@ impl CommittedState {
Err(InsertError::IndexError(e)) => return Err(IndexError::UniqueConstraintViolation(e).into()),
};

let row_ptr = row_ref.pointer();

if table_id == ST_COLUMN_ID {
// We've made a modification to `st_column`.
// The type of a table has changed, so figure out which.
// The first column in `StColumnRow` is `table_id`.
self.st_column_changed(row, row_ptr)?;
let row_ptr = row_ref.pointer();
let table_id = self.ignore_previous_versions_of_column(row, row_ptr)?;
self.st_column_changed(table_id)?;
}

Ok(())
}

/// Mark all `st_column` rows which refer to the same column as `st_column_row`
/// other than the one at `row_pointer` as outdated
/// by storing them in [`Self::replay_columns_to_ignore`].
///
/// Returns the ID of the table to which `st_column_row` belongs.
fn ignore_previous_versions_of_column(
&mut self,
st_column_row: &ProductValue,
row_ptr: RowPointer,
) -> Result<TableId> {
let target_table_id = Self::read_table_id(st_column_row);
let target_col_id = ColId::deserialize(ValueDeserializer::from_ref(&st_column_row.elements[1]))
.expect("second field in `st_column` should decode to a `ColId`");

let outdated_st_column_rows = iter_st_column_for_table(self, &target_table_id.into())?
.filter_map(|row_ref| {
StColumnRow::try_from(row_ref)
.map(|c| (c.col_pos == target_col_id && row_ref.pointer() != row_ptr).then(|| row_ref.pointer()))
.transpose()
})
.collect::<Result<Vec<RowPointer>>>()?;

for row in outdated_st_column_rows {
self.replay_columns_to_ignore.insert(row);
}

Ok(target_table_id)
}

/// Refreshes the columns and layout of a table
/// when a `row` has been inserted from `st_column`.
///
/// The `row_ptr` is a pointer to `row`.
fn st_column_changed(&mut self, row: &ProductValue, row_ptr: RowPointer) -> Result<()> {
let target_table_id = Self::read_table_id(row);
let target_col_id = ColId::deserialize(ValueDeserializer::from_ref(&row.elements[1]))
.expect("second field in `st_column` should decode to a `ColId`");

fn st_column_changed(&mut self, table_id: TableId) -> Result<()> {
// We're replaying and we don't have unique constraints yet.
// Due to replay handling all inserts first and deletes after,
// when processing `st_column` insert/deletes,
// we may end up with two definitions for the same `col_pos`.
// Of those two, we're interested in the one we just inserted
// and not the other one, as it is being replaced.
let mut columns = iter_st_column_for_table(self, &target_table_id.into())?
.filter_map(|row_ref| {
StColumnRow::try_from(row_ref)
.map(|c| (c.col_pos != target_col_id || row_ref.pointer() == row_ptr).then(|| c.into()))
.transpose()
})
// `Self::ignore_previous_version_of_column` has marked the old version as ignored,
// so filter only the non-ignored columns.
let mut columns = iter_st_column_for_table(self, &table_id.into())?
.filter(|row_ref| self.replay_columns_to_ignore.contains(&row_ref.pointer()))
.map(|row_ref| StColumnRow::try_from(row_ref).map(Into::into))
.collect::<Result<Vec<_>>>()?;

// Columns in `st_column` are not in general sorted by their `col_pos`,
Expand All @@ -573,13 +629,23 @@ impl CommittedState {
columns.sort_by_key(|col: &ColumnSchema| col.col_pos);

// Update the columns and layout of the the in-memory table.
if let Some(table) = self.tables.get_mut(&target_table_id) {
if let Some(table) = self.tables.get_mut(&table_id) {
table.change_columns_to(columns).map_err(TableError::from)?;
}

Ok(())
}

pub(super) fn replay_end_tx(&mut self) -> Result<()> {
self.next_tx_offset += 1;

if !self.replay_columns_to_ignore.is_empty() {
Err(anyhow::anyhow!("`CommittedState::replay_columns_to_ignore` should be empty at the end of a commit, but found {} entries", self.replay_columns_to_ignore.len()).into())
} else {
Ok(())
}
}

/// Assuming that a `TableId` is stored as the first field in `row`, read it.
fn read_table_id(row: &ProductValue) -> TableId {
TableId::deserialize(ValueDeserializer::from_ref(&row.elements[0]))
Expand Down
4 changes: 1 addition & 3 deletions crates/datastore/src/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1284,9 +1284,7 @@ impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVi
}

fn visit_tx_end(&mut self) -> std::result::Result<(), Self::Error> {
self.committed_state.next_tx_offset += 1;

Ok(())
self.committed_state.replay_end_tx().map_err(Into::into)
}
}

Expand Down
Loading