Skip to content

Commit

Permalink
use read txn in replicator
Browse files Browse the repository at this point in the history
  • Loading branch information
MarinPostma committed Oct 4, 2024
1 parent 961ecb8 commit e55dd89
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 76 deletions.
14 changes: 7 additions & 7 deletions libsql-wal/src/replication/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ impl<IO: Io> Replicator<IO> {
tracing::debug!(most_recent_frame_no, "new frame_no available");

let mut commit_frame_no = 0;
let tx = self.shared.begin_read(u64::MAX);
// we have stuff to replicate
if most_recent_frame_no >= self.next_frame_no {
// first replicate the most recent version of each page from the current
// segment. We also return how far back we have replicated from the current log
let current = self.shared.current.load();
let mut seen = RoaringBitmap::new();
let (stream, replicated_until, size_after) = current.frame_stream_from(self.next_frame_no, &mut seen);
let (stream, replicated_until) = tx.current.frame_stream_from(self.next_frame_no, &mut seen, &tx);
let should_replicate_from_tail = replicated_until != self.next_frame_no;

{
Expand All @@ -78,7 +78,7 @@ impl<IO: Io> Replicator<IO> {
let mut frame = frame.map_err(|e| Error::CurrentSegment(e.into()))?;
commit_frame_no = frame.header().frame_no().max(commit_frame_no);
if stream.peek().await.is_none() && !should_replicate_from_tail {
frame.header_mut().set_size_after(size_after);
frame.header_mut().set_size_after(tx.db_size);
self.next_frame_no = commit_frame_no + 1;
}

Expand All @@ -90,9 +90,9 @@ impl<IO: Io> Replicator<IO> {
// wee need to take frames from the sealed segments.
if should_replicate_from_tail {
let replicated_until = {
let (stream, replicated_until) = current
let (stream, replicated_until) = tx.current
.tail()
.stream_pages_from(replicated_until, self.next_frame_no, &mut seen).await;
.stream_pages_from(replicated_until, self.next_frame_no, &mut seen, &tx).await;
tokio::pin!(stream);

tracing::debug!(replicated_until, "replicating from tail");
Expand All @@ -105,7 +105,7 @@ impl<IO: Io> Replicator<IO> {
let mut frame = frame.map_err(|e| Error::SealedSegment(e.into()))?;
commit_frame_no = frame.header().frame_no().max(commit_frame_no);
if stream.peek().await.is_none() && !should_replicate_from_storage {
frame.header_mut().set_size_after(size_after);
frame.header_mut().set_size_after(tx.db_size);
self.next_frame_no = commit_frame_no + 1;
}

Expand All @@ -132,7 +132,7 @@ impl<IO: Io> Replicator<IO> {
let mut frame = frame?;
commit_frame_no = frame.header().frame_no().max(commit_frame_no);
if stream.peek().await.is_none() {
frame.header_mut().set_size_after(size_after);
frame.header_mut().set_size_after(tx.db_size);
self.next_frame_no = commit_frame_no + 1;
}

Expand Down
1 change: 1 addition & 0 deletions libsql-wal/src/replication/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ where
let segment = match maybe_seg {
Some(ref seg) => seg,
None => {
tracing::debug!(key = %key, "fetching segment");
maybe_seg = Some(storage.fetch_segment_data(&namespace, &key, None).await?);
maybe_seg.as_ref().unwrap()
},
Expand Down
62 changes: 33 additions & 29 deletions libsql-wal/src/segment/current.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::io::file::FileExt;
use crate::io::Inspect;
use crate::segment::{checked_frame_offset, SegmentFlags};
use crate::segment::{frame_offset, page_offset, sealed::SealedSegment};
use crate::transaction::{Transaction, TxGuardOwned, TxGuardShared};
use crate::transaction::{ReadTransaction, Transaction, TxGuardOwned, TxGuardShared};
use crate::{LIBSQL_MAGIC, LIBSQL_PAGE_SIZE, LIBSQL_WAL_VERSION};

use super::list::SegmentList;
Expand Down Expand Up @@ -507,22 +507,23 @@ impl<F> CurrentSegment<F> {
&'a self,
start_frame_no: u64,
seen: &'a mut RoaringBitmap,
) -> (impl Stream<Item = Result<Box<Frame>>> + 'a, u64, u32)
// not actually used, but ensures that a read lock is held while this method id called
tx: &'a ReadTransaction<F>,
) -> (impl Stream<Item = Result<Box<Frame>>> + 'a, u64)
where
F: FileExt,
{
let (seg_start_frame_no, last_committed, db_size) =
self.with_header(|h| (h.start_frame_no.get(), h.last_committed(), h.size_after()));
let seg_start_frame_no = tx.current.with_header(|h| h.start_frame_no.get());
let replicated_until = seg_start_frame_no
// if current is empty, start_frame_no doesn't exist
.min(last_committed)
.min(tx.max_frame_no)
.max(start_frame_no);

// TODO: optim, we could read less frames if we had a mapping from frame_no to page_no in
// the index
let stream = async_stream::try_stream! {
if !self.is_empty() {
let mut frame_offset = (last_committed - seg_start_frame_no) as u32;
let mut frame_offset = (tx.max_frame_no - seg_start_frame_no) as u32;
loop {
let buf = ZeroCopyBoxIoBuf::new(Frame::new_box_zeroed());
let (buf, res) = self.read_frame_offset_async(frame_offset, buf).await;
Expand Down Expand Up @@ -551,7 +552,7 @@ impl<F> CurrentSegment<F> {
}
};

(stream, replicated_until, db_size)
(stream, replicated_until)
}

fn recompute_checksum(&self, start_offset: u32, until_offset: u32) -> Result<u32>
Expand Down Expand Up @@ -714,18 +715,20 @@ mod test {
.unwrap();
}

let mut seen = RoaringBitmap::new();
let current = shared.current.load();
let (stream, replicated_until, size_after) = current.frame_stream_from(1, &mut seen);
tokio::pin!(stream);
assert_eq!(replicated_until, 1);
assert_eq!(size_after, 6);

let mut tmp = tempfile().unwrap();
while let Some(frame) = stream.next().await {
let frame = frame.unwrap();
let offset = (frame.header().page_no() - 1) * 4096;
tmp.write_all_at(frame.data(), offset as _).unwrap();
{
let tx = shared.begin_read(u64::MAX);
let mut seen = RoaringBitmap::new();
let (stream, replicated_until) = tx.current.frame_stream_from(1, &mut seen, &tx);
tokio::pin!(stream);
assert_eq!(replicated_until, 1);
assert_eq!(tx.db_size, 6);

while let Some(frame) = stream.next().await {
let frame = frame.unwrap();
let offset = (frame.header().page_no() - 1) * 4096;
tmp.write_all_at(frame.data(), offset as _).unwrap();
}
}

seal_current_segment(&shared);
Expand All @@ -743,6 +746,7 @@ mod test {
let mut copy = Vec::new();
tmp.read_to_end(&mut copy).unwrap();

dbg!(copy.len(), orig.len());
assert_eq!(db_payload(&copy), db_payload(&orig));
}

Expand All @@ -768,11 +772,11 @@ mod test {

let mut seen = RoaringBitmap::new();
{
let current = shared.current.load();
let (stream, replicated_until, size_after) = current.frame_stream_from(1, &mut seen);
let tx = shared.begin_read(u64::MAX);
let (stream, replicated_until) = tx.current.frame_stream_from(1, &mut seen, &tx);
tokio::pin!(stream);
assert_eq!(replicated_until, 60);
assert_eq!(size_after, 9);
assert_eq!(tx.db_size, 9);
assert_eq!(stream.fold(0, |count, _| count + 1).await, 6);
}
assert_debug_snapshot!(seen);
Expand All @@ -787,12 +791,12 @@ mod test {
conn.execute("create table test (x)", ()).unwrap();

let mut seen = RoaringBitmap::new();
let current = shared.current.load();
let (stream, replicated_until, size_after) = current.frame_stream_from(100, &mut seen);
let tx = shared.begin_read(u64::MAX);
let (stream, replicated_until) = tx.current.frame_stream_from(100, &mut seen, &tx);
tokio::pin!(stream);
assert_eq!(replicated_until, 100);
assert_eq!(stream.fold(0, |count, _| count + 1).await, 0);
assert_eq!(size_after, 2);
assert_eq!(tx.db_size, 2);
}

#[tokio::test]
Expand All @@ -805,11 +809,11 @@ mod test {
seal_current_segment(&shared);

let mut seen = RoaringBitmap::new();
let current = shared.current.load();
let (stream, replicated_until, size_after) = current.frame_stream_from(1, &mut seen);
let tx = shared.begin_read(u64::MAX);
let (stream, replicated_until) = tx.current.frame_stream_from(1, &mut seen, &tx);
tokio::pin!(stream);
assert_eq!(replicated_until, 2);
assert_eq!(size_after, 2);
assert_eq!(tx.db_size, 2);
assert_eq!(stream.fold(0, |count, _| count + 1).await, 0);
}

Expand Down Expand Up @@ -1014,8 +1018,8 @@ mod test {
}
}

fn db_payload(db: &[u8]) -> &[u8] {
fn db_payload(db: &[u8]) -> u32 {
let size = (db.len() / 4096) * 4096;
&db[..size]
crc32fast::hash(&db[..size])
}
}
105 changes: 65 additions & 40 deletions libsql-wal/src/segment/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::error::Result;
use crate::io::buf::{ZeroCopyBoxIoBuf, ZeroCopyBuf};
use crate::io::{FileExt, Io};
use crate::segment::Frame;
use crate::transaction::ReadTransaction;
use crate::{LibsqlFooter, LIBSQL_MAGIC, LIBSQL_PAGE_SIZE, LIBSQL_WAL_VERSION};

use super::Segment;
Expand Down Expand Up @@ -209,11 +210,13 @@ where
/// returns a stream of pages from the sealed segment list, and what's the lowest replication index
/// that was covered. If the returned index is less than start frame_no, the missing frames
/// must be read somewhere else.
pub async fn stream_pages_from<'a>(
pub async fn stream_pages_from<'a, F>(
&self,
current_fno: u64,
until_fno: u64,
seen: &'a mut RoaringBitmap,
// not actually used, but ensures that a read lock is held while this method id called
_tx: &ReadTransaction<F>,
) -> (
impl Stream<Item = crate::error::Result<Box<Frame>>> + 'a,
u64,
Expand Down Expand Up @@ -434,10 +437,13 @@ mod test {

seal_current_segment(&shared);

let current = shared.current.load();
let segment_list = current.tail();
let tx = shared.begin_read(u64::MAX);
let mut seen = RoaringBitmap::new();
let (stream, _) = segment_list.stream_pages_from(0, 0, &mut seen).await;
let (stream, _) = tx
.current
.tail()
.stream_pages_from(0, 0, &mut seen, &tx)
.await;
tokio::pin!(stream);

let mut file = NamedTempFile::new().unwrap();
Expand Down Expand Up @@ -485,10 +491,13 @@ mod test {

seal_current_segment(&shared);

let current = shared.current.load();
let segment_list = current.tail();
let tx = shared.begin_read(u64::MAX);
let mut seen = RoaringBitmap::new();
let (stream, replicated_until) = segment_list.stream_pages_from(0, 10, &mut seen).await;
let (stream, replicated_until) = tx
.current
.tail()
.stream_pages_from(0, 10, &mut seen, &tx)
.await;
tokio::pin!(stream);

assert_eq!(replicated_until, 10);
Expand All @@ -513,10 +522,13 @@ mod test {

seal_current_segment(&shared);

let current = shared.current.load();
let segment_list = current.tail();
let tx = shared.begin_read(u64::MAX);
let mut seen = RoaringBitmap::from_sorted_iter([1]).unwrap();
let (stream, replicated_until) = segment_list.stream_pages_from(0, 1, &mut seen).await;
let (stream, replicated_until) = tx
.current
.tail()
.stream_pages_from(0, 1, &mut seen, &tx)
.await;
tokio::pin!(stream);

assert_eq!(replicated_until, 1);
Expand All @@ -541,22 +553,27 @@ mod test {

seal_current_segment(&shared);

let current = shared.current.load();
let segment_list = current.tail();
let mut seen = RoaringBitmap::new();
let (stream, replicated_until) = segment_list.stream_pages_from(0, 1, &mut seen).await;
tokio::pin!(stream);
let mut tmp = tempfile().unwrap();
let mut last_offset = 0;

assert_eq!(replicated_until, 1);
{
let tx = shared.begin_read(u64::MAX);
let mut seen = RoaringBitmap::new();
let (stream, replicated_until) = tx
.current
.tail()
.stream_pages_from(0, 1, &mut seen, &tx)
.await;
tokio::pin!(stream);

let mut tmp = tempfile().unwrap();
assert_eq!(replicated_until, 1);

let mut last_offset = 0;
while let Some(frame) = stream.next().await {
let frame = frame.unwrap();
let offset = (frame.header().page_no() - 1) * 4096;
tmp.write_all_at(frame.data(), offset as u64).unwrap();
last_offset = last_offset.max(frame.header().frame_no());
while let Some(frame) = stream.next().await {
let frame = frame.unwrap();
let offset = (frame.header().page_no() - 1) * 4096;
tmp.write_all_at(frame.data(), offset as u64).unwrap();
last_offset = last_offset.max(frame.header().frame_no());
}
}

for _ in 0..10 {
Expand All @@ -565,21 +582,26 @@ mod test {

seal_current_segment(&shared);

let mut seen = RoaringBitmap::new();
let (stream, replicated_until) = segment_list
.stream_pages_from(0, last_offset, &mut seen)
.await;
tokio::pin!(stream);
{
let tx = shared.begin_read(u64::MAX);
let mut seen = RoaringBitmap::new();
let (stream, replicated_until) = tx
.current
.tail()
.stream_pages_from(0, last_offset, &mut seen, &tx)
.await;
tokio::pin!(stream);

assert_eq!(replicated_until, last_offset);
assert_eq!(replicated_until, last_offset);

while let Some(frame) = stream.next().await {
let frame = frame.unwrap();
let offset = (frame.header().page_no() - 1) * 4096;
tmp.write_all_at(frame.data(), offset as u64).unwrap();
}
while let Some(frame) = stream.next().await {
let frame = frame.unwrap();
let offset = (frame.header().page_no() - 1) * 4096;
tmp.write_all_at(frame.data(), offset as u64).unwrap();
}

*shared.durable_frame_no.lock() = 999999;
*shared.durable_frame_no.lock() = 999999;
}

shared.checkpoint().await.unwrap();
tmp.seek(std::io::SeekFrom::Start(0)).unwrap();
Expand Down Expand Up @@ -618,10 +640,13 @@ mod test {
}
seal_current_segment(&shared);

let current = shared.current.load();
let segment_list = current.tail();
let tx = shared.begin_read(u64::MAX);
let mut seen = RoaringBitmap::new();
let (stream, replicated_from) = segment_list.stream_pages_from(0, 0, &mut seen).await;
let (stream, replicated_from) = tx
.current
.tail()
.stream_pages_from(0, 0, &mut seen, &tx)
.await;
tokio::pin!(stream);

let mut count = 0;
Expand All @@ -633,8 +658,8 @@ mod test {
assert_eq!(replicated_from, 13);
}

fn db_payload(db: &[u8]) -> &[u8] {
fn db_payload(db: &[u8]) -> u32 {
let size = (db.len() / 4096) * 4096;
&db[..size]
crc32fast::hash(&db[..size])
}
}

0 comments on commit e55dd89

Please sign in to comment.