Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions crates/core/src/db/datastore/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2468,11 +2468,31 @@ mod tests {
// Seek the index on the first u32 field.
let index_id = extract_index_id(&datastore, &tx, &basic_indices()[0])?;

test_under_tx_and_commit(&datastore, tx, |tx| {
let (_, new_row) = update(&datastore, tx, table_id, index_id, row).expect("update should have succeeded");
assert_eq!(row, &new_row.to_product_value());
Ok(())
})
// Test before commit:
let (_, new_row) = update(&datastore, &mut tx, table_id, index_id, row).expect("update should have succeeded");
assert_eq!(row, &new_row.to_product_value());
// Commit.
let tx_data_1 = commit(&datastore, tx)?;
let mut tx = begin_mut_tx(&datastore);
// Test after commit:
let (_, new_row) = update(&datastore, &mut tx, table_id, index_id, row).expect("update should have succeeded");
assert_eq!(row, &new_row.to_product_value());
let tx_data_2 = commit(&datastore, tx)?;
// Ensure that none of the commits deleted rows in our table.
for tx_data in [&tx_data_1, &tx_data_2] {
assert_eq!(tx_data.deletes().find(|(tid, _)| **tid == table_id), None);
}
// Ensure that the first commit added the row but that the second didn't.
for (tx_data, expected_rows) in [(&tx_data_1, vec![row.clone()]), (&tx_data_2, vec![])] {
let inserted_rows = tx_data
.inserts()
.find(|(tid, _)| **tid == table_id)
.map(|(_, pvs)| pvs.to_vec())
.unwrap_or_default();
assert_eq!(inserted_rows, expected_rows);
}

Ok(())
}

/// Checks that update successfully uses sequences.
Expand Down
44 changes: 41 additions & 3 deletions crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1582,12 +1582,28 @@ impl MutTxId {
{
// 1. Ensure the index is unique.
// 2. Ensure the new row doesn't violate any other committed state unique indices.
let commit_table = commit_index.table();
if let Err(e) = ensure_unique(index_id, commit_index).and_then(|_| {
check_commit_unique_constraints(commit_index.table(), del_table, index_id, tx_row_ref, old_ptr)
check_commit_unique_constraints(commit_table, del_table, index_id, tx_row_ref, old_ptr)
}) {
break 'failed_rev_ins e;
}

// If the new row is the same as the old,
// skip the update altogether to match the semantics of `Self::insert`.
// SAFETY:
// 1. `tx_table` is derived from `commit_table` so they have the same layouts.
// 2. `old_ptr` was found in an index of `commit_table`, so we know it is valid.
// 3. we just inserted `tx_row_ptr` into `tx_table`, so we know it is valid.
if unsafe { Table::eq_row_in_page(commit_table, old_ptr, tx_table, tx_row_ptr) } {
// SAFETY: `self.is_row_present(tx_row_ptr)` holds, as noted in 3.
unsafe { tx_table.delete_internal_skip_pointer_map(tx_blob_store, tx_row_ptr) };
let commit_blob_store = &self.committed_state_write_lock.blob_store;
// SAFETY: `commit_table.is_row_present(old_ptr)` holds, as noted in 2.
let old_row_ref = unsafe { commit_table.get_row_ref_unchecked(commit_blob_store, old_ptr) };
return Ok((cols_to_gen, old_row_ref, update_flags));
}

// Check constraints and confirm the insertion of the new row.
//
// SAFETY: `self.is_row_present(row)` holds as we still haven't deleted the row,
Expand All @@ -1608,13 +1624,14 @@ impl MutTxId {
// 1. Ensure the index is unique.
// 2. Ensure the new row doesn't violate any other committed state unique indices.
let (old_ptr, needle) = find_old_row(tx_row_ref, tx_index);
let commit_table = self.committed_state_write_lock.get_table(table_id);
let res = old_ptr
// If we have an old committed state row, ensure it hasn't been deleted in our tx.
.filter(|ptr| ptr.squashed_offset() == SquashedOffset::TX_STATE || !del_table.contains(*ptr))
.ok_or_else(|| IndexError::KeyNotFound(index_id, needle).into())
.and_then(|old_ptr| {
ensure_unique(index_id, tx_index)?;
if let Some(commit_table) = self.committed_state_write_lock.get_table(table_id) {
if let Some(commit_table) = commit_table {
check_commit_unique_constraints(commit_table, del_table, index_id, tx_row_ref, old_ptr)?;
}
Ok(old_ptr)
Expand All @@ -1626,6 +1643,26 @@ impl MutTxId {

match old_ptr.squashed_offset() {
SquashedOffset::COMMITTED_STATE => {
if let Some(commit_table) = commit_table {
// If the new row is the same as the old,
// skip the update altogether to match the semantics of `Self::insert`.
// SAFETY:
// 1. `tx_table` is derived from `commit_table` so they have the same layouts.
// 2. `old_ptr` was found in an index of `tx_table`,
// but we had `SquashedOffset::COMMITTED_STATE`,
// so we know it is valid for `commit_table`.
// 3. we just inserted `tx_row_ptr` into `tx_table`, so we know it is valid.
if unsafe { Table::eq_row_in_page(commit_table, old_ptr, tx_table, tx_row_ptr) } {
// SAFETY: `self.is_row_present(tx_row_ptr)` holds, as noted in 3.
unsafe { tx_table.delete_internal_skip_pointer_map(tx_blob_store, tx_row_ptr) };
let commit_blob_store = &self.committed_state_write_lock.blob_store;
// SAFETY: `commit_table.is_row_present(old_ptr)` holds, as noted in 2.
let old_row_ref =
unsafe { commit_table.get_row_ref_unchecked(commit_blob_store, old_ptr) };
return Ok((cols_to_gen, old_row_ref, update_flags));
}
}

// Check constraints and confirm the insertion of the new row.
//
// SAFETY: `self.is_row_present(row)` holds as we still haven't deleted the row,
Expand Down Expand Up @@ -1662,7 +1699,8 @@ impl MutTxId {
};

// When we reach here, we had an error and we need to revert the insertion of `tx_row_ref`.
// SAFETY: `self.is_row_present(tx_row_ptr)` holds as we still haven't deleted the row physically.
// SAFETY: `self.is_row_present(tx_row_ptr)` holds,
// as we still haven't deleted the row physically.
unsafe { tx_table.delete_internal_skip_pointer_map(tx_blob_store, tx_row_ptr) };
Err(err)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/table/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ impl Table {
/// - `target_table` and `needle_table` must have the same `row_layout`.
/// - `target_table.is_row_present(target_ptr)`.
/// - `needle_table.is_row_present(needle_ptr)`.
unsafe fn eq_row_in_page(
pub unsafe fn eq_row_in_page(
target_table: &Table,
target_ptr: RowPointer,
needle_table: &Table,
Expand Down