diff --git a/Cargo.lock b/Cargo.lock index f442975cc..fb3614bcc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5439,6 +5439,7 @@ dependencies = [ "humantime", "metrics 0.23.0", "once_cell", + "parking_lot", "pin-project", "restate-core", "restate-metadata-store", diff --git a/crates/bifrost/Cargo.toml b/crates/bifrost/Cargo.toml index e63350be9..0ba1bc66a 100644 --- a/crates/bifrost/Cargo.toml +++ b/crates/bifrost/Cargo.toml @@ -31,6 +31,7 @@ futures = { workspace = true } humantime = { workspace = true } metrics = { workspace = true } once_cell = { workspace = true } +parking_lot = { workspace = true } pin-project = { workspace = true } rocksdb = { workspace = true } schemars = { workspace = true, optional = true } diff --git a/crates/bifrost/src/bifrost.rs b/crates/bifrost/src/bifrost.rs index 5941c84b4..4daf07c97 100644 --- a/crates/bifrost/src/bifrost.rs +++ b/crates/bifrost/src/bifrost.rs @@ -128,15 +128,15 @@ impl Bifrost { /// log_id, /// bifrost.get_trim_point(log_id).await.next(), /// bifrost.find_tail(log_id).await().offset().prev(), - /// ).await; + /// ); /// ``` - pub async fn create_reader( + pub fn create_reader( &self, log_id: LogId, start_lsn: Lsn, end_lsn: Lsn, ) -> Result { - LogReadStream::create(self.inner.clone(), log_id, start_lsn, end_lsn).await + LogReadStream::create(self.inner.clone(), log_id, start_lsn, end_lsn) } /// The tail is *the first unwritten LSN* in the log @@ -200,9 +200,7 @@ impl Bifrost { return Ok(Vec::default()); } - let reader = self - .create_reader(log_id, Lsn::OLDEST, current_tail.offset().prev()) - .await?; + let reader = self.create_reader(log_id, Lsn::OLDEST, current_tail.offset().prev())?; reader.try_collect().await } } @@ -213,7 +211,7 @@ static_assertions::assert_impl_all!(Bifrost: Send, Sync, Clone); // Locks in this data-structure are held for very short time and should never be // held across an async boundary. pub struct BifrostInner { - metadata: Metadata, + pub(crate) metadata: Metadata, #[allow(unused)] watchdog: WatchdogSender, // Initialized after BifrostService::start completes. diff --git a/crates/bifrost/src/loglet_wrapper.rs b/crates/bifrost/src/loglet_wrapper.rs index 321eb21ee..366e30aae 100644 --- a/crates/bifrost/src/loglet_wrapper.rs +++ b/crates/bifrost/src/loglet_wrapper.rs @@ -208,7 +208,6 @@ impl LogletBase for LogletWrapper { /// Wraps loglet read streams with the base LSN of the segment pub struct LogletReadStreamWrapper { pub(crate) base_lsn: Lsn, - #[allow(dead_code)] loglet: LogletWrapper, inner_read_stream: SendableLogletReadStream, } @@ -228,18 +227,15 @@ impl LogletReadStreamWrapper { /// The first LSN outside the boundary of this stream (bifrost's tail semantics) /// The read stream will return None and terminate before it reads this LSN - #[allow(dead_code)] #[inline(always)] pub fn tail_lsn(&self) -> Option { self.loglet.tail_lsn } - #[allow(dead_code)] pub fn set_tail_lsn(&mut self, tail_lsn: Lsn) { self.loglet.set_tail_lsn(tail_lsn) } - #[allow(dead_code)] #[inline(always)] pub fn loglet(&self) -> &LogletWrapper { &self.loglet diff --git a/crates/bifrost/src/providers/local_loglet/mod.rs b/crates/bifrost/src/providers/local_loglet/mod.rs index e7b32c053..d3f8597a7 100644 --- a/crates/bifrost/src/providers/local_loglet/mod.rs +++ b/crates/bifrost/src/providers/local_loglet/mod.rs @@ -75,7 +75,7 @@ impl std::fmt::Debug for LocalLoglet { } impl LocalLoglet { - pub async fn create( + pub fn create( loglet_id: u64, log_store: RocksDbLogStore, log_writer: RocksDbLogWriterHandle, @@ -445,17 +445,14 @@ mod tests { .create_writer() .start(config.clone().map(|c| &c.bifrost.local).boxed())?; - let loglet = Arc::new( - LocalLoglet::create( - params - .as_str() - .parse() - .expect("loglet params can be converted into u64"), - log_store, - log_writer, - ) - .await?, - ); + let loglet = Arc::new(LocalLoglet::create( + params + .as_str() + .parse() + .expect("loglet params can be converted into u64"), + log_store, + log_writer, + )?); gapless_loglet_smoke_test(loglet).await?; Ok(()) @@ -487,17 +484,14 @@ mod tests { .create_writer() .start(config.clone().map(|c| &c.bifrost.local).boxed())?; - let loglet = Arc::new( - LocalLoglet::create( - params - .as_str() - .parse() - .expect("loglet params can be converted into u64"), - log_store, - log_writer, - ) - .await?, - ); + let loglet = Arc::new(LocalLoglet::create( + params + .as_str() + .parse() + .expect("loglet params can be converted into u64"), + log_store, + log_writer, + )?); single_loglet_readstream_test(loglet).await?; Ok(()) @@ -529,17 +523,14 @@ mod tests { .create_writer() .start(config.clone().map(|c| &c.bifrost.local).boxed())?; - let loglet = Arc::new( - LocalLoglet::create( - params - .as_str() - .parse() - .expect("loglet params can be converted into u64"), - log_store, - log_writer, - ) - .await?, - ); + let loglet = Arc::new(LocalLoglet::create( + params + .as_str() + .parse() + .expect("loglet params can be converted into u64"), + log_store, + log_writer, + )?); single_loglet_readstream_test_with_trims(loglet).await?; Ok(()) @@ -570,17 +561,14 @@ mod tests { .create_writer() .start(config.clone().map(|c| &c.bifrost.local).boxed())?; - let loglet = Arc::new( - LocalLoglet::create( - params - .as_str() - .parse() - .expect("loglet params can be converted into u64"), - log_store, - log_writer, - ) - .await?, - ); + let loglet = Arc::new(LocalLoglet::create( + params + .as_str() + .parse() + .expect("loglet params can be converted into u64"), + log_store, + log_writer, + )?); loglet_test_append_after_seal(loglet).await?; Ok(()) @@ -613,9 +601,11 @@ mod tests { // Run the test 10 times for i in 1..=10 { - let loglet = Arc::new( - LocalLoglet::create(i, log_store.clone(), log_writer.clone()).await?, - ); + let loglet = Arc::new(LocalLoglet::create( + i, + log_store.clone(), + log_writer.clone(), + )?); loglet_test_append_after_seal_concurrent(loglet).await?; } diff --git a/crates/bifrost/src/providers/local_loglet/provider.rs b/crates/bifrost/src/providers/local_loglet/provider.rs index 7e336f2cb..9976733e8 100644 --- a/crates/bifrost/src/providers/local_loglet/provider.rs +++ b/crates/bifrost/src/providers/local_loglet/provider.rs @@ -12,7 +12,7 @@ use std::collections::{hash_map, HashMap}; use std::sync::Arc; use async_trait::async_trait; -use tokio::sync::Mutex as AsyncMutex; +use parking_lot::Mutex; use tracing::debug; use restate_types::config::{LocalLogletOptions, RocksDbOptions}; @@ -71,7 +71,7 @@ impl LogletProviderFactory for Factory { pub(crate) struct LocalLogletProvider { log_store: RocksDbLogStore, - active_loglets: AsyncMutex>>, + active_loglets: Mutex>>, log_writer: RocksDbLogWriterHandle, } @@ -81,7 +81,7 @@ impl LogletProvider for LocalLogletProvider { &self, params: &LogletParams, ) -> Result>, Error> { - let mut guard = self.active_loglets.lock().await; + let mut guard = self.active_loglets.lock(); let loglet = match guard.entry(params.as_str().to_owned()) { hash_map::Entry::Vacant(entry) => { // Create loglet @@ -93,8 +93,7 @@ impl LogletProvider for LocalLogletProvider { .expect("loglet params can be converted into u64"), self.log_store.clone(), self.log_writer.clone(), - ) - .await?; + )?; let loglet = entry.insert(Arc::new(loglet)); Arc::clone(loglet) } diff --git a/crates/bifrost/src/read_stream.rs b/crates/bifrost/src/read_stream.rs index 1ceeecfa0..844a1a720 100644 --- a/crates/bifrost/src/read_stream.rs +++ b/crates/bifrost/src/read_stream.rs @@ -8,90 +8,129 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::task::ready; use std::task::Poll; +use futures::future::BoxFuture; +use futures::stream::BoxStream; use futures::stream::FusedStream; use futures::Stream; use pin_project::pin_project; +use restate_core::MetadataKind; +use restate_core::ShutdownError; +use restate_types::logs::metadata::MaybeSegment; use restate_types::logs::SequenceNumber; use restate_types::logs::{LogId, Lsn}; +use restate_types::Version; +use restate_types::Versioned; use crate::bifrost::BifrostInner; +use crate::loglet::LogletBase; use crate::loglet_wrapper::LogletReadStreamWrapper; use crate::loglet_wrapper::LogletWrapper; -use crate::FindTailAttributes; +use crate::Error; use crate::LogRecord; use crate::Result; - +use crate::TailState; + +/// A read stream reads from the virtual log. The stream provides a unified view over +/// the virtual log addressing space in the face of seals, reconfiguration, and trims. +/// +// The use of [pin_project] is not strictly necessary but it's left to allow future +// substream implementations to be !Unpin without changing the read_stream. +#[must_use = "streams do nothing unless polled"] #[pin_project] pub struct LogReadStream { - #[pin] - current_loglet_stream: LogletReadStreamWrapper, - current_loglet: LogletWrapper, - inner: Arc, - _last_known_tail: Lsn, log_id: LogId, - // inclusive max lsn to read to + /// inclusive max LSN to read to end_lsn: Lsn, - terminated: bool, - /// Represents the next possible record to be read. - // This is akin to the lsn that can be passed to `read(from)` to read the - // next record in the log. + /// Represents the next record to read. + /// + /// This is akin to the lsn that can be passed to `read(from)` to read the + /// next record in the log. read_pointer: Lsn, + #[pin] + state: State, + /// Current substream we are reading from + #[pin] + substream: Option, + // IMPORTANT: Do not re-order this field. `inner` must be dropped last. This allows + // `state` to reference its lifetime as 'static. + bifrost_inner: Arc, +} + +/// The state machine encodes the necessary state changes in the read stream. +/// The order of variants roughly reflects the order of transitions in a typical case. +/// +#[pin_project(project = StateProj)] +enum State { + /// Initial state of the stream. No work has been done at this point + New, + /// Stream is waiting for bifrost to get a loglet that maps to the `read_pointer` + FindingLoglet { + /// The future to continue finding the loglet instance via Bifrost + #[pin] + find_loglet_fut: BoxFuture<'static, Result>, + }, + /// Waiting for the loglet read stream (substream) to be initialized + CreatingSubstream { + /// Future to continue creating the substream + #[pin] + create_stream_fut: BoxFuture<'static, Result>, + }, + /// Reading records from `substream` + Reading { + /// The tail LSN which is safe to use when the loglet is unsealed + safe_known_tail: Option, + #[pin] + tail_watch: Option>, + }, + /// Waiting for the tail LSN of the substream's loglet to be determined (sealing in-progress) + AwaitingReconfiguration { + /// Future to continue waiting on log metadata updates + #[pin] + log_metadata_watch_fut: Option>>, + }, + Terminated, } impl LogReadStream { - pub(crate) async fn create( - inner: Arc, + pub(crate) fn create( + bifrost_inner: Arc, log_id: LogId, start_lsn: Lsn, - // Inclusive. Use Lsn::MAX for a tailing stream. Once reached, stream will terminate - // (return Ready(None)). + // Inclusive. Use [`Lsn::MAX`] for a tailing stream. + // Once reached, the stream terminates. end_lsn: Lsn, ) -> Result { // Accidental reads from Lsn::INVALID are reset to Lsn::OLDEST let start_lsn = std::cmp::max(Lsn::OLDEST, start_lsn); - // todo: support switching loglets. At the moment, this is hard-wired to a single loglet - // implementation. - let current_loglet = inner - // find the loglet where the _next_ lsn resides. - .find_loglet_for_lsn(log_id, start_lsn) - .await?; - let (last_loglet, last_known_tail) = inner - .find_tail(log_id, FindTailAttributes::default()) - .await?; - debug_assert_eq!(last_loglet, current_loglet); - - let current_loglet_stream = current_loglet.create_wrapped_read_stream(start_lsn).await?; Ok(Self { - current_loglet_stream, - // reserved for future use - current_loglet: last_loglet, - // reserved for future use - _last_known_tail: last_known_tail.offset(), - inner, + bifrost_inner, log_id, read_pointer: start_lsn, end_lsn, - terminated: false, + substream: None, + state: State::New, }) } - pub fn is_terminated(&self) -> bool { - self.terminated - } - /// Current read pointer. This is the next (possible) record to be read. pub fn read_pointer(&self) -> Lsn { self.read_pointer } - /// The read pointer will point to the potential next LSN that we will read from on the next - /// poll_next() call. + /// Inclusive max LSN to read to + pub fn end_lsn(&self) -> Lsn { + self.end_lsn + } + + /// The read pointer points to the next LSN will be attempted on the next + /// `poll_next()`. fn calculate_read_pointer(record: &LogRecord) -> Lsn { match &record.record { // On trim gaps, we fast-forward the read pointer beyond the end of the gap. We do @@ -106,13 +145,13 @@ impl LogReadStream { impl FusedStream for LogReadStream { fn is_terminated(&self) -> bool { - self.terminated + matches!(self.state, State::Terminated) } } -/// Read the next record from the log after the current read pointer. The stream will yield +/// Read the next record from the log at the current read pointer. The stream will yield /// after the record is available to read, this will async-block indefinitely if no records are -/// ever written to the log beyond the read pointer. +/// ever written and released at the read pointer. impl Stream for LogReadStream { type Item = Result; @@ -120,33 +159,244 @@ impl Stream for LogReadStream { mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> Poll> { - if self.read_pointer > self.end_lsn { - self.as_mut().terminated = true; - return Poll::Ready(None); - } - // Are we after the known tail? - // todo: refresh the tail (in a multi-loglet universe) - let maybe_record = ready!(self - .as_mut() - .project() - .current_loglet_stream - .as_mut() - .poll_next(cx)); - match maybe_record { - Some(Ok(record)) => { - let record = record - .decode() - .expect("decoding a bifrost envelope succeeds"); - let new_pointer = Self::calculate_read_pointer(&record); - debug_assert!(new_pointer > self.read_pointer); - self.read_pointer = new_pointer; - Poll::Ready(Some(Ok(record))) + // # Safety + // BifrostInner is dropped last, we can safely lift it's lifetime to 'static as + // long as we don't leak this externally. External users should not see any `'static` + // lifetime as a result. + let bifrost_inner = unsafe { &*Arc::as_ptr(&self.bifrost_inner) }; + + let mut this = self.as_mut().project(); + loop { + let state = this.state.as_mut().project(); + // We have reached the end of the stream. + if *this.read_pointer == Lsn::MAX || *this.read_pointer > *this.end_lsn { + this.state.set(State::Terminated); + return Poll::Ready(None); } - Some(Err(e)) => Poll::Ready(Some(Err(e.into()))), - None => { - // todo: check if we should switch the loglet. - self.as_mut().terminated = true; - Poll::Ready(None) + + match state { + StateProj::New => { + let find_loglet_fut = Box::pin( + bifrost_inner.find_loglet_for_lsn(*this.log_id, *this.read_pointer), + ); + // => Find Loglet + this.state.set(State::FindingLoglet { find_loglet_fut }); + } + + // Finding a loglet and creating the loglet instance through the provider + StateProj::FindingLoglet { find_loglet_fut } => { + let loglet = match ready!(find_loglet_fut.poll(cx)) { + Ok(loglet) => loglet, + Err(e) => { + this.state.set(State::Terminated); + return Poll::Ready(Some(Err(e))); + } + }; + // create sub-stream to read from this loglet. + let create_stream_fut = + Box::pin(loglet.create_wrapped_read_stream(*this.read_pointer)); + // => Create Substream + this.state + .set(State::CreatingSubstream { create_stream_fut }); + } + + // Creating a new substream + StateProj::CreatingSubstream { create_stream_fut } => { + let substream = match ready!(create_stream_fut.poll(cx)) { + Ok(substream) => substream, + Err(e) => { + this.state.set(State::Terminated); + return Poll::Ready(Some(Err(e))); + } + }; + let safe_known_tail = substream.tail_lsn(); + // If the substream's tail is unknown, we will need to watch the tail updates. + let tail_watch = if safe_known_tail.is_none() { + Some(substream.loglet().watch_tail()) + } else { + None + }; + // => Start Reading + this.substream.set(Some(substream)); + this.state.set(State::Reading { + safe_known_tail, + tail_watch, + }); + } + + // Reading from the current substream + StateProj::Reading { + safe_known_tail, + tail_watch, + } => { + // Continue driving the substream + // + // This depends on whether we know its tail (if sealed), or if the value of + // `safe_known_tail` is higher than the `read_pointer` of the substream. + let Some(substream) = this.substream.as_mut().as_pin_mut() else { + panic!("substream must be set at this point"); + }; + + // If the loglet's `tail_lsn` is known, this is the tail we should always respect. + match substream.tail_lsn() { + // Next LSN is beyond the boundaries of this substream + Some(tail) if *this.read_pointer >= tail => { + // Switch loglets. + let find_loglet_fut = Box::pin( + bifrost_inner.find_loglet_for_lsn(*this.log_id, *this.read_pointer), + ); + // => Find the next loglet. We know we _probably_ have one, otherwise + // `stream_tail_lsn` wouldn't have been set. + this.substream.set(None); + this.state.set(State::FindingLoglet { find_loglet_fut }); + continue; + } + // Unsealed loglet, we can only read as far as the safe unsealed tail. + None => { + if safe_known_tail.is_none() + || safe_known_tail + .is_some_and(|known_tail| *this.read_pointer >= known_tail) + { + // Wait for tail update... + let Some(tail_watch) = tail_watch.as_pin_mut() else { + panic!("tail_watch must be set on non-sealed read streams"); + }; + // If the loglet is being sealed, we must wait for reconfiguration to complete. + let maybe_tail = ready!(tail_watch.poll_next(cx)); + match maybe_tail { + None => { + // Shutdown.... + this.substream.set(None); + this.state.set(State::Terminated); + return Poll::Ready(Some(Err(ShutdownError.into()))); + } + Some(TailState::Open(tail)) => { + // Safe to consider this as a tail. + *safe_known_tail = Some(tail); + } + Some(TailState::Sealed(_)) => { + // Wait for reconfiguration to complete. + // + // Note that we don't reset the substream here because + // reconfiguration might bring us back to Reading on the + // same substream, we don't want to lose the resources + // allocated by underlying the stream. + this.state.set(State::AwaitingReconfiguration { + log_metadata_watch_fut: None, + }); + continue; + } + } + } + } + // We are well within the bounds of this loglet. Continue reading. + Some(_) => { /* fall-through */ } + } + let maybe_record = ready!(substream.poll_next(cx)); + match maybe_record { + Some(Ok(record)) => { + let record = record + .decode() + .expect("decoding a bifrost envelope succeeds"); + let new_pointer = Self::calculate_read_pointer(&record); + debug_assert!(new_pointer > *this.read_pointer); + *this.read_pointer = new_pointer; + return Poll::Ready(Some(Ok(record))); + } + // The assumption here is that underlying stream won't move its read + // pointer on error. + Some(Err(e)) => return Poll::Ready(Some(Err(e.into()))), + None => { + // We should, almost never, reach this. + this.substream.set(None); + this.state.set(State::Terminated); + return Poll::Ready(None); + } + } + } + + // Waiting for the substream's loglet to be sealed + StateProj::AwaitingReconfiguration { + mut log_metadata_watch_fut, + } => { + // If a metadata watch is set, poll it. + if let Some(watch_fut) = log_metadata_watch_fut.as_mut().as_pin_mut() { + let _ = ready!(watch_fut.poll(cx))?; + } + + let Some(mut substream) = this.substream.as_mut().as_pin_mut() else { + panic!("substream must be set at this point"); + }; + + let log_metadata = bifrost_inner.metadata.logs(); + // Does metadata indicate that the `base_lsn` of the current substream + // points to a sealed segment? + // + // Why do we use `base_lsn` instead of `read_pointer`? Because if the loglet is + // sealed, the read_pointer might end up being the `base_lsn` of the next + // segment. We don't need to handle the transition to the next segment here + // since we have this handled in Reading state. We just need to set the + // `tail_lsn` on the substream once we determine it. + // + // todo (asoli): Handle empty sealed loglets. Ideally, we want to compare the segment returned + // with the one backing the current loglet, if they are different, we should + // recreate the substream and let the normal flow take over to move to the + // replacement loglet. Unfortunately, at the moment we don't have a reliable + // way to do that. + // + // The log is gone! + let Some(chain) = log_metadata.chain(this.log_id) else { + this.substream.set(None); + this.state.set(State::Terminated); + return Poll::Ready(Some(Err(Error::UnknownLogId(*this.log_id)))); + }; + + match chain.find_segment_for_lsn(substream.base_lsn) { + MaybeSegment::Some(segment) if segment.tail_lsn.is_some() => { + let sealed_tail = segment.tail_lsn.unwrap(); + substream.set_tail_lsn(segment.tail_lsn.unwrap()); + // go back to reading. + this.state.set(State::Reading { + safe_known_tail: Some(sealed_tail), + // No need for the tail watch since we know the tail already. + tail_watch: None, + }); + continue; + } + // Oh, we have a prefix trim, deliver the trim-gap and fast-forward. + MaybeSegment::Trim { next_base_lsn } => { + let read_pointer = *this.read_pointer; + let record = LogRecord::new_trim_gap(read_pointer, next_base_lsn); + // fast-forward. + *this.read_pointer = next_base_lsn; + let find_loglet_fut = Box::pin( + bifrost_inner.find_loglet_for_lsn(*this.log_id, *this.read_pointer), + ); + // => Find Loglet + this.substream.set(None); + this.state.set(State::FindingLoglet { find_loglet_fut }); + // Deliver the trim gap + return Poll::Ready(Some(Ok(record))); + } + // Segment is not sealed yet. + MaybeSegment::Some(_) => { /* fall-through */ } + }; + + // Reconfiguration still ongoing... + let metadata_version = log_metadata.version(); + + // No hope at this metadata version, wait for the next update. + let metadata_watch_fut = Box::pin( + bifrost_inner + .metadata + .wait_for_version(MetadataKind::Logs, metadata_version.next()), + ); + log_metadata_watch_fut.set(Some(metadata_watch_fut)); + continue; + } + StateProj::Terminated => { + return Poll::Ready(None); + } } } } @@ -157,17 +407,22 @@ mod tests { use std::sync::atomic::AtomicUsize; - use crate::{BifrostService, Record, TrimGap}; + use crate::loglet::LogletBase; + use crate::{setup_panic_handler, BifrostService, FindTailAttributes, Record, TrimGap}; use super::*; use bytes::Bytes; use googletest::prelude::*; - use restate_core::{metadata, task_center, TaskKind, TestCoreEnvBuilder}; + use restate_core::{ + metadata, task_center, MetadataKind, TargetVersion, TaskKind, TestCoreEnvBuilder, + }; use restate_rocksdb::RocksDbManager; use restate_types::config::{CommonOptions, Configuration}; use restate_types::live::{Constant, Live}; - use restate_types::logs::metadata::ProviderKind; + use restate_types::logs::metadata::{new_single_node_loglet_params, ProviderKind}; + use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY; + use restate_types::Versioned; use tokio_stream::StreamExt; use tracing::info; use tracing_test::traced_test; @@ -177,13 +432,7 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[traced_test] async fn test_readstream_one_loglet() -> anyhow::Result<()> { - // Make sure that panics exits the process. - let orig_hook = std::panic::take_hook(); - std::panic::set_hook(Box::new(move |panic_info| { - // invoke the default handler and exit the process - orig_hook(panic_info); - std::process::exit(1); - })); + setup_panic_handler(); let node_env = TestCoreEnvBuilder::new_with_mock_network() .set_provider_kind(ProviderKind::Local) @@ -202,7 +451,7 @@ mod tests { let bifrost = svc.handle(); svc.start().await.expect("loglet must start"); - let mut reader = bifrost.create_reader(log_id, read_from, Lsn::MAX).await?; + let mut reader = bifrost.create_reader(log_id, read_from, Lsn::MAX)?; let tail = bifrost .find_tail(log_id, FindTailAttributes::default()) @@ -274,13 +523,7 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[traced_test] async fn test_read_stream_with_trim() -> anyhow::Result<()> { - // Make sure that panics exits the process. - let orig_hook = std::panic::take_hook(); - std::panic::set_hook(Box::new(move |panic_info| { - // invoke the default handler and exit the process - orig_hook(panic_info); - std::process::exit(1); - })); + setup_panic_handler(); let node_env = TestCoreEnvBuilder::new_with_mock_network() .set_provider_kind(ProviderKind::Local) @@ -318,7 +561,7 @@ mod tests { ); assert_eq!(Lsn::from(5), bifrost.get_trim_point(log_id).await?); - let mut read_stream = bifrost.create_reader(log_id, Lsn::OLDEST, Lsn::MAX).await?; + let mut read_stream = bifrost.create_reader(log_id, Lsn::OLDEST, Lsn::MAX)?; let record = read_stream.next().await.unwrap()?; assert_that!( @@ -399,4 +642,310 @@ mod tests { }) .await } + + // Note: This test doesn't validate read stream behaviour with zombie records at seal boundary. + #[tokio::test(start_paused = true)] + async fn test_readstream_simple_multi_loglet() -> anyhow::Result<()> { + setup_panic_handler(); + const LOG_ID: LogId = LogId::new(0); + + let node_env = TestCoreEnvBuilder::new_with_mock_network() + .set_provider_kind(ProviderKind::Local) + .build() + .await; + + let tc = node_env.tc; + tc.run_in_scope("test", None, async { + let config = Live::from_value(Configuration::default()); + RocksDbManager::init(Constant::new(CommonOptions::default())); + + // enable both in-memory and local loglet types + let svc = BifrostService::new(task_center(), metadata()) + .enable_local_loglet(&config) + .enable_in_memory_loglet(); + let bifrost = svc.handle(); + svc.start().await.expect("loglet must start"); + + // create the reader and put it on the side. + let mut reader = bifrost.create_reader(LOG_ID, Lsn::OLDEST, Lsn::MAX)?; + // We should be at tail, any attempt to read will yield `pending`. + assert_that!( + futures::poll!(std::pin::pin!(reader.next())), + pat!(Poll::Pending) + ); + + let tail = bifrost + .find_tail(LOG_ID, FindTailAttributes::default()) + .await?; + // no records have been written + assert!(!tail.is_sealed()); + assert_eq!(Lsn::OLDEST, tail.offset()); + assert_eq!(Lsn::OLDEST, reader.read_pointer()); + + // Nothing is trimmed + assert_eq!(Lsn::INVALID, bifrost.get_trim_point(LOG_ID).await?); + + // append 10 records [1..10] + for i in 1..=10 { + let lsn = bifrost + .append(LOG_ID, Payload::new(format!("segment-1-{}", i))) + .await?; + assert_eq!(Lsn::from(i), lsn); + } + + // read 5 records. + for i in 1..=5 { + let record = reader.next().await.expect("to stay alive")?; + assert_eq!(Lsn::from(i), record.offset); + assert_eq!(reader.read_pointer(), record.offset.next()); + assert_eq!( + Payload::new(format!("segment-1-{}", i)).body(), + record.record.into_payload_unchecked().body() + ); + } + + // manually seal the loglet, create a new in-memory loglet at base_lsn=11 + let raw_loglet = bifrost + .inner() + .find_loglet_for_lsn(LOG_ID, Lsn::new(5)) + .await?; + raw_loglet.seal().await?; + // In fact, reader is allowed to go as far as the last known unsealed tail which + // in our case could be the real tail since we didn't have in-flight appends at seal + // time. It's legal for loglets to have lagging indicator of the unsealed pointer but + // we know that local loglet won't do this. + // + // read 5 more records. + println!("reading records at sealed loglet"); + for i in 6..=10 { + let record = reader.next().await.expect("to stay alive")?; + assert_eq!(Lsn::from(i), record.offset); + assert_eq!(reader.read_pointer(), record.offset.next()); + assert_eq!( + Payload::new(format!("segment-1-{}", i)).body(), + record.record.into_payload_unchecked().body() + ); + } + + // reads should yield pending since we are at the last known unsealed tail + // loglet. + assert_that!( + futures::poll!(std::pin::pin!(reader.next())), + pat!(Poll::Pending) + ); + // again. + assert_that!( + futures::poll!(std::pin::pin!(reader.next())), + pat!(Poll::Pending) + ); + + let tail = bifrost + .find_tail(LOG_ID, FindTailAttributes::default()) + .await?; + + assert!(tail.is_sealed()); + assert_eq!(Lsn::from(11), tail.offset()); + // perform manual reconfiguration (can be replaced with bifrost reconfiguration API + // when it's implemented) + let old_version = bifrost.inner().metadata.logs_version(); + let mut builder = bifrost.inner().metadata.logs().clone().into_builder(); + let mut chain_builder = builder.chain(&LOG_ID).unwrap(); + assert_eq!(1, chain_builder.num_segments()); + let new_segment_params = new_single_node_loglet_params(ProviderKind::InMemory); + chain_builder.append_segment( + Lsn::new(11), + ProviderKind::InMemory, + new_segment_params, + )?; + + let new_metadata = builder.build(); + let new_version = new_metadata.version(); + assert_eq!(new_version, old_version.next()); + node_env + .metadata_store_client + .put( + BIFROST_CONFIG_KEY.clone(), + new_metadata, + restate_metadata_store::Precondition::MatchesVersion(old_version), + ) + .await?; + + // make sure we have updated metadata. + bifrost + .inner() + .metadata + .sync(MetadataKind::Logs, TargetVersion::Latest) + .await?; + + // append 5 more records into the new loglet. + for i in 11..=15 { + let lsn = bifrost + .append(LOG_ID, Payload::new(format!("segment-2-{}", i))) + .await?; + println!("appended record={}", lsn); + assert_eq!(Lsn::from(i), lsn); + } + + // read stream should jump across segments. + for i in 11..=15 { + let record = reader.next().await.expect("to stay alive")?; + assert_eq!(Lsn::from(i), record.offset); + assert_eq!(reader.read_pointer(), record.offset.next()); + assert_eq!( + Payload::new(format!("segment-2-{}", i)).body(), + record.record.into_payload_unchecked().body() + ); + } + // We are at tail. validate. + assert_that!( + futures::poll!(std::pin::pin!(reader.next())), + pat!(Poll::Pending) + ); + + assert_eq!( + Lsn::from(16), + bifrost + .append(LOG_ID, Payload::new("segment-2-1000")) + .await? + ); + + let record = reader.next().await.expect("to stay alive")?; + assert_eq!(Lsn::from(16), record.offset); + assert_eq!( + Payload::new("segment-2-1000").body(), + record.record.into_payload_unchecked().body() + ); + + anyhow::Ok(()) + }) + .await?; + Ok(()) + } + + #[tokio::test(start_paused = true)] + async fn test_readstream_sealed_multi_loglet() -> anyhow::Result<()> { + setup_panic_handler(); + const LOG_ID: LogId = LogId::new(0); + + let node_env = TestCoreEnvBuilder::new_with_mock_network() + .set_provider_kind(ProviderKind::Local) + .build() + .await; + + let tc = node_env.tc; + tc.run_in_scope("test", None, async { + let config = Live::from_value(Configuration::default()); + RocksDbManager::init(Constant::new(CommonOptions::default())); + + // enable both in-memory and local loglet types + let svc = BifrostService::new(task_center(), metadata()) + .enable_local_loglet(&config) + .enable_in_memory_loglet(); + let bifrost = svc.handle(); + svc.start().await.expect("loglet must start"); + + let tail = bifrost + .find_tail(LOG_ID, FindTailAttributes::default()) + .await?; + // no records have been written + assert!(!tail.is_sealed()); + assert_eq!(Lsn::OLDEST, tail.offset()); + + // append 10 records [1..10] + for i in 1..=10 { + let lsn = bifrost + .append(LOG_ID, Payload::new(format!("segment-1-{}", i))) + .await?; + assert_eq!(Lsn::from(i), lsn); + } + + // manually seal the loglet, create a new in-memory loglet at base_lsn=11 + let raw_loglet = bifrost + .inner() + .find_loglet_for_lsn(LOG_ID, Lsn::new(5)) + .await?; + raw_loglet.seal().await?; + + let tail = bifrost + .find_tail(LOG_ID, FindTailAttributes::default()) + .await?; + + assert!(tail.is_sealed()); + assert_eq!(Lsn::from(11), tail.offset()); + // perform manual reconfiguration (can be replaced with bifrost reconfiguration API + // when it's implemented) + let old_version = bifrost.inner().metadata.logs_version(); + let mut builder = bifrost.inner().metadata.logs().clone().into_builder(); + let mut chain_builder = builder.chain(&LOG_ID).unwrap(); + assert_eq!(1, chain_builder.num_segments()); + let new_segment_params = new_single_node_loglet_params(ProviderKind::InMemory); + chain_builder.append_segment( + Lsn::new(11), + ProviderKind::InMemory, + new_segment_params, + )?; + let new_metadata = builder.build(); + let new_version = new_metadata.version(); + assert_eq!(new_version, old_version.next()); + node_env + .metadata_store_client + .put( + BIFROST_CONFIG_KEY.clone(), + new_metadata, + restate_metadata_store::Precondition::MatchesVersion(old_version), + ) + .await?; + + // make sure we have updated metadata. + bifrost + .inner() + .metadata + .sync(MetadataKind::Logs, TargetVersion::Latest) + .await?; + + // append 5 more records into the new loglet. + for i in 11..=15 { + let lsn = bifrost + .append(LOG_ID, Payload::new(format!("segment-2-{}", i))) + .await?; + info!(?lsn, "appended record"); + assert_eq!(Lsn::from(i), lsn); + } + + // start a reader (from 3) and read everything. [3..15] + let mut reader = bifrost.create_reader(LOG_ID, Lsn::new(3), Lsn::MAX)?; + + // first segment records + for i in 3..=10 { + let record = reader.next().await.expect("to stay alive")?; + assert_eq!(Lsn::from(i), record.offset); + assert_eq!(reader.read_pointer(), record.offset.next()); + assert_eq!( + Payload::new(format!("segment-1-{}", i)).body(), + record.record.into_payload_unchecked().body() + ); + } + + // first segment records + for i in 11..=15 { + let record = reader.next().await.expect("to stay alive")?; + assert_eq!(Lsn::from(i), record.offset); + assert_eq!(reader.read_pointer(), record.offset.next()); + assert_eq!( + Payload::new(format!("segment-2-{}", i)).body(), + record.record.into_payload_unchecked().body() + ); + } + + // We are at tail. validate. + assert_that!( + futures::poll!(std::pin::pin!(reader.next())), + pat!(Poll::Pending) + ); + + anyhow::Ok(()) + }) + .await?; + Ok(()) + } } diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 06e86dd09..7710403d3 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -243,8 +243,7 @@ where LogId::from(self.partition_id), last_applied_lsn.next(), Lsn::MAX, - ) - .await? + )? .map_ok(|record| { let LogRecord { record, offset } = record; match record { diff --git a/crates/worker/src/partition/shuffle.rs b/crates/worker/src/partition/shuffle.rs index c73401d2a..accc67e8d 100644 --- a/crates/worker/src/partition/shuffle.rs +++ b/crates/worker/src/partition/shuffle.rs @@ -679,10 +679,11 @@ mod tests { None, shuffle_env.shuffle.run(), )?; - let reader = shuffle_env - .bifrost - .create_reader(LogId::from(partition_id), Lsn::OLDEST, Lsn::MAX) - .await?; + let reader = shuffle_env.bifrost.create_reader( + LogId::from(partition_id), + Lsn::OLDEST, + Lsn::MAX, + )?; let messages = collect_invoke_commands_until(reader, last_invocation_id).await?; @@ -723,10 +724,11 @@ mod tests { None, shuffle_env.shuffle.run(), )?; - let reader = shuffle_env - .bifrost - .create_reader(LogId::from(partition_id), Lsn::OLDEST, Lsn::MAX) - .await?; + let reader = shuffle_env.bifrost.create_reader( + LogId::from(partition_id), + Lsn::OLDEST, + Lsn::MAX, + )?; let messages = collect_invoke_commands_until(reader, last_invocation_id).await?; @@ -759,10 +761,11 @@ mod tests { let shuffle_task_id = tc .run_in_scope("test", None, async { let partition_id = shuffle_env.shuffle.metadata.partition_id; - let reader = shuffle_env - .bifrost - .create_reader(LogId::from(partition_id), Lsn::INVALID, Lsn::MAX) - .await?; + let reader = shuffle_env.bifrost.create_reader( + LogId::from(partition_id), + Lsn::INVALID, + Lsn::MAX, + )?; let total_restarts = Arc::clone(&total_restarts); let shuffle_task = diff --git a/tools/bifrost-benchpress/src/write_to_read.rs b/tools/bifrost-benchpress/src/write_to_read.rs index 19f66f287..781f41973 100644 --- a/tools/bifrost-benchpress/src/write_to_read.rs +++ b/tools/bifrost-benchpress/src/write_to_read.rs @@ -41,7 +41,7 @@ pub async fn run( let bifrost = bifrost.clone(); let clock = clock.clone(); async move { - let mut read_stream = bifrost.create_reader(log_id, Lsn::OLDEST, Lsn::MAX).await?; + let mut read_stream = bifrost.create_reader(log_id, Lsn::OLDEST, Lsn::MAX)?; let mut counter = 0; let mut cancel = std::pin::pin!(cancellation_watcher()); let mut lag_latencies = Histogram::::new(3)?;