Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bifrost] Basic BifrostAdmin interface #1753

Merged
merged 5 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[Bifrost] Introduces SegmentIndex to identify segments in the log chain
This allows correct handling of sealing empty loglets and support for future reconfiguration operations.
  • Loading branch information
AhmedSoliman committed Jul 26, 2024
commit a71f1d1a093469d7a1fc5f925f500e4c7877e92b
19 changes: 13 additions & 6 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ impl BifrostInner {
// todo: support multiple segments.
// todo: dispatch loglet deletion in the background when entire segments are trimmed
for segment in log_chain.iter() {
let loglet = self.get_loglet(segment).await?;
let loglet = self.get_loglet(log_id, segment).await?;
let loglet_specific_trim_point = loglet.get_trim_point().await?;

// if a loglet has no trim point, then all subsequent loglets should also not contain a trim point
Expand All @@ -393,7 +393,7 @@ impl BifrostInner {
.ok_or(Error::UnknownLogId(log_id))?;

for segment in log_chain.iter() {
let loglet = self.get_loglet(segment).await?;
let loglet = self.get_loglet(log_id, segment).await?;

if loglet.base_lsn > trim_point {
break;
Expand Down Expand Up @@ -443,7 +443,7 @@ impl BifrostInner {
.chain(&log_id)
.ok_or(Error::UnknownLogId(log_id))?
.tail();
self.get_loglet(tail_segment).await
self.get_loglet(log_id, tail_segment).await
}

pub(crate) async fn find_loglet_for_lsn(
Expand All @@ -457,16 +457,23 @@ impl BifrostInner {
.ok_or(Error::UnknownLogId(log_id))?
.find_segment_for_lsn(lsn);
match maybe_segment {
MaybeSegment::Some(segment) => self.get_loglet(segment).await,
MaybeSegment::Some(segment) => self.get_loglet(log_id, segment).await,
// todo: handle trimmed segments
MaybeSegment::Trim { .. } => todo!("trimmed segments is not supported yet"),
}
}

async fn get_loglet(&self, segment: Segment<'_>) -> Result<LogletWrapper, Error> {
async fn get_loglet(
&self,
log_id: LogId,
segment: Segment<'_>,
) -> Result<LogletWrapper, Error> {
let provider = self.provider_for(segment.config.kind)?;
let loglet = provider.get_loglet(&segment.config.params).await?;
let loglet = provider
.get_loglet(log_id, segment.index(), &segment.config.params)
.await?;
Ok(LogletWrapper::new(
segment.index(),
segment.base_lsn,
segment.tail_lsn,
loglet,
Expand Down
10 changes: 8 additions & 2 deletions crates/bifrost/src/loglet/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use std::sync::Arc;

use async_trait::async_trait;

use restate_types::logs::metadata::{LogletParams, ProviderKind};
use restate_types::logs::metadata::{LogletParams, ProviderKind, SegmentIndex};
use restate_types::logs::LogId;

use super::{Loglet, OperationError};
use crate::Result;
Expand All @@ -29,7 +30,12 @@ pub trait LogletProviderFactory: Send + 'static {
#[async_trait]
pub trait LogletProvider: Send + Sync {
/// Create a loglet client for a given segment and configuration.
async fn get_loglet(&self, params: &LogletParams) -> Result<Arc<dyn Loglet>>;
async fn get_loglet(
&self,
log_id: LogId,
segment_index: SegmentIndex,
params: &LogletParams,
) -> Result<Arc<dyn Loglet>>;

/// A hook that's called after provider is started.
async fn post_start(&self) {}
Expand Down
16 changes: 14 additions & 2 deletions crates/bifrost/src/loglet_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use bytes::Bytes;
use futures::stream::BoxStream;
use futures::{Stream, StreamExt};

use restate_types::logs::metadata::SegmentIndex;
use restate_types::logs::{Lsn, SequenceNumber};

use crate::loglet::{
Expand All @@ -35,6 +36,7 @@ enum LogletWrapperError {
/// Wraps loglets with the base LSN of the segment
#[derive(Clone, Debug)]
pub struct LogletWrapper {
segment_index: SegmentIndex,
/// The offset of the first record in the segment (if exists).
/// A segment on a clean chain is created with Lsn::OLDEST but this doesn't mean that this
/// record exists. It only means that we want to offset the loglet offsets by base_lsn -
Expand All @@ -46,8 +48,14 @@ pub struct LogletWrapper {
}

impl LogletWrapper {
pub fn new(base_lsn: Lsn, tail_lsn: Option<Lsn>, loglet: Arc<dyn Loglet>) -> Self {
pub fn new(
segment_index: SegmentIndex,
base_lsn: Lsn,
tail_lsn: Option<Lsn>,
loglet: Arc<dyn Loglet>,
) -> Self {
Self {
segment_index,
base_lsn,
tail_lsn,
loglet,
Expand All @@ -60,6 +68,10 @@ impl LogletWrapper {
self.tail_lsn = Some(tail_lsn)
}

pub fn segment_index(&self) -> SegmentIndex {
self.segment_index
}

pub async fn create_wrapped_read_stream(
self,
start_lsn: Lsn,
Expand All @@ -83,7 +95,7 @@ impl LogletWrapper {

impl PartialEq for LogletWrapper {
fn eq(&self, other: &Self) -> bool {
self.base_lsn == other.base_lsn && Arc::ptr_eq(&self.loglet, &other.loglet)
self.segment_index == other.segment_index
}
}

Expand Down
9 changes: 6 additions & 3 deletions crates/bifrost/src/providers/local_loglet/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use tracing::debug;

use restate_types::config::{LocalLogletOptions, RocksDbOptions};
use restate_types::live::BoxedLiveLoad;
use restate_types::logs::metadata::{LogletParams, ProviderKind};
use restate_types::logs::metadata::{LogletParams, ProviderKind, SegmentIndex};
use restate_types::logs::LogId;

use super::log_store::RocksDbLogStore;
use super::log_store_writer::RocksDbLogWriterHandle;
Expand Down Expand Up @@ -71,18 +72,20 @@ impl LogletProviderFactory for Factory {

pub(crate) struct LocalLogletProvider {
log_store: RocksDbLogStore,
active_loglets: Mutex<HashMap<String, Arc<LocalLoglet>>>,
active_loglets: Mutex<HashMap<(LogId, SegmentIndex), Arc<LocalLoglet>>>,
log_writer: RocksDbLogWriterHandle,
}

#[async_trait]
impl LogletProvider for LocalLogletProvider {
async fn get_loglet(
&self,
log_id: LogId,
segment_index: SegmentIndex,
params: &LogletParams,
) -> Result<Arc<dyn Loglet<Offset = LogletOffset>>, Error> {
let mut guard = self.active_loglets.lock();
let loglet = match guard.entry(params.as_str().to_owned()) {
let loglet = match guard.entry((log_id, segment_index)) {
hash_map::Entry::Vacant(entry) => {
// Create loglet
// NOTE: local-loglet expects params to be a `u64` string-encoded unique identifier under the hood.
Expand Down
15 changes: 10 additions & 5 deletions crates/bifrost/src/providers/memory_loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use tokio::sync::Mutex as AsyncMutex;
use tokio_stream::StreamExt;
use tracing::{debug, info};

use restate_types::logs::metadata::{LogletParams, ProviderKind};
use restate_types::logs::SequenceNumber;
use restate_types::logs::metadata::{LogletParams, ProviderKind, SegmentIndex};
use restate_types::logs::{LogId, SequenceNumber};

use crate::loglet::util::TailOffsetWatch;
use crate::loglet::{
Expand Down Expand Up @@ -66,16 +66,21 @@ impl LogletProviderFactory for Factory {

#[derive(Default)]
struct MemoryLogletProvider {
store: AsyncMutex<HashMap<LogletParams, Arc<MemoryLoglet>>>,
store: AsyncMutex<HashMap<(LogId, SegmentIndex), Arc<MemoryLoglet>>>,
init_delay: Duration,
}

#[async_trait]
impl LogletProvider for MemoryLogletProvider {
async fn get_loglet(&self, params: &LogletParams) -> Result<Arc<dyn Loglet>> {
async fn get_loglet(
&self,
log_id: LogId,
segment_index: SegmentIndex,
params: &LogletParams,
) -> Result<Arc<dyn Loglet>> {
let mut guard = self.store.lock().await;

let loglet = match guard.entry(params.clone()) {
let loglet = match guard.entry((log_id, segment_index)) {
hash_map::Entry::Vacant(entry) => {
if !self.init_delay.is_zero() {
// Artificial delay to simulate slow loglet creation
Expand Down
5 changes: 4 additions & 1 deletion crates/bifrost/src/providers/replicated_loglet/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use restate_core::Metadata;
use restate_metadata_store::MetadataStoreClient;
use restate_types::config::ReplicatedLogletOptions;
use restate_types::live::BoxedLiveLoad;
use restate_types::logs::metadata::{LogletParams, ProviderKind};
use restate_types::logs::metadata::{LogletParams, ProviderKind, SegmentIndex};
use restate_types::logs::LogId;

use super::metric_definitions;
use crate::loglet::{Loglet, LogletOffset, LogletProvider, LogletProviderFactory, OperationError};
Expand Down Expand Up @@ -70,6 +71,8 @@ impl LogletProvider for ReplicatedLogletProvider {
async fn get_loglet(
&self,
// todo: we need richer params
_log_id: LogId,
_segment_index: SegmentIndex,
_params: &LogletParams,
) -> Result<Arc<dyn Loglet<Offset = LogletOffset>>, Error> {
todo!("Not implemented yet")
Expand Down
59 changes: 31 additions & 28 deletions crates/bifrost/src/read_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,39 +329,44 @@ impl Stream for LogReadStream {
};

let log_metadata = bifrost_inner.metadata.logs();
// Does metadata indicate that the `base_lsn` of the current substream
// points to a sealed segment?
//
// Why do we use `base_lsn` instead of `read_pointer`? Because if the loglet is
// sealed, the read_pointer might end up being the `base_lsn` of the next
// segment. We don't need to handle the transition to the next segment here
// since we have this handled in Reading state. We just need to set the
// `tail_lsn` on the substream once we determine it.
//
// todo (asoli): Handle empty sealed loglets. Ideally, we want to compare the segment returned
// with the one backing the current loglet, if they are different, we should
// recreate the substream and let the normal flow take over to move to the
// replacement loglet. Unfortunately, at the moment we don't have a reliable
// way to do that.
//

// The log is gone!
let Some(chain) = log_metadata.chain(this.log_id) else {
this.substream.set(None);
this.state.set(State::Terminated);
return Poll::Ready(Some(Err(Error::UnknownLogId(*this.log_id))));
};

match chain.find_segment_for_lsn(substream.base_lsn) {
MaybeSegment::Some(segment) if segment.tail_lsn.is_some() => {
let sealed_tail = segment.tail_lsn.unwrap();
substream.set_tail_lsn(segment.tail_lsn.unwrap());
// go back to reading.
this.state.set(State::Reading {
safe_known_tail: Some(sealed_tail),
// No need for the tail watch since we know the tail already.
tail_watch: None,
});
continue;
match chain.find_segment_for_lsn(*this.read_pointer) {
MaybeSegment::Some(segment) => {
// This is a different segment now, we need to recreate the substream.
// This could mean that this is a new segment replacing the existing
// one (if it was an empty sealed loglet) or that the loglet has been
// sealed and the read_pointer points to the next segment. In all
// cases, we want to get the right loglet.
if segment.index() != substream.loglet().segment_index() {
this.substream.set(None);
let find_loglet_fut = Box::pin(
bifrost_inner
.find_loglet_for_lsn(*this.log_id, *this.read_pointer),
);
// => Find Loglet
this.state.set(State::FindingLoglet { find_loglet_fut });
continue;
}
if segment.tail_lsn.is_some() {
let sealed_tail = segment.tail_lsn.unwrap();
substream.set_tail_lsn(segment.tail_lsn.unwrap());
// go back to reading.
this.state.set(State::Reading {
safe_known_tail: Some(sealed_tail),
// No need for the tail watch since we know the tail already.
tail_watch: None,
});
continue;
}
// Segment is not sealed yet.
// fall-through
}
// Oh, we have a prefix trim, deliver the trim-gap and fast-forward.
MaybeSegment::Trim { next_base_lsn } => {
Expand All @@ -378,8 +383,6 @@ impl Stream for LogReadStream {
// Deliver the trim gap
return Poll::Ready(Some(Ok(record)));
}
// Segment is not sealed yet.
MaybeSegment::Some(_) => { /* fall-through */ }
};

// Reconfiguration still ongoing...
Expand Down
16 changes: 14 additions & 2 deletions crates/types/src/logs/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

use std::ops::Deref;

use super::metadata::{Chain, LogletConfig, LogletParams, Logs, MaybeSegment, ProviderKind};
use super::metadata::{
Chain, LogletConfig, LogletParams, Logs, MaybeSegment, ProviderKind, SegmentIndex,
};
use super::{LogId, Lsn};

#[derive(Debug, Default, Clone)]
Expand Down Expand Up @@ -102,9 +104,10 @@ impl<'a> ChainBuilder<'a> {
if *last_entry.key() < base_lsn {
// append
// validate that the base_lsn is higher than existing base_lsns.
let new_index = SegmentIndex(last_entry.get().index().0 + 1);
self.inner
.chain
.insert(base_lsn, LogletConfig::new(provider, params));
.insert(base_lsn, LogletConfig::new(new_index, provider, params));
Ok(())
} else {
// can't add to the back.
Expand Down Expand Up @@ -142,6 +145,8 @@ mod tests {
Chain::new(ProviderKind::InMemory, LogletParams::from("test1")),
)?;

assert_eq!(chain.tail_index(), SegmentIndex(0));

let segment = chain.find_segment_for_lsn(Lsn::INVALID);
assert_that!(
segment,
Expand Down Expand Up @@ -215,6 +220,9 @@ mod tests {
assert_eq!(Lsn::OLDEST, chain.head().base_lsn);
assert_eq!(Lsn::from(10), chain.tail().base_lsn);

assert_eq!(SegmentIndex(1), chain.tail_index());
assert_eq!(SegmentIndex(1), chain.tail().index());

// can't, this is a conflict.
assert_that!(
chain.append_segment(
Expand Down Expand Up @@ -262,6 +270,8 @@ mod tests {
LogletParams::from("test5"),
)?;
assert_eq!(3, chain.num_segments());
assert_eq!(SegmentIndex(2), chain.tail_index());
assert_eq!(SegmentIndex(2), chain.tail().index());
let base_lsns: Vec<_> = chain.iter().map(|s| s.base_lsn).collect();
assert_that!(
base_lsns,
Expand Down Expand Up @@ -484,6 +494,8 @@ mod tests {
assert_eq!(1, chain.num_segments());
assert_eq!(Lsn::from(550), chain.tail().base_lsn);

assert_eq!(SegmentIndex(5), chain.tail_index());

Ok(())
}
}
Loading
Loading