Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use futures::channel::mpsc;
use futures::StreamExt;
use parking_lot::RwLock;
use spacetimedb_commitlog as commitlog;
use spacetimedb_data_structures::map::IntSet;
use spacetimedb_datastore::db_metrics::DB_METRICS;
use spacetimedb_datastore::error::{DatastoreError, TableError};
use spacetimedb_datastore::execution_context::{ReducerContext, Workload, WorkloadType};
Expand All @@ -23,7 +24,7 @@ use spacetimedb_datastore::system_tables::{system_tables, StModuleRow};
use spacetimedb_datastore::system_tables::{StFields, StVarFields, StVarName, StVarRow, ST_MODULE_ID, ST_VAR_ID};
use spacetimedb_datastore::traits::{
InsertFlags, IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore,
TxTableTruncated, UpdateFlags,
UpdateFlags,
};
use spacetimedb_datastore::{
locking_tx_datastore::{
Expand Down Expand Up @@ -889,18 +890,17 @@ impl RelationalDB {
rowdata: rowdata.clone(),
})
.collect();

let truncates: IntSet<TableId> = tx_data.truncates().collect();

let deletes: Box<_> = tx_data
.deletes()
.filter(|(_, truncated, _)| *truncated == TxTableTruncated::No)
.map(|(table_id, _, rowdata)| Ops {
.map(|(table_id, rowdata)| Ops {
table_id: *table_id,
rowdata: rowdata.clone(),
})
.collect();
let truncates: Box<_> = tx_data
.deletes()
.filter(|(_, truncated, _)| *truncated == TxTableTruncated::Yes)
.map(|(table_id, ..)| *table_id)
// filter out deletes for tables that are truncated in the same transaction.
.filter(|ops| !truncates.contains(&ops.table_id))
.collect();

let inputs = reducer_context.map(|rcx| rcx.into());
Expand All @@ -911,7 +911,7 @@ impl RelationalDB {
mutations: Some(Mutations {
inserts,
deletes,
truncates,
truncates: truncates.into_iter().collect(),
}),
};

Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/subscription/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,9 @@ impl DeltaTableIndexes {
indexes
}

let deletes = data.deletes().map(|(table_id, _, rows)| (table_id, rows));
Self {
inserts: build_indexes_for_rows(tx, meta, data.inserts()),
deletes: build_indexes_for_rows(tx, meta, deletes),
deletes: build_indexes_for_rows(tx, meta, data.deletes()),
}
}
}
Expand Down
44 changes: 38 additions & 6 deletions crates/datastore/src/locking_tx_datastore/committed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,13 +610,27 @@ impl CommittedState {

pub(super) fn merge(&mut self, tx_state: TxState, ctx: &ExecutionContext) -> TxData {
let mut tx_data = TxData::default();
let mut truncates = IntSet::default();

// First, apply deletes. This will free up space in the committed tables.
self.merge_apply_deletes(&mut tx_data, tx_state.delete_tables, tx_state.pending_schema_changes);
self.merge_apply_deletes(
&mut tx_data,
tx_state.delete_tables,
tx_state.pending_schema_changes,
&mut truncates,
);

// Then, apply inserts. This will re-fill the holes freed by deletions
// before allocating new pages.
self.merge_apply_inserts(&mut tx_data, tx_state.insert_tables, tx_state.blob_store);
self.merge_apply_inserts(
&mut tx_data,
tx_state.insert_tables,
tx_state.blob_store,
&mut truncates,
);

// Record any truncated tables in the `TxData`.
tx_data.add_truncates(truncates);

// If the TX will be logged, record its projected tx offset,
// then increment the counter.
Expand All @@ -633,6 +647,7 @@ impl CommittedState {
tx_data: &mut TxData,
delete_tables: BTreeMap<TableId, DeleteTable>,
pending_schema_changes: ThinVec<PendingSchemaChange>,
truncates: &mut IntSet<TableId>,
) {
fn delete_rows(
tx_data: &mut TxData,
Expand All @@ -641,6 +656,7 @@ impl CommittedState {
blob_store: &mut dyn BlobStore,
row_ptrs_len: usize,
row_ptrs: impl Iterator<Item = RowPointer>,
truncates: &mut IntSet<TableId>,
) {
let mut deletes = Vec::with_capacity(row_ptrs_len);

Expand All @@ -660,16 +676,25 @@ impl CommittedState {

if !deletes.is_empty() {
let table_name = &*table.get_schema().table_name;
tx_data.set_deletes_for_table(table_id, table_name, deletes.into());
let truncated = table.row_count == 0;
tx_data.set_deletes_for_table(table_id, table_name, deletes.into(), truncated);
if truncated {
truncates.insert(table_id);
}
}
}

for (table_id, row_ptrs) in delete_tables {
match self.get_table_and_blob_store_mut(table_id) {
Ok((table, blob_store, ..)) => {
delete_rows(tx_data, table_id, table, blob_store, row_ptrs.len(), row_ptrs.iter())
}
Ok((table, blob_store, ..)) => delete_rows(
tx_data,
table_id,
table,
blob_store,
row_ptrs.len(),
row_ptrs.iter(),
truncates,
),
Err(_) if !row_ptrs.is_empty() => panic!("Deletion for non-existent table {table_id:?}... huh?"),
Err(_) => {}
}
Expand All @@ -688,6 +713,7 @@ impl CommittedState {
&mut self.blob_store,
row_ptrs.len(),
row_ptrs.into_iter(),
truncates,
);
}
}
Expand All @@ -698,6 +724,7 @@ impl CommittedState {
tx_data: &mut TxData,
insert_tables: BTreeMap<TableId, Table>,
tx_blob_store: impl BlobStore,
truncates: &mut IntSet<TableId>,
) {
// TODO(perf): Consider moving whole pages from the `insert_tables` into the committed state,
// rather than copying individual rows out of them.
Expand Down Expand Up @@ -726,6 +753,11 @@ impl CommittedState {
if !inserts.is_empty() {
let table_name = &*commit_table.get_schema().table_name;
tx_data.set_inserts_for_table(table_id, table_name, inserts.into());

// if table has inserted rows, it cannot be truncated
if truncates.contains(&table_id) {
truncates.remove(&table_id);
}
}

let (schema, _indexes, pages) = tx_table.consume_for_merge();
Expand Down
54 changes: 42 additions & 12 deletions crates/datastore/src/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use super::{
tx::TxId,
tx_state::TxState,
};
use crate::execution_context::{Workload, WorkloadType};
use crate::{
db_metrics::DB_METRICS,
error::{DatastoreError, TableError},
Expand All @@ -23,18 +22,24 @@ use crate::{
DataRow, IsolationLevel, Metadata, MutTx, MutTxDatastore, Program, RowTypeForTable, Tx, TxData, TxDatastore,
},
};
use crate::{
execution_context::{Workload, WorkloadType},
system_tables::StTableRow,
};
use anyhow::{anyhow, Context};
use core::{cell::RefCell, ops::RangeBounds};
use parking_lot::{Mutex, RwLock};
use spacetimedb_commitlog::payload::{txdata, Txdata};
use spacetimedb_data_structures::map::{HashCollectionExt, HashMap};
use spacetimedb_data_structures::map::{HashCollectionExt, HashMap, IntMap};
use spacetimedb_durability::TxOffset;
use spacetimedb_lib::{db::auth::StAccess, metrics::ExecutionMetrics};
use spacetimedb_lib::{ConnectionId, Identity};
use spacetimedb_paths::server::SnapshotDirPath;
use spacetimedb_primitives::{ColList, ConstraintId, IndexId, SequenceId, TableId};
use spacetimedb_sats::memory_usage::MemoryUsage;
use spacetimedb_sats::{bsatn, buffer::BufReader, AlgebraicValue, ProductValue};
use spacetimedb_sats::{
algebraic_value::de::ValueDeserializer, bsatn, buffer::BufReader, AlgebraicValue, ProductValue,
};
use spacetimedb_sats::{memory_usage::MemoryUsage, Deserialize};
use spacetimedb_schema::schema::{ColumnSchema, IndexSchema, SequenceSchema, TableSchema};
use spacetimedb_snapshot::{ReconstructedSnapshot, SnapshotRepository};
use spacetimedb_table::{indexes::RowPointer, page_pool::PagePool, table::RowRef};
Expand Down Expand Up @@ -978,6 +983,7 @@ impl<F> Replay<F> {
database_identity: &self.database_identity,
committed_state: &mut committed_state,
progress: &mut *self.progress.borrow_mut(),
dropped_table_names: IntMap::default(),
};
f(&mut visitor)
}
Expand Down Expand Up @@ -1083,6 +1089,10 @@ struct ReplayVisitor<'a, F> {
database_identity: &'a Identity,
committed_state: &'a mut CommittedState,
progress: &'a mut F,
// Since deletes are handled before truncation / drop, sometimes the schema
// info is gone. We save the name on the first delete of that table so metrics
// can still show a name.
dropped_table_names: IntMap<TableId, Box<str>>,
}

impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVisitor<'_, F> {
Expand Down Expand Up @@ -1139,6 +1149,14 @@ impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVi
let table_name = schema.table_name.clone();
let row = ProductValue::decode(schema.get_row_type(), reader)?;

// If this is a delete from the `st_table` system table, save the name
if table_id == ST_TABLE_ID {
let ab = AlgebraicValue::Product(row.clone());
let st_table_row = StTableRow::deserialize(ValueDeserializer::from_ref(&ab)).unwrap();
self.dropped_table_names
.insert(st_table_row.table_id, st_table_row.table_name);
}

self.committed_state
.replay_delete_by_rel(table_id, &row)
.with_context(|| {
Expand All @@ -1158,9 +1176,18 @@ impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVi
}

fn visit_truncate(&mut self, table_id: TableId) -> std::result::Result<(), Self::Error> {
let schema = self.committed_state.schema_for_table(table_id)?;
// TODO: avoid clone
let table_name = schema.table_name.clone();
let table_name = match self.committed_state.schema_for_table(table_id) {
// TODO: avoid clone
Ok(schema) => schema.table_name.clone(),

Err(_) => {
if let Some(name) = self.dropped_table_names.remove(&table_id) {
name
} else {
return Err(anyhow!("Error looking up name for truncated table {:?}", table_id).into());
}
}
};

self.committed_state.replay_truncate(table_id).with_context(|| {
format!(
Expand Down Expand Up @@ -1231,7 +1258,7 @@ mod tests {
ST_ROW_LEVEL_SECURITY_NAME, ST_SCHEDULED_ID, ST_SCHEDULED_NAME, ST_SEQUENCE_ID, ST_SEQUENCE_NAME,
ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME,
};
use crate::traits::{IsolationLevel, MutTx, TxTableTruncated};
use crate::traits::{IsolationLevel, MutTx};
use crate::Result;
use bsatn::to_vec;
use core::{fmt, mem};
Expand Down Expand Up @@ -3201,12 +3228,12 @@ mod tests {
// Now drop the table again and commit.
assert!(datastore.drop_table_mut_tx(&mut tx, table_id).is_ok());
let tx_data = commit(&datastore, tx)?;
let (_, truncated, deleted_rows) = tx_data
let (_, deleted_rows) = tx_data
.deletes()
.find(|(id, ..)| **id == table_id)
.expect("should have deleted rows for `table_id`");
assert_eq!(&**deleted_rows, [row]);
assert_eq!(truncated, TxTableTruncated::Yes);
assert!(tx_data.truncates().contains(&table_id), "table should be truncated");

// In the next transaction, the table doesn't exist.
assert!(
Expand Down Expand Up @@ -3410,9 +3437,12 @@ mod tests {
let to_product = |col: &ColumnSchema| value_serialize(&StColumnRow::from(col.clone())).into_product().unwrap();
let (_, inserts) = tx_data.inserts().find(|(id, _)| **id == ST_COLUMN_ID).unwrap();
assert_eq!(&**inserts, [to_product(&columns[1])].as_slice());
let (_, truncated, deletes) = tx_data.deletes().find(|(id, ..)| **id == ST_COLUMN_ID).unwrap();
let (_, deletes) = tx_data.deletes().find(|(id, ..)| **id == ST_COLUMN_ID).unwrap();
assert_eq!(&**deletes, [to_product(&columns_original[1])].as_slice());
assert_eq!(truncated, TxTableTruncated::No);
assert!(
!tx_data.truncates().contains(&ST_COLUMN_ID),
"table should not be truncated"
);

// Check that we can successfully scan using the new schema type post commit.
let tx = begin_tx(&datastore);
Expand Down
Loading
Loading