Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1318,6 +1318,47 @@ pub async fn local_durability(commitlog_dir: CommitLogDir) -> io::Result<(LocalD
Ok((local, disk_size_fn))
}

/// Watches snapshot creation events and compresses all commitlog segments older
/// than the snapshot.
///
/// Intended to be spawned as a [StartSnapshotWatcher], provided by a
/// [DurabilityProvider]. Suitable **only** for non-replicated databases.
///
/// [StartSnapshotWatcher]: crate::host::host_controller::StartSnapshotWatcher
/// [DurabilityProvider]: crate::host::host_controller::DurabilityProvider
pub async fn snapshot_watching_commitlog_compressor(
mut snapshot_rx: watch::Receiver<u64>,
durability: LocalDurability,
) {
let mut prev_snapshot_offset = *snapshot_rx.borrow_and_update();
while snapshot_rx.changed().await.is_ok() {
let snapshot_offset = *snapshot_rx.borrow_and_update();
let Ok(segment_offsets) = durability
.existing_segment_offsets()
.inspect_err(|e| tracing::warn!("failed to find offsets: {e}"))
else {
continue;
};
let start_idx = segment_offsets
.binary_search(&prev_snapshot_offset)
// if the snapshot is in the middle of a segment, we want to round down.
// [0, 2].binary_search(1) will return Err(1), so we subtract 1.
.unwrap_or_else(|i| i.saturating_sub(1));
let segment_offsets = &segment_offsets[start_idx..];
let end_idx = segment_offsets
.binary_search(&snapshot_offset)
.unwrap_or_else(|i| i.saturating_sub(1));
// in this case, segment_offsets[end_idx] is the segment that contains the snapshot,
// which we don't want to compress, so an exclusive range is correct.
let segment_offsets = &segment_offsets[..end_idx];
if let Err(e) = durability.compress_segments(segment_offsets) {
tracing::warn!("failed to compress segments: {e}");
continue;
}
prev_snapshot_offset = snapshot_offset;
}
}

/// Open a [`SnapshotRepository`] at `db_path/snapshots`,
/// configured to store snapshots of the database `database_identity`/`replica_id`.
pub fn open_snapshot_repo(
Expand Down
36 changes: 4 additions & 32 deletions crates/standalone/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use spacetimedb_client_api_messages::name::{DomainName, InsertDomainResult, Regi
use spacetimedb_paths::server::{ModuleLogsDir, PidFile, ServerDataDir};
use spacetimedb_paths::standalone::StandaloneDataDirExt;
use std::sync::Arc;
use tokio::sync::watch;

pub use spacetimedb_client_api::routes::subscribe::{BIN_PROTOCOL, TEXT_PROTOCOL};

Expand Down Expand Up @@ -108,43 +107,16 @@ impl DurabilityProvider for StandaloneDurabilityProvider {
let start_snapshot_watcher = {
let durability = durability.clone();
|snapshot_rx| {
tokio::spawn(snapshot_watcher(snapshot_rx, durability));
tokio::spawn(relational_db::snapshot_watching_commitlog_compressor(
snapshot_rx,
durability,
));
}
};
Ok(((durability, disk_size), Some(Box::new(start_snapshot_watcher))))
}
}

async fn snapshot_watcher(mut snapshot_rx: watch::Receiver<u64>, durability: relational_db::LocalDurability) {
let mut prev_snapshot_offset = *snapshot_rx.borrow_and_update();
while snapshot_rx.changed().await.is_ok() {
let snapshot_offset = *snapshot_rx.borrow_and_update();
let Ok(segment_offsets) = durability
.existing_segment_offsets()
.inspect_err(|e| tracing::warn!("failed to find offsets: {e}"))
else {
continue;
};
let start_idx = segment_offsets
.binary_search(&prev_snapshot_offset)
// if the snapshot is in the middle of a segment, we want to round down.
// [0, 2].binary_search(1) will return Err(1), so we subtract 1.
.unwrap_or_else(|i| i.saturating_sub(1));
let segment_offsets = &segment_offsets[start_idx..];
let end_idx = segment_offsets
.binary_search(&snapshot_offset)
.unwrap_or_else(|i| i.saturating_sub(1));
// in this case, segment_offsets[end_idx] is the segment that contains the snapshot,
// which we don't want to compress, so an exclusive range is correct.
let segment_offsets = &segment_offsets[..end_idx];
if let Err(e) = durability.compress_segments(segment_offsets) {
tracing::warn!("failed to compress segments: {e}");
continue;
}
prev_snapshot_offset = snapshot_offset;
}
}

#[async_trait]
impl NodeDelegate for StandaloneEnv {
fn gather_metrics(&self) -> Vec<prometheus::proto::MetricFamily> {
Expand Down
Loading