Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ use spacetimedb_lib::{
db::auth::{StAccess, StTableType},
Identity,
};
use spacetimedb_primitives::{ColList, ColSet, TableId};
use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductValue};
use spacetimedb_primitives::{ColList, ColSet, IndexId, TableId};
use spacetimedb_sats::{AlgebraicValue, ProductValue};
use spacetimedb_schema::schema::TableSchema;
use spacetimedb_table::{
blob_store::{BlobStore, HashMapBlobStore},
indexes::{RowPointer, SquashedOffset},
table::{IndexScanIter, InsertError, RowRef, Table},
table::{IndexScanIter, InsertError, RowRef, Table, TableAndIndex},
MemoryUsage,
};
use std::collections::{BTreeMap, BTreeSet};
Expand Down Expand Up @@ -375,19 +375,19 @@ impl CommittedState {
.collect();

for index_row in rows {
let Some((table, blob_store)) = self.get_table_and_blob_store(index_row.table_id) else {
let index_id = index_row.index_id;
let table_id = index_row.table_id;
let Some((table, blob_store)) = self.get_table_and_blob_store(table_id) else {
panic!("Cannot create index for table which doesn't exist in committed state");
};
let columns = match index_row.index_algorithm {
StIndexAlgorithm::BTree { columns } => columns,
_ => unimplemented!("Only BTree indexes are supported"),
};
let is_unique = unique_constraints.contains(&(index_row.table_id, (&columns).into()));

let index = table.new_index(index_row.index_id, &columns, is_unique)?;
table.insert_index(blob_store, columns.clone(), index);
self.index_id_map
.insert(index_row.index_id, (index_row.table_id, columns));
let is_unique = unique_constraints.contains(&(table_id, (&columns).into()));
let index = table.new_index(columns.clone(), is_unique)?;
table.insert_index(blob_store, index_id, index);
self.index_id_map.insert(index_id, table_id);
}
Ok(())
}
Expand Down Expand Up @@ -429,13 +429,36 @@ impl CommittedState {
Ok(())
}

/// When there's an index on `cols`,
/// returns an iterator over the [BTreeIndex] that yields all the [`RowRef`]s
/// that match the specified `range` in the indexed column.
///
/// Matching is defined by `Ord for AlgebraicValue`.
///
/// For a unique index this will always yield at most one `RowRef`.
/// When there is no index this returns `None`.
pub(super) fn index_seek<'a>(
&'a self,
table_id: TableId,
cols: &ColList,
range: &impl RangeBounds<AlgebraicValue>,
) -> Option<IndexScanIter<'a>> {
self.tables.get(&table_id)?.index_seek(&self.blob_store, cols, range)
self.tables
.get(&table_id)?
.get_index_by_cols_with_table(&self.blob_store, cols)
.map(|i| i.seek(range))
}

/// Returns the table associated with the given `index_id`, if any.
pub(super) fn get_table_for_index(&self, index_id: IndexId) -> Option<TableId> {
self.index_id_map.get(&index_id).copied()
}

/// Returns the table for `table_id` combined with the index for `index_id`, if both exist.
pub(super) fn get_index_by_id_with_table(&self, table_id: TableId, index_id: IndexId) -> Option<TableAndIndex<'_>> {
self.tables
.get(&table_id)?
.get_index_by_id_with_table(&self.blob_store, index_id)
}

// TODO(perf, deep-integration): Make this method `unsafe`. Add the following to the docs:
Expand Down Expand Up @@ -637,13 +660,6 @@ impl CommittedState {
let blob_store = &mut self.blob_store;
(table, blob_store)
}

/// Returns the table and index associated with the given `table_id` and `col_list`, if any.
pub(super) fn get_table_and_index_type(&self, table_id: TableId, col_list: &ColList) -> Option<&AlgebraicType> {
let table = self.tables.get(&table_id)?;
let index = table.indexes.get(col_list)?;
Some(&index.key_type)
}
}

pub struct CommittedIndexIterWithDeletedMutTx<'a> {
Expand Down
96 changes: 51 additions & 45 deletions crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use spacetimedb_schema::{
use spacetimedb_table::{
blob_store::{BlobStore, HashMapBlobStore},
indexes::{RowPointer, SquashedOffset},
table::{IndexScanIter, InsertError, RowRef, Table},
table::{IndexScanIter, InsertError, RowRef, Table, TableAndIndex},
};
use std::{
sync::Arc,
Expand Down Expand Up @@ -395,15 +395,15 @@ impl MutTxId {
//
// Ensure adding the index does not cause a unique constraint violation due to
// the existing rows having the same value for some column(s).
let mut insert_index = table.new_index(index.index_id, &columns, is_unique)?;
let mut insert_index = table.new_index(columns.clone(), is_unique)?;
let mut build_from_rows = |table: &Table, bs: &dyn BlobStore| -> Result<()> {
if let Some(violation) = insert_index.build_from_rows(&columns, table.scan_rows(bs))? {
if let Some(violation) = insert_index.build_from_rows(table.scan_rows(bs))? {
let violation = table
.get_row_ref(bs, violation)
.expect("row came from scanning the table")
.project(&columns)
.expect("`cols` should consist of valid columns for this table");
return Err(IndexError::from(table.build_error_unique(&insert_index, &columns, violation)).into());
return Err(IndexError::from(table.build_error_unique(&insert_index, index_id, violation)).into());
}
Ok(())
};
Expand All @@ -421,16 +421,17 @@ impl MutTxId {
build_from_rows(commit_table, commit_blob_store)?;
}

table.add_index(columns.clone(), insert_index);
// Associate `index_id -> (table_id, col_list)` for fast lookup.
idx_map.insert(index_id, (table_id, columns.clone()));

log::trace!(
"INDEX CREATED: {} for table: {} and col(s): {:?}",
index_id,
table_id,
columns
);

table.add_index(index_id, insert_index);
// Associate `index_id -> table_id` for fast lookup.
idx_map.insert(index_id, table_id);

// Update the table's schema.
// This won't clone-write when creating a table but likely to otherwise.
table.with_mut_schema(|s| s.indexes.push(index));
Expand All @@ -455,16 +456,10 @@ impl MutTxId {
// Remove the index in the transaction's insert table.
// By altering the insert table, this gets moved over to the committed state on merge.
let (table, blob_store, idx_map, ..) = self.get_or_create_insert_table_mut(table_id)?;
if let Some(col) = table
.indexes
.iter()
.find(|(_, idx)| idx.index_id == index_id)
.map(|(cols, _)| cols.clone())
{
if table.delete_index(blob_store, index_id) {
// This likely will do a clone-write as over time?
// The schema might have found other referents.
table.with_mut_schema(|s| s.indexes.retain(|x| x.index_algorithm.columns() != &col));
table.delete_index(blob_store, &col);
table.with_mut_schema(|s| s.indexes.retain(|x| x.index_id != index_id));
}
// Remove the `index_id -> (table_id, col_list)` association.
idx_map.remove(&index_id);
Expand Down Expand Up @@ -497,21 +492,20 @@ impl MutTxId {
rstart: &[u8],
rend: &[u8],
) -> Result<(TableId, BTreeScan<'a>)> {
// Extract the table and index type for the tx state.
let (table_id, col_list, tx_idx_key_type) = self
.get_table_and_index_type(index_id)
.ok_or_else(|| IndexError::NotFound(index_id))?;
// Extract the table id, index type, and commit/tx indices.
let (table_id_and_index_ty, commit_index, tx_index) = self.get_table_and_index_type(index_id);
let (table_id, index_ty) = table_id_and_index_ty.ok_or_else(|| IndexError::NotFound(index_id))?;

// TODO(centril): Once we have more index types than `btree`,
// we'll need to enforce that `index_id` refers to a btree index.

// We have the index key type, so we can decode everything.
let bounds = Self::btree_decode_bounds(tx_idx_key_type, prefix, prefix_elems, rstart, rend)
.map_err(IndexError::Decode)?;
let bounds =
Self::btree_decode_bounds(index_ty, prefix, prefix_elems, rstart, rend).map_err(IndexError::Decode)?;

// Get an index seek iterator for the tx and committed state.
let tx_iter = self.tx_state.index_seek(table_id, col_list, &bounds);
let commit_iter = self.committed_state_write_lock.index_seek(table_id, col_list, &bounds);
let tx_iter = tx_index.map(|i| i.seek(&bounds));
let commit_iter = commit_index.map(|i| i.seek(&bounds));

// Chain together the indexed rows in the tx and committed state,
// but don't yield rows deleted in the tx state.
Expand All @@ -521,7 +515,7 @@ impl MutTxId {
None => Left(iter),
Some(deletes) => Right(IndexScanFilterDeleted { iter, deletes }),
});
// this is effectively just `tx_iter.into_iter().flatten().chain(commit_iter.into_iter().flatten())`,
// This is effectively just `tx_iter.into_iter().flatten().chain(commit_iter.into_iter().flatten())`,
// but with all the branching and `Option`s flattened to just one layer.
let iter = match (tx_iter, commit_iter) {
(None, None) => Empty(iter::empty()),
Expand All @@ -534,34 +528,46 @@ impl MutTxId {
Ok((table_id, BTreeScan { inner: iter }))
}

/// Translate `index_id` to the table id, the column list and index key type.
fn get_table_and_index_type(&self, index_id: IndexId) -> Option<(TableId, &ColList, &AlgebraicType)> {
/// Translate `index_id` to the table id, index type, and commit/tx indices.
fn get_table_and_index_type(
&self,
index_id: IndexId,
) -> (
Option<(TableId, &AlgebraicType)>,
Option<TableAndIndex<'_>>,
Option<TableAndIndex<'_>>,
) {
// The order of querying the committed vs. tx state for the translation is not important.
// But it is vastly more likely that it is in the committed state,
// so query that first to avoid two lookups.
let &(table_id, ref col_list) = self
.committed_state_write_lock
.index_id_map
.get(&index_id)
.or_else(|| self.tx_state.index_id_map.get(&index_id))?;

// The tx state must have the index.
//
// Also, the tx state must have the index.
// If the index was e.g., dropped from the tx state but exists physically in the committed state,
// the index does not exist, semantically.
// TODO: handle the case where the table has been dropped in this transaction.
let key_ty = if let Some(key_ty) = self
let commit_table_id = self
.committed_state_write_lock
.get_table_and_index_type(table_id, col_list)
{
if self.tx_state_removed_index(index_id) {
return None;
}
key_ty
.get_table_for_index(index_id)
.filter(|_| !self.tx_state_removed_index(index_id));

let (table_id, commit_index, tx_index) = if let t_id @ Some(table_id) = commit_table_id {
// Index found for commit state, might also exist for tx state.
let commit_index = self
.committed_state_write_lock
.get_index_by_id_with_table(table_id, index_id);
let tx_index = self.tx_state.get_index_by_id_with_table(table_id, index_id);
(t_id, commit_index, tx_index)
} else if let t_id @ Some(table_id) = self.tx_state.get_table_for_index(index_id) {
// Index might exist for tx state.
let tx_index = self.tx_state.get_index_by_id_with_table(table_id, index_id);
(t_id, None, tx_index)
} else {
self.tx_state.get_table_and_index_type(table_id, col_list)?
// No index in either side.
(None, None, None)
};

Some((table_id, col_list, key_ty))
let index_ty = commit_index.or(tx_index).map(|index| &index.index().key_type);
let table_id_and_index_ty = table_id.zip(index_ty);
(table_id_and_index_ty, commit_index, tx_index)
}

/// Returns whether the index with `index_id` was removed in this transaction.
Expand Down Expand Up @@ -1540,7 +1546,7 @@ impl StateView for MutTxId {
// TODO(george): It's unclear that we truly support dynamically creating an index
// yet. In particular, I don't know if creating an index in a transaction and
// rolling it back will leave the index in place.
if let Some(inserted_rows) = self.tx_state.index_seek(table_id, &cols, &range) {
if let Some(inserted_rows) = self.tx_state.index_seek_by_cols(table_id, &cols, &range) {
let committed_rows = self.committed_state_write_lock.index_seek(table_id, &cols, &range);
// The current transaction has modified this table, and the table is indexed.
Ok(if let Some(del_table) = self.tx_state.get_delete_table(table_id) {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/db/datastore/locking_tx_datastore/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl TxId {
// Do not change its return type to a bare `u64`.
pub(crate) fn num_distinct_values(&self, table_id: TableId, cols: &ColList) -> Option<NonZeroU64> {
let table = self.committed_state_shared_lock.get_table(table_id)?;
let index = table.indexes.get(cols)?;
let (_, index) = table.get_index_by_cols(cols)?;
NonZeroU64::new(index.num_keys() as u64)
}
}
36 changes: 21 additions & 15 deletions crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
use core::ops::RangeBounds;
use spacetimedb_data_structures::map::{IntMap, IntSet};
use spacetimedb_primitives::{ColList, IndexId, TableId};
use spacetimedb_sats::{AlgebraicType, AlgebraicValue};
use spacetimedb_sats::AlgebraicValue;
use spacetimedb_table::{
blob_store::{BlobStore, HashMapBlobStore},
indexes::{RowPointer, SquashedOffset},
static_assert_size,
table::{IndexScanIter, RowRef, Table},
table::{IndexScanIter, RowRef, Table, TableAndIndex},
};
use std::collections::{btree_map, BTreeMap, BTreeSet};

pub(super) type DeleteTable = BTreeSet<RowPointer>;

/// A mapping to find the actual index given an `IndexId`.
pub(super) type IndexIdMap = IntMap<IndexId, (TableId, ColList)>;
pub(super) type IndexIdMap = IntMap<IndexId, TableId>;
pub(super) type RemovedIndexIdSet = IntSet<IndexId>;

/// `TxState` tracks all of the modifications made during a particular transaction.
Expand Down Expand Up @@ -89,22 +89,35 @@ impl TxState {
}

/// When there's an index on `cols`,
/// returns an iterator over the [BTreeIndex] that yields all the `RowId`s
/// that match the specified `value` in the indexed column.
/// returns an iterator over the [BTreeIndex] that yields all the [`RowRef`]s
/// that match the specified `range` in the indexed column.
///
/// Matching is defined by `Ord for AlgebraicValue`.
///
/// For a unique index this will always yield at most one `RowId`.
/// For a unique index this will always yield at most one `RowRef`.
/// When there is no index this returns `None`.
pub(super) fn index_seek<'a>(
pub(super) fn index_seek_by_cols<'a>(
&'a self,
table_id: TableId,
cols: &ColList,
range: &impl RangeBounds<AlgebraicValue>,
) -> Option<IndexScanIter<'a>> {
self.insert_tables
.get(&table_id)?
.index_seek(&self.blob_store, cols, range)
.get_index_by_cols_with_table(&self.blob_store, cols)
.map(|i| i.seek(range))
}

/// Returns the table associated with the given `index_id`, if any.
pub(super) fn get_table_for_index(&self, index_id: IndexId) -> Option<TableId> {
self.index_id_map.get(&index_id).copied()
}

/// Returns the table for `table_id` combined with the index for `index_id`, if both exist.
pub(super) fn get_index_by_id_with_table(&self, table_id: TableId, index_id: IndexId) -> Option<TableAndIndex<'_>> {
self.insert_tables
.get(&table_id)?
.get_index_by_id_with_table(&self.blob_store, index_id)
}

// TODO(perf, deep-integration): Make this unsafe. Add the following to the docs:
Expand Down Expand Up @@ -203,11 +216,4 @@ impl TxState {
let delete_table = unsafe { delete_table.unwrap_unchecked() };
(tx_table, tx_blob_store, delete_table)
}

/// Returns the table and index associated with the given `table_id` and `col_list`, if any.
pub(super) fn get_table_and_index_type(&self, table_id: TableId, col_list: &ColList) -> Option<&AlgebraicType> {
let table = self.insert_tables.get(&table_id)?;
let index = table.indexes.get(col_list)?;
Some(&index.key_type)
}
}
Loading
Loading