diff --git a/libsql-replication/src/injector/libsql_injector.rs b/libsql-replication/src/injector/libsql_injector.rs index f730881cf6..d2c9ae2c7e 100644 --- a/libsql-replication/src/injector/libsql_injector.rs +++ b/libsql-replication/src/injector/libsql_injector.rs @@ -1,8 +1,9 @@ use std::mem::size_of; -use libsql_wal::io::StdIO; use libsql_wal::replication::injector::Injector; +use libsql_wal::segment::sealed::SealedSegment; use libsql_wal::segment::Frame as WalFrame; +use libsql_wal::{io::StdIO, storage::Storage}; use zerocopy::{AsBytes, FromZeroes}; use crate::frame::FrameNo; @@ -10,17 +11,17 @@ use crate::rpc::replication::Frame as RpcFrame; use super::error::{Error, Result}; -pub struct LibsqlInjector { - injector: Injector, +pub struct LibsqlInjector { + injector: Injector, } -impl LibsqlInjector { - pub fn new(injector: Injector) -> Self { +impl LibsqlInjector { + pub fn new(injector: Injector) -> Self { Self { injector } } } -impl super::Injector for LibsqlInjector { +impl>> super::Injector for LibsqlInjector { async fn inject_frame(&mut self, frame: RpcFrame) -> Result> { // this is a bit annoying be we want to read the frame, and it has to be aligned, so we // must copy it... diff --git a/libsql-server/src/connection/connection_manager.rs b/libsql-server/src/connection/connection_manager.rs index b923f65ab6..502385d31e 100644 --- a/libsql-server/src/connection/connection_manager.rs +++ b/libsql-server/src/connection/connection_manager.rs @@ -31,13 +31,13 @@ pub type ConnId = u64; pub type InnerWalManager = Either3, DurableWalManager>; #[cfg(feature = "durable-wal")] -pub type InnerWal = Either3, DurableWal>; +pub type InnerWal = Either3, DurableWal>; #[cfg(not(feature = "durable-wal"))] pub type InnerWalManager = Either>; #[cfg(not(feature = "durable-wal"))] -pub type InnerWal = Either>; +pub type InnerWal = Either>; pub type ManagedConnectionWal = WrappedWal; #[derive(Copy, Clone, Debug)] diff --git a/libsql-server/src/connection/libsql.rs b/libsql-server/src/connection/libsql.rs index ac374815fe..a38e1da919 100644 --- a/libsql-server/src/connection/libsql.rs +++ b/libsql-server/src/connection/libsql.rs @@ -82,7 +82,7 @@ impl MakeConnection for MakeLibsqlConnection { #[derive(Clone)] pub struct LibsqlConnection { - inner: Arc>>>, + inner: Arc>>>, } impl LibsqlConnection { diff --git a/libsql-server/src/namespace/configurator/libsql_fork.rs b/libsql-server/src/namespace/configurator/libsql_fork.rs index b9d8d690d3..bdd5b62466 100644 --- a/libsql-server/src/namespace/configurator/libsql_fork.rs +++ b/libsql-server/src/namespace/configurator/libsql_fork.rs @@ -97,7 +97,7 @@ pub(crate) async fn libsql_wal_fork( } async fn try_inject( - to_shared: Arc>, + to_shared: Arc>, stream: &mut Pin< Box, libsql_wal::replication::Error>> + Send + '_>, >, diff --git a/libsql-server/src/replication/replicator_client.rs b/libsql-server/src/replication/replicator_client.rs index 2ba084c329..9cf9623079 100644 --- a/libsql-server/src/replication/replicator_client.rs +++ b/libsql-server/src/replication/replicator_client.rs @@ -29,10 +29,11 @@ use crate::metrics::{ use crate::namespace::meta_store::MetaStoreHandle; use crate::namespace::{NamespaceName, NamespaceStore}; use crate::replication::FrameNo; +use crate::SqldStorage; pub enum WalImpl { LibsqlWal { - shared: Arc>, + shared: Arc>, }, SqliteWal { meta: WalIndexMeta, @@ -52,7 +53,7 @@ impl WalImpl { }) } - pub fn new_libsql(shared: Arc>) -> Self { + pub fn new_libsql(shared: Arc>) -> Self { Self::LibsqlWal { shared } } diff --git a/libsql-server/src/rpc/replication/libsql_replicator.rs b/libsql-server/src/rpc/replication/libsql_replicator.rs index 7b80a611f3..a29864fcc3 100644 --- a/libsql-server/src/rpc/replication/libsql_replicator.rs +++ b/libsql-server/src/rpc/replication/libsql_replicator.rs @@ -82,12 +82,12 @@ pin_project_lite::pin_project! { #[pin] inner: S, flavor: WalFlavor, - shared: Arc>, + shared: Arc>, } } impl FrameStreamAdapter { - fn new(inner: S, flavor: WalFlavor, shared: Arc>) -> Self { + fn new(inner: S, flavor: WalFlavor, shared: Arc>) -> Self { Self { inner, flavor, diff --git a/libsql-wal/benches/benchmarks.rs b/libsql-wal/benches/benchmarks.rs index f0faeda715..67f54edfb0 100644 --- a/libsql-wal/benches/benchmarks.rs +++ b/libsql-wal/benches/benchmarks.rs @@ -55,7 +55,7 @@ fn prepare_for_random_reads(conn: &mut Connection) { } } -fn with_libsql_conn(f: impl FnOnce(&mut Connection>)) { +fn with_libsql_conn(f: impl FnOnce(&mut Connection>)) { let tmp = tempdir().unwrap(); let resolver = |_: &Path| NamespaceName::from_string("test".into()); diff --git a/libsql-wal/src/checkpointer.rs b/libsql-wal/src/checkpointer.rs index 672a250e65..f6ec6d3cf7 100644 --- a/libsql-wal/src/checkpointer.rs +++ b/libsql-wal/src/checkpointer.rs @@ -8,6 +8,8 @@ use tokio::task::JoinSet; use crate::io::Io; use crate::registry::WalRegistry; +use crate::segment::sealed::SealedSegment; +use crate::storage::Storage; pub(crate) type NotifyCheckpointer = mpsc::Sender; @@ -29,7 +31,7 @@ pub type LibsqlCheckpointer = Checkpointer>; impl LibsqlCheckpointer where IO: Io, - S: Sync + Send + 'static, + S: Storage>, { pub fn new( registry: Arc>, @@ -51,6 +53,7 @@ impl PerformCheckpoint for WalRegistry where IO: Io, S: Sync + Send + 'static, + S: Storage>, { #[tracing::instrument(skip(self))] fn checkpoint( diff --git a/libsql-wal/src/lib.rs b/libsql-wal/src/lib.rs index e2f0b3826c..506f26fc90 100644 --- a/libsql-wal/src/lib.rs +++ b/libsql-wal/src/lib.rs @@ -124,7 +124,7 @@ pub mod test { Self { tmp, registry, wal } } - pub fn shared(&self, namespace: &str) -> Arc> { + pub fn shared(&self, namespace: &str) -> Arc>> { let path = self.tmp.path().join(namespace).join("data"); let registry = self.registry.clone(); let namespace = NamespaceName::from_string(namespace.into()); @@ -135,7 +135,10 @@ pub mod test { self.tmp.path().join(namespace) } - pub fn open_conn(&self, namespace: &'static str) -> libsql_sys::Connection> { + pub fn open_conn( + &self, + namespace: &'static str, + ) -> libsql_sys::Connection>> { let path = self.db_path(namespace); let wal = self.wal.clone(); std::fs::create_dir_all(&path).unwrap(); @@ -159,7 +162,7 @@ pub mod test { } } - pub fn seal_current_segment(shared: &SharedWal) { + pub fn seal_current_segment(shared: &SharedWal>) { let mut tx = shared.begin_read(99999).into(); shared.upgrade(&mut tx).unwrap(); { @@ -170,7 +173,7 @@ pub mod test { tx.end(); } - pub async fn wait_current_durable(shared: &SharedWal) { + pub async fn wait_current_durable(shared: &SharedWal) { let current = shared.current.load().next_frame_no().get() - 1; loop { { diff --git a/libsql-wal/src/registry.rs b/libsql-wal/src/registry.rs index b948643a84..d0597652ea 100644 --- a/libsql-wal/src/registry.rs +++ b/libsql-wal/src/registry.rs @@ -1,14 +1,11 @@ use std::io; -use std::num::NonZeroU64; use std::path::Path; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; use dashmap::DashMap; -use libsql_sys::ffi::Sqlite3DbHeader; use parking_lot::{Condvar, Mutex}; -use rand::Rng; use roaring::RoaringBitmap; use tokio::sync::{mpsc, Notify, Semaphore}; use tokio::task::JoinSet; @@ -22,20 +19,14 @@ use crate::io::file::FileExt; use crate::io::{Io, StdIO}; use crate::replication::injector::Injector; use crate::replication::storage::{ReplicateFromStorage as _, StorageReplicator}; -use crate::segment::list::SegmentList; -use crate::segment::Segment; -use crate::segment::{current::CurrentSegment, sealed::SealedSegment}; -use crate::segment_swap_strategy::duration::DurationSwapStrategy; -use crate::segment_swap_strategy::frame_count::FrameCountSwapStrategy; -use crate::segment_swap_strategy::SegmentSwapStrategy; -use crate::shared_wal::{SharedWal, SwapLog}; -use crate::storage::{OnStoreCallback, Storage}; -use crate::transaction::TxGuard; +use crate::segment::sealed::SealedSegment; +use crate::shared_wal::SharedWal; +use crate::storage::Storage; use crate::{LibsqlFooter, LIBSQL_PAGE_SIZE}; use libsql_sys::name::NamespaceName; -enum Slot { - Wal(Arc>), +enum Slot { + Wal(Arc>), /// Only a single thread is allowed to instantiate the wal. The first thread to acquire an /// entry in the registry map puts a building slot. Other connections will wait for the mutex /// to turn to true, after the slot has been updated to contain the wal @@ -48,7 +39,7 @@ enum Slot { pub struct WalRegistry { io: Arc, shutdown: AtomicBool, - opened: DashMap>, + opened: DashMap>, storage: Arc, checkpoint_notifier: mpsc::Sender, } @@ -79,7 +70,7 @@ impl WalRegistry { Ok(registry) } - pub async fn get_async(&self, namespace: &NamespaceName) -> Option>> { + pub async fn get_async(&self, namespace: &NamespaceName) -> Option>> { loop { let notify = { match self.opened.get(namespace).as_deref() { @@ -95,82 +86,6 @@ impl WalRegistry { } } -impl SwapLog for WalRegistry -where - IO: Io, - S: Storage>, -{ - #[tracing::instrument(skip_all)] - fn swap_current( - &self, - shared: &SharedWal, - tx: &dyn TxGuard<::File>, - ) -> Result<()> { - assert!(tx.is_commited()); - self.swap_current_inner(shared) - } -} - -#[tracing::instrument(skip_all, fields(namespace = namespace.as_str(), start_frame_no = seg.start_frame_no()))] -fn maybe_store_segment( - storage: &S, - notifier: &tokio::sync::mpsc::Sender, - namespace: &NamespaceName, - durable_frame_no: &Arc>, - seg: S::Segment, -) { - if seg.last_committed() > *durable_frame_no.lock() { - let cb: OnStoreCallback = Box::new({ - let notifier = notifier.clone(); - let durable_frame_no = durable_frame_no.clone(); - let namespace = namespace.clone(); - move |fno| { - Box::pin(async move { - update_durable(fno, notifier, durable_frame_no, namespace).await; - }) - } - }); - storage.store(namespace, seg, None, cb); - } else { - // segment can be checkpointed right away. - // FIXME: this is only necessary because some tests call this method in an async context. - #[cfg(debug_assertions)] - { - let namespace = namespace.clone(); - let notifier = notifier.clone(); - tokio::spawn(async move { - let _ = notifier.send(CheckpointMessage::Namespace(namespace)).await; - }); - } - - #[cfg(not(debug_assertions))] - { - let _ = notifier.blocking_send(CheckpointMessage::Namespace(namespace.clone())); - } - - tracing::debug!( - segment_end = seg.last_committed(), - durable_frame_no = *durable_frame_no.lock(), - "segment doesn't contain any new data" - ); - } -} - -async fn update_durable( - new_durable: u64, - notifier: mpsc::Sender, - durable_frame_no_slot: Arc>, - namespace: NamespaceName, -) { - { - let mut g = durable_frame_no_slot.lock(); - if *g < new_durable { - *g = new_durable; - } - } - let _ = notifier.send(CheckpointMessage::Namespace(namespace)).await; -} - impl WalRegistry where IO: Io, @@ -181,7 +96,7 @@ where self: Arc, db_path: &Path, namespace: &NamespaceName, - ) -> Result>> { + ) -> Result>> { if self.shutdown.load(Ordering::SeqCst) { return Err(crate::error::Error::ShuttingDown); } @@ -219,10 +134,25 @@ where Ok((notifier, async_notifier)) => { // if try_open succedded, then the slot was updated and contains the shared wal, if it // failed we need to remove the slot. Either way, notify all waiters - let ret = self.clone().try_open(&namespace, db_path); - if ret.is_err() { - self.opened.remove(namespace); - } + let ret = match SharedWal::try_open( + self.io.clone(), + self.storage.clone(), + &self.checkpoint_notifier, + namespace, + db_path, + ) { + Ok(shared) => { + let shared = Arc::new(shared); + self.opened + .insert(namespace.clone(), Slot::Wal(shared.clone())); + Ok(shared) + } + Err(e) => { + tracing::error!("error opening wal: {e}"); + self.opened.remove(namespace); + Err(e) + } + }; *notifier.1.lock() = true; notifier.0.notify_all(); @@ -240,182 +170,7 @@ where } } - fn try_open( - self: Arc, - namespace: &NamespaceName, - db_path: &Path, - ) -> Result>> { - let db_file = self.io.open(false, true, true, db_path)?; - let db_file_len = db_file.len()?; - let header = if db_file_len > 0 { - let mut header: Sqlite3DbHeader = Sqlite3DbHeader::new_zeroed(); - db_file.read_exact_at(header.as_bytes_mut(), 0)?; - Some(header) - } else { - None - }; - - let footer = self.try_read_footer(&db_file)?; - - let mut checkpointed_frame_no = footer.map(|f| f.replication_index.get()).unwrap_or(0); - - // the trick here to prevent sqlite to open our db is to create a dir -wal. Sqlite - // will think that this is a wal file, but it's in fact a directory and it will not like - // it. - let mut wals_path = db_path.to_owned(); - wals_path.set_file_name(format!( - "{}-wal", - db_path.file_name().unwrap().to_str().unwrap() - )); - self.io.create_dir_all(&wals_path)?; - // TODO: handle that with abstract io - let dir = walkdir::WalkDir::new(&wals_path) - .sort_by_file_name() - .into_iter(); - - // we only checkpoint durable frame_no so this is a good first estimate without an actual - // network call. - let durable_frame_no = Arc::new(Mutex::new(checkpointed_frame_no)); - - let list = SegmentList::default(); - for entry in dir { - let entry = entry.map_err(|e| e.into_io_error().unwrap())?; - if entry - .path() - .extension() - .map(|e| e.to_str().unwrap() != "seg") - .unwrap_or(true) - { - continue; - } - - let file = self.io.open(false, true, true, entry.path())?; - - if let Some(sealed) = SealedSegment::open( - file.into(), - entry.path().to_path_buf(), - Default::default(), - self.io.now(), - )? { - list.push(sealed.clone()); - maybe_store_segment( - self.storage.as_ref(), - &self.checkpoint_notifier, - &namespace, - &durable_frame_no, - sealed, - ); - } - } - - let log_id = match footer { - Some(footer) if list.is_empty() => footer.log_id(), - None if list.is_empty() => self.io.uuid(), - Some(footer) => { - let log_id = list - .with_head(|h| h.header().log_id.get()) - .expect("non-empty list should have a head"); - let log_id = Uuid::from_u128(log_id); - assert_eq!(log_id, footer.log_id()); - log_id - } - None => { - let log_id = list - .with_head(|h| h.header().log_id.get()) - .expect("non-empty list should have a head"); - Uuid::from_u128(log_id) - } - }; - - // if there is a tail, then the latest checkpointed frame_no is one before the the - // start frame_no of the tail. We must read it from the tail, because a partial - // checkpoint may have occured before a crash. - if let Some(last) = list.last() { - checkpointed_frame_no = (last.start_frame_no() - 1).max(1) - } - - let (db_size, next_frame_no) = list - .with_head(|segment| { - let header = segment.header(); - (header.size_after(), header.next_frame_no()) - }) - .unwrap_or_else(|| match header { - Some(header) => ( - header.db_size.get(), - NonZeroU64::new(checkpointed_frame_no + 1) - .unwrap_or(NonZeroU64::new(1).unwrap()), - ), - None => (0, NonZeroU64::new(1).unwrap()), - }); - - let current_segment_path = wals_path.join(format!("{next_frame_no:020}.seg")); - - let segment_file = self.io.open(true, true, true, ¤t_segment_path)?; - let salt = self.io.with_rng(|rng| rng.gen()); - - let current = arc_swap::ArcSwap::new(Arc::new(CurrentSegment::create( - segment_file, - current_segment_path, - next_frame_no, - db_size, - list.into(), - salt, - log_id, - )?)); - - let (new_frame_notifier, _) = tokio::sync::watch::channel(next_frame_no.get() - 1); - - // FIXME: make swap strategy configurable - // This strategy will perform a swap if either the wal is bigger than 20k frames, or older - // than 10 minutes, or if the frame count is greater than a 1000 and the wal was last - // swapped more than 30 secs ago - let swap_strategy = Box::new( - DurationSwapStrategy::new(Duration::from_secs(5 * 60)) - .or(FrameCountSwapStrategy::new(20_000)) - .or(FrameCountSwapStrategy::new(1000) - .and(DurationSwapStrategy::new(Duration::from_secs(30)))), - ); - - let shared = Arc::new(SharedWal { - current, - wal_lock: Default::default(), - db_file, - registry: self.clone(), - namespace: namespace.clone(), - checkpointed_frame_no: checkpointed_frame_no.into(), - new_frame_notifier, - durable_frame_no, - stored_segments: Box::new(StorageReplicator::new( - self.storage.clone(), - namespace.clone(), - )), - shutdown: false.into(), - checkpoint_notifier: self.checkpoint_notifier.clone(), - io: self.io.clone(), - swap_strategy, - wals_path: wals_path.to_owned(), - }); - - self.opened - .insert(namespace.clone(), Slot::Wal(shared.clone())); - - return Ok(shared); - } - - fn try_read_footer(&self, db_file: &impl FileExt) -> Result> { - let len = db_file.len()?; - if len as usize % LIBSQL_PAGE_SIZE as usize == size_of::() { - let mut footer: LibsqlFooter = LibsqlFooter::new_zeroed(); - let footer_offset = (len / LIBSQL_PAGE_SIZE as u64) * LIBSQL_PAGE_SIZE as u64; - db_file.read_exact_at(footer.as_bytes_mut(), footer_offset)?; - footer.validate()?; - Ok(Some(footer)) - } else { - Ok(None) - } - } - - pub async fn tombstone(&self, namespace: &NamespaceName) -> Option>> { + pub async fn tombstone(&self, namespace: &NamespaceName) -> Option>> { // if a wal is currently being openned, let it { let v = self.opened.get(namespace)?; @@ -545,55 +300,16 @@ where Ok(()) } - #[tracing::instrument(skip_all)] - fn swap_current_inner(&self, shared: &SharedWal) -> Result<()> { - let current = shared.current.load(); - if current.is_empty() { - return Ok(()); - } - let start_frame_no = current.next_frame_no(); - let path = shared.wals_path.join(format!("{start_frame_no:020}.seg")); - - let segment_file = self.io.open(true, true, true, &path)?; - let salt = self.io.with_rng(|rng| rng.gen()); - let new = CurrentSegment::create( - segment_file, - path, - start_frame_no, - current.db_size(), - current.tail().clone(), - salt, - current.log_id(), - )?; - // sealing must the last fallible operation, because we don't want to end up in a situation - // where the current log is sealed and it wasn't swapped. - if let Some(sealed) = current.seal(self.io.now())? { - new.tail().push(sealed.clone()); - maybe_store_segment( - self.storage.as_ref(), - &self.checkpoint_notifier, - &shared.namespace, - &shared.durable_frame_no, - sealed, - ); - } - - shared.current.swap(Arc::new(new)); - tracing::debug!("current segment swapped"); - - Ok(()) - } - pub fn storage(&self) -> Arc { self.storage.clone() } } -#[tracing::instrument(skip_all, fields(namespace = shared.namespace().as_str()))] -async fn sync_one(shared: Arc>, storage: Arc) -> Result<()> +#[tracing::instrument(skip_all, fields(namespace = shared.namespace.as_str()))] +async fn sync_one(shared: Arc>, storage: Arc) -> Result<()> where IO: Io, - S: Storage, + S: Storage>, { let remote_durable_frame_no = storage .durable_frame_no(shared.namespace(), None) diff --git a/libsql-wal/src/replication/injector.rs b/libsql-wal/src/replication/injector.rs index 141fdcfa40..7a7c73b20a 100644 --- a/libsql-wal/src/replication/injector.rs +++ b/libsql-wal/src/replication/injector.rs @@ -4,14 +4,16 @@ use std::sync::Arc; use crate::error::Result; use crate::io::Io; +use crate::segment::sealed::SealedSegment; use crate::segment::Frame; use crate::shared_wal::SharedWal; +use crate::storage::Storage; use crate::transaction::{Transaction, TxGuardOwned}; /// The injector takes frames and injects them in the wal. -pub struct Injector { +pub struct Injector { // The wal to which we are injecting - wal: Arc>, + wal: Arc>, buffer: Vec>, /// capacity of the frame buffer capacity: usize, @@ -20,8 +22,12 @@ pub struct Injector { previous_durable_frame_no: u64, } -impl Injector { - pub fn new(wal: Arc>, buffer_capacity: usize) -> Result { +impl Injector +where + IO: Io, + S: Storage>, +{ + pub fn new(wal: Arc>, buffer_capacity: usize) -> Result { Ok(Self { wal, buffer: Vec::with_capacity(buffer_capacity), diff --git a/libsql-wal/src/replication/replicator.rs b/libsql-wal/src/replication/replicator.rs index 89d33211d2..8124dcb65c 100644 --- a/libsql-wal/src/replication/replicator.rs +++ b/libsql-wal/src/replication/replicator.rs @@ -9,18 +9,19 @@ use crate::io::Io; use crate::replication::Error; use crate::segment::Frame; use crate::shared_wal::SharedWal; +use crate::storage::Storage; use super::Result; -pub struct Replicator { - shared: Arc>, +pub struct Replicator { + shared: Arc>, new_frame_notifier: watch::Receiver, next_frame_no: u64, wait_for_more: bool, } -impl Replicator { - pub fn new(shared: Arc>, next_frame_no: u64, wait_for_more: bool) -> Self { +impl Replicator { + pub fn new(shared: Arc>, next_frame_no: u64, wait_for_more: bool) -> Self { let new_frame_notifier = shared.new_frame_notifier.subscribe(); Self { shared, diff --git a/libsql-wal/src/shared_wal.rs b/libsql-wal/src/shared_wal.rs index d1d3cd9e75..0ca4381d24 100644 --- a/libsql-wal/src/shared_wal.rs +++ b/libsql-wal/src/shared_wal.rs @@ -1,29 +1,38 @@ -use std::path::PathBuf; +use std::num::NonZeroU64; +use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use arc_swap::ArcSwap; use crossbeam::deque::Injector; use crossbeam::sync::Unparker; use futures::Stream; +use libsql_sys::ffi::Sqlite3DbHeader; use parking_lot::{Mutex, MutexGuard}; +use rand::Rng as _; use roaring::RoaringBitmap; use tokio::sync::{mpsc, watch}; use uuid::Uuid; -use zerocopy::FromZeroes; +use zerocopy::{AsBytes as _, FromZeroes as _}; use crate::checkpointer::CheckpointMessage; use crate::error::{Error, Result}; use crate::io::buf::ZeroCopyBoxIoBuf; use crate::io::file::FileExt; use crate::io::Io; -use crate::replication::storage::ReplicateFromStorage; +use crate::replication::storage::{ReplicateFromStorage, StorageReplicator}; use crate::segment::current::CurrentSegment; +use crate::segment::list::SegmentList; +use crate::segment::sealed::SealedSegment; +use crate::segment::Segment as _; use crate::segment::{Frame, FrameHeader}; +use crate::segment_swap_strategy::duration::DurationSwapStrategy; +use crate::segment_swap_strategy::frame_count::FrameCountSwapStrategy; use crate::segment_swap_strategy::SegmentSwapStrategy; +use crate::storage::{OnStoreCallback, Storage}; use crate::transaction::{ReadTransaction, Savepoint, Transaction, TxGuard, WriteTransaction}; -use crate::LIBSQL_PAGE_SIZE; +use crate::{LibsqlFooter, LIBSQL_PAGE_SIZE}; use libsql_sys::name::NamespaceName; #[derive(Default)] @@ -41,16 +50,11 @@ pub struct WalLock { pub(crate) waiters: Injector<(Unparker, u64)>, } -pub(crate) trait SwapLog: Sync + Send + 'static { - fn swap_current(&self, shared: &SharedWal, tx: &dyn TxGuard) -> Result<()>; -} - -pub struct SharedWal { +pub struct SharedWal { pub(crate) current: ArcSwap>, pub(crate) wal_lock: Arc, pub(crate) db_file: IO::File, pub(crate) namespace: NamespaceName, - pub(crate) registry: Arc>, pub(crate) checkpointed_frame_no: AtomicU64, /// max frame_no acknowledged by the durable storage pub(crate) durable_frame_no: Arc>, @@ -61,11 +65,18 @@ pub struct SharedWal { pub(crate) io: Arc, pub(crate) swap_strategy: Box, pub(crate) wals_path: PathBuf, + pub(crate) storage: Arc, } -impl SharedWal { +impl SharedWal +where + IO: Io, +{ #[tracing::instrument(skip(self), fields(namespace = self.namespace.as_str()))] - pub fn shutdown(&self) -> Result<()> { + pub fn shutdown(&self) -> Result<()> + where + S: Storage>, + { tracing::info!("started namespace shutdown"); self.shutdown.store(true, Ordering::SeqCst); // fixme: for infinite loop @@ -81,7 +92,7 @@ impl SharedWal { { let mut tx = tx.as_write_mut().unwrap().lock(); tx.commit(); - self.registry.swap_current(self, &tx)?; + self.swap_current(&tx)?; } // The current segment will not be used anymore. It's empty, but we still seal it so that // the next startup doesn't find an unsealed segment. @@ -274,7 +285,10 @@ impl SharedWal { tx: &mut WriteTransaction, pages: impl Iterator, size_after: Option, - ) -> Result<()> { + ) -> Result<()> + where + S: Storage>, + { let current = self.current.load(); let mut tx = tx.lock(); if let Some(last_committed) = current.insert_pages(pages, size_after, &mut tx)? { @@ -290,7 +304,10 @@ impl SharedWal { } /// Cut the current log, and register it for storage - pub fn seal_current(&self) -> Result<()> { + pub fn seal_current(&self) -> Result<()> + where + S: Storage>, + { let mut tx = self.begin_read(u64::MAX).into(); self.upgrade(&mut tx)?; @@ -309,8 +326,45 @@ impl SharedWal { } /// Swap the current log. A write lock must be held, but the transaction must be must be committed already. - pub(crate) fn swap_current(&self, tx: &impl TxGuard) -> Result<()> { - self.registry.swap_current(self, tx)?; + pub(crate) fn swap_current(&self, tx: &impl TxGuard) -> Result<()> + where + S: Storage>, + { + assert!(tx.is_commited()); + let current = self.current.load(); + if current.is_empty() { + return Ok(()); + } + let start_frame_no = current.next_frame_no(); + let path = self.wals_path.join(format!("{start_frame_no:020}.seg")); + + let segment_file = self.io.open(true, true, true, &path)?; + let salt = self.io.with_rng(|rng| rng.gen()); + let new = CurrentSegment::create( + segment_file, + path, + start_frame_no, + current.db_size(), + current.tail().clone(), + salt, + current.log_id(), + )?; + // sealing must the last fallible operation, because we don't want to end up in a situation + // where the current log is sealed and it wasn't swapped. + if let Some(sealed) = current.seal(self.io.now())? { + new.tail().push(sealed.clone()); + maybe_store_segment( + self.storage.as_ref(), + &self.checkpoint_notifier, + &self.namespace, + &self.durable_frame_no, + sealed, + ); + } + + self.current.swap(Arc::new(new)); + tracing::debug!("current segment swapped"); + Ok(()) } @@ -346,7 +400,10 @@ impl SharedWal { seen: &'a RoaringBitmap, tx: &'a ReadTransaction, until: u64, - ) -> impl Stream>> + Send + 'a { + ) -> impl Stream>> + Send + 'a + where + S: Send + Sync, + { async_stream::try_stream! { let mut all = RoaringBitmap::new(); all.insert_range(1..=tx.db_size); @@ -367,6 +424,240 @@ impl SharedWal { } } } + + /// Open the shared wal at path. The caller must ensure that no other process is calling this + /// conccurently. + pub(crate) fn try_open( + io: Arc, + storage: Arc, + checkpoint_notifier: &tokio::sync::mpsc::Sender, + namespace: &NamespaceName, + db_path: &Path, + ) -> Result + where + S: Storage>, + { + let db_file = io.open(false, true, true, db_path)?; + let db_file_len = db_file.len()?; + let header = if db_file_len > 0 { + let mut header: Sqlite3DbHeader = Sqlite3DbHeader::new_zeroed(); + db_file.read_exact_at(header.as_bytes_mut(), 0)?; + Some(header) + } else { + None + }; + + let footer = try_read_footer(&db_file)?; + + let mut checkpointed_frame_no = footer.map(|f| f.replication_index.get()).unwrap_or(0); + + // the trick here to prevent sqlite to open our db is to create a dir -wal. Sqlite + // will think that this is a wal file, but it's in fact a directory and it will not like + // it. + let mut wals_path = db_path.to_owned(); + wals_path.set_file_name(format!( + "{}-wal", + db_path.file_name().unwrap().to_str().unwrap() + )); + io.create_dir_all(&wals_path)?; + // TODO: handle that with abstract io + let dir = walkdir::WalkDir::new(&wals_path) + .sort_by_file_name() + .into_iter(); + + // we only checkpoint durable frame_no so this is a good first estimate without an actual + // network call. + let durable_frame_no = Arc::new(Mutex::new(checkpointed_frame_no)); + + let list = SegmentList::default(); + for entry in dir { + let entry = entry.map_err(|e| e.into_io_error().unwrap())?; + if entry + .path() + .extension() + .map(|e| e.to_str().unwrap() != "seg") + .unwrap_or(true) + { + continue; + } + + let file = io.open(false, true, true, entry.path())?; + + if let Some(sealed) = SealedSegment::open( + file.into(), + entry.path().to_path_buf(), + Default::default(), + io.now(), + )? { + list.push(sealed.clone()); + maybe_store_segment( + storage.as_ref(), + &checkpoint_notifier, + &namespace, + &durable_frame_no, + sealed, + ); + } + } + + let log_id = match footer { + Some(footer) if list.is_empty() => footer.log_id(), + None if list.is_empty() => io.uuid(), + Some(footer) => { + let log_id = list + .with_head(|h| h.header().log_id.get()) + .expect("non-empty list should have a head"); + let log_id = Uuid::from_u128(log_id); + assert_eq!(log_id, footer.log_id()); + log_id + } + None => { + let log_id = list + .with_head(|h| h.header().log_id.get()) + .expect("non-empty list should have a head"); + Uuid::from_u128(log_id) + } + }; + + // if there is a tail, then the latest checkpointed frame_no is one before the the + // start frame_no of the tail. We must read it from the tail, because a partial + // checkpoint may have occured before a crash. + if let Some(last) = list.last() { + checkpointed_frame_no = (last.start_frame_no() - 1).max(1) + } + + let (db_size, next_frame_no) = list + .with_head(|segment| { + let header = segment.header(); + (header.size_after(), header.next_frame_no()) + }) + .unwrap_or_else(|| match header { + Some(header) => ( + header.db_size.get(), + NonZeroU64::new(checkpointed_frame_no + 1) + .unwrap_or(NonZeroU64::new(1).unwrap()), + ), + None => (0, NonZeroU64::new(1).unwrap()), + }); + + let current_segment_path = wals_path.join(format!("{next_frame_no:020}.seg")); + + let segment_file = io.open(true, true, true, ¤t_segment_path)?; + let salt = io.with_rng(|rng| rng.gen()); + + let current = arc_swap::ArcSwap::new(Arc::new(CurrentSegment::create( + segment_file, + current_segment_path, + next_frame_no, + db_size, + list.into(), + salt, + log_id, + )?)); + + let (new_frame_notifier, _) = tokio::sync::watch::channel(next_frame_no.get() - 1); + + // FIXME: make swap strategy configurable + // This strategy will perform a swap if either the wal is bigger than 20k frames, or older + // than 10 minutes, or if the frame count is greater than a 1000 and the wal was last + // swapped more than 30 secs ago + let swap_strategy = Box::new( + DurationSwapStrategy::new(Duration::from_secs(5 * 60)) + .or(FrameCountSwapStrategy::new(20_000)) + .or(FrameCountSwapStrategy::new(1000) + .and(DurationSwapStrategy::new(Duration::from_secs(30)))), + ); + + Ok(Self { + current, + wal_lock: Default::default(), + db_file, + namespace: namespace.clone(), + checkpointed_frame_no: checkpointed_frame_no.into(), + new_frame_notifier, + durable_frame_no, + stored_segments: Box::new(StorageReplicator::new(storage.clone(), namespace.clone())), + shutdown: false.into(), + checkpoint_notifier: checkpoint_notifier.clone(), + io, + storage, + swap_strategy, + wals_path: wals_path.to_owned(), + }) + } +} + +fn try_read_footer(db_file: &impl FileExt) -> Result> { + let len = db_file.len()?; + if len as usize % LIBSQL_PAGE_SIZE as usize == size_of::() { + let mut footer: LibsqlFooter = LibsqlFooter::new_zeroed(); + let footer_offset = (len / LIBSQL_PAGE_SIZE as u64) * LIBSQL_PAGE_SIZE as u64; + db_file.read_exact_at(footer.as_bytes_mut(), footer_offset)?; + footer.validate()?; + Ok(Some(footer)) + } else { + Ok(None) + } +} + +#[tracing::instrument(skip_all, fields(namespace = namespace.as_str(), start_frame_no = seg.start_frame_no()))] +fn maybe_store_segment( + storage: &S, + notifier: &tokio::sync::mpsc::Sender, + namespace: &NamespaceName, + durable_frame_no: &Arc>, + seg: S::Segment, +) { + if seg.last_committed() > *durable_frame_no.lock() { + let cb: OnStoreCallback = Box::new({ + let notifier = notifier.clone(); + let durable_frame_no = durable_frame_no.clone(); + let namespace = namespace.clone(); + move |fno| { + Box::pin(async move { + update_durable(fno, notifier, durable_frame_no, namespace).await; + }) + } + }); + storage.store(namespace, seg, None, cb); + } else { + // segment can be checkpointed right away. + // FIXME: this is only necessary because some tests call this method in an async context. + #[cfg(debug_assertions)] + { + let namespace = namespace.clone(); + let notifier = notifier.clone(); + tokio::spawn(async move { + let _ = notifier.send(CheckpointMessage::Namespace(namespace)).await; + }); + } + + #[cfg(not(debug_assertions))] + { + let _ = notifier.blocking_send(CheckpointMessage::Namespace(namespace.clone())); + } + + tracing::debug!( + segment_end = seg.last_committed(), + durable_frame_no = *durable_frame_no.lock(), + "segment doesn't contain any new data" + ); + } +} + +async fn update_durable( + new_durable: u64, + notifier: mpsc::Sender, + durable_frame_no_slot: Arc>, + namespace: NamespaceName, +) { + { + let mut g = durable_frame_no_slot.lock(); + if *g < new_durable { + *g = new_durable; + } + } + let _ = notifier.send(CheckpointMessage::Namespace(namespace)).await; } #[cfg(test)] diff --git a/libsql-wal/src/transaction.rs b/libsql-wal/src/transaction.rs index 146186c394..a45b666a71 100644 --- a/libsql-wal/src/transaction.rs +++ b/libsql-wal/src/transaction.rs @@ -1,6 +1,5 @@ use std::collections::BTreeMap; use std::ops::{Deref, DerefMut}; -use std::sync::atomic::AtomicU64; use std::sync::Arc; use std::time::Instant; diff --git a/libsql-wal/src/wal.rs b/libsql-wal/src/wal.rs index 05cf221a64..8b057dfc06 100644 --- a/libsql-wal/src/wal.rs +++ b/libsql-wal/src/wal.rs @@ -42,15 +42,15 @@ impl LibsqlWalManager { } } -pub struct LibsqlWal { +pub struct LibsqlWal { last_read_frame_no: Option, tx: Option>, - shared: Arc>, + shared: Arc>, conn_id: u64, } impl>> WalManager for LibsqlWalManager { - type Wal = LibsqlWal; + type Wal = LibsqlWal; fn use_shared_memory(&self) -> bool { false @@ -116,7 +116,11 @@ impl>> WalManager for Libsq } } -impl Wal for LibsqlWal { +impl Wal for LibsqlWal +where + IO: Io, + S: Storage>, +{ #[tracing::instrument(skip_all, fields(id = self.conn_id))] fn limit(&mut self, _size: i64) {}