Skip to content

Commit

Permalink
[Bifrost] Watch tail updates
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Jul 25, 2024
1 parent 7b15172 commit 8defacc
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 103 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ test-log = { version = "0.2.11", default-features = false, features = ["trace"]
# tikv-jemallocator has not yet been released with musl target support, so we pin a main commit
tikv-jemallocator = { git = "https://github.com/restatedev/jemallocator", rev = "7c32f6e3d6ad5e4e492cc08d6bdb8307acf9afa0", default-features = false }
thiserror = "1.0"
tokio = { version = "1.29", default-features = false, features = ["rt-multi-thread", "signal", "macros", ] }
tokio = { version = "1.29", default-features = false, features = ["rt-multi-thread", "signal", "macros", "parking_lot" ] }
tokio-stream = "0.1.14"
tokio-util = { version = "0.7.10" }
tonic = { version = "0.12.1", default-features = false }
Expand Down
11 changes: 11 additions & 0 deletions crates/bifrost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,14 @@ pub use service::BifrostService;
pub use types::*;

pub const SMALL_BATCH_THRESHOLD_COUNT: usize = 4;

#[cfg(test)]
pub(crate) fn setup_panic_handler() {
// Make sure that panics exits the process.
let orig_hook = std::panic::take_hook();
std::panic::set_hook(Box::new(move |panic_info| {
// invoke the default handler and exit the process
orig_hook(panic_info);
std::process::exit(1);
}));
}
22 changes: 6 additions & 16 deletions crates/bifrost/src/loglet/loglet_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,7 @@ use tracing::info;

use super::{Loglet, LogletOffset};
use crate::loglet::AppendError;
use crate::{LogRecord, Record, TailState, TrimGap};

fn setup() {
// Make sure that panics exits the process.
let orig_hook = std::panic::take_hook();
std::panic::set_hook(Box::new(move |panic_info| {
// invoke the default handler and exit the process
orig_hook(panic_info);
std::process::exit(1);
}));
}
use crate::{setup_panic_handler, LogRecord, Record, TailState, TrimGap};

async fn wait_for_trim(
loglet: &Arc<dyn Loglet>,
Expand Down Expand Up @@ -65,7 +55,7 @@ async fn wait_for_trim(
/// is started, initialized, and ready for reads and writes. It also assumes that this loglet
/// provide contiguous offsets that start from LogletOffset::OLDEST.
pub async fn gapless_loglet_smoke_test(loglet: Arc<dyn Loglet>) -> googletest::Result<()> {
setup();
setup_panic_handler();

assert_eq!(None, loglet.get_trim_point().await?);
{
Expand Down Expand Up @@ -211,7 +201,7 @@ pub async fn gapless_loglet_smoke_test(loglet: Arc<dyn Loglet>) -> googletest::R
/// is started, initialized, and ready for reads and writes. It also assumes that this loglet
/// starts from LogletOffset::OLDEST.
pub async fn single_loglet_readstream_test(loglet: Arc<dyn Loglet>) -> googletest::Result<()> {
setup();
setup_panic_handler();

let read_from_offset = LogletOffset::from(6);
let mut reader = loglet
Expand Down Expand Up @@ -283,7 +273,7 @@ pub async fn single_loglet_readstream_test(loglet: Arc<dyn Loglet>) -> googletes
pub async fn single_loglet_readstream_test_with_trims(
loglet: Arc<dyn Loglet>,
) -> googletest::Result<()> {
setup();
setup_panic_handler();

assert_eq!(None, loglet.get_trim_point().await?);
{
Expand Down Expand Up @@ -397,7 +387,7 @@ pub async fn single_loglet_readstream_test_with_trims(

/// Validates that appends fail after find_tail() returned Sealed()
pub async fn loglet_test_append_after_seal(loglet: Arc<dyn Loglet>) -> googletest::Result<()> {
setup();
setup_panic_handler();

assert_eq!(None, loglet.get_trim_point().await?);
{
Expand Down Expand Up @@ -435,7 +425,7 @@ pub async fn loglet_test_append_after_seal_concurrent(
const WARMUP_APPENDS: usize = 200;
const CONCURRENT_APPENDERS: usize = 20;

setup();
setup_panic_handler();

assert_eq!(None, loglet.get_trim_point().await?);
{
Expand Down
14 changes: 14 additions & 0 deletions crates/bifrost/src/loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub(crate) mod util;

// exports
pub use error::*;
use futures::stream::BoxStream;
pub use provider::{LogletProvider, LogletProviderFactory};

use std::ops::Add;
Expand Down Expand Up @@ -131,6 +132,19 @@ pub trait LogletBase: Send + Sync + std::fmt::Debug {
None
}

/// Create a stream watching the state of tail for this loglet
///
/// The stream will return the last known TailState with seal notification semantics
/// similar to `find_tail()` except that it won't trigger a linearizable tail check when
/// polled. This can be used as a trailing tail indicator.
///
/// Note that it's legal to observe the last unsealed tail becoming sealed. The
/// last_known_unsealed (or the last unsealed offset emitted on this stream) defines the
/// point at which readers should stop **before**, therefore, when reading, if the next offset
/// to read == the tail, it means that you can only read this offset if the tail watch moves
/// beyond it to a higher tail while remaining unsealed.
fn watch_tail(&self) -> BoxStream<'static, TailState<Self::Offset>>;

/// Append a batch of records to the loglet. The returned offset (on success) if the offset of
/// the first record in the batch)
async fn append_batch(&self, payloads: &[Bytes]) -> Result<Self::Offset, AppendError>;
Expand Down
42 changes: 19 additions & 23 deletions crates/bifrost/src/loglet/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,46 +13,42 @@ use tokio_stream::wrappers::WatchStream;

use restate_core::ShutdownError;

use crate::TailState;

use super::LogletOffset;

#[derive(Debug)]
pub struct OffsetWatch {
sender: watch::Sender<LogletOffset>,
receive: watch::Receiver<LogletOffset>,
pub struct TailOffsetWatch {
sender: watch::Sender<TailState<LogletOffset>>,
receive: watch::Receiver<TailState<LogletOffset>>,
}

impl OffsetWatch {
pub fn new(offset: LogletOffset) -> Self {
let (send, receive) = watch::channel(offset);
Self {
sender: send,
receive,
}
impl TailOffsetWatch {
pub fn new(tail: TailState<LogletOffset>) -> Self {
let (sender, receive) = watch::channel(tail);
Self { sender, receive }
}

/// Inform the watch that the tail might have changed.
pub fn notify(&self, sealed: bool, offset: LogletOffset) {
self.sender.send_if_modified(|v| v.combine(sealed, offset));
}

/// Inform the watch that the offset has changed.
pub fn notify(&self, offset: LogletOffset) {
self.sender.send_if_modified(|v| {
if offset > *v {
*v = offset;
true
} else {
false
}
});
pub fn notify_seal(&self) {
self.sender.send_if_modified(|v| v.seal());
}

/// Blocks until the offset is greater or equal to the given offset.
/// Blocks until the tail is beyong the given offset.
pub async fn wait_for(&self, offset: LogletOffset) -> Result<(), ShutdownError> {
self.receive
.clone()
.wait_for(|v| *v >= offset)
.wait_for(|v| v.offset() > offset)
.await
.map_err(|_| ShutdownError)?;
Ok(())
}

pub fn to_stream(&self) -> WatchStream<LogletOffset> {
pub fn to_stream(&self) -> WatchStream<TailState<LogletOffset>> {
WatchStream::new(self.receive.clone())
}
}
14 changes: 13 additions & 1 deletion crates/bifrost/src/loglet_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ use std::sync::Arc;

use async_trait::async_trait;
use bytes::Bytes;
use futures::Stream;
use futures::stream::BoxStream;
use futures::{Stream, StreamExt};

use restate_types::logs::{Lsn, SequenceNumber};

Expand Down Expand Up @@ -109,6 +110,17 @@ impl LogletBase for LogletWrapper {
Ok(self.base_lsn.offset_by(offset))
}

fn watch_tail(&self) -> BoxStream<'static, TailState<Self::Offset>> {
let base_lsn = self.base_lsn;
self.loglet
.watch_tail()
.map(move |tail_state| {
let offset = std::cmp::max(tail_state.offset(), LogletOffset::OLDEST);
TailState::new(tail_state.is_sealed(), base_lsn.offset_by(offset))
})
.boxed()
}

async fn append_batch(&self, payloads: &[Bytes]) -> Result<Lsn, AppendError> {
if self.tail_lsn.is_some() {
return Err(AppendError::Sealed);
Expand Down
36 changes: 25 additions & 11 deletions crates/bifrost/src/providers/local_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod read_stream;

use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
pub use log_store::LogStoreError;
use metrics::{counter, histogram, Histogram};
pub use provider::Factory;
Expand All @@ -42,7 +43,7 @@ use self::log_store::RocksDbLogStore;
use self::log_store_writer::RocksDbLogWriterHandle;
use self::metric_definitions::{BIFROST_LOCAL_APPEND, BIFROST_LOCAL_APPEND_DURATION};
use self::read_stream::LocalLogletReadStream;
use crate::loglet::util::OffsetWatch;
use crate::loglet::util::TailOffsetWatch;

struct LocalLoglet {
loglet_id: u64,
Expand All @@ -56,7 +57,8 @@ struct LocalLoglet {
last_committed_offset: AtomicU64,
next_write_offset: Mutex<LogletOffset>,
sealed: AtomicBool,
release_watch: OffsetWatch,
// watches the tail state of ths loglet
tail_watch: TailOffsetWatch,
append_latency: Histogram,
}

Expand Down Expand Up @@ -101,7 +103,10 @@ impl LocalLoglet {
next_write_offset,
last_committed_offset,
sealed,
release_watch: OffsetWatch::new(release_pointer),
tail_watch: TailOffsetWatch::new(TailState::new(
log_state.seal,
release_pointer.next(),
)),
append_latency,
};
debug!(
Expand All @@ -115,8 +120,9 @@ impl LocalLoglet {
}

#[inline]
fn notify_readers(&self, release_pointer: LogletOffset) {
self.release_watch.notify(release_pointer);
fn notify_readers(&self, sealed: bool, release_pointer: LogletOffset) {
// tail is beyond the release pointer
self.tail_watch.notify(sealed, release_pointer.next());
}

fn read_from(
Expand Down Expand Up @@ -200,6 +206,10 @@ impl LogletBase for LocalLoglet {
))
}

fn watch_tail(&self) -> BoxStream<'static, TailState<Self::Offset>> {
Box::pin(self.tail_watch.to_stream())
}

async fn append(&self, payload: Bytes) -> Result<LogletOffset, AppendError> {
// An initial check if we are sealed or not, we are not worried about accepting an
// append while sealing is taking place. We only care about *not* acknowledging
Expand Down Expand Up @@ -240,10 +250,11 @@ impl LogletBase for LocalLoglet {
.fetch_max(offset.into(), Ordering::AcqRel)
.max(offset.into()),
);
self.notify_readers(release_pointer);
let is_sealed = self.sealed.load(Ordering::Relaxed);
self.notify_readers(is_sealed, release_pointer);
// Ensure that we don't acknowledge the append (even that it has happened) if the loglet
// has been sealed already.
if self.sealed.load(Ordering::Relaxed) {
if is_sealed {
return Err(AppendError::Sealed);
}
self.append_latency.record(start_time.elapsed());
Expand Down Expand Up @@ -291,10 +302,11 @@ impl LogletBase for LocalLoglet {
.fetch_max(offset.into(), Ordering::AcqRel)
.max(offset.into()),
);
self.notify_readers(release_pointer);
// Ensure that we don't acknowledge the append (albeit durable) if the loglet
let is_sealed = self.sealed.load(Ordering::Relaxed);
self.notify_readers(is_sealed, release_pointer);
// Ensure that we don't acknowledge the append (even that it has happened) if the loglet
// has been sealed already.
if self.sealed.load(Ordering::Relaxed) {
if is_sealed {
return Err(AppendError::Sealed);
}
self.append_latency.record(start_time.elapsed());
Expand Down Expand Up @@ -370,6 +382,8 @@ impl LogletBase for LocalLoglet {
Err(ShutdownError.into())
})?;
self.sealed.store(true, Ordering::Relaxed);
self.tail_watch.notify_seal();

Ok(())
}

Expand All @@ -383,7 +397,7 @@ impl LogletBase for LocalLoglet {
break Ok(next_record);
}
// Wait and respond when available.
self.release_watch.wait_for(from).await?;
self.tail_watch.wait_for(from).await?;
}
}

Expand Down
Loading

0 comments on commit 8defacc

Please sign in to comment.