Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 181 additions & 0 deletions crates/core/src/db/durability.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
use crate::db::persistence::Durability;
use spacetimedb_commitlog::payload::{
txdata::{Mutations, Ops},
Txdata,
};
use spacetimedb_data_structures::map::IntSet;
use spacetimedb_datastore::{execution_context::ReducerContext, traits::TxData};
use spacetimedb_durability::DurableOffset;
use spacetimedb_primitives::TableId;
use std::sync::Arc;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};

/// A request to persist a transaction or to terminate the actor.
pub enum DurabilityRequest {
Work {
reducer_context: Option<ReducerContext>,
tx_data: Arc<TxData>,
},
Close,
}

/// Represents a handle to a background task that persists transactions
/// according to the [`Durability`] policy provided.
///
/// This exists to avoid doing some preparatory work
/// before sending over to the `Durability` layer.
#[derive(Clone)]
pub struct DurabilityWorker {
request_tx: UnboundedSender<DurabilityRequest>,
durability: Arc<Durability>,
}

impl Drop for DurabilityWorker {
fn drop(&mut self) {
// Try to close the actor.
// If the actor paniced, or a clone of `self` was `Drop`ped,
// this can return `Err(_)`,
// in which case we need only drop `self.durability`.
if self.request_tx.send(DurabilityRequest::Close).is_ok() {
// Wait until the actor's `Arc<Durability>` has been dropped.
// After that, we drop `self.durability` as normal.
futures::executor::block_on(self.request_tx.closed());
}
}
}

impl DurabilityWorker {
/// Create a new [`DurabilityWorker`] using the given `durability` policy.
pub fn new(durability: Arc<Durability>) -> Self {
let (request_tx, request_rx) = unbounded_channel();

let actor = DurabilityWorkerActor {
request_rx,
durability: durability.clone(),
};
tokio::spawn(actor.run());

Self { request_tx, durability }
}

/// Request that a transaction be made be made durable.
/// That is, if `(tx_data, ctx)` should be appended to the commitlog, do so.
///
/// Note that by this stage,
/// [`spacetimedb_datastore::locking_tx_datastore::committed_state::tx_consumes_offset`]
/// has already decided based on the reducer and operations whether the transaction should be appended;
/// this method is responsible only for reading its decision out of the `tx_data`
/// and calling `durability.append_tx`.
///
/// This method does not block,
/// and sends the work to an actor that collects data and calls `durability.append_tx`.
///
/// Panics if the durability worker has closed the receive end of its queue(s),
/// which is likely due to it having panicked
/// or because `DurabilityWorker` and thus `RelationalDB` was cloned.
pub fn request_durability(&self, reducer_context: Option<ReducerContext>, tx_data: &Arc<TxData>) {
self.request_tx
.send(DurabilityRequest::Work {
reducer_context,
tx_data: tx_data.clone(),
})
.expect("durability actor hung up / panicked");
}

/// Get the [`DurableOffset`] of this database.
pub fn durable_tx_offset(&self) -> DurableOffset {
self.durability.durable_tx_offset()
}
}

struct DurabilityWorkerActor {
request_rx: UnboundedReceiver<DurabilityRequest>,
durability: Arc<Durability>,
}

impl DurabilityWorkerActor {
/// Processes requests to do durability.
async fn run(mut self) {
while let Some(req) = self.request_rx.recv().await {
match req {
DurabilityRequest::Work {
reducer_context,
tx_data,
} => Self::do_durability(&*self.durability, reducer_context, &tx_data),

// Terminate the actor
// and make sure we drop `self.durability`
// before we drop `self.request_tx`.
//
// After a `Close`,
// there should be no more `Work` incoming or buffered,
// as `Close` hangs up the receiver end of the channel,
// so nothing can be sent to it.
DurabilityRequest::Close => {
drop(self.durability);
drop(self.request_rx);
return;
}
}
}
}

fn do_durability(durability: &Durability, reducer_context: Option<ReducerContext>, tx_data: &TxData) {
if tx_data.tx_offset().is_none() {
let name = reducer_context.as_ref().map(|rcx| &*rcx.name);
debug_assert!(
!tx_data.has_rows_or_connect_disconnect(name),
"tx_data has no rows but has connect/disconnect: `{name:?}`"
);
return;
}

let is_not_ephemeral_table = |table_id: &TableId| -> bool {
tx_data
.ephemeral_tables()
.map(|etables| !etables.contains(table_id))
.unwrap_or(true)
};

let inserts: Box<_> = tx_data
.inserts()
// Skip ephemeral tables
.filter(|(table_id, _)| is_not_ephemeral_table(table_id))
.map(|(table_id, rowdata)| Ops {
table_id: *table_id,
rowdata: rowdata.clone(),
})
.collect();

let truncates: IntSet<TableId> = tx_data.truncates().collect();

let deletes: Box<_> = tx_data
.deletes()
.filter(|(table_id, _)| is_not_ephemeral_table(table_id))
.map(|(table_id, rowdata)| Ops {
table_id: *table_id,
rowdata: rowdata.clone(),
})
// filter out deletes for tables that are truncated in the same transaction.
.filter(|ops| !truncates.contains(&ops.table_id))
.collect();

let truncates = truncates.into_iter().filter(is_not_ephemeral_table).collect();

let inputs = reducer_context.map(|rcx| rcx.into());

let txdata = Txdata {
inputs,
outputs: None,
mutations: Some(Mutations {
inserts,
deletes,
truncates,
}),
};

// TODO: Should measure queuing time + actual write
// This does not block, as per trait docs.
durability.append_tx(txdata);
}
}
1 change: 1 addition & 0 deletions crates/core/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::subscription::ExecutionCounters;
use spacetimedb_datastore::execution_context::WorkloadType;
use spacetimedb_datastore::{locking_tx_datastore::datastore::TxMetrics, traits::TxData};

mod durability;
pub mod persistence;
pub mod relational_db;
pub mod snapshot;
Expand Down
93 changes: 14 additions & 79 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::db::durability::DurabilityWorker;
use crate::db::MetricsRecorderQueue;
use crate::error::{DBError, DatabaseError, RestoreSnapshotError};
use crate::messages::control_db::HostType;
Expand All @@ -10,10 +11,9 @@ use fs2::FileExt;
use log::info;
use spacetimedb_commitlog::repo::OnNewSegmentFn;
use spacetimedb_commitlog::{self as commitlog, SizeOnDisk};
use spacetimedb_data_structures::map::IntSet;
use spacetimedb_datastore::db_metrics::DB_METRICS;
use spacetimedb_datastore::error::{DatastoreError, TableError, ViewError};
use spacetimedb_datastore::execution_context::{ReducerContext, Workload, WorkloadType};
use spacetimedb_datastore::execution_context::{Workload, WorkloadType};
use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
use spacetimedb_datastore::locking_tx_datastore::state_view::{
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, IterTx, StateView,
Expand Down Expand Up @@ -103,7 +103,7 @@ pub struct RelationalDB {
owner_identity: Identity,

inner: Locking,
durability: Option<Arc<Durability>>,
durability: Option<DurabilityWorker>,
snapshot_worker: Option<SnapshotWorker>,

row_count_fn: RowCountFn,
Expand Down Expand Up @@ -154,6 +154,7 @@ impl RelationalDB {
Arc::new(EnumMap::from_fn(|ty| ExecutionCounters::new(&ty, &database_identity)));

let (durability, disk_size_fn, snapshot_worker) = Persistence::unzip(persistence);
let durability = durability.map(DurabilityWorker::new);

Self {
inner,
Expand Down Expand Up @@ -766,19 +767,21 @@ impl RelationalDB {
}

#[tracing::instrument(level = "trace", skip_all)]
pub fn commit_tx(&self, tx: MutTx) -> Result<Option<(TxOffset, TxData, TxMetrics, String)>, DBError> {
#[allow(clippy::type_complexity)]
pub fn commit_tx(&self, tx: MutTx) -> Result<Option<(TxOffset, Arc<TxData>, TxMetrics, String)>, DBError> {
log::trace!("COMMIT MUT TX");

// TODO: Never returns `None` -- should it?
let reducer_context = tx.ctx.reducer_context().cloned();
// TODO: Never returns `None` -- should it?
let Some((tx_offset, tx_data, tx_metrics, reducer)) = self.inner.commit_mut_tx(tx)? else {
return Ok(None);
};

self.maybe_do_snapshot(&tx_data);

let tx_data = Arc::new(tx_data);
if let Some(durability) = &self.durability {
Self::do_durability(&**durability, reducer_context.as_ref(), &tx_data)
durability.request_durability(reducer_context, &tx_data);
}

Ok(Some((tx_offset, tx_data, tx_metrics, reducer)))
Expand All @@ -789,7 +792,7 @@ impl RelationalDB {
&self,
tx: MutTx,
workload: Workload,
) -> Result<Option<(TxData, TxMetrics, Tx)>, DBError> {
) -> Result<Option<(Arc<TxData>, TxMetrics, Tx)>, DBError> {
log::trace!("COMMIT MUT TX");

let Some((tx_data, tx_metrics, tx)) = self.inner.commit_mut_tx_downgrade(tx, workload)? else {
Expand All @@ -798,82 +801,14 @@ impl RelationalDB {

self.maybe_do_snapshot(&tx_data);

let tx_data = Arc::new(tx_data);
if let Some(durability) = &self.durability {
Self::do_durability(&**durability, tx.ctx.reducer_context(), &tx_data)
durability.request_durability(tx.ctx.reducer_context().cloned(), &tx_data);
}

Ok(Some((tx_data, tx_metrics, tx)))
}

/// If `(tx_data, ctx)` should be appended to the commitlog, do so.
///
/// Note that by this stage,
/// [`spacetimedb_datastore::locking_tx_datastore::committed_state::tx_consumes_offset`]
/// has already decided based on the reducer and operations whether the transaction should be appended;
/// this method is responsible only for reading its decision out of the `tx_data`
/// and calling `durability.append_tx`.
fn do_durability(durability: &Durability, reducer_context: Option<&ReducerContext>, tx_data: &TxData) {
use commitlog::payload::{
txdata::{Mutations, Ops},
Txdata,
};

let is_not_ephemeral_table = |table_id: &TableId| -> bool {
tx_data
.ephemeral_tables()
.map(|etables| !etables.contains(table_id))
.unwrap_or(true)
};

if tx_data.tx_offset().is_some() {
let inserts: Box<_> = tx_data
.inserts()
// Skip ephemeral tables
.filter(|(table_id, _)| is_not_ephemeral_table(table_id))
.map(|(table_id, rowdata)| Ops {
table_id: *table_id,
rowdata: rowdata.clone(),
})
.collect();

let truncates: IntSet<TableId> = tx_data.truncates().collect();

let deletes: Box<_> = tx_data
.deletes()
.filter(|(table_id, _)| is_not_ephemeral_table(table_id))
.map(|(table_id, rowdata)| Ops {
table_id: *table_id,
rowdata: rowdata.clone(),
})
// filter out deletes for tables that are truncated in the same transaction.
.filter(|ops| !truncates.contains(&ops.table_id))
.collect();

let truncates = truncates.into_iter().filter(is_not_ephemeral_table).collect();

let inputs = reducer_context.map(|rcx| rcx.into());

let txdata = Txdata {
inputs,
outputs: None,
mutations: Some(Mutations {
inserts,
deletes,
truncates,
}),
};

// TODO: Should measure queuing time + actual write
durability.append_tx(txdata);
} else {
debug_assert!(
!tx_data.has_rows_or_connect_disconnect(reducer_context),
"tx_data has no rows but has connect/disconnect: `{:?}`",
reducer_context.map(|rcx| &rcx.name),
);
}
}

/// Get the [`DurableOffset`] of this database, or `None` if this is an
/// in-memory instance.
pub fn durable_tx_offset(&self) -> Option<DurableOffset> {
Expand Down Expand Up @@ -1511,8 +1446,8 @@ impl RelationalDB {
}

/// Reports the metrics for `reducer`, using counters provided by `db`.
pub fn report_mut_tx_metrics(&self, reducer: String, metrics: TxMetrics, tx_data: Option<TxData>) {
self.report_tx_metrics(reducer, tx_data.map(Arc::new), Some(metrics), None);
pub fn report_mut_tx_metrics(&self, reducer: String, metrics: TxMetrics, tx_data: Option<Arc<TxData>>) {
self.report_tx_metrics(reducer, tx_data, Some(metrics), None);
}

/// Reports subscription metrics for `reducer`, using counters provided by `db`.
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1031,7 +1031,7 @@ impl ModuleSubscriptions {
return Ok(Err(WriteConflict));
};
*db_update = DatabaseUpdate::from_writes(&tx_data);
(read_tx, Arc::new(tx_data), tx_metrics)
(read_tx, tx_data, tx_metrics)
}
EventStatus::Failed(_) | EventStatus::OutOfEnergy => {
// If the transaction failed, we need to rollback the mutable tx.
Expand Down Expand Up @@ -1198,7 +1198,7 @@ impl ModuleSubscriptions {
let _ = extra.send(tx_offset);
}
self.relational_db
.report_tx_metrics(reducer, Some(Arc::new(tx_data)), Some(tx_metrics_mut), None);
.report_tx_metrics(reducer, Some(tx_data), Some(tx_metrics_mut), None);
}
});
(guard, offset_rx)
Expand Down
Loading
Loading