Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bifrost] Eager providers initialization #1681

Merged
merged 1 commit into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ mod tests {

let bifrost = node_env
.tc
.run_in_scope("init", None, Bifrost::init(metadata))
.run_in_scope("init", None, Bifrost::init_in_memory(metadata))
.await;

node_env.tc.spawn(
Expand Down Expand Up @@ -483,7 +483,7 @@ mod tests {

let bifrost = node_env
.tc
.run_in_scope("init", None, Bifrost::init(metadata))
.run_in_scope("init", None, Bifrost::init_in_memory(metadata))
.await;

node_env.tc.spawn(
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/benches/append_throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ fn write_throughput_local_loglet(c: &mut Criterion) {

let bifrost = tc.block_on("bifrost-init", None, async {
let metadata = metadata();
let bifrost_svc = BifrostService::new(metadata);
let bifrost_svc = BifrostService::new(restate_core::task_center(), metadata);
let bifrost = bifrost_svc.handle();

// start bifrost service in the background
Expand Down
108 changes: 57 additions & 51 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,27 @@
// TODO: Remove after fleshing the code out.
#![allow(dead_code)]

use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::OnceLock;

use bytes::BytesMut;
use enum_map::EnumMap;
use once_cell::sync::OnceCell;

use smallvec::SmallVec;
use tracing::{error, instrument};
use tracing::instrument;

use restate_core::{Metadata, MetadataKind};
use restate_types::logs::metadata::{ProviderKind, Segment};
use restate_types::logs::{LogId, Lsn, Payload, SequenceNumber};
use restate_types::storage::StorageCodec;
use restate_types::Version;

use crate::loglet::{LogletBase, LogletProvider, LogletWrapper};
use crate::watchdog::{WatchdogCommand, WatchdogSender};
use crate::loglet::{LogletBase, LogletWrapper};
use crate::watchdog::WatchdogSender;
use crate::{
Error, FindTailAttributes, LogReadStream, LogRecord, Result, SMALL_BATCH_THRESHOLD_COUNT,
Error, FindTailAttributes, LogReadStream, LogRecord, LogletProvider, Result,
SMALL_BATCH_THRESHOLD_COUNT,
};

/// Bifrost is Restate's durable interconnect system
Expand All @@ -50,10 +51,40 @@ impl Bifrost {
}

#[cfg(any(test, feature = "test-util"))]
pub async fn init(metadata: Metadata) -> Self {
pub async fn init_in_memory(metadata: Metadata) -> Self {
use crate::loglets::memory_loglet;

Self::init_with_factory(metadata, memory_loglet::Factory::default()).await
}

#[cfg(any(test, feature = "test-util"))]
pub async fn init_local(metadata: Metadata) -> Self {
use restate_types::config::Configuration;

use crate::BifrostService;

let config = Configuration::updateable();
let bifrost_svc =
BifrostService::new(restate_core::task_center(), metadata).enable_local_loglet(&config);
let bifrost = bifrost_svc.handle();

// start bifrost service in the background
bifrost_svc
.start()
.await
.expect("in memory loglet must start");
bifrost
}

#[cfg(any(test, feature = "test-util"))]
pub async fn init_with_factory(
metadata: Metadata,
factory: impl crate::LogletProviderFactory,
) -> Self {
use crate::BifrostService;

let bifrost_svc = BifrostService::new(metadata);
let bifrost_svc =
BifrostService::new(restate_core::task_center(), metadata).with_factory(factory);
let bifrost = bifrost_svc.handle();

// start bifrost service in the background
Expand Down Expand Up @@ -170,7 +201,8 @@ static_assertions::assert_impl_all!(Bifrost: Send, Sync, Clone);
pub struct BifrostInner {
metadata: Metadata,
watchdog: WatchdogSender,
providers: EnumMap<ProviderKind, OnceCell<Arc<dyn LogletProvider>>>,
// Initialized after BifrostService::start completes.
pub(crate) providers: OnceLock<EnumMap<ProviderKind, Option<Arc<dyn LogletProvider>>>>,
shutting_down: AtomicBool,
}

Expand Down Expand Up @@ -322,36 +354,17 @@ impl BifrostInner {
}

// --- Helper functions --- //
/// Get the provider for a given kind. A provider must be enabled and BifrostService **must**
/// be started before calling this.
fn provider_for(&self, kind: ProviderKind) -> Result<&Arc<dyn LogletProvider>> {
let providers = self
.providers
.get()
.expect("BifrostService must be started prior to using Bifrost");

/// Get the provider for a given kind. If the provider is not yet initialized, it will be
/// lazily initialized by the watchdog.
fn provider_for(&self, kind: ProviderKind) -> &dyn LogletProvider {
self.providers[kind]
.get_or_init(|| {
let provider =
crate::loglet::create_provider(kind).expect("provider is able to get created");
if let Err(e) = provider.start() {
error!("Failed to start loglet provider {}: {}", kind, e);
// todo: Handle provider errors by a graceful system shutdown
// task_center().shutdown_node("Bifrost loglet provider startup error", 1);
panic!("Failed to start loglet provider {}: {}", kind, e);
}
// tell watchdog about it.
let _ = self
.watchdog
.send(WatchdogCommand::WatchProvider(provider.clone()));
provider
})
.deref()
}

/// Injects a provider for testing purposes. The call is responsible for starting the provider
/// and that it's monitored by watchdog if necessary.
/// This will only work if the provider was never accessed by bifrost before this call.
#[cfg(test)]
#[track_caller]
fn inject_provider(&self, kind: ProviderKind, provider: Arc<dyn LogletProvider>) {
assert!(self.providers[kind].try_insert(provider).is_ok());
providers[kind]
.as_ref()
.ok_or_else(|| Error::Disabled(kind.to_string()))
}

async fn writeable_loglet(&self, log_id: LogId) -> Result<LogletWrapper> {
Expand All @@ -377,7 +390,7 @@ impl BifrostInner {
}

async fn get_loglet(&self, segment: &Segment) -> Result<LogletWrapper, Error> {
let provider = self.provider_for(segment.config.kind);
let provider = self.provider_for(segment.config.kind)?;
let loglet = provider.get_loglet(&segment.config.params).await?;
Ok(LogletWrapper::new(segment.base_lsn, loglet))
}
Expand All @@ -389,7 +402,7 @@ mod tests {

use super::*;

use crate::loglets::memory_loglet::MemoryLogletProvider;
use crate::loglets::memory_loglet::{self};
use googletest::prelude::*;

use crate::{Record, TrimGap};
Expand All @@ -414,7 +427,7 @@ mod tests {
.await;
let tc = node_env.tc;
tc.run_in_scope("test", None, async {
let bifrost = Bifrost::init(metadata()).await;
let bifrost = Bifrost::init_in_memory(metadata()).await;

let clean_bifrost_clone = bifrost.clone();

Expand Down Expand Up @@ -471,7 +484,6 @@ mod tests {
let res = bifrost.append(LogId::from(0), Payload::default()).await;
assert!(matches!(res, Err(Error::Shutdown(_))));
// Validate the watchdog has called the provider::start() function.
assert!(logs_contain("Starting in-memory loglet provider"));
assert!(logs_contain("Shutting down in-memory loglet provider"));
assert!(logs_contain("Bifrost watchdog shutdown complete"));
Ok(())
Expand All @@ -487,14 +499,8 @@ mod tests {
let delay = Duration::from_secs(5);
// This memory provider adds a delay to its loglet initialization, we want
// to ensure that appends do not fail while waiting for the loglet;
let memory_provider = MemoryLogletProvider::with_init_delay(delay);

let bifrost = Bifrost::init(metadata()).await;

// Inject out preconfigured memory provider
bifrost
.inner()
.inject_provider(ProviderKind::InMemory, memory_provider);
let factory = memory_loglet::Factory::with_init_delay(delay);
let bifrost = Bifrost::init_with_factory(metadata(), factory).await;

let start = tokio::time::Instant::now();
let lsn = bifrost.append(LogId::from(0), Payload::default()).await?;
Expand All @@ -518,7 +524,7 @@ mod tests {
RocksDbManager::init(Constant::new(CommonOptions::default()));

let log_id = LogId::from(0);
let bifrost = Bifrost::init(metadata()).await;
let bifrost = Bifrost::init_local(metadata()).await;

assert!(bifrost.get_trim_point(log_id).await?.is_none());

Expand Down
3 changes: 3 additions & 0 deletions crates/bifrost/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ pub enum Error {
#[error("failed syncing logs metadata: {0}")]
// unfortunately, we have to use Arc here, because the SyncError is not Clone.
MetadataSync(#[from] Arc<SyncError>),
/// Provider is unknown or disabled
#[error("bifrost provider '{0}' is disabled or unrecognized")]
Disabled(String),
}

#[derive(Debug, thiserror::Error)]
Expand Down
4 changes: 3 additions & 1 deletion crates/bifrost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
mod bifrost;
mod error;
mod loglet;
mod loglets;
pub mod loglets;
mod provider;
mod read_stream;
mod record;
mod service;
Expand All @@ -20,6 +21,7 @@ mod watchdog;

pub use bifrost::Bifrost;
pub use error::{Error, ProviderError, Result};
pub use provider::*;
pub use read_stream::LogReadStream;
pub use record::*;
pub use service::BifrostService;
Expand Down
30 changes: 2 additions & 28 deletions crates/bifrost/src/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,13 @@ use std::sync::Arc;
use std::task::Poll;

use async_trait::async_trait;

use bytes::Bytes;
use futures::Stream;
use restate_types::config::Configuration;
use restate_types::logs::metadata::{LogletParams, ProviderKind};

use restate_types::logs::{Lsn, SequenceNumber};

use crate::Result;
use crate::{LogRecord, LsnExt, ProviderError};

pub fn create_provider(kind: ProviderKind) -> Result<Arc<dyn LogletProvider>, ProviderError> {
match kind {
ProviderKind::Local => Ok(crate::loglets::local_loglet::LocalLogletProvider::new(
&Configuration::pinned().bifrost.local,
Configuration::mapped_updateable(|c| &c.bifrost.local.rocksdb),
)?),
ProviderKind::InMemory => Ok(crate::loglets::memory_loglet::MemoryLogletProvider::new()?),
}
}
use crate::{LogRecord, LsnExt};

// Inner loglet offset
#[derive(
Expand Down Expand Up @@ -75,20 +63,6 @@ impl SequenceNumber for LogletOffset {
}
}

#[async_trait]
pub trait LogletProvider: Send + Sync {
/// Create a loglet client for a given segment and configuration.
async fn get_loglet(&self, params: &LogletParams) -> Result<Arc<dyn Loglet>>;

// Hook for handling lazy initialization
fn start(&self) -> Result<(), ProviderError>;

// Hook for handling graceful shutdown
async fn shutdown(&self) -> Result<(), ProviderError> {
Ok(())
}
}

pub trait Loglet: LogletBase<Offset = LogletOffset> {}
impl<T> Loglet for T where T: LogletBase<Offset = LogletOffset> {}

Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/src/loglets/local_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use async_trait::async_trait;
use bytes::Bytes;
pub use log_store::LogStoreError;
use metrics::{counter, histogram, Histogram};
pub use provider::LocalLogletProvider;
pub use provider::Factory;
use restate_core::ShutdownError;
use restate_types::logs::SequenceNumber;
use tokio::sync::Mutex;
Expand All @@ -43,7 +43,7 @@ use self::metric_definitions::{BIFROST_LOCAL_APPEND, BIFROST_LOCAL_APPEND_DURATI
use self::read_stream::LocalLogletReadStream;
use self::utils::OffsetWatch;

pub struct LocalLoglet {
struct LocalLoglet {
log_id: u64,
log_store: RocksDbLogStore,
log_writer: RocksDbLogWriterHandle,
Expand Down
Loading
Loading