Skip to content

Commit

Permalink
[Bifrost] Read stream support multi-segment logs
Browse files Browse the repository at this point in the history
This introduces a new read stream implementation that operates under a multi-segment bifrost world. Notable features include:
- Support for reading from multiple segments seamlessly
- Reading unsealed segments while watching the tail state to determine the safe boundaries with minimal efficiency loss
- Handling of on-going reconfiguration, the stream waits for the loglet to be sealed.
- Handles prefix trims on metadata-level when detected (partial support, more on that in follow up PRs)

Running bifrost-benchpress read-to-write latency tests show that the new read-stream doesn't introduce any meaningful regression in latency in the the unsealed close-to-tail case (note that P100 should be discarded due to shutdown-related noise)

Write-to-read latency:
```
New                                  Old
Total records read: 98317            Total records read: 97871
P50: 67.455µs                        P50: 67.519µs
P90: 77.951µs                        P90: 77.183µs
P99: 96.447µs                        P99: 94.143µs
P999: 129.215µs                      P999: 122.815µs
```
  • Loading branch information
AhmedSoliman committed Jul 24, 2024
1 parent dd3cb68 commit cae8f5f
Show file tree
Hide file tree
Showing 6 changed files with 666 additions and 114 deletions.
12 changes: 5 additions & 7 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,15 @@ impl Bifrost {
/// log_id,
/// bifrost.get_trim_point(log_id).await.next(),
/// bifrost.find_tail(log_id).await().offset().prev(),
/// ).await;
/// );
/// ```
pub async fn create_reader(
pub fn create_reader(
&self,
log_id: LogId,
start_lsn: Lsn,
end_lsn: Lsn,
) -> Result<LogReadStream> {
LogReadStream::create(self.inner.clone(), log_id, start_lsn, end_lsn).await
LogReadStream::create(self.inner.clone(), log_id, start_lsn, end_lsn)
}

/// The tail is *the first unwritten LSN* in the log
Expand Down Expand Up @@ -200,9 +200,7 @@ impl Bifrost {
return Ok(Vec::default());
}

let reader = self
.create_reader(log_id, Lsn::OLDEST, current_tail.offset().prev())
.await?;
let reader = self.create_reader(log_id, Lsn::OLDEST, current_tail.offset().prev())?;
reader.try_collect().await
}
}
Expand All @@ -213,7 +211,7 @@ static_assertions::assert_impl_all!(Bifrost: Send, Sync, Clone);
// Locks in this data-structure are held for very short time and should never be
// held across an async boundary.
pub struct BifrostInner {
metadata: Metadata,
pub(crate) metadata: Metadata,
#[allow(unused)]
watchdog: WatchdogSender,
// Initialized after BifrostService::start completes.
Expand Down
4 changes: 0 additions & 4 deletions crates/bifrost/src/loglet_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ impl LogletBase for LogletWrapper {
/// Wraps loglet read streams with the base LSN of the segment
pub struct LogletReadStreamWrapper {
pub(crate) base_lsn: Lsn,
#[allow(dead_code)]
loglet: LogletWrapper,
inner_read_stream: SendableLogletReadStream<LogletOffset>,
}
Expand All @@ -228,18 +227,15 @@ impl LogletReadStreamWrapper {

/// The first LSN outside the boundary of this stream (bifrost's tail semantics)
/// The read stream will return None and terminate before it reads this LSN
#[allow(dead_code)]
#[inline(always)]
pub fn tail_lsn(&self) -> Option<Lsn> {
self.loglet.tail_lsn
}

#[allow(dead_code)]
pub fn set_tail_lsn(&mut self, tail_lsn: Lsn) {
self.loglet.set_tail_lsn(tail_lsn)
}

#[allow(dead_code)]
#[inline(always)]
pub fn loglet(&self) -> &LogletWrapper {
&self.loglet
Expand Down
Loading

0 comments on commit cae8f5f

Please sign in to comment.