Skip to content

Commit

Permalink
[Bifrost] ReplicatedLoglet bifrost stubs
Browse files Browse the repository at this point in the history
Empty stubs for client-side replicated-loglet
  • Loading branch information
AhmedSoliman committed Jul 2, 2024
1 parent 269ebf0 commit 03d0094
Show file tree
Hide file tree
Showing 14 changed files with 174 additions and 5 deletions.
4 changes: 3 additions & 1 deletion crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ license.workspace = true
publish = false

[features]
default = []
default = ["replicated-loglet"]
options_schema = ["dep:schemars"]
replicated-loglet = ["restate-types/replicated-loglet", "restate-metadata-store"]
test-util = []

[dependencies]
restate-core = { workspace = true }
restate-rocksdb = { workspace = true }
restate-types = { workspace = true }
restate-metadata-store = { workspace = true, optional = true }

anyhow = { workspace = true }
async-trait = { workspace = true }
Expand Down
23 changes: 23 additions & 0 deletions crates/bifrost/src/loglets/replicated_loglet/metric_definitions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

/// Optional to have but adds description/help message to the metrics emitted to
/// the metrics' sink.
use metrics::{describe_counter, Unit};

pub(crate) const BIFROST_REPLICATED_APPEND: &str = "restate.bifrost.replicatedloglet.appends.total";

pub(crate) fn describe_metrics() {
describe_counter!(
BIFROST_REPLICATED_APPEND,
Unit::Count,
"Number of append requests to bifrost's replicated loglet"
);
}
5 changes: 5 additions & 0 deletions crates/bifrost/src/loglets/replicated_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,8 @@
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

pub(crate) mod metric_definitions;
mod provider;

pub use provider::Factory;
82 changes: 82 additions & 0 deletions crates/bifrost/src/loglets/replicated_loglet/provider.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

// todo: remove when fleshed out
#![allow(unused)]

use std::sync::Arc;

use async_trait::async_trait;

use restate_core::network::rpc_router::RpcRouter;
use restate_core::network::{MessageRouterBuilder, Networking};
use restate_core::Metadata;
use restate_metadata_store::MetadataStoreClient;
use restate_types::config::ReplicatedLogletOptions;
use restate_types::live::BoxedLiveLoad;
use restate_types::logs::metadata::{LogletParams, ProviderKind};

use super::metric_definitions;
use crate::loglet::{Loglet, LogletOffset};
use crate::ProviderError;
use crate::{Error, LogletProvider};

pub struct Factory {
opts: BoxedLiveLoad<ReplicatedLogletOptions>,
metadata: Metadata,
metadata_store_client: MetadataStoreClient,
networking: Networking,
}

impl Factory {
pub fn new(
opts: BoxedLiveLoad<ReplicatedLogletOptions>,
metadata_store_client: MetadataStoreClient,
metadata: Metadata,
networking: Networking,
_router_builder: &mut MessageRouterBuilder,
) -> Self {
Self {
opts,
metadata,
metadata_store_client,
networking,
}
}
}

#[async_trait]
impl crate::LogletProviderFactory for Factory {
fn kind(&self) -> ProviderKind {
ProviderKind::Replicated
}

async fn create(self: Box<Self>) -> Result<Arc<dyn LogletProvider>, ProviderError> {
metric_definitions::describe_metrics();
Ok(Arc::new(ReplicatedLogletProvider))
}
}

struct ReplicatedLogletProvider;

#[async_trait]
impl LogletProvider for ReplicatedLogletProvider {
async fn get_loglet(
&self,
// todo: we need richer params
_params: &LogletParams,
) -> Result<Arc<dyn Loglet<Offset = LogletOffset>>, Error> {
todo!("Not implemented yet")
}

async fn shutdown(&self) -> Result<(), ProviderError> {
Ok(())
}
}
2 changes: 1 addition & 1 deletion crates/log-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ test-util = []
[dependencies]
restate-core = { workspace = true }
restate-rocksdb = { workspace = true }
restate-types = { workspace = true }
restate-types = { workspace = true, features = ["replicated-loglet"] }

anyhow = { workspace = true }
async-trait = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ publish = false

[features]
default = []
replicated-loglet = ["restate-bifrost/replicated-loglet", "restate-log-server"]
options_schema = [
"dep:schemars",
"restate-admin/options_schema",
Expand All @@ -20,7 +21,7 @@ restate-admin = { workspace = true, features = ["servers"] }
restate-bifrost = { workspace = true }
restate-core = { workspace = true }
restate-errors = { workspace = true }
restate-log-server = { workspace = true }
restate-log-server = { workspace = true, optional = true }
restate-metadata-store = { workspace = true }
restate-rocksdb = { workspace = true }
restate-service-client = { workspace = true }
Expand Down
22 changes: 21 additions & 1 deletion crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::future::Future;
use std::time::Duration;

use codederror::CodedError;
#[cfg(feature = "replicated-loglet")]
use restate_log_server::LogServerService;
use restate_types::live::Live;
use tokio::time::Instant;
Expand Down Expand Up @@ -75,6 +76,7 @@ pub enum BuildError {
#[code]
roles::AdminRoleBuildError,
),
#[cfg(feature = "replicated-loglet")]
#[error("building log-server failed: {0}")]
LogServer(
#[from]
Expand All @@ -99,6 +101,7 @@ pub struct Node {
metadata_store_role: Option<LocalMetadataStoreService>,
admin_role: Option<AdminRole>,
worker_role: Option<WorkerRole>,
#[cfg(feature = "replicated-loglet")]
log_server: Option<LogServerService>,
server: NetworkServer,
}
Expand Down Expand Up @@ -151,10 +154,25 @@ impl Node {
metadata_manager.register_in_message_router(&mut router_builder);
let updating_schema_information = metadata.updateable_schema();

// Setup bifrost.
// Setup bifrost
// replicated-loglet
#[cfg(feature = "replicated-loglet")]
let replicated_loglet_factory = restate_bifrost::loglets::replicated_loglet::Factory::new(
updateable_config
.clone()
.map(|c| &c.bifrost.replicated_loglet)
.boxed(),
metadata_store_client.clone(),
metadata.clone(),
networking.clone(),
&mut router_builder,
);
let bifrost_svc = BifrostService::new(tc.clone(), metadata.clone())
.enable_local_loglet(&updateable_config);
#[cfg(feature = "replicated-loglet")]
let bifrost_svc = bifrost_svc.with_factory(replicated_loglet_factory);

#[cfg(feature = "replicated-loglet")]
let log_server = if config.has_role(Role::LogServer) {
Some(
LogServerService::create(
Expand Down Expand Up @@ -230,6 +248,7 @@ impl Node {
metadata_store_role,
admin_role,
worker_role,
#[cfg(feature = "replicated-loglet")]
log_server,
server,
})
Expand Down Expand Up @@ -338,6 +357,7 @@ impl Node {
tc.run_in_scope("bifrost-init", None, self.bifrost.start())
.await?;

#[cfg(feature = "replicated-loglet")]
if let Some(log_server) = self.log_server {
tc.spawn(
TaskKind::SystemBoot,
Expand Down
1 change: 1 addition & 0 deletions crates/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ publish = false
[features]
default = []

replicated-loglet = []
schemars = ["dep:schemars", "restate-serde-util/schema"]
test-util = ["dep:tempfile", "dep:restate-test-util"]

Expand Down
15 changes: 15 additions & 0 deletions crates/types/src/config/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,18 @@ pub struct BifrostOptions {
#[cfg_attr(feature = "schemars", schemars(with = "String"))]
/// Configuration of local loglet provider
pub local: LocalLogletOptions,
#[cfg(feature = "replicated-loglet")]
/// [IN DEVELOPMENT]
/// Configuration of replicated loglet provider
pub replicated_loglet: ReplicatedLogletOptions,
}

impl Default for BifrostOptions {
fn default() -> Self {
Self {
default_provider: ProviderKind::Local,
#[cfg(feature = "replicated-loglet")]
replicated_loglet: ReplicatedLogletOptions::default(),
local: LocalLogletOptions::default(),
}
}
Expand Down Expand Up @@ -143,3 +149,12 @@ impl Default for LocalLogletOptions {
}
}
}

#[cfg(feature = "replicated-loglet")]
#[serde_as]
#[derive(Debug, Default, Clone, Serialize, Deserialize, derive_builder::Builder)]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
#[cfg_attr(feature = "schemars", schemars(rename = "ReplicatedLoglet", default))]
#[serde(rename_all = "kebab-case")]
#[builder(default)]
pub struct ReplicatedLogletOptions {}
2 changes: 2 additions & 0 deletions crates/types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub mod net;
pub mod nodes_config;
pub mod partition_table;
pub mod protobuf;
#[cfg(feature = "replicated-loglet")]
pub mod replicated_loglet;
pub mod retries;
pub mod schema;
pub mod service_discovery;
Expand Down
5 changes: 5 additions & 0 deletions crates/types/src/logs/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ pub enum ProviderKind {
Local,
/// An in-memory loglet, primarily for testing.
InMemory,
#[cfg(feature = "replicated-loglet")]
/// [IN DEVELOPMENT]
/// Replicated loglet implementation. This requires log-server role to run on
/// enough nodes in the cluster.
Replicated,
}

impl LogletConfig {
Expand Down
1 change: 1 addition & 0 deletions crates/types/src/nodes_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub enum Role {
Admin,
/// Serves the metadata store
MetadataStore,
#[cfg(feature = "replicated-loglet")]
/// Serves a log server for replicated loglets
LogServer,
}
Expand Down
9 changes: 9 additions & 0 deletions crates/types/src/replicated_loglet/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
5 changes: 4 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ options_schema = [
"restate-tracing-instrumentation/options_schema",
"restate-types/schemars",
]

replicated-loglet = [
"restate-node/replicated-loglet",
]

[dependencies]
restate-admin = { workspace = true }
restate-bifrost = { workspace = true }
Expand Down

0 comments on commit 03d0094

Please sign in to comment.