Skip to content

Commit

Permalink
[Bifrost] Read stream support multi-segment logs
Browse files Browse the repository at this point in the history
This introduces a new read stream implementation that operates under a multi-segment bifrost world. Notable features include:
- Support for reading from multiple segments seamlessly
- Reading unsealed segments while watching the tail state to determine the safe boundaries with minimal efficiency loss
- Handling of on-going reconfiguration, the stream waits for the loglet to be sealed.
- Handles prefix trims on metadata-level when detected (partial support, more on that in follow up PRs)

Running bifrost-benchpress read-to-write latency tests show that the new read-stream doesn't introduce any meaningful regression in latency in the the unsealed close-to-tail case (note that P100 should be discarded due to shutdown-related noise)

Write-to-read latency:
```
New                                  Old
Total records read: 98317            Total records read: 97871
P50: 67.455µs                        P50: 67.519µs
P90: 77.951µs                        P90: 77.183µs
P99: 96.447µs                        P99: 94.143µs
P999: 129.215µs                      P999: 122.815µs
```
  • Loading branch information
AhmedSoliman committed Jul 26, 2024
1 parent 032f7a5 commit 86a3ff5
Show file tree
Hide file tree
Showing 10 changed files with 704 additions and 168 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ futures = { workspace = true }
humantime = { workspace = true }
metrics = { workspace = true }
once_cell = { workspace = true }
parking_lot = { workspace = true }
pin-project = { workspace = true }
rocksdb = { workspace = true }
schemars = { workspace = true, optional = true }
Expand Down
12 changes: 5 additions & 7 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,15 @@ impl Bifrost {
/// log_id,
/// bifrost.get_trim_point(log_id).await.next(),
/// bifrost.find_tail(log_id).await().offset().prev(),
/// ).await;
/// );
/// ```
pub async fn create_reader(
pub fn create_reader(
&self,
log_id: LogId,
start_lsn: Lsn,
end_lsn: Lsn,
) -> Result<LogReadStream> {
LogReadStream::create(self.inner.clone(), log_id, start_lsn, end_lsn).await
LogReadStream::create(self.inner.clone(), log_id, start_lsn, end_lsn)
}

/// The tail is *the first unwritten LSN* in the log
Expand Down Expand Up @@ -200,9 +200,7 @@ impl Bifrost {
return Ok(Vec::default());
}

let reader = self
.create_reader(log_id, Lsn::OLDEST, current_tail.offset().prev())
.await?;
let reader = self.create_reader(log_id, Lsn::OLDEST, current_tail.offset().prev())?;
reader.try_collect().await
}
}
Expand All @@ -213,7 +211,7 @@ static_assertions::assert_impl_all!(Bifrost: Send, Sync, Clone);
// Locks in this data-structure are held for very short time and should never be
// held across an async boundary.
pub struct BifrostInner {
metadata: Metadata,
pub(crate) metadata: Metadata,
#[allow(unused)]
watchdog: WatchdogSender,
// Initialized after BifrostService::start completes.
Expand Down
4 changes: 0 additions & 4 deletions crates/bifrost/src/loglet_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ impl LogletBase for LogletWrapper {
/// Wraps loglet read streams with the base LSN of the segment
pub struct LogletReadStreamWrapper {
pub(crate) base_lsn: Lsn,
#[allow(dead_code)]
loglet: LogletWrapper,
inner_read_stream: SendableLogletReadStream<LogletOffset>,
}
Expand All @@ -228,18 +227,15 @@ impl LogletReadStreamWrapper {

/// The first LSN outside the boundary of this stream (bifrost's tail semantics)
/// The read stream will return None and terminate before it reads this LSN
#[allow(dead_code)]
#[inline(always)]
pub fn tail_lsn(&self) -> Option<Lsn> {
self.loglet.tail_lsn
}

#[allow(dead_code)]
pub fn set_tail_lsn(&mut self, tail_lsn: Lsn) {
self.loglet.set_tail_lsn(tail_lsn)
}

#[allow(dead_code)]
#[inline(always)]
pub fn loglet(&self) -> &LogletWrapper {
&self.loglet
Expand Down
86 changes: 38 additions & 48 deletions crates/bifrost/src/providers/local_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl std::fmt::Debug for LocalLoglet {
}

impl LocalLoglet {
pub async fn create(
pub fn create(
loglet_id: u64,
log_store: RocksDbLogStore,
log_writer: RocksDbLogWriterHandle,
Expand Down Expand Up @@ -445,17 +445,14 @@ mod tests {
.create_writer()
.start(config.clone().map(|c| &c.bifrost.local).boxed())?;

let loglet = Arc::new(
LocalLoglet::create(
params
.as_str()
.parse()
.expect("loglet params can be converted into u64"),
log_store,
log_writer,
)
.await?,
);
let loglet = Arc::new(LocalLoglet::create(
params
.as_str()
.parse()
.expect("loglet params can be converted into u64"),
log_store,
log_writer,
)?);

gapless_loglet_smoke_test(loglet).await?;
Ok(())
Expand Down Expand Up @@ -487,17 +484,14 @@ mod tests {
.create_writer()
.start(config.clone().map(|c| &c.bifrost.local).boxed())?;

let loglet = Arc::new(
LocalLoglet::create(
params
.as_str()
.parse()
.expect("loglet params can be converted into u64"),
log_store,
log_writer,
)
.await?,
);
let loglet = Arc::new(LocalLoglet::create(
params
.as_str()
.parse()
.expect("loglet params can be converted into u64"),
log_store,
log_writer,
)?);

single_loglet_readstream_test(loglet).await?;
Ok(())
Expand Down Expand Up @@ -529,17 +523,14 @@ mod tests {
.create_writer()
.start(config.clone().map(|c| &c.bifrost.local).boxed())?;

let loglet = Arc::new(
LocalLoglet::create(
params
.as_str()
.parse()
.expect("loglet params can be converted into u64"),
log_store,
log_writer,
)
.await?,
);
let loglet = Arc::new(LocalLoglet::create(
params
.as_str()
.parse()
.expect("loglet params can be converted into u64"),
log_store,
log_writer,
)?);

single_loglet_readstream_test_with_trims(loglet).await?;
Ok(())
Expand Down Expand Up @@ -570,17 +561,14 @@ mod tests {
.create_writer()
.start(config.clone().map(|c| &c.bifrost.local).boxed())?;

let loglet = Arc::new(
LocalLoglet::create(
params
.as_str()
.parse()
.expect("loglet params can be converted into u64"),
log_store,
log_writer,
)
.await?,
);
let loglet = Arc::new(LocalLoglet::create(
params
.as_str()
.parse()
.expect("loglet params can be converted into u64"),
log_store,
log_writer,
)?);

loglet_test_append_after_seal(loglet).await?;
Ok(())
Expand Down Expand Up @@ -613,9 +601,11 @@ mod tests {

// Run the test 10 times
for i in 1..=10 {
let loglet = Arc::new(
LocalLoglet::create(i, log_store.clone(), log_writer.clone()).await?,
);
let loglet = Arc::new(LocalLoglet::create(
i,
log_store.clone(),
log_writer.clone(),
)?);
loglet_test_append_after_seal_concurrent(loglet).await?;
}

Expand Down
9 changes: 4 additions & 5 deletions crates/bifrost/src/providers/local_loglet/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::collections::{hash_map, HashMap};
use std::sync::Arc;

use async_trait::async_trait;
use tokio::sync::Mutex as AsyncMutex;
use parking_lot::Mutex;
use tracing::debug;

use restate_types::config::{LocalLogletOptions, RocksDbOptions};
Expand Down Expand Up @@ -71,7 +71,7 @@ impl LogletProviderFactory for Factory {

pub(crate) struct LocalLogletProvider {
log_store: RocksDbLogStore,
active_loglets: AsyncMutex<HashMap<String, Arc<LocalLoglet>>>,
active_loglets: Mutex<HashMap<String, Arc<LocalLoglet>>>,
log_writer: RocksDbLogWriterHandle,
}

Expand All @@ -81,7 +81,7 @@ impl LogletProvider for LocalLogletProvider {
&self,
params: &LogletParams,
) -> Result<Arc<dyn Loglet<Offset = LogletOffset>>, Error> {
let mut guard = self.active_loglets.lock().await;
let mut guard = self.active_loglets.lock();
let loglet = match guard.entry(params.as_str().to_owned()) {
hash_map::Entry::Vacant(entry) => {
// Create loglet
Expand All @@ -93,8 +93,7 @@ impl LogletProvider for LocalLogletProvider {
.expect("loglet params can be converted into u64"),
self.log_store.clone(),
self.log_writer.clone(),
)
.await?;
)?;
let loglet = entry.insert(Arc::new(loglet));
Arc::clone(loglet)
}
Expand Down
Loading

0 comments on commit 86a3ff5

Please sign in to comment.