From e55dd89b0a6e718ecde02d134405364e8fbba15d Mon Sep 17 00:00:00 2001 From: ad hoc Date: Fri, 4 Oct 2024 12:34:49 +0200 Subject: [PATCH 1/4] use read txn in replicator --- libsql-wal/src/replication/replicator.rs | 14 +-- libsql-wal/src/replication/storage.rs | 1 + libsql-wal/src/segment/current.rs | 62 ++++++------- libsql-wal/src/segment/list.rs | 105 ++++++++++++++--------- 4 files changed, 106 insertions(+), 76 deletions(-) diff --git a/libsql-wal/src/replication/replicator.rs b/libsql-wal/src/replication/replicator.rs index 8888bb308b..bf0d206ec3 100644 --- a/libsql-wal/src/replication/replicator.rs +++ b/libsql-wal/src/replication/replicator.rs @@ -58,13 +58,13 @@ impl Replicator { tracing::debug!(most_recent_frame_no, "new frame_no available"); let mut commit_frame_no = 0; + let tx = self.shared.begin_read(u64::MAX); // we have stuff to replicate if most_recent_frame_no >= self.next_frame_no { // first replicate the most recent version of each page from the current // segment. We also return how far back we have replicated from the current log - let current = self.shared.current.load(); let mut seen = RoaringBitmap::new(); - let (stream, replicated_until, size_after) = current.frame_stream_from(self.next_frame_no, &mut seen); + let (stream, replicated_until) = tx.current.frame_stream_from(self.next_frame_no, &mut seen, &tx); let should_replicate_from_tail = replicated_until != self.next_frame_no; { @@ -78,7 +78,7 @@ impl Replicator { let mut frame = frame.map_err(|e| Error::CurrentSegment(e.into()))?; commit_frame_no = frame.header().frame_no().max(commit_frame_no); if stream.peek().await.is_none() && !should_replicate_from_tail { - frame.header_mut().set_size_after(size_after); + frame.header_mut().set_size_after(tx.db_size); self.next_frame_no = commit_frame_no + 1; } @@ -90,9 +90,9 @@ impl Replicator { // wee need to take frames from the sealed segments. if should_replicate_from_tail { let replicated_until = { - let (stream, replicated_until) = current + let (stream, replicated_until) = tx.current .tail() - .stream_pages_from(replicated_until, self.next_frame_no, &mut seen).await; + .stream_pages_from(replicated_until, self.next_frame_no, &mut seen, &tx).await; tokio::pin!(stream); tracing::debug!(replicated_until, "replicating from tail"); @@ -105,7 +105,7 @@ impl Replicator { let mut frame = frame.map_err(|e| Error::SealedSegment(e.into()))?; commit_frame_no = frame.header().frame_no().max(commit_frame_no); if stream.peek().await.is_none() && !should_replicate_from_storage { - frame.header_mut().set_size_after(size_after); + frame.header_mut().set_size_after(tx.db_size); self.next_frame_no = commit_frame_no + 1; } @@ -132,7 +132,7 @@ impl Replicator { let mut frame = frame?; commit_frame_no = frame.header().frame_no().max(commit_frame_no); if stream.peek().await.is_none() { - frame.header_mut().set_size_after(size_after); + frame.header_mut().set_size_after(tx.db_size); self.next_frame_no = commit_frame_no + 1; } diff --git a/libsql-wal/src/replication/storage.rs b/libsql-wal/src/replication/storage.rs index d239bad300..aa71531e36 100644 --- a/libsql-wal/src/replication/storage.rs +++ b/libsql-wal/src/replication/storage.rs @@ -61,6 +61,7 @@ where let segment = match maybe_seg { Some(ref seg) => seg, None => { + tracing::debug!(key = %key, "fetching segment"); maybe_seg = Some(storage.fetch_segment_data(&namespace, &key, None).await?); maybe_seg.as_ref().unwrap() }, diff --git a/libsql-wal/src/segment/current.rs b/libsql-wal/src/segment/current.rs index 15c7ef12ef..7e9dddf3c9 100644 --- a/libsql-wal/src/segment/current.rs +++ b/libsql-wal/src/segment/current.rs @@ -24,7 +24,7 @@ use crate::io::file::FileExt; use crate::io::Inspect; use crate::segment::{checked_frame_offset, SegmentFlags}; use crate::segment::{frame_offset, page_offset, sealed::SealedSegment}; -use crate::transaction::{Transaction, TxGuardOwned, TxGuardShared}; +use crate::transaction::{ReadTransaction, Transaction, TxGuardOwned, TxGuardShared}; use crate::{LIBSQL_MAGIC, LIBSQL_PAGE_SIZE, LIBSQL_WAL_VERSION}; use super::list::SegmentList; @@ -507,22 +507,23 @@ impl CurrentSegment { &'a self, start_frame_no: u64, seen: &'a mut RoaringBitmap, - ) -> (impl Stream>> + 'a, u64, u32) + // not actually used, but ensures that a read lock is held while this method id called + tx: &'a ReadTransaction, + ) -> (impl Stream>> + 'a, u64) where F: FileExt, { - let (seg_start_frame_no, last_committed, db_size) = - self.with_header(|h| (h.start_frame_no.get(), h.last_committed(), h.size_after())); + let seg_start_frame_no = tx.current.with_header(|h| h.start_frame_no.get()); let replicated_until = seg_start_frame_no // if current is empty, start_frame_no doesn't exist - .min(last_committed) + .min(tx.max_frame_no) .max(start_frame_no); // TODO: optim, we could read less frames if we had a mapping from frame_no to page_no in // the index let stream = async_stream::try_stream! { if !self.is_empty() { - let mut frame_offset = (last_committed - seg_start_frame_no) as u32; + let mut frame_offset = (tx.max_frame_no - seg_start_frame_no) as u32; loop { let buf = ZeroCopyBoxIoBuf::new(Frame::new_box_zeroed()); let (buf, res) = self.read_frame_offset_async(frame_offset, buf).await; @@ -551,7 +552,7 @@ impl CurrentSegment { } }; - (stream, replicated_until, db_size) + (stream, replicated_until) } fn recompute_checksum(&self, start_offset: u32, until_offset: u32) -> Result @@ -714,18 +715,20 @@ mod test { .unwrap(); } - let mut seen = RoaringBitmap::new(); - let current = shared.current.load(); - let (stream, replicated_until, size_after) = current.frame_stream_from(1, &mut seen); - tokio::pin!(stream); - assert_eq!(replicated_until, 1); - assert_eq!(size_after, 6); - let mut tmp = tempfile().unwrap(); - while let Some(frame) = stream.next().await { - let frame = frame.unwrap(); - let offset = (frame.header().page_no() - 1) * 4096; - tmp.write_all_at(frame.data(), offset as _).unwrap(); + { + let tx = shared.begin_read(u64::MAX); + let mut seen = RoaringBitmap::new(); + let (stream, replicated_until) = tx.current.frame_stream_from(1, &mut seen, &tx); + tokio::pin!(stream); + assert_eq!(replicated_until, 1); + assert_eq!(tx.db_size, 6); + + while let Some(frame) = stream.next().await { + let frame = frame.unwrap(); + let offset = (frame.header().page_no() - 1) * 4096; + tmp.write_all_at(frame.data(), offset as _).unwrap(); + } } seal_current_segment(&shared); @@ -743,6 +746,7 @@ mod test { let mut copy = Vec::new(); tmp.read_to_end(&mut copy).unwrap(); + dbg!(copy.len(), orig.len()); assert_eq!(db_payload(©), db_payload(&orig)); } @@ -768,11 +772,11 @@ mod test { let mut seen = RoaringBitmap::new(); { - let current = shared.current.load(); - let (stream, replicated_until, size_after) = current.frame_stream_from(1, &mut seen); + let tx = shared.begin_read(u64::MAX); + let (stream, replicated_until) = tx.current.frame_stream_from(1, &mut seen, &tx); tokio::pin!(stream); assert_eq!(replicated_until, 60); - assert_eq!(size_after, 9); + assert_eq!(tx.db_size, 9); assert_eq!(stream.fold(0, |count, _| count + 1).await, 6); } assert_debug_snapshot!(seen); @@ -787,12 +791,12 @@ mod test { conn.execute("create table test (x)", ()).unwrap(); let mut seen = RoaringBitmap::new(); - let current = shared.current.load(); - let (stream, replicated_until, size_after) = current.frame_stream_from(100, &mut seen); + let tx = shared.begin_read(u64::MAX); + let (stream, replicated_until) = tx.current.frame_stream_from(100, &mut seen, &tx); tokio::pin!(stream); assert_eq!(replicated_until, 100); assert_eq!(stream.fold(0, |count, _| count + 1).await, 0); - assert_eq!(size_after, 2); + assert_eq!(tx.db_size, 2); } #[tokio::test] @@ -805,11 +809,11 @@ mod test { seal_current_segment(&shared); let mut seen = RoaringBitmap::new(); - let current = shared.current.load(); - let (stream, replicated_until, size_after) = current.frame_stream_from(1, &mut seen); + let tx = shared.begin_read(u64::MAX); + let (stream, replicated_until) = tx.current.frame_stream_from(1, &mut seen, &tx); tokio::pin!(stream); assert_eq!(replicated_until, 2); - assert_eq!(size_after, 2); + assert_eq!(tx.db_size, 2); assert_eq!(stream.fold(0, |count, _| count + 1).await, 0); } @@ -1014,8 +1018,8 @@ mod test { } } - fn db_payload(db: &[u8]) -> &[u8] { + fn db_payload(db: &[u8]) -> u32 { let size = (db.len() / 4096) * 4096; - &db[..size] + crc32fast::hash(&db[..size]) } } diff --git a/libsql-wal/src/segment/list.rs b/libsql-wal/src/segment/list.rs index b50622d3ef..11b52c23b4 100644 --- a/libsql-wal/src/segment/list.rs +++ b/libsql-wal/src/segment/list.rs @@ -15,6 +15,7 @@ use crate::error::Result; use crate::io::buf::{ZeroCopyBoxIoBuf, ZeroCopyBuf}; use crate::io::{FileExt, Io}; use crate::segment::Frame; +use crate::transaction::ReadTransaction; use crate::{LibsqlFooter, LIBSQL_MAGIC, LIBSQL_PAGE_SIZE, LIBSQL_WAL_VERSION}; use super::Segment; @@ -209,11 +210,13 @@ where /// returns a stream of pages from the sealed segment list, and what's the lowest replication index /// that was covered. If the returned index is less than start frame_no, the missing frames /// must be read somewhere else. - pub async fn stream_pages_from<'a>( + pub async fn stream_pages_from<'a, F>( &self, current_fno: u64, until_fno: u64, seen: &'a mut RoaringBitmap, + // not actually used, but ensures that a read lock is held while this method id called + _tx: &ReadTransaction, ) -> ( impl Stream>> + 'a, u64, @@ -434,10 +437,13 @@ mod test { seal_current_segment(&shared); - let current = shared.current.load(); - let segment_list = current.tail(); + let tx = shared.begin_read(u64::MAX); let mut seen = RoaringBitmap::new(); - let (stream, _) = segment_list.stream_pages_from(0, 0, &mut seen).await; + let (stream, _) = tx + .current + .tail() + .stream_pages_from(0, 0, &mut seen, &tx) + .await; tokio::pin!(stream); let mut file = NamedTempFile::new().unwrap(); @@ -485,10 +491,13 @@ mod test { seal_current_segment(&shared); - let current = shared.current.load(); - let segment_list = current.tail(); + let tx = shared.begin_read(u64::MAX); let mut seen = RoaringBitmap::new(); - let (stream, replicated_until) = segment_list.stream_pages_from(0, 10, &mut seen).await; + let (stream, replicated_until) = tx + .current + .tail() + .stream_pages_from(0, 10, &mut seen, &tx) + .await; tokio::pin!(stream); assert_eq!(replicated_until, 10); @@ -513,10 +522,13 @@ mod test { seal_current_segment(&shared); - let current = shared.current.load(); - let segment_list = current.tail(); + let tx = shared.begin_read(u64::MAX); let mut seen = RoaringBitmap::from_sorted_iter([1]).unwrap(); - let (stream, replicated_until) = segment_list.stream_pages_from(0, 1, &mut seen).await; + let (stream, replicated_until) = tx + .current + .tail() + .stream_pages_from(0, 1, &mut seen, &tx) + .await; tokio::pin!(stream); assert_eq!(replicated_until, 1); @@ -541,22 +553,27 @@ mod test { seal_current_segment(&shared); - let current = shared.current.load(); - let segment_list = current.tail(); - let mut seen = RoaringBitmap::new(); - let (stream, replicated_until) = segment_list.stream_pages_from(0, 1, &mut seen).await; - tokio::pin!(stream); + let mut tmp = tempfile().unwrap(); + let mut last_offset = 0; - assert_eq!(replicated_until, 1); + { + let tx = shared.begin_read(u64::MAX); + let mut seen = RoaringBitmap::new(); + let (stream, replicated_until) = tx + .current + .tail() + .stream_pages_from(0, 1, &mut seen, &tx) + .await; + tokio::pin!(stream); - let mut tmp = tempfile().unwrap(); + assert_eq!(replicated_until, 1); - let mut last_offset = 0; - while let Some(frame) = stream.next().await { - let frame = frame.unwrap(); - let offset = (frame.header().page_no() - 1) * 4096; - tmp.write_all_at(frame.data(), offset as u64).unwrap(); - last_offset = last_offset.max(frame.header().frame_no()); + while let Some(frame) = stream.next().await { + let frame = frame.unwrap(); + let offset = (frame.header().page_no() - 1) * 4096; + tmp.write_all_at(frame.data(), offset as u64).unwrap(); + last_offset = last_offset.max(frame.header().frame_no()); + } } for _ in 0..10 { @@ -565,21 +582,26 @@ mod test { seal_current_segment(&shared); - let mut seen = RoaringBitmap::new(); - let (stream, replicated_until) = segment_list - .stream_pages_from(0, last_offset, &mut seen) - .await; - tokio::pin!(stream); + { + let tx = shared.begin_read(u64::MAX); + let mut seen = RoaringBitmap::new(); + let (stream, replicated_until) = tx + .current + .tail() + .stream_pages_from(0, last_offset, &mut seen, &tx) + .await; + tokio::pin!(stream); - assert_eq!(replicated_until, last_offset); + assert_eq!(replicated_until, last_offset); - while let Some(frame) = stream.next().await { - let frame = frame.unwrap(); - let offset = (frame.header().page_no() - 1) * 4096; - tmp.write_all_at(frame.data(), offset as u64).unwrap(); - } + while let Some(frame) = stream.next().await { + let frame = frame.unwrap(); + let offset = (frame.header().page_no() - 1) * 4096; + tmp.write_all_at(frame.data(), offset as u64).unwrap(); + } - *shared.durable_frame_no.lock() = 999999; + *shared.durable_frame_no.lock() = 999999; + } shared.checkpoint().await.unwrap(); tmp.seek(std::io::SeekFrom::Start(0)).unwrap(); @@ -618,10 +640,13 @@ mod test { } seal_current_segment(&shared); - let current = shared.current.load(); - let segment_list = current.tail(); + let tx = shared.begin_read(u64::MAX); let mut seen = RoaringBitmap::new(); - let (stream, replicated_from) = segment_list.stream_pages_from(0, 0, &mut seen).await; + let (stream, replicated_from) = tx + .current + .tail() + .stream_pages_from(0, 0, &mut seen, &tx) + .await; tokio::pin!(stream); let mut count = 0; @@ -633,8 +658,8 @@ mod test { assert_eq!(replicated_from, 13); } - fn db_payload(db: &[u8]) -> &[u8] { + fn db_payload(db: &[u8]) -> u32 { let size = (db.len() / 4096) * 4096; - &db[..size] + crc32fast::hash(&db[..size]) } } From aa011b4b0f8cd541cc389e9e5c6e3c3ee16e740f Mon Sep 17 00:00:00 2001 From: ad hoc Date: Fri, 4 Oct 2024 20:39:49 +0200 Subject: [PATCH 2/4] add partial ZerocopyBoxIoBuf --- libsql-wal/src/io/buf.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/libsql-wal/src/io/buf.rs b/libsql-wal/src/io/buf.rs index 0a06be4cef..7b824287e4 100644 --- a/libsql-wal/src/io/buf.rs +++ b/libsql-wal/src/io/buf.rs @@ -154,6 +154,17 @@ impl ZeroCopyBoxIoBuf { Self { init: 0, inner } } + /// same as new_uninit, but partially fills the buffer starting at offset + /// + /// # Safety: The caller must ensure that the remaining bytes are initialized + pub unsafe fn new_uninit_partial(inner: Box, offset: usize) -> Self { + assert!(offset < size_of::()); + Self { + inner, + init: offset, + } + } + fn is_init(&self) -> bool { self.init == size_of::() } From 348c4c97555b4ff1d8c782f6846757d5bf82cb45 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Tue, 8 Oct 2024 16:43:01 +0200 Subject: [PATCH 3/4] fix buf and file impl --- libsql-wal/src/io/file.rs | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/libsql-wal/src/io/file.rs b/libsql-wal/src/io/file.rs index e60f3f25c5..f7f4b94615 100644 --- a/libsql-wal/src/io/file.rs +++ b/libsql-wal/src/io/file.rs @@ -193,16 +193,17 @@ impl FileExt for File { let (buffer, ret) = tokio::task::spawn_blocking(move || { // let mut read = 0; + let len = buf.bytes_total(); + let init = buf.bytes_init(); let chunk = unsafe { - let len = buf.bytes_total(); - let ptr = buf.stable_mut_ptr(); - std::slice::from_raw_parts_mut(ptr, len) + let ptr = buf.stable_mut_ptr().offset(init as _); + std::slice::from_raw_parts_mut(ptr, len - init) }; let ret = file.read_exact_at(chunk, offset); if ret.is_ok() { unsafe { - buf.set_init(buf.bytes_total()); + buf.set_init(init + chunk.len()); } } (buf, ret) @@ -222,16 +223,17 @@ impl FileExt for File { let (buffer, ret) = tokio::task::spawn_blocking(move || { // let mut read = 0; + let len = buf.bytes_total(); + let init = buf.bytes_init(); let chunk = unsafe { - let len = buf.bytes_total(); - let ptr = buf.stable_mut_ptr(); - std::slice::from_raw_parts_mut(ptr, len) + let ptr = buf.stable_mut_ptr().offset(init as _); + std::slice::from_raw_parts_mut(ptr, len - init) }; let ret = file.read_at(chunk, offset); if let Ok(n) = ret { unsafe { - buf.set_init(n); + buf.set_init(init + n); } } (buf, ret) @@ -358,13 +360,13 @@ mod test { file.write_all(&[1; 12345]).unwrap(); file.write_all(&[2; 50]).unwrap(); - let buf = vec![0u8; 12345]; + let buf = Vec::with_capacity(12345); let (buf, ret) = file.read_exact_at_async(buf, 0).await; ret.unwrap(); assert_eq!(buf.len(), 12345); assert!(buf.iter().all(|x| *x == 1)); - let buf = vec![2u8; 50]; + let buf = Vec::with_capacity(50); let (buf, ret) = file.read_exact_at_async(buf, 12345).await; ret.unwrap(); assert_eq!(buf.len(), 50); From 18e7c0c43dc691db8bf0baa8df8fc52d352e65fa Mon Sep 17 00:00:00 2001 From: ad hoc Date: Tue, 8 Oct 2024 16:43:45 +0200 Subject: [PATCH 4/4] replicate from db-file when start_frame_no is 1 --- libsql-wal/src/replication/replicator.rs | 24 ++++++++++++----- libsql-wal/src/segment/current.rs | 3 +-- libsql-wal/src/segment/list.rs | 6 +++-- libsql-wal/src/shared_wal.rs | 34 ++++++++++++++++++++++++ 4 files changed, 56 insertions(+), 11 deletions(-) diff --git a/libsql-wal/src/replication/replicator.rs b/libsql-wal/src/replication/replicator.rs index bf0d206ec3..89d33211d2 100644 --- a/libsql-wal/src/replication/replicator.rs +++ b/libsql-wal/src/replication/replicator.rs @@ -1,8 +1,9 @@ +use std::pin::Pin; use std::sync::Arc; use roaring::RoaringBitmap; use tokio::sync::watch; -use tokio_stream::{Stream, StreamExt}; +use tokio_stream::{Stream, StreamExt as _}; use crate::io::Io; use crate::replication::Error; @@ -118,12 +119,21 @@ impl Replicator { // Replicating from sealed segments was not enough, so we replicate from // durable storage if let Some(replicated_until) = replicated_until { - tracing::debug!("replicating from durable storage"); - let stream = self - .shared - .stored_segments - .stream(&mut seen, replicated_until, self.next_frame_no) - .peekable(); + let stream: Pin + Send>> = if self.next_frame_no == 1 { + // we're replicating from scratch, read straight from the main db + // file + tracing::debug!("replicating main db file"); + Box::pin(self.shared.replicate_from_db_file(&mut seen, &tx, replicated_until)) + } else { + tracing::debug!("replicating from durable storage"); + Box::pin(self + .shared + .stored_segments + .stream(&mut seen, replicated_until, self.next_frame_no) + .peekable()) + }; + + let stream = stream.peekable(); tokio::pin!(stream); diff --git a/libsql-wal/src/segment/current.rs b/libsql-wal/src/segment/current.rs index 7e9dddf3c9..11a1234eb7 100644 --- a/libsql-wal/src/segment/current.rs +++ b/libsql-wal/src/segment/current.rs @@ -525,7 +525,7 @@ impl CurrentSegment { if !self.is_empty() { let mut frame_offset = (tx.max_frame_no - seg_start_frame_no) as u32; loop { - let buf = ZeroCopyBoxIoBuf::new(Frame::new_box_zeroed()); + let buf = ZeroCopyBoxIoBuf::new_uninit(Frame::new_box_zeroed()); let (buf, res) = self.read_frame_offset_async(frame_offset, buf).await; res?; @@ -746,7 +746,6 @@ mod test { let mut copy = Vec::new(); tmp.read_to_end(&mut copy).unwrap(); - dbg!(copy.len(), orig.len()); assert_eq!(db_payload(©), db_payload(&orig)); } diff --git a/libsql-wal/src/segment/list.rs b/libsql-wal/src/segment/list.rs index 11b52c23b4..ebf616e058 100644 --- a/libsql-wal/src/segment/list.rs +++ b/libsql-wal/src/segment/list.rs @@ -138,8 +138,10 @@ where let mut buf = ZeroCopyBuf::::new_uninit(); let mut last_replication_index = 0; + let mut checkpointed = RoaringBitmap::new(); while let Some((k, v)) = union.next() { let page_no = u32::from_be_bytes(k.try_into().unwrap()); + checkpointed.insert(page_no); tracing::trace!(page_no); let v = v.iter().min_by_key(|i| i.index).unwrap(); let offset = v.value as u32; @@ -156,6 +158,7 @@ where .await; ret?; buf = read_buf.into_inner(); + buf.deinit(); } // update the footer at the end of the db file. @@ -174,7 +177,6 @@ where .await; ret?; - // todo: truncate if necessary //// TODO: make async db_file.sync_all()?; @@ -265,7 +267,7 @@ where continue } - let buf = ZeroCopyBoxIoBuf::new(Frame::new_box_zeroed()); + let buf = ZeroCopyBoxIoBuf::new_uninit(Frame::new_box_zeroed()); let (buf, ret) = segment.read_frame_offset_async(*frame_offset as u32, buf).await; ret?; let mut frame = buf.into_inner(); diff --git a/libsql-wal/src/shared_wal.rs b/libsql-wal/src/shared_wal.rs index 4087a254c9..204430bbec 100644 --- a/libsql-wal/src/shared_wal.rs +++ b/libsql-wal/src/shared_wal.rs @@ -6,18 +6,24 @@ use std::time::Instant; use arc_swap::ArcSwap; use crossbeam::deque::Injector; use crossbeam::sync::Unparker; +use futures::Stream; use parking_lot::{Mutex, MutexGuard}; +use roaring::RoaringBitmap; use tokio::sync::{mpsc, watch}; use uuid::Uuid; +use zerocopy::FromZeroes; use crate::checkpointer::CheckpointMessage; use crate::error::{Error, Result}; +use crate::io::buf::ZeroCopyBoxIoBuf; use crate::io::file::FileExt; use crate::io::Io; use crate::replication::storage::ReplicateFromStorage; use crate::segment::current::CurrentSegment; +use crate::segment::{Frame, FrameHeader}; use crate::segment_swap_strategy::SegmentSwapStrategy; use crate::transaction::{ReadTransaction, Savepoint, Transaction, TxGuard, WriteTransaction}; +use crate::LIBSQL_PAGE_SIZE; use libsql_sys::name::NamespaceName; #[derive(Default)] @@ -334,6 +340,34 @@ impl SharedWal { pub fn namespace(&self) -> &NamespaceName { &self.namespace } + + /// read frames from the main db file. + pub(crate) fn replicate_from_db_file<'a>( + &'a self, + seen: &'a RoaringBitmap, + tx: &'a ReadTransaction, + until: u64, + ) -> impl Stream>> + Send + 'a { + async_stream::try_stream! { + let mut all = RoaringBitmap::new(); + all.insert_range(1..=tx.db_size); + let to_take = all - seen; + for page_no in to_take { + let mut frame = Frame::new_box_zeroed(); + *frame.header_mut() = FrameHeader { + page_no: page_no.into(), + size_after: 0.into(), + // we don't really know what the frame_no is, so we set it to a number less that any other frame_no + frame_no: until.into(), + }; + let buf = unsafe { ZeroCopyBoxIoBuf::new_uninit_partial(frame, size_of::()) }; + let (buf, ret) = self.db_file.read_exact_at_async(buf, (page_no as u64 - 1) * LIBSQL_PAGE_SIZE as u64).await; + ret?; + let frame = buf.into_inner(); + yield frame; + } + } + } } #[cfg(test)]