Skip to content

Commit

Permalink
replicate from db-file when start_frame_no is 1
Browse files Browse the repository at this point in the history
  • Loading branch information
MarinPostma committed Oct 8, 2024
1 parent 348c4c9 commit 18e7c0c
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 11 deletions.
24 changes: 17 additions & 7 deletions libsql-wal/src/replication/replicator.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::pin::Pin;
use std::sync::Arc;

use roaring::RoaringBitmap;
use tokio::sync::watch;
use tokio_stream::{Stream, StreamExt};
use tokio_stream::{Stream, StreamExt as _};

use crate::io::Io;
use crate::replication::Error;
Expand Down Expand Up @@ -118,12 +119,21 @@ impl<IO: Io> Replicator<IO> {
// Replicating from sealed segments was not enough, so we replicate from
// durable storage
if let Some(replicated_until) = replicated_until {
tracing::debug!("replicating from durable storage");
let stream = self
.shared
.stored_segments
.stream(&mut seen, replicated_until, self.next_frame_no)
.peekable();
let stream: Pin<Box<dyn Stream<Item = _> + Send>> = if self.next_frame_no == 1 {
// we're replicating from scratch, read straight from the main db
// file
tracing::debug!("replicating main db file");
Box::pin(self.shared.replicate_from_db_file(&mut seen, &tx, replicated_until))
} else {
tracing::debug!("replicating from durable storage");
Box::pin(self
.shared
.stored_segments
.stream(&mut seen, replicated_until, self.next_frame_no)
.peekable())
};

let stream = stream.peekable();

tokio::pin!(stream);

Expand Down
3 changes: 1 addition & 2 deletions libsql-wal/src/segment/current.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ impl<F> CurrentSegment<F> {
if !self.is_empty() {
let mut frame_offset = (tx.max_frame_no - seg_start_frame_no) as u32;
loop {
let buf = ZeroCopyBoxIoBuf::new(Frame::new_box_zeroed());
let buf = ZeroCopyBoxIoBuf::new_uninit(Frame::new_box_zeroed());
let (buf, res) = self.read_frame_offset_async(frame_offset, buf).await;
res?;

Expand Down Expand Up @@ -746,7 +746,6 @@ mod test {
let mut copy = Vec::new();
tmp.read_to_end(&mut copy).unwrap();

dbg!(copy.len(), orig.len());
assert_eq!(db_payload(&copy), db_payload(&orig));
}

Expand Down
6 changes: 4 additions & 2 deletions libsql-wal/src/segment/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,10 @@ where

let mut buf = ZeroCopyBuf::<Frame>::new_uninit();
let mut last_replication_index = 0;
let mut checkpointed = RoaringBitmap::new();
while let Some((k, v)) = union.next() {
let page_no = u32::from_be_bytes(k.try_into().unwrap());
checkpointed.insert(page_no);
tracing::trace!(page_no);
let v = v.iter().min_by_key(|i| i.index).unwrap();
let offset = v.value as u32;
Expand All @@ -156,6 +158,7 @@ where
.await;
ret?;
buf = read_buf.into_inner();
buf.deinit();
}

// update the footer at the end of the db file.
Expand All @@ -174,7 +177,6 @@ where
.await;
ret?;

// todo: truncate if necessary
//// TODO: make async
db_file.sync_all()?;

Expand Down Expand Up @@ -265,7 +267,7 @@ where
continue
}

let buf = ZeroCopyBoxIoBuf::new(Frame::new_box_zeroed());
let buf = ZeroCopyBoxIoBuf::new_uninit(Frame::new_box_zeroed());
let (buf, ret) = segment.read_frame_offset_async(*frame_offset as u32, buf).await;
ret?;
let mut frame = buf.into_inner();
Expand Down
34 changes: 34 additions & 0 deletions libsql-wal/src/shared_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,24 @@ use std::time::Instant;
use arc_swap::ArcSwap;
use crossbeam::deque::Injector;
use crossbeam::sync::Unparker;
use futures::Stream;
use parking_lot::{Mutex, MutexGuard};
use roaring::RoaringBitmap;
use tokio::sync::{mpsc, watch};
use uuid::Uuid;
use zerocopy::FromZeroes;

use crate::checkpointer::CheckpointMessage;
use crate::error::{Error, Result};
use crate::io::buf::ZeroCopyBoxIoBuf;
use crate::io::file::FileExt;
use crate::io::Io;
use crate::replication::storage::ReplicateFromStorage;
use crate::segment::current::CurrentSegment;
use crate::segment::{Frame, FrameHeader};
use crate::segment_swap_strategy::SegmentSwapStrategy;
use crate::transaction::{ReadTransaction, Savepoint, Transaction, TxGuard, WriteTransaction};
use crate::LIBSQL_PAGE_SIZE;
use libsql_sys::name::NamespaceName;

#[derive(Default)]
Expand Down Expand Up @@ -334,6 +340,34 @@ impl<IO: Io> SharedWal<IO> {
pub fn namespace(&self) -> &NamespaceName {
&self.namespace
}

/// read frames from the main db file.
pub(crate) fn replicate_from_db_file<'a>(
&'a self,
seen: &'a RoaringBitmap,
tx: &'a ReadTransaction<IO::File>,
until: u64,
) -> impl Stream<Item = crate::replication::Result<Box<Frame>>> + Send + 'a {
async_stream::try_stream! {
let mut all = RoaringBitmap::new();
all.insert_range(1..=tx.db_size);
let to_take = all - seen;
for page_no in to_take {
let mut frame = Frame::new_box_zeroed();
*frame.header_mut() = FrameHeader {
page_no: page_no.into(),
size_after: 0.into(),
// we don't really know what the frame_no is, so we set it to a number less that any other frame_no
frame_no: until.into(),
};
let buf = unsafe { ZeroCopyBoxIoBuf::new_uninit_partial(frame, size_of::<FrameHeader>()) };
let (buf, ret) = self.db_file.read_exact_at_async(buf, (page_no as u64 - 1) * LIBSQL_PAGE_SIZE as u64).await;
ret?;
let frame = buf.into_inner();
yield frame;
}
}
}
}

#[cfg(test)]
Expand Down

0 comments on commit 18e7c0c

Please sign in to comment.