From 03d0094142308ec291827aa67258f5006ff4a598 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Tue, 2 Jul 2024 13:36:40 +0100 Subject: [PATCH] [Bifrost] ReplicatedLoglet bifrost stubs Empty stubs for client-side replicated-loglet --- crates/bifrost/Cargo.toml | 4 +- .../replicated_loglet/metric_definitions.rs | 23 ++++++ .../src/loglets/replicated_loglet/mod.rs | 5 ++ .../src/loglets/replicated_loglet/provider.rs | 82 +++++++++++++++++++ crates/log-server/Cargo.toml | 2 +- crates/node/Cargo.toml | 3 +- crates/node/src/lib.rs | 22 ++++- crates/types/Cargo.toml | 1 + crates/types/src/config/bifrost.rs | 15 ++++ crates/types/src/lib.rs | 2 + crates/types/src/logs/metadata.rs | 5 ++ crates/types/src/nodes_config.rs | 1 + crates/types/src/replicated_loglet/mod.rs | 9 ++ server/Cargo.toml | 5 +- 14 files changed, 174 insertions(+), 5 deletions(-) create mode 100644 crates/bifrost/src/loglets/replicated_loglet/metric_definitions.rs create mode 100644 crates/bifrost/src/loglets/replicated_loglet/provider.rs create mode 100644 crates/types/src/replicated_loglet/mod.rs diff --git a/crates/bifrost/Cargo.toml b/crates/bifrost/Cargo.toml index f284c28667..330c10b9c5 100644 --- a/crates/bifrost/Cargo.toml +++ b/crates/bifrost/Cargo.toml @@ -8,14 +8,16 @@ license.workspace = true publish = false [features] -default = [] +default = ["replicated-loglet"] options_schema = ["dep:schemars"] +replicated-loglet = ["restate-types/replicated-loglet", "restate-metadata-store"] test-util = [] [dependencies] restate-core = { workspace = true } restate-rocksdb = { workspace = true } restate-types = { workspace = true } +restate-metadata-store = { workspace = true, optional = true } anyhow = { workspace = true } async-trait = { workspace = true } diff --git a/crates/bifrost/src/loglets/replicated_loglet/metric_definitions.rs b/crates/bifrost/src/loglets/replicated_loglet/metric_definitions.rs new file mode 100644 index 0000000000..e41435b972 --- /dev/null +++ b/crates/bifrost/src/loglets/replicated_loglet/metric_definitions.rs @@ -0,0 +1,23 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +/// Optional to have but adds description/help message to the metrics emitted to +/// the metrics' sink. +use metrics::{describe_counter, Unit}; + +pub(crate) const BIFROST_REPLICATED_APPEND: &str = "restate.bifrost.replicatedloglet.appends.total"; + +pub(crate) fn describe_metrics() { + describe_counter!( + BIFROST_REPLICATED_APPEND, + Unit::Count, + "Number of append requests to bifrost's replicated loglet" + ); +} diff --git a/crates/bifrost/src/loglets/replicated_loglet/mod.rs b/crates/bifrost/src/loglets/replicated_loglet/mod.rs index e2c04ec914..55e281a885 100644 --- a/crates/bifrost/src/loglets/replicated_loglet/mod.rs +++ b/crates/bifrost/src/loglets/replicated_loglet/mod.rs @@ -7,3 +7,8 @@ // As of the Change Date specified in that file, in accordance with // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. + +pub(crate) mod metric_definitions; +mod provider; + +pub use provider::Factory; diff --git a/crates/bifrost/src/loglets/replicated_loglet/provider.rs b/crates/bifrost/src/loglets/replicated_loglet/provider.rs new file mode 100644 index 0000000000..db1c9e33f3 --- /dev/null +++ b/crates/bifrost/src/loglets/replicated_loglet/provider.rs @@ -0,0 +1,82 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +// todo: remove when fleshed out +#![allow(unused)] + +use std::sync::Arc; + +use async_trait::async_trait; + +use restate_core::network::rpc_router::RpcRouter; +use restate_core::network::{MessageRouterBuilder, Networking}; +use restate_core::Metadata; +use restate_metadata_store::MetadataStoreClient; +use restate_types::config::ReplicatedLogletOptions; +use restate_types::live::BoxedLiveLoad; +use restate_types::logs::metadata::{LogletParams, ProviderKind}; + +use super::metric_definitions; +use crate::loglet::{Loglet, LogletOffset}; +use crate::ProviderError; +use crate::{Error, LogletProvider}; + +pub struct Factory { + opts: BoxedLiveLoad, + metadata: Metadata, + metadata_store_client: MetadataStoreClient, + networking: Networking, +} + +impl Factory { + pub fn new( + opts: BoxedLiveLoad, + metadata_store_client: MetadataStoreClient, + metadata: Metadata, + networking: Networking, + _router_builder: &mut MessageRouterBuilder, + ) -> Self { + Self { + opts, + metadata, + metadata_store_client, + networking, + } + } +} + +#[async_trait] +impl crate::LogletProviderFactory for Factory { + fn kind(&self) -> ProviderKind { + ProviderKind::Replicated + } + + async fn create(self: Box) -> Result, ProviderError> { + metric_definitions::describe_metrics(); + Ok(Arc::new(ReplicatedLogletProvider)) + } +} + +struct ReplicatedLogletProvider; + +#[async_trait] +impl LogletProvider for ReplicatedLogletProvider { + async fn get_loglet( + &self, + // todo: we need richer params + _params: &LogletParams, + ) -> Result>, Error> { + todo!("Not implemented yet") + } + + async fn shutdown(&self) -> Result<(), ProviderError> { + Ok(()) + } +} diff --git a/crates/log-server/Cargo.toml b/crates/log-server/Cargo.toml index 4cf4cd9eb7..75e64496fa 100644 --- a/crates/log-server/Cargo.toml +++ b/crates/log-server/Cargo.toml @@ -15,7 +15,7 @@ test-util = [] [dependencies] restate-core = { workspace = true } restate-rocksdb = { workspace = true } -restate-types = { workspace = true } +restate-types = { workspace = true, features = ["replicated-loglet"] } anyhow = { workspace = true } async-trait = { workspace = true } diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 568699fe1e..c04196c605 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -9,6 +9,7 @@ publish = false [features] default = [] +replicated-loglet = ["restate-bifrost/replicated-loglet", "restate-log-server"] options_schema = [ "dep:schemars", "restate-admin/options_schema", @@ -20,7 +21,7 @@ restate-admin = { workspace = true, features = ["servers"] } restate-bifrost = { workspace = true } restate-core = { workspace = true } restate-errors = { workspace = true } -restate-log-server = { workspace = true } +restate-log-server = { workspace = true, optional = true } restate-metadata-store = { workspace = true } restate-rocksdb = { workspace = true } restate-service-client = { workspace = true } diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 4764242f0f..66ad13300a 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -16,6 +16,7 @@ use std::future::Future; use std::time::Duration; use codederror::CodedError; +#[cfg(feature = "replicated-loglet")] use restate_log_server::LogServerService; use restate_types::live::Live; use tokio::time::Instant; @@ -75,6 +76,7 @@ pub enum BuildError { #[code] roles::AdminRoleBuildError, ), + #[cfg(feature = "replicated-loglet")] #[error("building log-server failed: {0}")] LogServer( #[from] @@ -99,6 +101,7 @@ pub struct Node { metadata_store_role: Option, admin_role: Option, worker_role: Option, + #[cfg(feature = "replicated-loglet")] log_server: Option, server: NetworkServer, } @@ -151,10 +154,25 @@ impl Node { metadata_manager.register_in_message_router(&mut router_builder); let updating_schema_information = metadata.updateable_schema(); - // Setup bifrost. + // Setup bifrost + // replicated-loglet + #[cfg(feature = "replicated-loglet")] + let replicated_loglet_factory = restate_bifrost::loglets::replicated_loglet::Factory::new( + updateable_config + .clone() + .map(|c| &c.bifrost.replicated_loglet) + .boxed(), + metadata_store_client.clone(), + metadata.clone(), + networking.clone(), + &mut router_builder, + ); let bifrost_svc = BifrostService::new(tc.clone(), metadata.clone()) .enable_local_loglet(&updateable_config); + #[cfg(feature = "replicated-loglet")] + let bifrost_svc = bifrost_svc.with_factory(replicated_loglet_factory); + #[cfg(feature = "replicated-loglet")] let log_server = if config.has_role(Role::LogServer) { Some( LogServerService::create( @@ -230,6 +248,7 @@ impl Node { metadata_store_role, admin_role, worker_role, + #[cfg(feature = "replicated-loglet")] log_server, server, }) @@ -338,6 +357,7 @@ impl Node { tc.run_in_scope("bifrost-init", None, self.bifrost.start()) .await?; + #[cfg(feature = "replicated-loglet")] if let Some(log_server) = self.log_server { tc.spawn( TaskKind::SystemBoot, diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml index 153f515dcc..ea38edaa39 100644 --- a/crates/types/Cargo.toml +++ b/crates/types/Cargo.toml @@ -10,6 +10,7 @@ publish = false [features] default = [] +replicated-loglet = [] schemars = ["dep:schemars", "restate-serde-util/schema"] test-util = ["dep:tempfile", "dep:restate-test-util"] diff --git a/crates/types/src/config/bifrost.rs b/crates/types/src/config/bifrost.rs index 8a41b24cf9..3316bd62cd 100644 --- a/crates/types/src/config/bifrost.rs +++ b/crates/types/src/config/bifrost.rs @@ -35,12 +35,18 @@ pub struct BifrostOptions { #[cfg_attr(feature = "schemars", schemars(with = "String"))] /// Configuration of local loglet provider pub local: LocalLogletOptions, + #[cfg(feature = "replicated-loglet")] + /// [IN DEVELOPMENT] + /// Configuration of replicated loglet provider + pub replicated_loglet: ReplicatedLogletOptions, } impl Default for BifrostOptions { fn default() -> Self { Self { default_provider: ProviderKind::Local, + #[cfg(feature = "replicated-loglet")] + replicated_loglet: ReplicatedLogletOptions::default(), local: LocalLogletOptions::default(), } } @@ -143,3 +149,12 @@ impl Default for LocalLogletOptions { } } } + +#[cfg(feature = "replicated-loglet")] +#[serde_as] +#[derive(Debug, Default, Clone, Serialize, Deserialize, derive_builder::Builder)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "schemars", schemars(rename = "ReplicatedLoglet", default))] +#[serde(rename_all = "kebab-case")] +#[builder(default)] +pub struct ReplicatedLogletOptions {} diff --git a/crates/types/src/lib.rs b/crates/types/src/lib.rs index be1fb8c772..36fd28d6c3 100644 --- a/crates/types/src/lib.rs +++ b/crates/types/src/lib.rs @@ -35,6 +35,8 @@ pub mod net; pub mod nodes_config; pub mod partition_table; pub mod protobuf; +#[cfg(feature = "replicated-loglet")] +pub mod replicated_loglet; pub mod retries; pub mod schema; pub mod service_discovery; diff --git a/crates/types/src/logs/metadata.rs b/crates/types/src/logs/metadata.rs index b9c23822be..5d351e091c 100644 --- a/crates/types/src/logs/metadata.rs +++ b/crates/types/src/logs/metadata.rs @@ -81,6 +81,11 @@ pub enum ProviderKind { Local, /// An in-memory loglet, primarily for testing. InMemory, + #[cfg(feature = "replicated-loglet")] + /// [IN DEVELOPMENT] + /// Replicated loglet implementation. This requires log-server role to run on + /// enough nodes in the cluster. + Replicated, } impl LogletConfig { diff --git a/crates/types/src/nodes_config.rs b/crates/types/src/nodes_config.rs index 94b6b01c9f..d8b99d72aa 100644 --- a/crates/types/src/nodes_config.rs +++ b/crates/types/src/nodes_config.rs @@ -46,6 +46,7 @@ pub enum Role { Admin, /// Serves the metadata store MetadataStore, + #[cfg(feature = "replicated-loglet")] /// Serves a log server for replicated loglets LogServer, } diff --git a/crates/types/src/replicated_loglet/mod.rs b/crates/types/src/replicated_loglet/mod.rs new file mode 100644 index 0000000000..e2c04ec914 --- /dev/null +++ b/crates/types/src/replicated_loglet/mod.rs @@ -0,0 +1,9 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. diff --git a/server/Cargo.toml b/server/Cargo.toml index dc0ee613b9..735d47252f 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -26,7 +26,10 @@ options_schema = [ "restate-tracing-instrumentation/options_schema", "restate-types/schemars", ] - +replicated-loglet = [ + "restate-node/replicated-loglet", +] + [dependencies] restate-admin = { workspace = true } restate-bifrost = { workspace = true }