Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ webbrowser = "1.0.2"
windows-sys = "0.59"
xdg = "2.5"
tikv-jemallocator = { version = "0.6.0", features = ["profiling", "stats"] }
tikv-jemalloc-ctl = { version = "0.6.0", features = ["stats"]}
tikv-jemalloc-ctl = { version = "0.6.0", features = ["stats"] }
jemalloc_pprof = { version = "0.7", features = ["symbolize", "flamegraph"] }
zstd-framed = { version = "0.1.1", features = ["tokio"] }

Expand Down
2 changes: 1 addition & 1 deletion crates/commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ impl<T> Commitlog<T> {
self.inner.read().unwrap().repo.existing_offsets()
}

/// Compress the segments at the offsets provded, marking them as immutable.
/// Compress the segments at the offsets provided, marking them as immutable.
pub fn compress_segments(&self, offsets: &[u64]) -> io::Result<()> {
// even though `compress_segment` takes &self, we take an
// exclusive lock to avoid any weirdness happening.
Expand Down
7 changes: 2 additions & 5 deletions crates/commitlog/src/repo/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::fs::{self, File};
use std::io;

use log::{debug, warn};
use spacetimedb_fs_utils::compression::{new_zstd_writer, CompressReader};
use spacetimedb_fs_utils::compression::{compress_with_zstd, CompressReader};
use spacetimedb_paths::server::{CommitLogDir, SegmentFile};
use tempfile::NamedTempFile;

Expand Down Expand Up @@ -124,10 +124,7 @@ impl Repo for Fs {
// bytes per frame. in the future, it might be worth looking into putting
// every commit into its own frame, to make seeking more efficient.
let max_frame_size = 0x1000;
let mut writer = new_zstd_writer(&mut dst, max_frame_size)?;
io::copy(&mut src, &mut writer)?;
writer.shutdown()?;
drop(writer);
compress_with_zstd(&mut src, &mut dst, Some(max_frame_size))?;
dst.persist(self.segment_path(offset))?;
Ok(())
}
Expand Down
20 changes: 6 additions & 14 deletions crates/commitlog/src/stream/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ use std::{
ops::{Bound, RangeBounds},
};

use tokio::io::{
AsyncBufRead, AsyncBufReadExt as _, AsyncRead, AsyncReadExt as _, AsyncSeek, AsyncSeekExt, AsyncWrite,
};
use tokio::io::{AsyncBufRead, AsyncBufReadExt as _, AsyncRead, AsyncReadExt as _, AsyncSeek, AsyncWrite};

use crate::{commit, repo::Repo};

Expand Down Expand Up @@ -52,17 +50,11 @@ impl AsyncFsync for tokio::fs::File {
}

pub trait AsyncLen: AsyncSeek + Unpin + Send {
fn segment_len(&mut self) -> impl Future<Output = io::Result<u64>> + Send {
async {
let old_pos = self.stream_position().await?;
let len = self.seek(io::SeekFrom::End(0)).await?;
// If we're already at the end of the file, avoid seeking.
if old_pos != len {
self.seek(io::SeekFrom::Start(old_pos)).await?;
}

Ok(len)
}
fn segment_len(&mut self) -> impl Future<Output = io::Result<u64>> + Send
where
Self: Sized,
{
async { spacetimedb_fs_utils::compression::segment_len(self).await }
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ spacetimedb-snapshot.workspace = true
spacetimedb-subscription.workspace = true
spacetimedb-expr.workspace = true
spacetimedb-execution.workspace = true
spacetimedb-fs-utils.workspace = true

anyhow = { workspace = true, features = ["backtrace"] }
arrayvec.workspace = true
Expand Down
15 changes: 15 additions & 0 deletions crates/core/src/db/datastore/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,21 @@ impl Locking {
Ok(Some((tx_offset, snapshot_dir)))
}

pub(crate) fn compress_older_snapshot_internal(repo: &SnapshotRepository, upper_bound: TxOffset) {
log::info!(
"Compressing snapshots of database {:?} older than TX offset {}",
repo.database_identity(),
upper_bound,
);
if let Err(err) = repo.compress_older_snapshots(upper_bound) {
log::error!(
"Failed to compress snapshot of database {:?} older than {:?}: {err}",
repo.database_identity(),
upper_bound
);
};
}

/// Returns a list over all the currently connected clients,
/// reading from the `st_clients` system table.
pub fn connected_clients<'a>(
Expand Down
152 changes: 146 additions & 6 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,15 @@ impl SnapshotWorkerActor {
let start_time = std::time::Instant::now();
let committed_state = self.committed_state.clone();
let snapshot_repo = self.repo.clone();
let res =
tokio::task::spawn_blocking(move || Locking::take_snapshot_internal(&committed_state, &snapshot_repo))
.await
.unwrap();
let res = tokio::task::spawn_blocking(move || {
Locking::take_snapshot_internal(&committed_state, &snapshot_repo).inspect(|opts| {
if let Some(opts) = opts {
Locking::compress_older_snapshot_internal(&snapshot_repo, opts.0);
}
})
})
.await
.unwrap();
match res {
Err(e) => {
log::error!(
Expand Down Expand Up @@ -1386,7 +1391,9 @@ pub mod tests_utils {
use super::*;
use core::ops::Deref;
use durability::EmptyHistory;
use spacetimedb_fs_utils::compression::CompressType;
use spacetimedb_lib::{bsatn::to_vec, ser::Serialize};
use spacetimedb_paths::server::SnapshotDirPath;
use spacetimedb_paths::FromPathUnchecked;
use tempfile::TempDir;

Expand Down Expand Up @@ -1467,7 +1474,6 @@ pub mod tests_utils {

Ok(Self {
db,

durable: Some(durable),
tmp_dir: dir,
})
Expand Down Expand Up @@ -1618,6 +1624,10 @@ pub mod tests_utils {
fn row_count_fn() -> RowCountFn {
Arc::new(|_, _| i64::MAX)
}

pub fn take_snapshot(&self, repo: &SnapshotRepository) -> Result<Option<SnapshotDirPath>, DBError> {
self.inner.take_snapshot(repo)
}
}

impl Deref for TestDB {
Expand Down Expand Up @@ -1675,21 +1685,40 @@ pub mod tests_utils {
Self(log)
}
}

pub fn make_snapshot(
dir: SnapshotsPath,
identity: Identity,
replica: u64,
compress: CompressType,
delete_if_exists: bool,
) -> (SnapshotsPath, SnapshotRepository) {
let path = dir.0.join(format!("{replica}_{compress:?}"));
if delete_if_exists && path.exists() {
std::fs::remove_dir_all(&path).unwrap();
}
let dir = SnapshotsPath::from_path_unchecked(path);
dir.create().unwrap();
let snapshot = SnapshotRepository::open(dir.clone(), identity, replica).unwrap();

(dir, snapshot)
}
}

#[cfg(test)]
mod tests {
#![allow(clippy::disallowed_macros)]

use std::cell::RefCell;
use std::path::PathBuf;
use std::rc::Rc;

use super::*;
use crate::db::datastore::system_tables::{
system_tables, StConstraintRow, StIndexRow, StSequenceRow, StTableRow, ST_CONSTRAINT_ID, ST_INDEX_ID,
ST_SEQUENCE_ID, ST_TABLE_ID,
};
use crate::db::relational_db::tests_utils::{insert, TestDB};
use crate::db::relational_db::tests_utils::{insert, make_snapshot, TestDB};
use crate::error::IndexError;
use crate::execution_context::ReducerContext;
use anyhow::bail;
Expand All @@ -1699,15 +1728,19 @@ mod tests {
use durability::EmptyHistory;
use pretty_assertions::assert_eq;
use spacetimedb_data_structures::map::IntMap;
use spacetimedb_fs_utils::compression::{CompressCount, CompressType};
use spacetimedb_lib::db::raw_def::v9::{btree, RawTableDefBuilder};
use spacetimedb_lib::error::ResultTest;
use spacetimedb_lib::Identity;
use spacetimedb_lib::Timestamp;
use spacetimedb_paths::FromPathUnchecked;
use spacetimedb_sats::buffer::BufReader;
use spacetimedb_sats::product;
use spacetimedb_schema::schema::RowLevelSecuritySchema;
use spacetimedb_snapshot::Snapshot;
use spacetimedb_table::read_column::ReadColumn;
use spacetimedb_table::table::RowRef;
use tempfile::TempDir;
use tests::tests_utils::TestHistory;

fn my_table(col_type: AlgebraicType) -> TableSchema {
Expand Down Expand Up @@ -2651,4 +2684,111 @@ mod tests {

stdb.release_tx(read_tx);
}

// Verify that we can compress snapshots and hardlink them,
// except for the last one, which should be uncompressed.
// Then, verify that we can read the compressed snapshot.
//
// NOTE: `snapshot_watching_compressor` is what filter out the last snapshot
#[test]
fn compress_snapshot_test() -> ResultTest<()> {
let stdb = TestDB::in_memory()?;

let mut tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
let schema = my_table(AlgebraicType::I32);
let table_id = stdb.create_table(&mut tx, schema)?;
for v in 0..3 {
insert(&stdb, &mut tx, table_id, &product![v])?;
}
stdb.commit_tx(tx)?;

let root = stdb.path().snapshots();
let (dir, repo) = make_snapshot(root.clone(), Identity::ZERO, 0, CompressType::None, true);
stdb.take_snapshot(&repo)?;

let total_objects = repo.size_on_disk()?.object_count;
// Another snapshots that will hardlink part of the first one
for i in 0..2 {
let mut tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
for v in 0..(10 + i) {
insert(&stdb, &mut tx, table_id, &product![v])?;
}
stdb.commit_tx(tx)?;
stdb.take_snapshot(&repo)?;
}

let size_compress_off = repo.size_on_disk()?;
assert!(
size_compress_off.total_size > 0,
"Snapshot size should be greater than 0"
);
let mut offsets = repo.all_snapshots()?.collect::<Vec<_>>();
offsets.sort();
assert_eq!(&offsets, &[1, 2, 3]);
// Simulate we take except the last snapshot
let last_compress = 2;
assert_eq!(repo.compress_older_snapshots(3)?, CompressCount { none: 0, zstd: 2 });
let size_compress_on = repo.size_on_disk()?;
assert!(size_compress_on.total_size < size_compress_off.total_size);
// Verify we hard-linked the second snapshot
#[cfg(unix)]
{
use std::os::unix::fs::MetadataExt;
let snapshot_dir = dir.snapshot_dir(last_compress);
let mut hard_linked_on = 0;
let mut hard_linked_off = 0;

let (snapshot, compress) = Snapshot::read_from_file(&snapshot_dir.snapshot_file(last_compress))?;
assert_eq!(compress, CompressType::Zstd);
let repo = SnapshotRepository::object_repo(&snapshot_dir)?;
for (_, path) in snapshot.files(&repo) {
match path.metadata()?.nlink() {
0 => hard_linked_off += 1,
_ => hard_linked_on += 1,
}
}
assert_eq!(hard_linked_on, total_objects);
assert_eq!(hard_linked_off, 0);
}

// Sanity check that we can read the snapshot after compression
let repo = open_snapshot_repo(dir, Identity::ZERO, 0)?;
RelationalDB::restore_from_snapshot_or_bootstrap(Identity::ZERO, Some(&repo), Some(last_compress))?;

Ok(())
}

// For test compression into an existing database.
// Must supply the path to the database and the identity of the replica using the `ENV`:
// - `SNAPSHOT` the path to the database, like `/tmp/db/replicas/.../8/database`
// - `IDENTITY` the identity in hex format
#[tokio::test]
#[ignore]
async fn read_existing() -> ResultTest<()> {
let path_db = PathBuf::from(std::env::var("SNAPSHOT").expect("SNAPSHOT must be set to a valid path"));
let identity =
Identity::from_hex(std::env::var("IDENTITY").expect("IDENTITY must be set to a valid hex identity"))?;
let path = ReplicaDir::from_path_unchecked(path_db);

let repo = open_snapshot_repo(path.snapshots(), Identity::ZERO, 0)?;
dbg!(repo.size_on_disk()?);
assert!(
repo.size_on_disk()?.total_size > 0,
"Snapshot size should be greater than 0"
);

let last = repo.latest_snapshot()?;
let stdb = RelationalDB::restore_from_snapshot_or_bootstrap(identity, Some(&repo), last)?;

let out = TempDir::with_prefix("snapshot_test")?;
let dir = SnapshotsPath::from_path_unchecked(out.path());

let (_, repo) = make_snapshot(dir.clone(), Identity::ZERO, 0, CompressType::Zstd, false);

stdb.take_snapshot(&repo)?;
let size = repo.size_on_disk()?;
assert!(size.total_size > 0, "Snapshot size should be greater than 0");

Ok(())
}
}
2 changes: 1 addition & 1 deletion crates/durability/src/imp/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl<T: Encode + Send + Sync + 'static> Local<T> {
self.clog.existing_segment_offsets()
}

/// Compress the segments at the offsets provded, marking them as immutable.
/// Compress the segments at the offsets provided, marking them as immutable.
pub fn compress_segments(&self, offsets: &[TxOffset]) -> io::Result<()> {
self.clog.compress_segments(offsets)
}
Expand Down
Loading
Loading