Skip to content

Commit

Permalink
[Bifrost] Base seal tests and implements seal() on memory loglet
Browse files Browse the repository at this point in the history
This also adds an optional limit to loglet readstreams to enable creating a readstream with a pre-determined end offset.
  • Loading branch information
AhmedSoliman committed Jul 19, 2024
1 parent 7e8dc57 commit ad036dd
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 22 deletions.
163 changes: 158 additions & 5 deletions crates/bifrost/src/loglet/loglet_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,24 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::BTreeSet;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::Duration;

use bytes::Bytes;
use futures::StreamExt;
use googletest::prelude::*;
use tokio::task::JoinHandle;
use tokio::sync::Barrier;
use tokio::task::{JoinHandle, JoinSet};

use restate_test_util::let_assert;
use restate_types::logs::SequenceNumber;
use tokio_stream::StreamExt;
use tracing::info;

use super::{Loglet, LogletOffset};
use crate::{LogRecord, Record, TrimGap};
use crate::loglet::AppendError;
use crate::{LogRecord, Record, TailState, TrimGap};

fn setup() {
// Make sure that panics exits the process.
Expand Down Expand Up @@ -216,7 +219,10 @@ pub async fn single_loglet_readstream_test(loglet: Arc<dyn Loglet>) -> googletes
setup();

let read_from_offset = LogletOffset::from(6);
let mut reader = loglet.clone().create_read_stream(read_from_offset).await?;
let mut reader = loglet
.clone()
.create_read_stream(read_from_offset, None)
.await?;

{
// no records have been written yet.
Expand Down Expand Up @@ -307,7 +313,7 @@ pub async fn single_loglet_readstream_test_with_trims(

let mut read_stream = loglet
.clone()
.create_read_stream(LogletOffset::OLDEST)
.create_read_stream(LogletOffset::OLDEST, None)
.await?;

let record = read_stream.next().await.unwrap()?;
Expand Down Expand Up @@ -393,3 +399,150 @@ pub async fn single_loglet_readstream_test_with_trims(

Ok(())
}

/// Validates that appends fail after find_tail() returned Sealed()
pub async fn loglet_test_append_after_seal(loglet: Arc<dyn Loglet>) -> googletest::Result<()> {
setup();

assert_eq!(None, loglet.get_trim_point().await?);
{
let tail = loglet.find_tail().await?;
assert_eq!(LogletOffset::OLDEST, tail.offset());
assert!(!tail.is_sealed());
}

// append 5 records. Offsets [1..5]
for i in 1..=5 {
loglet.append(Bytes::from(format!("record{}", i))).await?;
}

loglet.seal().await?;

// attempt to append 5 records. Offsets [6..10]. Expected to fail since seal happened on the same client.
for i in 6..=10 {
let res = loglet.append(Bytes::from(format!("record{}", i))).await;
assert_that!(res, err(pat!(AppendError::Sealed)));
}

let tail = loglet.find_tail().await?;
// Seal must be applied after commit index 5 since it has been acknowledged (tail is 6 or higher)
assert_that!(tail, pat!(TailState::Sealed(gt(LogletOffset::from(5)))));

Ok(())
}

/// Validates that appends fail after find_tail() returned Sealed()
pub async fn loglet_test_append_after_seal_concurrent(
loglet: Arc<dyn Loglet>,
) -> googletest::Result<()> {
use futures::TryStreamExt as _;

const WARMUP_APPENDS: usize = 1000;
const CONCURRENT_APPENDERS: usize = 20;

setup();

assert_eq!(None, loglet.get_trim_point().await?);
{
let tail = loglet.find_tail().await?;
assert_eq!(LogletOffset::OLDEST, tail.offset());
assert!(!tail.is_sealed());
}
// +1 for the main task waiting on all concurrent appenders
let append_barrier = Arc::new(Barrier::new(CONCURRENT_APPENDERS + 1));

let mut appenders: JoinSet<googletest::Result<_>> = JoinSet::new();
for appender_id in 0..CONCURRENT_APPENDERS {
appenders.spawn({
let loglet = loglet.clone();
let append_barrier = append_barrier.clone();
async move {
let mut i = 1;
let mut committed = Vec::new();
let mut warmup = true;
loop {
let res = loglet
.append(Bytes::from(format!("appender-{}-record{}", appender_id, i)))
.await;
i += 1;
if i > WARMUP_APPENDS && warmup {
println!("appender({}) - warmup complete....", appender_id);
append_barrier.wait().await;
warmup = false;
}
match res {
Ok(offset) => {
committed.push(offset);
}
Err(AppendError::Sealed) => {
break;
}
Err(e) => fail!("unexpected error: {}", e)?,
}
// give a chance to other tasks to work
tokio::task::yield_now().await;
}
Ok(committed)
}
});
}

// Wait for some warmup appends
println!(
"Awaiting all appenders to reach at least {} appends",
WARMUP_APPENDS
);
append_barrier.wait().await;
// Go places and do other things.
for _ in 0..5 {
tokio::task::yield_now().await;
}

loglet.seal().await?;
// fails immediately
assert_that!(
loglet.append(Bytes::from_static(b"failed-record")).await,
err(pat!(AppendError::Sealed))
);

let tail = loglet.find_tail().await?;
assert!(tail.is_sealed());
println!("Sealed tail: {:?}", tail);

let mut all_committed = BTreeSet::new();
while let Some(handle) = appenders.join_next().await {
let mut committed = handle??;
assert!(!committed.is_empty());
let committed_len = committed.len();
assert!(committed_len >= WARMUP_APPENDS);
let tail_record = committed.pop().unwrap();
// tail must be beyond seal point
assert!(tail.offset() > tail_record);
println!(
"Committed len: {}, last appended was {}",
committed_len, tail_record
);
// ensure that all committed records are unique
assert!(all_committed.insert(tail_record));
for offset in committed {
assert!(all_committed.insert(offset));
}
}

let reader = loglet
.clone()
.create_read_stream(LogletOffset::OLDEST, Some(tail.offset().prev()))
.await?;

let records: BTreeSet<LogletOffset> = reader
.try_filter_map(|x| std::future::ready(Ok(Some(x.offset))))
.try_collect()
.await?;

// every record committed must be observed in readstream, and it's acceptable for the
// readstream to include more records.
assert!(all_committed.len() <= records.len());
assert!(all_committed.is_subset(&records));

Ok(())
}
10 changes: 10 additions & 0 deletions crates/bifrost/src/loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,12 @@ pub trait LogletBase: Send + Sync + std::fmt::Debug {

/// Create a read stream that streams record from a single loglet instance.
///
/// `to`: The offset of the last record to be read (inclusive). If `None`, the
/// stream is an open-ended tailing read stream.
async fn create_read_stream(
self: Arc<Self>,
from: Self::Offset,
to: Option<Self::Offset>,
) -> Result<SendableLogletReadStream<Self::Offset>>;

/// Append a record to the loglet.
Expand Down Expand Up @@ -146,6 +149,12 @@ pub trait LogletBase: Send + Sync + std::fmt::Debug {
/// Passing `Offset::OLDEST` trims the first record in the loglet (if exists).
async fn trim(&self, trim_point: Self::Offset) -> Result<(), OperationError>;

/// Seal the loglet. This operation is idempotent.
///
/// Appends **SHOULD NOT** succeed after a `seal()` call is successful. And appends **MUST
/// NOT** succeed after the offset returned by the *first* TailState::Sealed() response.
async fn seal(&self) -> Result<(), OperationError>;

/// Read or wait for the record at `from` offset, or the next available record if `from` isn't
/// defined for the loglet.
async fn read_next_single(
Expand All @@ -166,6 +175,7 @@ pub trait LogletReadStream<S: SequenceNumber>:
{
/// Current read pointer. This points to the next offset to be read.
fn read_pointer(&self) -> S;

/// Returns true if the stream is terminated.
fn is_terminated(&self) -> bool;
}
Expand Down
7 changes: 6 additions & 1 deletion crates/bifrost/src/loglet_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl LogletWrapper {
// Translates LSN to loglet offset
Ok(LogletReadStreamWrapper::new(
self.loglet
.create_read_stream(start_lsn.into_offset(self.base_lsn))
.create_read_stream(start_lsn.into_offset(self.base_lsn), None)
.await?,
self.base_lsn,
))
Expand All @@ -69,6 +69,7 @@ impl LogletBase for LogletWrapper {
async fn create_read_stream(
self: Arc<Self>,
_after: Self::Offset,
_to: Option<Self::Offset>,
) -> Result<SendableLogletReadStream<Self::Offset>> {
unreachable!("create_read_stream on LogletWrapper should never be used directly")
}
Expand Down Expand Up @@ -107,6 +108,10 @@ impl LogletBase for LogletWrapper {
self.loglet.trim(trim_point).await
}

async fn seal(&self) -> Result<(), OperationError> {
self.loglet.seal().await
}

async fn read_next_single(&self, from: Lsn) -> Result<LogRecord<Lsn, Bytes>, OperationError> {
// convert LSN to loglet offset
let offset = from.into_offset(self.base_lsn);
Expand Down
9 changes: 8 additions & 1 deletion crates/bifrost/src/providers/local_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,11 @@ impl LogletBase for LocalLoglet {
async fn create_read_stream(
self: Arc<Self>,
from: Self::Offset,
to: Option<Self::Offset>,
) -> Result<SendableLogletReadStream<Self::Offset>> {
Ok(Box::pin(LocalLogletReadStream::create(self, from).await?))
Ok(Box::pin(
LocalLogletReadStream::create(self, from, to).await?,
))
}

async fn append(&self, payload: Bytes) -> Result<LogletOffset, AppendError> {
Expand Down Expand Up @@ -325,6 +328,10 @@ impl LogletBase for LocalLoglet {
Ok(())
}

async fn seal(&self) -> Result<(), OperationError> {
todo!()
}

async fn read_next_single(
&self,
from: Self::Offset,
Expand Down
9 changes: 9 additions & 0 deletions crates/bifrost/src/providers/local_loglet/read_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub(crate) struct LocalLogletReadStream {
// the next record this stream will attempt to read
read_pointer: LogletOffset,
release_pointer: LogletOffset,
/// Last offset to read before terminating the stream. None means "tailing" reader.
read_to: Option<LogletOffset>,
#[pin]
iterator: DBRawIteratorWithThreadMode<'static, DB>,
#[pin]
Expand All @@ -62,6 +64,7 @@ impl LocalLogletReadStream {
pub(crate) async fn create(
loglet: Arc<LocalLoglet>,
from_offset: LogletOffset,
to: Option<LogletOffset>,
) -> Result<Self> {
// Reading from INVALID resets to OLDEST.
let from_offset = from_offset.max(LogletOffset::OLDEST);
Expand Down Expand Up @@ -111,6 +114,7 @@ impl LocalLogletReadStream {
terminated: false,
release_watch,
release_pointer,
read_to: to,
})
}
}
Expand Down Expand Up @@ -143,6 +147,11 @@ impl Stream for LocalLogletReadStream {
loop {
let mut this = self.as_mut().project();

// We have reached the limit we are allowed to read
if this.read_to.is_some_and(|read_to| next_offset > read_to) {
this.terminated.set(true);
return Poll::Ready(None);
}
// Are we reading after commit offset?
// We are at tail. We need to wait until new records have been released.
if next_offset > *this.release_pointer {
Expand Down
Loading

0 comments on commit ad036dd

Please sign in to comment.