Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ jobs:
wasm-bindgen --version
- name: Check engine simulation build
run: cargo check -p spacetimedb-engine --no-default-features --features simulation

# Source emsdk environment to make emcc (Emscripten compiler) available in PATH.
- name: Run tests
run: |
Expand Down
43 changes: 43 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ members = [
"crates/data-structures",
"crates/datastore",
"crates/durability",
"crates/engine",
"crates/execution",
"crates/expr",
"crates/guard",
Expand Down Expand Up @@ -132,6 +133,7 @@ spacetimedb-core = { path = "crates/core", version = "=2.4.1" }
spacetimedb-data-structures = { path = "crates/data-structures", version = "=2.4.1" }
spacetimedb-datastore = { path = "crates/datastore", version = "=2.4.1" }
spacetimedb-durability = { path = "crates/durability", version = "=2.4.1" }
spacetimedb-engine = { path = "crates/engine", version = "=2.4.1" }
spacetimedb-execution = { path = "crates/execution", version = "=2.4.1" }
spacetimedb-expr = { path = "crates/expr", version = "=2.4.1" }
spacetimedb-guard = { path = "crates/guard", version = "=2.4.1" }
Expand All @@ -143,6 +145,7 @@ spacetimedb-pg = { path = "crates/pg", version = "=2.4.1" }
spacetimedb-physical-plan = { path = "crates/physical-plan", version = "=2.4.1" }
spacetimedb-primitives = { path = "crates/primitives", version = "=2.4.1" }
spacetimedb-query = { path = "crates/query", version = "=2.4.1" }
spacetimedb-runtime = { path = "crates/runtime", version = "=2.4.1" }
spacetimedb-sats = { path = "crates/sats", version = "=2.4.1" }
spacetimedb-schema = { path = "crates/schema", version = "=2.4.1" }
spacetimedb-standalone = { path = "crates/standalone", version = "=2.4.1" }
Expand All @@ -152,7 +155,6 @@ spacetimedb-fs-utils = { path = "crates/fs-utils", version = "=2.4.1" }
spacetimedb-snapshot = { path = "crates/snapshot", version = "=2.4.1" }
spacetimedb-subscription = { path = "crates/subscription", version = "=2.4.1" }
spacetimedb-query-builder = { path = "crates/query-builder", version = "=2.4.1" }
spacetimedb-runtime = { path = "crates/runtime", version = "=2.4.1" }

# Prevent `ahash` from pulling in `getrandom` by disabling default features.
# Modules use `getrandom02` and we need to prevent an incompatible version
Expand Down
2 changes: 1 addition & 1 deletion crates/bench/benches/subscription.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use spacetimedb::client::consume_each_list::ConsumeEachBuffer;
use spacetimedb::db::relational_db::RelationalDB;
use spacetimedb::db::sql::ast::SchemaViewer;
use spacetimedb::error::DBError;
use spacetimedb::identity::AuthCtx;
use spacetimedb::sql::ast::SchemaViewer;
use spacetimedb::subscription::row_list_builder_pool::BsatnRowListBuilderPool;
use spacetimedb::subscription::tx::DeltaTx;
use spacetimedb::subscription::{collect_table_update, TableUpdateType};
Expand Down
5 changes: 3 additions & 2 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ spacetimedb-client-api-messages.workspace = true
spacetimedb-commitlog.workspace = true
spacetimedb-datastore.workspace = true
spacetimedb-durability.workspace = true
spacetimedb-engine.workspace = true
spacetimedb-memory-usage.workspace = true
spacetimedb-metrics.workspace = true
spacetimedb-primitives.workspace = true
spacetimedb-paths.workspace = true
spacetimedb-physical-plan.workspace = true
spacetimedb-query.workspace = true
spacetimedb-runtime = { workspace = true, features = ["tokio"] }
spacetimedb-runtime.workspace = true
spacetimedb-sats = { workspace = true, features = ["serde"] }
spacetimedb-schema.workspace = true
spacetimedb-table.workspace = true
Expand Down Expand Up @@ -144,7 +145,7 @@ allow_loopback_http_for_tests = []
# Enable timing for wasm ABI calls
spacetimedb-wasm-instance-env-times = []
# Enable test helpers and utils
test = ["spacetimedb-commitlog/test", "spacetimedb-datastore/test"]
test = ["spacetimedb-commitlog/test", "spacetimedb-datastore/test", "spacetimedb-engine/test"]
# Perfmaps for profiling modules
perfmap = []
# Enables core pinning.
Expand Down
6 changes: 6 additions & 0 deletions crates/core/src/database_logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,12 @@ impl SystemLogger {
}
}

impl spacetimedb_engine::update::UpdateLogger for SystemLogger {
fn info(&self, msg: &str) {
self.info(msg);
}
}

#[cfg(test)]
mod tests {
use std::{ops::Range, sync::Arc};
Expand Down
146 changes: 27 additions & 119 deletions crates/core/src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
use std::sync::Arc;
pub mod persistence {
pub use spacetimedb_engine::persistence::*;
}

use enum_map::EnumMap;
use spacetimedb_schema::reducer_name::ReducerName;
use tokio::sync::mpsc;
use tokio::time::MissedTickBehavior;
pub mod relational_db {
pub use spacetimedb_engine::relational_db::*;
}

use crate::subscription::ExecutionCounters;
use spacetimedb_datastore::execution_context::WorkloadType;
use spacetimedb_datastore::{locking_tx_datastore::datastore::TxMetrics, traits::TxData};
pub mod sql {
pub mod ast {
pub use spacetimedb_engine::sql::ast::*;
}

mod durability;
pub mod persistence;
pub mod relational_db;
pub mod snapshot;
pub mod update;
pub mod rls {
pub use spacetimedb_engine::sql::rls::*;
}
}

pub mod snapshot {
pub use spacetimedb_engine::snapshot::*;
}

pub mod update {
pub use spacetimedb_engine::update::*;
}

/// Whether SpacetimeDB is run in memory, or persists objects and
/// a message log to disk.
Expand All @@ -35,111 +44,10 @@ pub struct Config {
pub page_pool_max_size: Option<usize>,
}

/// A message that is processed by the [`spawn_metrics_recorder`] actor.
/// We use a separate task to record metrics to avoid blocking transactions.
pub struct MetricsMessage {
/// The reducer the produced these metrics.
reducer: Option<ReducerName>,
/// Metrics from a mutable transaction.
metrics_for_writer: Option<TxMetrics>,
/// Metrics from a read-only transaction.
/// A message may have metrics for both types of transactions,
/// because metrics for a reducer and its subscription updates are recorded together.
metrics_for_reader: Option<TxMetrics>,
/// The row updates for an immutable transaction.
/// Needed for insert and delete counters.
tx_data: Option<Arc<TxData>>,
/// Cached metrics counters for each workload type.
counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,
}

/// The handle used to send work to the tx metrics recorder.
#[derive(Clone)]
pub struct MetricsRecorderQueue {
tx: mpsc::UnboundedSender<MetricsMessage>,
}

impl MetricsRecorderQueue {
pub fn send_metrics(
&self,
reducer: Option<ReducerName>,
metrics_for_writer: Option<TxMetrics>,
metrics_for_reader: Option<TxMetrics>,
tx_data: Option<Arc<TxData>>,
counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,
) {
if let Err(err) = self.tx.send(MetricsMessage {
reducer,
metrics_for_writer,
metrics_for_reader,
tx_data,
counters,
}) {
log::warn!("failed to send metrics: {err}");
}
}
}

fn record_metrics(
MetricsMessage {
reducer,
metrics_for_writer,
metrics_for_reader,
tx_data,
counters,
}: MetricsMessage,
) {
if let Some(tx_metrics) = metrics_for_writer {
tx_metrics.report(
// If row updates are present,
// they will always belong to the writer transaction.
tx_data.as_deref(),
reducer.as_ref(),
|wl| &counters[wl],
);
}
if let Some(tx_metrics) = metrics_for_reader {
tx_metrics.report(
// If row updates are present,
// they will never belong to the reader transaction.
// Passing row updates here will most likely panic.
None,
reducer.as_ref(),
|wl| &counters[wl],
);
}
}

/// The metrics recorder is a side channel that the main database thread forwards metrics to.
/// While we want to avoid unnecessary compute on the critical path, communicating with other
/// threads is not free, and for this case in particular waking a parked task is not free.
///
/// Previously, each tx would send its metrics to the recorder task. As soon as the recorder
/// task `recv`d a message, it would update the counters and gauges, and immediately wait for
/// the next tx's message. This meant that the tx would need to be more expensive than the
/// recording of its metrics in order for the recorder task not to be parked on `recv` when
/// the tx would `send` its metrics. This would obviously never be the case, and so each `send`
/// would incur the overhead of waking the task.
///
/// To mitigate this, we now record metrics, for potentially many transactions, periodically
/// every 5ms.
const TX_METRICS_RECORDING_INTERVAL: std::time::Duration = std::time::Duration::from_millis(5);

/// Spawns a task for recording transaction metrics.
/// Returns the handle for pushing metrics to the recorder.
pub fn spawn_tx_metrics_recorder() -> (MetricsRecorderQueue, tokio::task::AbortHandle) {
let (tx, mut rx) = mpsc::unbounded_channel();
let abort_handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(TX_METRICS_RECORDING_INTERVAL);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
pub type MetricsRecorderQueue = spacetimedb_engine::MetricsRecorderQueue;

loop {
interval.tick().await;
while let Ok(metrics) = rx.try_recv() {
record_metrics(metrics);
}
}
})
.abort_handle();
(MetricsRecorderQueue { tx }, abort_handle)
pub fn spawn_tx_metrics_recorder(
handle: &spacetimedb_runtime::Handle,
) -> (MetricsRecorderQueue, spacetimedb_runtime::AbortHandle) {
spacetimedb_engine::spawn_tx_metrics_recorder(handle)
}
Loading
Loading