Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ ahash = "0.8"
anyhow = "1.0.68"
anymap = "0.12"
arrayvec = "0.7.2"
async-stream = "0.3.6"
async-trait = "0.1.68"
axum = { version = "0.7", features = ["tracing"] }
axum-extra = { version = "0.9", features = ["typed-header"] }
Expand All @@ -135,7 +136,7 @@ blake3 = "1.5.1"
brotli = "3.5"
byte-unit = "4.0.18"
bytemuck = { version = "1.16.2", features = ["must_cast"] }
bytes = "1.2.1"
bytes = "1.10.1"
bytestring = { version = "1.2.0", features = ["serde"] }
cargo_metadata = "0.17.0"
chrono = { version = "0.4.24", default-features = false }
Expand Down
13 changes: 12 additions & 1 deletion crates/commitlog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,41 @@ description = "Implementation of the SpacetimeDB commitlog."

[features]
default = ["serde"]
streaming = ["dep:async-stream", "dep:bytes", "dep:futures", "dep:tokio", "dep:tokio-util"]
# Enable types + impls useful for testing
test = ["dep:env_logger"]

[dependencies]
async-stream = { workspace = true, optional = true }
bitflags.workspace = true
bytes= { workspace = true, optional = true }
crc32c.workspace = true
futures = { workspace = true, optional = true}
itertools.workspace = true
log.workspace = true
memmap2 = "0.9.4"
serde = { workspace = true, optional = true }
spacetimedb-primitives.workspace = true
spacetimedb-paths.workspace = true
spacetimedb-primitives.workspace = true
spacetimedb-sats.workspace = true
tempfile.workspace = true
thiserror.workspace = true
tokio = { workspace = true, optional = true }
tokio-util = { workspace = true, optional = true, features = ["io-util"] }

# For the 'test' feature
env_logger = { workspace = true, optional = true }

[dev-dependencies]
# Enable streaming in tests
# Also enable 'test' feature, so integration tests can use the helpers.
spacetimedb-commitlog = { path = ".", features = ["test", "streaming"] }

env_logger.workspace = true
once_cell.workspace = true
pretty_assertions = { workspace = true, features = ["unstable"] }
proptest-derive.workspace = true
proptest.workspace = true
rand.workspace = true
tempfile.workspace = true
tokio-stream = { version = "0.1.17", features = ["fs"] }
9 changes: 6 additions & 3 deletions crates/commitlog/src/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use crc32c::{Crc32cReader, Crc32cWriter};
use spacetimedb_sats::buffer::{BufReader, Cursor, DecodeError};

use crate::{
error::ChecksumMismatch, payload::Decoder, segment::CHECKSUM_ALGORITHM_CRC32C, Transaction,
DEFAULT_LOG_FORMAT_VERSION,
error::ChecksumMismatch,
payload::Decoder,
segment::{CHECKSUM_ALGORITHM_CRC32C, CHECKSUM_CRC32C_LEN},
Transaction, DEFAULT_LOG_FORMAT_VERSION,
};

#[derive(Default)]
Expand Down Expand Up @@ -139,8 +141,9 @@ pub struct Commit {
impl Commit {
pub const DEFAULT_EPOCH: u64 = 0;

pub const FRAMING_LEN: usize = Header::LEN + /* crc32 */ 4;
pub const FRAMING_LEN: usize = Header::LEN + Self::CHECKSUM_LEN;
pub const CHECKSUM_ALGORITHM: u8 = CHECKSUM_ALGORITHM_CRC32C;
pub const CHECKSUM_LEN: usize = CHECKSUM_CRC32C_LEN;

/// The range of transaction offsets contained in this commit.
pub fn tx_range(&self) -> Range<u64> {
Expand Down
127 changes: 89 additions & 38 deletions crates/commitlog/src/commitlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
payload::Decoder,
repo::{self, Repo},
segment::{self, FileLike, Transaction, Writer},
Commit, Encode, Options,
Commit, Encode, Options, DEFAULT_LOG_FORMAT_VERSION,
};

pub use crate::segment::Committed;
Expand Down Expand Up @@ -240,43 +240,7 @@ impl<R: Repo, T> Generic<R, T> {
self.panicked = true;
self.tail.reserve(1);
self.tail.push(self.head.min_tx_offset);
for segment in self.tail.iter().rev() {
let segment = *segment;
if segment > offset {
// Segment is outside the offset, so remove it wholesale.
debug!("removing segment {segment}");
self.repo.remove_segment(segment)?;
} else {
// Read commit-wise until we find the byte offset.
let reader = repo::open_segment_reader(&self.repo, self.opts.log_format_version, segment)?;
let commits = reader.commits();

let mut bytes_read = 0;
for commit in commits {
let commit = commit?;
if commit.min_tx_offset > offset {
break;
}
bytes_read += Commit::from(commit).encoded_len() as u64;
}

if bytes_read == 0 {
// Segment is empty, just remove it.
self.repo.remove_segment(segment)?;
} else {
let byte_offset = segment::Header::LEN as u64 + bytes_read;
debug!("truncating segment {segment} to {offset} at {byte_offset}");
let mut file = self.repo.open_segment(segment)?;
// Note: The offset index truncates equal or greater,
// inclusive. We'd like to retain `offset` in the index, as
// the commit is also retained in the log.
file.ftruncate(offset + 1, byte_offset)?;
// Some filesystems require fsync after ftruncate.
file.fsync()?;
break;
}
}
}
reset_to_internal(&self.repo, &self.tail, offset)?;
// Prevent finalizer from running by not updating self.panicked.

Self::open(self.repo.clone(), self.opts)
Expand Down Expand Up @@ -341,6 +305,37 @@ impl<R: Repo, T> Drop for Generic<R, T> {
}
}

/// Extract the most recently written [`segment::Metadata`] from the commitlog
/// in `repo`.
///
/// Returns `None` if the commitlog is empty.
///
/// Note that this function validates the most recent segment, which entails
/// traversing it from the start.
///
/// The function can be used instead of the pattern:
///
/// ```ignore
/// let log = Commitlog::open(..)?;
/// let max_offset = log.max_committed_offset();
/// ```
///
/// like so:
///
/// ```ignore
/// let max_offset = committed_meta(..)?.map(|meta| meta.tx_range.end);
/// ```
///
/// Unlike `open`, no segment will be created in an empty `repo`.
pub fn committed_meta(repo: impl Repo) -> Result<Option<segment::Metadata>, error::SegmentMetadata> {
let Some(last) = repo.existing_offsets()?.pop() else {
return Ok(None);
};

let mut storage = repo.open_segment(last)?;
segment::Metadata::extract(last, &mut storage).map(Some)
}

pub fn commits_from<R: Repo>(repo: R, max_log_format_version: u8, offset: u64) -> io::Result<Commits<R>> {
let mut offsets = repo.existing_offsets()?;
if let Some(pos) = offsets.iter().rposition(|&off| off <= offset) {
Expand Down Expand Up @@ -446,6 +441,62 @@ where
Ok(())
}

/// Remove all data past the given transaction `offset`.
///
/// The function deletes log segments starting from the newest. As multiple
/// segments cannot be deleted atomically, the log may be left longer than
/// `offset` if the function does not return successfully.
///
/// If the function returns successfully, the most recent [`Commit`] in the
/// log will contain the transaction at `offset`.
///
/// The log must be re-opened if it is to be used after calling this function.
pub fn reset_to(repo: &impl Repo, offset: u64) -> io::Result<()> {
let segments = repo.existing_offsets()?;
reset_to_internal(repo, &segments, offset)
}

fn reset_to_internal(repo: &impl Repo, segments: &[u64], offset: u64) -> io::Result<()> {
for segment in segments.iter().copied().rev() {
if segment > offset {
// Segment is outside the offset, so remove it wholesale.
debug!("removing segment {segment}");
repo.remove_segment(segment)?;
} else {
// Read commit-wise until we find the byte offset.
let reader = repo::open_segment_reader(repo, DEFAULT_LOG_FORMAT_VERSION, segment)?;
let commits = reader.commits();

let mut bytes_read = 0;
for commit in commits {
let commit = commit?;
if commit.min_tx_offset > offset {
break;
}
bytes_read += Commit::from(commit).encoded_len() as u64;
}

if bytes_read == 0 {
// Segment is empty, just remove it.
repo.remove_segment(segment)?;
} else {
let byte_offset = segment::Header::LEN as u64 + bytes_read;
debug!("truncating segment {segment} to {offset} at {byte_offset}");
let mut file = repo.open_segment(segment)?;
// Note: The offset index truncates equal or greater,
// inclusive. We'd like to retain `offset` in the index, as
// the commit is also retained in the log.
file.ftruncate(offset + 1, byte_offset)?;
// Some filesystems require fsync after ftruncate.
file.fsync()?;
break;
}
}
}

Ok(())
}

pub struct Segments<R> {
repo: R,
offs: vec::IntoIter<u64>,
Expand Down
2 changes: 1 addition & 1 deletion crates/commitlog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub struct Append<T> {
pub struct ChecksumMismatch;

#[derive(Debug, Error)]
pub(crate) enum SegmentMetadata {
pub enum SegmentMetadata {
#[error("invalid commit encountered")]
InvalidCommit {
sofar: segment::Metadata,
Expand Down
29 changes: 29 additions & 0 deletions crates/commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ pub use crate::{
pub mod error;
pub mod payload;

#[cfg(feature = "streaming")]
pub mod stream;

#[cfg(any(test, feature = "test"))]
pub mod tests;

Expand Down Expand Up @@ -433,6 +436,32 @@ impl<T: Encode> Commitlog<T> {
}
}

/// Extract the most recently written [`segment::Metadata`] from the commitlog
/// in `repo`.
///
/// Returns `None` if the commitlog is empty.
///
/// Note that this function validates the most recent segment, which entails
/// traversing it from the start.
///
/// The function can be used instead of the pattern:
///
/// ```ignore
/// let log = Commitlog::open(..)?;
/// let max_offset = log.max_committed_offset();
/// ```
///
/// like so:
///
/// ```ignore
/// let max_offset = committed_meta(..)?.map(|meta| meta.tx_range.end);
/// ```
///
/// Unlike `open`, no segment will be created in an empty `repo`.
pub fn committed_meta(root: CommitLogDir) -> Result<Option<segment::Metadata>, error::SegmentMetadata> {
commitlog::committed_meta(repo::Fs::new(root)?)
}

/// Obtain an iterator which traverses the commitlog located at the `root`
/// directory from the start, yielding [`StoredCommit`]s.
///
Expand Down
Loading
Loading