Skip to content
9 changes: 9 additions & 0 deletions crates/commitlog/src/commitlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,15 @@ impl<R: Repo, T> Generic<R, T> {
self.head.next_tx_offset().checked_sub(1)
}

/// The first transaction offset written to disk, or `None` if nothing has
/// been written yet.
pub fn min_committed_offset(&self) -> Option<u64> {
self.tail
.first()
.copied()
.or_else(|| (!self.head.is_empty()).then(|| self.head.min_tx_offset()))
}

// Helper to obtain a list of the segment offsets which include transaction
// offset `offset`.
//
Expand Down
7 changes: 7 additions & 0 deletions crates/commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ impl<T> Commitlog<T> {
self.inner.read().unwrap().max_committed_offset()
}

/// Determine the minimum transaction offset in the log.
///
/// The offset is `None` if the log hasn't been flushed to disk yet.
pub fn min_committed_offset(&self) -> Option<u64> {
self.inner.read().unwrap().min_committed_offset()
}

/// Get the current epoch.
///
/// See also: [`Commit::epoch`].
Expand Down
185 changes: 151 additions & 34 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use super::datastore::{
};
use super::db_metrics::DB_METRICS;
use crate::db::datastore::system_tables::{StModuleRow, WASM_MODULE};
use crate::error::{DBError, DatabaseError, TableError};
use crate::error::{DBError, DatabaseError, RestoreSnapshotError, TableError};
use crate::execution_context::{ReducerContext, Workload};
use crate::messages::control_db::HostType;
use crate::util::{asyncify, spawn_rayon};
Expand Down Expand Up @@ -336,12 +336,14 @@ impl RelationalDB {
.map(|pair| pair.0.clone())
.as_deref()
.and_then(|durability| durability.durable_tx_offset());
let (min_commitlog_offset, _) = history.tx_range_hint();

log::info!("[{database_identity}] DATABASE: durable_tx_offset is {durable_tx_offset:?}");
let inner = Self::restore_from_snapshot_or_bootstrap(
database_identity,
snapshot_repo.as_deref(),
durable_tx_offset,
min_commitlog_offset,
page_pool,
)?;

Expand Down Expand Up @@ -460,43 +462,102 @@ impl RelationalDB {
database_identity: Identity,
snapshot_repo: Option<&SnapshotRepository>,
durable_tx_offset: Option<TxOffset>,
min_commitlog_offset: TxOffset,
page_pool: PagePool,
) -> Result<Locking, DBError> {
if let Some(snapshot_repo) = snapshot_repo {
if let Some(durable_tx_offset) = durable_tx_offset {
// Don't restore from a snapshot newer than the `durable_tx_offset`,
// so that you drop TXes which were committed but not durable before the restart.
if let Some(tx_offset) = snapshot_repo.latest_snapshot_older_than(durable_tx_offset)? {
// Mark any newer snapshots as invalid, as the new history will diverge from their state.
snapshot_repo.invalidate_newer_snapshots(durable_tx_offset)?;
log::info!("[{database_identity}] DATABASE: restoring snapshot of tx_offset {tx_offset}");
let start = std::time::Instant::now();
let snapshot = snapshot_repo.read_snapshot(tx_offset, &page_pool)?;
) -> Result<Locking, RestoreSnapshotError> {
fn try_restore_snapshot(
snapshot_repo: &SnapshotRepository,
snapshot_offset: TxOffset,
database_identity: Identity,
page_pool: PagePool,
) -> Result<Locking, RestoreSnapshotError> {
log::info!(
"[{database_identity}] DATABASE: restoring snapshot of tx_offset {}",
snapshot_offset
);
let start = std::time::Instant::now();
let snapshot = snapshot_repo
.read_snapshot(snapshot_offset, &page_pool)
.map_err(Box::new)?;
log::info!(
"[{database_identity}] DATABASE: read snapshot of tx_offset {} in {:?}",
snapshot_offset,
start.elapsed(),
);
if snapshot.database_identity != database_identity {
return Err(RestoreSnapshotError::IdentityMismatch {
expected: database_identity,
actual: snapshot.database_identity,
});
}
let start = std::time::Instant::now();
Locking::restore_from_snapshot(snapshot, page_pool)
.inspect(|_| {
log::info!(
"[{database_identity}] DATABASE: read snapshot of tx_offset {tx_offset} in {:?}",
"[{database_identity}] DATABASE: restored from snapshot of tx_offset {} in {:?}",
snapshot_offset,
start.elapsed(),
);
if snapshot.database_identity != database_identity {
// TODO: return a proper typed error
return Err(anyhow::anyhow!(
"Snapshot has incorrect database_identity: expected {database_identity} but found {}",
snapshot.database_identity,
)
.into());
)
})
.inspect_err(|e| {
log::warn!(
"[{database_identity}] DATABASE: failed to restore snapshot of tx_offset {}: {}",
snapshot_offset,
e
)
})
.map_err(Box::new)
.map_err(RestoreSnapshotError::Datastore)
}

if let Some((snapshot_repo, durable_tx_offset)) = snapshot_repo.zip(durable_tx_offset) {
// Mark any newer snapshots as invalid, as the history past
// `durable_tx_offset` may have been reset and thus diverge from
// any snapshots taken earlier.
snapshot_repo
.invalidate_newer_snapshots(durable_tx_offset)
.map_err(Box::new)?;

// Try to restore from any snapshot that was taken within the
// range `(min_commitlog_offset + 1)..=durable_tx_offset`.
let mut upper_bound = durable_tx_offset;
loop {
let Some(snapshot_offset) = snapshot_repo
.latest_snapshot_older_than(upper_bound)
.map_err(Box::new)?
else {
break;
};
if min_commitlog_offset + 1 > snapshot_offset {
break;
}
if let Ok(datastore) =
try_restore_snapshot(snapshot_repo, snapshot_offset, database_identity, page_pool.clone())
{
return Ok(datastore);
} else {
// `latest_snapshot_older_than` is inclusive of the
// upper bound, so subtract one and give up if there
// are no more offsets to try.
match snapshot_offset.checked_sub(1) {
None => break,
Some(older_than) => upper_bound = older_than,
}
let start = std::time::Instant::now();
let res = Locking::restore_from_snapshot(snapshot, page_pool);
log::info!(
"[{database_identity}] DATABASE: restored from snapshot of tx_offset {tx_offset} in {:?}",
start.elapsed(),
);
return res;
}
}
log::info!("[{database_identity}] DATABASE: no snapshot on disk");
}
log::info!("[{database_identity}] DATABASE: no usable snapshot on disk");

// If we didn't find a snapshot and the commitlog doesn't start at the
// zero-th commit (e.g. due to archiving), there is no way to restore
// the database.
if min_commitlog_offset > 0 {
return Err(RestoreSnapshotError::NoConnectedSnapshot { min_commitlog_offset });
}

Locking::bootstrap(database_identity, page_pool)
.map_err(Box::new)
.map_err(RestoreSnapshotError::Bootstrap)
}

/// Apply the provided [`spacetimedb_durability::History`] onto the database
Expand Down Expand Up @@ -1267,7 +1328,7 @@ where
// always supplied when constructing a `RelationalDB`. This would allow
// to spawn a timer task here which just prints the progress periodically
// in case the history is finite but very long.
let max_tx_offset = history.max_tx_offset();
let (_, max_tx_offset) = history.tx_range_hint();
let mut last_logged_percentage = 0;
let progress = |tx_offset: u64| {
if let Some(max_tx_offset) = max_tx_offset {
Expand Down Expand Up @@ -1665,13 +1726,15 @@ pub mod tests_utils {

impl durability::History for TestHistory {
type TxData = Txdata;

fn fold_transactions_from<D>(&self, offset: TxOffset, decoder: D) -> Result<(), D::Error>
where
D: commitlog::Decoder,
D::Error: From<commitlog::error::Traversal>,
{
self.0.fold_transactions_from(offset, decoder)
}

fn transactions_from<'a, D>(
&self,
offset: TxOffset,
Expand All @@ -1684,8 +1747,12 @@ pub mod tests_utils {
{
self.0.transactions_from(offset, decoder)
}
fn max_tx_offset(&self) -> Option<TxOffset> {
self.0.max_committed_offset()

fn tx_range_hint(&self) -> (TxOffset, Option<TxOffset>) {
let min = self.0.min_committed_offset().unwrap_or_default();
let max = self.0.max_committed_offset();

(min, max)
}
}

Expand Down Expand Up @@ -1721,6 +1788,7 @@ mod tests {
#![allow(clippy::disallowed_macros)]

use std::cell::RefCell;
use std::fs::OpenOptions;
use std::path::PathBuf;
use std::rc::Rc;

Expand All @@ -1737,7 +1805,7 @@ mod tests {
use commitlog::payload::txdata;
use commitlog::Commitlog;
use durability::EmptyHistory;
use pretty_assertions::assert_eq;
use pretty_assertions::{assert_eq, assert_matches};
use spacetimedb_data_structures::map::IntMap;
use spacetimedb_fs_utils::compression::{CompressCount, CompressType};
use spacetimedb_lib::db::raw_def::v9::{btree, RawTableDefBuilder};
Expand Down Expand Up @@ -2769,6 +2837,7 @@ mod tests {
Identity::ZERO,
Some(&repo),
Some(last_compress),
0,
PagePool::new_for_test(),
)?;

Expand All @@ -2795,7 +2864,7 @@ mod tests {

let last = repo.latest_snapshot()?;
let stdb =
RelationalDB::restore_from_snapshot_or_bootstrap(identity, Some(&repo), last, PagePool::new_for_test())?;
RelationalDB::restore_from_snapshot_or_bootstrap(identity, Some(&repo), last, 0, PagePool::new_for_test())?;

let out = TempDir::with_prefix("snapshot_test")?;
let dir = SnapshotsPath::from_path_unchecked(out.path());
Expand All @@ -2808,4 +2877,52 @@ mod tests {

Ok(())
}

#[test]
fn tries_older_snapshots() -> ResultTest<()> {
let stdb = TestDB::in_memory()?;
stdb.path().snapshots().create()?;
let repo = SnapshotRepository::open(stdb.path().snapshots(), stdb.database_identity(), 85)?;

stdb.take_snapshot(&repo)?.expect("failed to take snapshot");
{
let mut tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
let schema = my_table(AlgebraicType::I32);
let table_id = stdb.create_table(&mut tx, schema)?;
for v in 0..3 {
insert(&stdb, &mut tx, table_id, &product![v])?;
}
stdb.commit_tx(tx)?;
}
stdb.take_snapshot(&repo)?.expect("failed to take snapshot");

let try_restore = |durable_tx_offset, min_commitlog_offset| {
RelationalDB::restore_from_snapshot_or_bootstrap(
stdb.database_identity(),
Some(&repo),
Some(durable_tx_offset),
min_commitlog_offset,
PagePool::new_for_test(),
)
};

try_restore(1, 0)?;
// We can restore from the previous snapshot
// if the snapshot file is corrupted
repo.snapshot_dir_path(1)
.snapshot_file(1)
.open_file(OpenOptions::new().write(true))?
.set_len(1)?;
try_restore(1, 0)?;
// Also if it's gone
std::fs::remove_file(repo.snapshot_dir_path(1).snapshot_file(1))?;
try_restore(1, 0)?;
// But not if the commitlog starts after the previous snapshot
assert_matches!(
try_restore(1, 1).map(drop),
Err(RestoreSnapshotError::NoConnectedSnapshot { .. })
);

Ok(())
}
}
18 changes: 18 additions & 0 deletions crates/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use std::sync::{MutexGuard, PoisonError};

use enum_as_inner::EnumAsInner;
use hex::FromHexError;
use spacetimedb_commitlog::repo::TxOffset;
use spacetimedb_expr::errors::TypingError;
use spacetimedb_lib::Identity;
use spacetimedb_sats::AlgebraicType;
use spacetimedb_schema::error::ValidationErrors;
use spacetimedb_snapshot::SnapshotError;
Expand Down Expand Up @@ -236,6 +238,8 @@ pub enum DBError {
error: Box<DBError>,
sql: Box<str>,
},
#[error(transparent)]
RestoreSnapshot(#[from] RestoreSnapshotError),
}

impl From<bflatn_to::Error> for DBError {
Expand Down Expand Up @@ -409,3 +413,17 @@ impl From<ErrorVm> for NodesError {
DBError::from(err).into()
}
}

#[derive(Debug, Error)]
pub enum RestoreSnapshotError {
#[error("Snapshot has incorrect database_identity: expected {expected} but found {actual}")]
IdentityMismatch { expected: Identity, actual: Identity },
#[error("Failed to restore datastore from snapshot")]
Datastore(#[source] Box<DBError>),
#[error("Failed to read snapshot")]
Snapshot(#[from] Box<SnapshotError>),
#[error("Failed to bootstrap datastore without snapshot")]
Bootstrap(#[source] Box<DBError>),
#[error("No connected snapshot found, commitlog starts at {min_commitlog_offset}")]
NoConnectedSnapshot { min_commitlog_offset: TxOffset },
}
15 changes: 12 additions & 3 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -698,8 +698,7 @@ impl Host {
///
/// Note that this does **not** run module initialization routines, but may
/// create on-disk artifacts if the host / database did not exist.

#[tracing::instrument(level = "debug", skip_all, err)]
#[tracing::instrument(level = "debug", skip_all)]
async fn try_init(host_controller: &HostController, database: Database, replica_id: u64) -> anyhow::Result<Self> {
let HostController {
data_dir,
Expand Down Expand Up @@ -738,7 +737,17 @@ impl Host {
Some(durability),
Some(snapshot_repo),
page_pool.clone(),
)?;
)
// Make sure we log the source chain of the error
// as a single line, with the help of `anyhow`.
.map_err(anyhow::Error::from)
.inspect_err(|e| {
tracing::error!(
database = %database.database_identity,
replica = replica_id,
"Failed to open database: {e:#}"
);
})?;
if let Some(start_snapshot_watcher) = start_snapshot_watcher {
let watcher = db.subscribe_to_snapshots().expect("we passed snapshot_repo");
start_snapshot_watcher(watcher)
Expand Down
7 changes: 5 additions & 2 deletions crates/durability/src/imp/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,10 @@ impl<T: Encode + 'static> History for Local<T> {
self.clog.transactions_from(offset, decoder)
}

fn max_tx_offset(&self) -> Option<TxOffset> {
self.clog.max_committed_offset()
fn tx_range_hint(&self) -> (TxOffset, Option<TxOffset>) {
let min = self.clog.min_committed_offset().unwrap_or_default();
let max = self.clog.max_committed_offset();

(min, max)
}
}
Loading
Loading