Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/commitlog/proptest-regressions/tests/bitflip.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
# It is recommended to check this file in to source control so that
# everyone who runs the test benefits from these saved cases.
cc a224c9559a4f825676852b58397b59027a14561c8bd9439b52691234fab848de # shrinks to inputs = Inputs { byte_pos: 354, bit_mask: 205, segment_offset: 30 }
cc a62542123f6c7a5c747cdf8d64246d93b1ba55e53f207dd0827d3bc65442cb35 # shrinks to inputs = Inputs { byte_pos: 25, bit_mask: 1, segment_offset: 0 }
100 changes: 95 additions & 5 deletions crates/commitlog/src/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,27 @@ use std::{
use crc32c::{Crc32cReader, Crc32cWriter};
use spacetimedb_sats::buffer::{BufReader, Cursor, DecodeError};

use crate::{error::ChecksumMismatch, payload::Decoder, segment::CHECKSUM_ALGORITHM_CRC32C, Transaction};
use crate::{
error::ChecksumMismatch, payload::Decoder, segment::CHECKSUM_ALGORITHM_CRC32C, Transaction,
DEFAULT_LOG_FORMAT_VERSION,
};

#[derive(Default)]
enum Version {
V0,
#[default]
V1,
}

pub struct Header {
pub min_tx_offset: u64,
pub epoch: u64,
pub n: u16,
pub len: u32,
}

impl Header {
pub const LEN: usize = /* offset */ 8 + /* n */ 2 + /* len */ 4;
pub const LEN: usize = /* offset */ 8 + /* epoch */ 8 + /* n */ 2 + /* len */ 4;

/// Read [`Self::LEN`] bytes from `reader` and interpret them as the
/// "header" of a [`Commit`].
Expand All @@ -30,7 +41,45 @@ impl Header {
///
/// This is to allow preallocation of segments.
///
pub fn decode<R: Read>(mut reader: R) -> io::Result<Option<Self>> {
pub fn decode<R: Read>(reader: R) -> io::Result<Option<Self>> {
Self::decode_v1(reader)
}

fn decode_internal<R: Read>(reader: R, v: Version) -> io::Result<Option<Self>> {
use Version::*;
match v {
V0 => Self::decode_v0(reader),
V1 => Self::decode_v1(reader),
}
}

fn decode_v0<R: Read>(mut reader: R) -> io::Result<Option<Self>> {
let mut hdr = [0; Self::LEN - 8];
if let Err(e) = reader.read_exact(&mut hdr) {
if e.kind() == io::ErrorKind::UnexpectedEof {
return Ok(None);
}

return Err(e);
}
match &mut hdr.as_slice() {
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] => Ok(None),
buf => {
let min_tx_offset = buf.get_u64().map_err(decode_error)?;
let n = buf.get_u16().map_err(decode_error)?;
let len = buf.get_u32().map_err(decode_error)?;

Ok(Some(Self {
min_tx_offset,
epoch: Commit::DEFAULT_EPOCH,
n,
len,
}))
}
}
}

fn decode_v1<R: Read>(mut reader: R) -> io::Result<Option<Self>> {
let mut hdr = [0; Self::LEN];
if let Err(e) = reader.read_exact(&mut hdr) {
if e.kind() == io::ErrorKind::UnexpectedEof {
Expand All @@ -43,10 +92,16 @@ impl Header {
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] => Ok(None),
buf => {
let min_tx_offset = buf.get_u64().map_err(decode_error)?;
let epoch = buf.get_u64().map_err(decode_error)?;
let n = buf.get_u16().map_err(decode_error)?;
let len = buf.get_u32().map_err(decode_error)?;

Ok(Some(Self { min_tx_offset, n, len }))
Ok(Some(Self {
min_tx_offset,
epoch,
n,
len,
}))
}
}
}
Expand All @@ -60,6 +115,18 @@ pub struct Commit {
/// The offset starts from zero and is counted from the beginning of the
/// entire log.
pub min_tx_offset: u64,
/// The epoch within which the commit was created.
///
/// Indicates the monotonically increasing term number of the leader when
/// the commitlog is being written to in a distributed deployment.
///
/// The default epoch is 0 (zero). It should be used when the log is written
/// to by a single process.
///
/// Note, however, that an existing log may have a non-zero epoch.
/// It is currently unspecified how a commitlog is transitioned between
/// distributed and single-node deployment, wrt the epoch.
pub epoch: u64,
/// The number of records in the commit.
pub n: u16,
/// A buffer of all records in the commit in serialized form.
Expand All @@ -70,6 +137,8 @@ pub struct Commit {
}

impl Commit {
pub const DEFAULT_EPOCH: u64 = 0;

pub const FRAMING_LEN: usize = Header::LEN + /* crc32 */ 4;
pub const CHECKSUM_ALGORITHM: u8 = CHECKSUM_ALGORITHM_CRC32C;

Expand All @@ -90,10 +159,12 @@ impl Commit {
let mut out = Crc32cWriter::new(out);

let min_tx_offset = self.min_tx_offset.to_le_bytes();
let epoch = self.epoch.to_le_bytes();
let n = self.n.to_le_bytes();
let len = (self.records.len() as u32).to_le_bytes();

out.write_all(&min_tx_offset)?;
out.write_all(&epoch)?;
out.write_all(&n)?;
out.write_all(&len)?;
out.write_all(&self.records)?;
Expand Down Expand Up @@ -173,13 +244,15 @@ impl From<StoredCommit> for Commit {
fn from(
StoredCommit {
min_tx_offset,
epoch,
n,
records,
checksum: _,
}: StoredCommit,
) -> Self {
Self {
min_tx_offset,
epoch,
n,
records,
}
Expand All @@ -194,6 +267,8 @@ impl From<StoredCommit> for Commit {
pub struct StoredCommit {
/// See [`Commit::min_tx_offset`].
pub min_tx_offset: u64,
/// See [`Commit::epoch`].
pub epoch: u64,
/// See [`Commit::n`].
pub n: u16,
/// See [`Commit::records`].
Expand All @@ -216,9 +291,18 @@ impl StoredCommit {
/// kind [`io::ErrorKind::InvalidData`] with an inner error downcastable to
/// [`ChecksumMismatch`] is returned.
pub fn decode<R: Read>(reader: R) -> io::Result<Option<Self>> {
Self::decode_internal(reader, DEFAULT_LOG_FORMAT_VERSION)
}

pub(crate) fn decode_internal<R: Read>(reader: R, log_format_version: u8) -> io::Result<Option<Self>> {
let mut reader = Crc32cReader::new(reader);

let Some(hdr) = Header::decode(&mut reader)? else {
let v = if log_format_version == 0 {
Version::V0
} else {
Version::V1
};
let Some(hdr) = Header::decode_internal(&mut reader, v)? else {
return Ok(None);
};
let mut records = vec![0; hdr.len as usize];
Expand All @@ -233,6 +317,7 @@ impl StoredCommit {

Ok(Some(Self {
min_tx_offset: hdr.min_tx_offset,
epoch: hdr.epoch,
n: hdr.n,
records,
checksum: crc,
Expand All @@ -258,6 +343,7 @@ impl StoredCommit {
pub struct Metadata {
pub tx_range: Range<u64>,
pub size_in_bytes: u64,
pub epoch: u64,
}

impl Metadata {
Expand All @@ -275,6 +361,7 @@ impl From<Commit> for Metadata {
Self {
tx_range: commit.tx_range(),
size_in_bytes: commit.encoded_len() as u64,
epoch: commit.epoch,
}
}
}
Expand Down Expand Up @@ -312,6 +399,7 @@ mod tests {
min_tx_offset: 0,
n: 3,
records,
epoch: Commit::DEFAULT_EPOCH,
};

let mut buf = Vec::with_capacity(commit.encoded_len());
Expand All @@ -329,6 +417,7 @@ mod tests {
min_tx_offset: 0,
n: 4,
records: vec![0; 128],
epoch: Commit::DEFAULT_EPOCH,
};

let txs = commit
Expand Down Expand Up @@ -358,6 +447,7 @@ mod tests {
min_tx_offset: 42,
n: 10,
records: vec![1; 512],
epoch: Commit::DEFAULT_EPOCH,
};

let mut buf = Vec::with_capacity(commit.encoded_len());
Expand Down
44 changes: 41 additions & 3 deletions crates/commitlog/src/commitlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ impl<R: Repo, T> Generic<R, T> {
debug!("resuming last segment: {last}");
repo::resume_segment_writer(&repo, opts, last)?.or_else(|meta| {
tail.push(meta.tx_range.start);
repo::create_segment_writer(&repo, opts, meta.tx_range.end)
repo::create_segment_writer(&repo, opts, meta.max_epoch, meta.tx_range.end)
})?
} else {
debug!("starting fresh log");
repo::create_segment_writer(&repo, opts, 0)?
repo::create_segment_writer(&repo, opts, Commit::DEFAULT_EPOCH, 0)?
};

Ok(Self {
Expand All @@ -72,6 +72,43 @@ impl<R: Repo, T> Generic<R, T> {
})
}

/// Get the current epoch.
///
/// See also: [`Commit::epoch`].
pub fn epoch(&self) -> u64 {
self.head.commit.epoch
}

/// Update the current epoch.
///
/// Calls [`Self::commit`] to flush all data of the previous epoch, and
/// returns the result.
///
/// Does nothing if the given `epoch` is equal to the current epoch.
///
/// # Errors
///
/// If `epoch` is smaller than the current epoch, an error of kind
/// [`io::ErrorKind::InvalidInput`] is returned.
///
/// Also see [`Self::commit`].
pub fn set_epoch(&mut self, epoch: u64) -> io::Result<Option<Committed>> {
use std::cmp::Ordering::*;

match self.head.epoch().cmp(&epoch) {
Less => Err(io::Error::new(
io::ErrorKind::InvalidInput,
"new epoch is smaller than current epoch",
)),
Equal => Ok(None),
Greater => {
let res = self.commit()?;
self.head.set_epoch(epoch);
Ok(res)
}
}
}

/// Write the currently buffered data to storage and rotate segments as
/// necessary.
///
Expand Down Expand Up @@ -254,7 +291,7 @@ impl<R: Repo, T> Generic<R, T> {
self.head.next_tx_offset(),
self.head.min_tx_offset()
);
let new = repo::create_segment_writer(&self.repo, self.opts, self.head.next_tx_offset())?;
let new = repo::create_segment_writer(&self.repo, self.opts, self.head.epoch(), self.head.next_tx_offset())?;
let old = mem::replace(&mut self.head, new);
self.tail.push(old.min_tx_offset());
self.head.commit = old.commit;
Expand Down Expand Up @@ -821,6 +858,7 @@ mod tests {
min_tx_offset: 0,
n: 1,
records: [43; 32].to_vec(),
epoch: 0,
};
log.commit().unwrap();

Expand Down
29 changes: 29 additions & 0 deletions crates/commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,35 @@ impl<T> Commitlog<T> {
self.inner.read().unwrap().max_committed_offset()
}

/// Get the current epoch.
///
/// See also: [`Commit::epoch`].
pub fn epoch(&self) -> u64 {
self.inner.read().unwrap().epoch()
}

/// Update the current epoch.
///
/// Does nothing if the given `epoch` is equal to the current epoch.
/// Otherwise flushes outstanding transactions to disk (equivalent to
/// [`Self::flush`]) before updating the epoch.
///
/// Returns the maximum transaction offset written to disk. The offset is
/// `None` if the log is empty and no data was pending to be flushed.
///
/// # Errors
///
/// If `epoch` is smaller than the current epoch, an error of kind
/// [`io::ErrorKind::InvalidInput`] is returned.
///
/// Errors from the implicit flush are propagated.
pub fn set_epoch(&self, epoch: u64) -> io::Result<Option<u64>> {
let mut inner = self.inner.write().unwrap();
inner.set_epoch(epoch)?;

Ok(inner.max_committed_offset())
}

/// Sync all OS-buffered writes to disk.
///
/// Note that this does **not** write outstanding records to disk.
Expand Down
4 changes: 2 additions & 2 deletions crates/commitlog/src/payload/txdata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use thiserror::Error;
use crate::{
error,
varint::{decode_varint, encode_varint},
Encode, Varchar,
Encode, Varchar, DEFAULT_LOG_FORMAT_VERSION,
};

// Re-export so we get a hyperlink in rustdocs by default
Expand Down Expand Up @@ -116,7 +116,7 @@ impl<T> Txdata<T> {
}

impl<T: Encode> Txdata<T> {
pub const VERSION: u8 = 0;
pub const VERSION: u8 = DEFAULT_LOG_FORMAT_VERSION;

pub fn encode(&self, buf: &mut impl BufWriter) {
let mut flags = Flags::empty();
Expand Down
Loading