Skip to content

Commit

Permalink
[Bifrost] Watch tail updates
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Jul 24, 2024
1 parent 571a50f commit 1cc77a6
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 86 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ test-log = { version = "0.2.11", default-features = false, features = ["trace"]
# tikv-jemallocator has not yet been released with musl target support, so we pin a main commit
tikv-jemallocator = { git = "https://github.com/restatedev/jemallocator", rev = "7c32f6e3d6ad5e4e492cc08d6bdb8307acf9afa0", default-features = false }
thiserror = "1.0"
tokio = { version = "1.29", default-features = false, features = ["rt-multi-thread", "signal", "macros", ] }
tokio = { version = "1.29", default-features = false, features = ["rt-multi-thread", "signal", "macros", "parking_lot" ] }
tokio-stream = "0.1.14"
tokio-util = { version = "0.7.10" }
tonic = { version = "0.10.2", default-features = false }
Expand Down
14 changes: 14 additions & 0 deletions crates/bifrost/src/loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub(crate) mod util;

// exports
pub use error::*;
use futures::stream::BoxStream;
pub use provider::{LogletProvider, LogletProviderFactory};

use std::ops::Add;
Expand Down Expand Up @@ -131,6 +132,19 @@ pub trait LogletBase: Send + Sync + std::fmt::Debug {
None
}

/// Create a stream watching the state of tail for this loglet
///
/// The stream will return the last known TailState with seal notification semantics
/// similar to `find_tail()` except that it won't trigger a linearizable tail check when
/// polled. This can be used as a trailing tail indicator.
///
/// Note that it's legal to observe the last unsealed tail becoming sealed. The
/// last_known_unsealed (or the last unsealed offset emitted on this stream) defines the
/// point at which readers should stop **before**, therefore, when reading, if the next offset
/// to read == the tail, it means that you can only read this offset if the tail watch moves
/// beyond it to a higher tail while remaining unsealed.
fn watch_tail(&self) -> BoxStream<'static, TailState<Self::Offset>>;

/// Append a batch of records to the loglet. The returned offset (on success) if the offset of
/// the first record in the batch)
async fn append_batch(&self, payloads: &[Bytes]) -> Result<Self::Offset, AppendError>;
Expand Down
42 changes: 19 additions & 23 deletions crates/bifrost/src/loglet/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,46 +13,42 @@ use tokio_stream::wrappers::WatchStream;

use restate_core::ShutdownError;

use crate::TailState;

use super::LogletOffset;

#[derive(Debug)]
pub struct OffsetWatch {
sender: watch::Sender<LogletOffset>,
receive: watch::Receiver<LogletOffset>,
pub struct TailOffsetWatch {
sender: watch::Sender<TailState<LogletOffset>>,
receive: watch::Receiver<TailState<LogletOffset>>,
}

impl OffsetWatch {
pub fn new(offset: LogletOffset) -> Self {
let (send, receive) = watch::channel(offset);
Self {
sender: send,
receive,
}
impl TailOffsetWatch {
pub fn new(tail: TailState<LogletOffset>) -> Self {
let (sender, receive) = watch::channel(tail);
Self { sender, receive }
}

/// Inform the watch that the tail might have changed.
pub fn notify(&self, sealed: bool, offset: LogletOffset) {
self.sender.send_if_modified(|v| v.combine(sealed, offset));
}

/// Inform the watch that the offset has changed.
pub fn notify(&self, offset: LogletOffset) {
self.sender.send_if_modified(|v| {
if offset > *v {
*v = offset;
true
} else {
false
}
});
pub fn notify_seal(&self) {
self.sender.send_if_modified(|v| v.seal());
}

/// Blocks until the offset is greater or equal to the given offset.
/// Blocks until the tail is beyong the given offset.
pub async fn wait_for(&self, offset: LogletOffset) -> Result<(), ShutdownError> {
self.receive
.clone()
.wait_for(|v| *v >= offset)
.wait_for(|v| v.offset() > offset)
.await
.map_err(|_| ShutdownError)?;
Ok(())
}

pub fn to_stream(&self) -> WatchStream<LogletOffset> {
pub fn to_stream(&self) -> WatchStream<TailState<LogletOffset>> {
WatchStream::new(self.receive.clone())
}
}
14 changes: 13 additions & 1 deletion crates/bifrost/src/loglet_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ use std::sync::Arc;

use async_trait::async_trait;
use bytes::Bytes;
use futures::Stream;
use futures::stream::BoxStream;
use futures::{Stream, StreamExt};

use restate_types::logs::{Lsn, SequenceNumber};

Expand Down Expand Up @@ -109,6 +110,17 @@ impl LogletBase for LogletWrapper {
Ok(self.base_lsn.offset_by(offset))
}

fn watch_tail(&self) -> BoxStream<'static, TailState<Self::Offset>> {
let base_lsn = self.base_lsn;
self.loglet
.watch_tail()
.map(move |tail_state| match tail_state {
TailState::Open(offset) => TailState::Open(base_lsn.offset_by(offset)),
TailState::Sealed(offset) => TailState::Sealed(base_lsn.offset_by(offset)),
})
.boxed()
}

async fn append_batch(&self, payloads: &[Bytes]) -> Result<Lsn, AppendError> {
if self.tail_lsn.is_some() {
return Err(AppendError::Sealed);
Expand Down
36 changes: 25 additions & 11 deletions crates/bifrost/src/providers/local_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod read_stream;

use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
pub use log_store::LogStoreError;
use metrics::{counter, histogram, Histogram};
pub use provider::Factory;
Expand All @@ -42,7 +43,7 @@ use self::log_store::RocksDbLogStore;
use self::log_store_writer::RocksDbLogWriterHandle;
use self::metric_definitions::{BIFROST_LOCAL_APPEND, BIFROST_LOCAL_APPEND_DURATION};
use self::read_stream::LocalLogletReadStream;
use crate::loglet::util::OffsetWatch;
use crate::loglet::util::TailOffsetWatch;

struct LocalLoglet {
loglet_id: u64,
Expand All @@ -56,7 +57,8 @@ struct LocalLoglet {
last_committed_offset: AtomicU64,
next_write_offset: Mutex<LogletOffset>,
sealed: AtomicBool,
release_watch: OffsetWatch,
// watches the tail state of ths loglet
tail_watch: TailOffsetWatch,
append_latency: Histogram,
}

Expand Down Expand Up @@ -101,7 +103,10 @@ impl LocalLoglet {
next_write_offset,
last_committed_offset,
sealed,
release_watch: OffsetWatch::new(release_pointer),
tail_watch: TailOffsetWatch::new(TailState::new(
log_state.seal,
release_pointer.next(),
)),
append_latency,
};
debug!(
Expand All @@ -115,8 +120,9 @@ impl LocalLoglet {
}

#[inline]
fn notify_readers(&self, release_pointer: LogletOffset) {
self.release_watch.notify(release_pointer);
fn notify_readers(&self, sealed: bool, release_pointer: LogletOffset) {
// tail is beyond the release pointer
self.tail_watch.notify(sealed, release_pointer.next());
}

fn read_from(
Expand Down Expand Up @@ -200,6 +206,10 @@ impl LogletBase for LocalLoglet {
))
}

fn watch_tail(&self) -> BoxStream<'static, TailState<Self::Offset>> {
Box::pin(self.tail_watch.to_stream())
}

async fn append(&self, payload: Bytes) -> Result<LogletOffset, AppendError> {
// An initial check if we are sealed or not, we are not worried about accepting an
// append while sealing is taking place. We only care about *not* acknowledging
Expand Down Expand Up @@ -240,10 +250,11 @@ impl LogletBase for LocalLoglet {
.fetch_max(offset.into(), Ordering::AcqRel)
.max(offset.into()),
);
self.notify_readers(release_pointer);
let is_sealed = self.sealed.load(Ordering::Relaxed);
self.notify_readers(is_sealed, release_pointer);
// Ensure that we don't acknowledge the append (even that it has happened) if the loglet
// has been sealed already.
if self.sealed.load(Ordering::Relaxed) {
if is_sealed {
return Err(AppendError::Sealed);
}
self.append_latency.record(start_time.elapsed());
Expand Down Expand Up @@ -291,10 +302,11 @@ impl LogletBase for LocalLoglet {
.fetch_max(offset.into(), Ordering::AcqRel)
.max(offset.into()),
);
self.notify_readers(release_pointer);
// Ensure that we don't acknowledge the append (albeit durable) if the loglet
let is_sealed = self.sealed.load(Ordering::Relaxed);
self.notify_readers(is_sealed, release_pointer);
// Ensure that we don't acknowledge the append (even that it has happened) if the loglet
// has been sealed already.
if self.sealed.load(Ordering::Relaxed) {
if is_sealed {
return Err(AppendError::Sealed);
}
self.append_latency.record(start_time.elapsed());
Expand Down Expand Up @@ -370,6 +382,8 @@ impl LogletBase for LocalLoglet {
Err(ShutdownError.into())
})?;
self.sealed.store(true, Ordering::Relaxed);
self.tail_watch.notify_seal();

Ok(())
}

Expand All @@ -383,7 +397,7 @@ impl LogletBase for LocalLoglet {
break Ok(next_record);
}
// Wait and respond when available.
self.release_watch.wait_for(from).await?;
self.tail_watch.wait_for(from).await?;
}
}

Expand Down
42 changes: 22 additions & 20 deletions crates/bifrost/src/providers/local_loglet/read_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ use std::sync::Arc;
use std::task::Poll;

use bytes::{BufMut, Bytes, BytesMut};
use futures::stream::BoxStream;
use futures::{Stream, StreamExt};
use pin_project::pin_project;
use rocksdb::{DBRawIteratorWithThreadMode, DB};
use tokio_stream::wrappers::WatchStream;
use tracing::{debug, error, warn};

use restate_core::ShutdownError;
use restate_rocksdb::RocksDbPerfGuard;
use restate_types::logs::SequenceNumber;

use crate::loglet::{LogletOffset, LogletReadStream, OperationError};
use crate::loglet::{LogletBase, LogletOffset, LogletReadStream, OperationError};
use crate::providers::local_loglet::LogStoreError;
use crate::{LogRecord, Result};
use crate::{LogRecord, Result, TailState};

use super::keys::RecordKey;
use super::LocalLoglet;
Expand All @@ -37,13 +37,14 @@ pub(crate) struct LocalLogletReadStream {
loglet: Arc<LocalLoglet>,
// the next record this stream will attempt to read
read_pointer: LogletOffset,
release_pointer: LogletOffset,
/// stop when read_pointer is at or beyond this offset
last_known_tail: LogletOffset,
/// Last offset to read before terminating the stream. None means "tailing" reader.
read_to: Option<LogletOffset>,
#[pin]
iterator: DBRawIteratorWithThreadMode<'static, DB>,
#[pin]
release_watch: WatchStream<LogletOffset>,
tail_watch: BoxStream<'static, TailState<LogletOffset>>,
#[pin]
terminated: bool,
}
Expand Down Expand Up @@ -87,11 +88,12 @@ impl LocalLogletReadStream {
read_opts.set_iterate_upper_bound(RecordKey::upper_bound(loglet.loglet_id).to_bytes());

let log_store = &loglet.log_store;
let mut release_watch = loglet.release_watch.to_stream();
let release_pointer = release_watch
let mut tail_watch = loglet.watch_tail();
let last_known_tail = tail_watch
.next()
.await
.expect("loglet watch returns release pointer");
.expect("loglet watch returns tail pointer")
.offset();

// ## Safety:
// the iterator is guaranteed to be dropped before the loglet is dropped, we hold to the
Expand All @@ -112,8 +114,8 @@ impl LocalLogletReadStream {
read_pointer: from_offset,
iterator: iter,
terminated: false,
release_watch,
release_pointer,
tail_watch,
last_known_tail,
read_to: to,
})
}
Expand Down Expand Up @@ -154,18 +156,18 @@ impl Stream for LocalLogletReadStream {
}
// Are we reading after commit offset?
// We are at tail. We need to wait until new records have been released.
if next_offset > *this.release_pointer {
let updated_release_pointer = match this.release_watch.poll_next(cx) {
if next_offset >= *this.last_known_tail {
let maybe_tail_state = match this.tail_watch.poll_next(cx) {
Poll::Ready(t) => t,
Poll::Pending => {
perf_guard.forget();
return Poll::Pending;
}
};

match updated_release_pointer {
Some(updated_release_pointer) => {
*this.release_pointer = updated_release_pointer;
match maybe_tail_state {
Some(tail_state) => {
*this.last_known_tail = tail_state.offset();
continue;
}
None => {
Expand All @@ -175,11 +177,11 @@ impl Stream for LocalLogletReadStream {
}
}
}
// release_pointer has been updated.
let release_pointer = *this.release_pointer;
// tail has been updated.
let last_known_tail = *this.last_known_tail;

// assert that we are newer
assert!(release_pointer >= next_offset);
// assert that we are behind tail
assert!(last_known_tail > next_offset);

// Trim point is the the slot **before** the first readable record (if it exists)
// trim point might have been updated since last time.
Expand Down Expand Up @@ -225,7 +227,7 @@ impl Stream for LocalLogletReadStream {
log_id = *this.log_id,
next_offset = %next_offset,
trim_point = %potentially_different_trim_point,
release_pointer = %this.release_pointer,
last_known_tail = %this.last_known_tail,
"poll_next() has moved to a non-existent record, that should not happen!"
);
panic!("poll_next() has moved to a non-existent record, that should not happen!");
Expand Down
Loading

0 comments on commit 1cc77a6

Please sign in to comment.