Skip to content

Commit

Permalink
MetadataBuilder to untangle shared metadata construction
Browse files Browse the repository at this point in the history
Address a chicken-and-an-egg problem at construction time of the runtime environment. MetadataBuilder helps by allowing the shared metadata object to be constructed early while encapsulating metadata's construction implementation details.
  • Loading branch information
AhmedSoliman committed Jun 27, 2024
1 parent 1f9546d commit e665ffd
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 57 deletions.
13 changes: 9 additions & 4 deletions crates/bifrost/benches/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
// by the Apache License, Version 2.0.

use restate_core::{
spawn_metadata_manager, MetadataManager, MockNetworkSender, TaskCenter, TaskCenterBuilder,
spawn_metadata_manager, MetadataBuilder, MetadataManager, MockNetworkSender, TaskCenter,
TaskCenterBuilder,
};
use restate_metadata_store::{MetadataStoreClient, Precondition};
use restate_rocksdb::RocksDbManager;
Expand All @@ -32,10 +33,14 @@ pub async fn spawn_environment(
let network_sender = MockNetworkSender::default();

let metadata_store_client = MetadataStoreClient::new_in_memory();
let metadata_manager =
MetadataManager::build(network_sender.clone(), metadata_store_client.clone());
let metadata_builder = MetadataBuilder::default();
let metadata = metadata_builder.to_metadata();
let metadata_manager = MetadataManager::new(
metadata_builder,
network_sender.clone(),
metadata_store_client.clone(),
);

let metadata = metadata_manager.metadata();
let metadata_writer = metadata_manager.writer();
tc.try_set_global_metadata(metadata.clone());

Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ mod task_center_types;
pub mod worker_api;

pub use metadata::{
spawn_metadata_manager, Metadata, MetadataKind, MetadataManager, MetadataWriter, SyncError,
spawn_metadata_manager, Metadata, MetadataBuilder, MetadataKind, MetadataManager,
MetadataWriter, SyncError,
};
pub use task_center::*;
pub use task_center_types::*;
Expand Down
66 changes: 38 additions & 28 deletions crates/core/src/metadata/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ use crate::metadata_store::{MetadataStoreClient, ReadError};
use crate::network::{MessageHandler, MessageRouterBuilder, NetworkSender};
use crate::task_center;

use super::{Metadata, MetadataContainer, MetadataInner, MetadataKind, MetadataWriter};
use super::MetadataBuilder;
use super::{Metadata, MetadataContainer, MetadataKind, MetadataWriter};

pub(super) type CommandSender = mpsc::UnboundedSender<Command>;
pub(super) type CommandReceiver = mpsc::UnboundedReceiver<Command>;
Expand Down Expand Up @@ -197,8 +198,7 @@ where
/// - NodesConfiguration
/// - Partition table
pub struct MetadataManager<N> {
self_sender: CommandSender,
inner: Arc<MetadataInner>,
metadata: Metadata,
inbound: CommandReceiver,
networking: N,
metadata_store_client: MetadataStoreClient,
Expand All @@ -208,30 +208,32 @@ impl<N> MetadataManager<N>
where
N: NetworkSender + 'static + Clone,
{
pub fn build(networking: N, metadata_store_client: MetadataStoreClient) -> Self {
let (self_sender, inbound) = mpsc::unbounded_channel();
pub fn new(
metadata_builder: MetadataBuilder,
networking: N,
metadata_store_client: MetadataStoreClient,
) -> Self {
Self {
inner: Arc::new(MetadataInner::default()),
inbound,
self_sender,
metadata: metadata_builder.metadata,
inbound: metadata_builder.receiver,
networking,
metadata_store_client,
}
}

pub fn register_in_message_router(&self, sr_builder: &mut MessageRouterBuilder) {
sr_builder.add_message_handler(MetadataMessageHandler {
sender: self.self_sender.clone(),
sender: self.metadata.sender.clone(),
networking: self.networking.clone(),
});
}

pub fn metadata(&self) -> Metadata {
Metadata::new(self.inner.clone(), self.self_sender.clone())
pub fn metadata(&self) -> &Metadata {
&self.metadata
}

pub fn writer(&self) -> MetadataWriter {
MetadataWriter::new(self.self_sender.clone(), self.inner.clone())
MetadataWriter::new(self.metadata.sender.clone(), self.metadata.inner.clone())
}

/// Start and wait for shutdown signal.
Expand Down Expand Up @@ -330,26 +332,27 @@ where
}

fn update_nodes_configuration(&mut self, config: NodesConfiguration) {
let maybe_new_version = Self::update_option_internal(&self.inner.nodes_config, config);
let maybe_new_version =
Self::update_option_internal(&self.metadata.inner.nodes_config, config);

self.notify_watches(maybe_new_version, MetadataKind::NodesConfiguration);
}

fn update_partition_table(&mut self, partition_table: FixedPartitionTable) {
let maybe_new_version =
Self::update_option_internal(&self.inner.partition_table, partition_table);
Self::update_option_internal(&self.metadata.inner.partition_table, partition_table);

self.notify_watches(maybe_new_version, MetadataKind::PartitionTable);
}

fn update_logs(&mut self, logs: Logs) {
let maybe_new_version = Self::update_option_internal(&self.inner.logs, logs);
let maybe_new_version = Self::update_option_internal(&self.metadata.inner.logs, logs);

self.notify_watches(maybe_new_version, MetadataKind::Logs);
}

fn update_schema(&mut self, schema: Schema) {
let maybe_new_version = Self::update_internal(&self.inner.schema, schema);
let maybe_new_version = Self::update_internal(&self.metadata.inner.schema, schema);

self.notify_watches(maybe_new_version, MetadataKind::Schema);
}
Expand Down Expand Up @@ -399,14 +402,16 @@ where

fn notify_watches(&mut self, maybe_new_version: Version, kind: MetadataKind) {
// notify watches.
self.inner.write_watches[kind].sender.send_if_modified(|v| {
if maybe_new_version > *v {
*v = maybe_new_version;
true
} else {
false
}
});
self.metadata.inner.write_watches[kind]
.sender
.send_if_modified(|v| {
if maybe_new_version > *v {
*v = maybe_new_version;
true
} else {
false
}
});
}
}

Expand Down Expand Up @@ -460,9 +465,11 @@ mod tests {
tc.block_on("test", None, async move {
let network_sender = MockNetworkSender::default();
let metadata_store_client = MetadataStoreClient::new_in_memory();
let metadata_manager = MetadataManager::build(network_sender, metadata_store_client);
let metadata_builder = MetadataBuilder::default();
let metadata = metadata_builder.to_metadata();
let metadata_manager =
MetadataManager::new(metadata_builder, network_sender, metadata_store_client);
let metadata_writer = metadata_manager.writer();
let metadata = metadata_manager.metadata();

assert_eq!(Version::INVALID, config_version(&metadata));

Expand Down Expand Up @@ -541,9 +548,12 @@ mod tests {
tc.block_on("test", None, async move {
let network_sender = MockNetworkSender::default();
let metadata_store_client = MetadataStoreClient::new_in_memory();
let metadata_manager = MetadataManager::build(network_sender, metadata_store_client);

let metadata_builder = MetadataBuilder::default();
let metadata = metadata_builder.to_metadata();
let metadata_manager =
MetadataManager::new(metadata_builder, network_sender, metadata_store_client);
let metadata_writer = metadata_manager.writer();
let metadata = metadata_manager.metadata();

assert_eq!(Version::INVALID, config_version(&metadata));

Expand Down
31 changes: 25 additions & 6 deletions crates/core/src/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::sync::{Arc, OnceLock};

use arc_swap::{ArcSwap, ArcSwapOption};
use enum_map::EnumMap;
use tokio::sync::{oneshot, watch};
use tokio::sync::{mpsc, oneshot, watch};

use restate_types::logs::metadata::Logs;
use restate_types::net::metadata::MetadataContainer;
Expand All @@ -41,19 +41,38 @@ pub enum SyncError {
Shutdown(#[from] ShutdownError),
}

/// The kind of versioned metadata that can be synchronized across nodes.
pub struct MetadataBuilder {
receiver: manager::CommandReceiver,
metadata: Metadata,
}

impl MetadataBuilder {
pub fn to_metadata(&self) -> Metadata {
self.metadata.clone()
}
}

impl Default for MetadataBuilder {
fn default() -> Self {
let (sender, receiver) = mpsc::unbounded_channel();
Self {
receiver,
metadata: Metadata {
inner: Default::default(),
sender,
},
}
}
}

/// The kind of versioned metadata that can be synchronized across nodes.
#[derive(Clone)]
pub struct Metadata {
sender: manager::CommandSender,
inner: Arc<MetadataInner>,
}

impl Metadata {
fn new(inner: Arc<MetadataInner>, sender: manager::CommandSender) -> Self {
Self { inner, sender }
}

/// Panics if nodes configuration is not loaded yet.
#[track_caller]
pub fn nodes_config(&self) -> Arc<NodesConfiguration> {
Expand Down
16 changes: 12 additions & 4 deletions crates/core/src/network/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,27 @@ use restate_types::NodeId;

use super::{ConnectionManager, ConnectionSender};
use super::{NetworkError, NetworkSender};
use crate::metadata;
use crate::Metadata;

const DEFAULT_MAX_CONNECT_ATTEMPTS: u32 = 10;
// todo: make this configurable
const SEND_RETRY_BASE_DURATION: Duration = Duration::from_millis(250);

/// Access to node-to-node networking infrastructure;
#[derive(Clone, Default)]
#[derive(Clone)]
pub struct Networking {
connections: ConnectionManager,
metadata: Metadata,
}

impl Networking {
pub fn new(metadata: Metadata) -> Self {
Self {
connections: Default::default(),
metadata,
}
}

pub fn connection_manager(&self) -> ConnectionManager {
self.connections.clone()
}
Expand All @@ -43,7 +51,7 @@ impl Networking {
let node = match node.as_generational() {
Some(node) => node,
None => {
metadata()
self.metadata
.nodes_config()
.find_node_by_id(node)?
.current_generation
Expand All @@ -68,7 +76,7 @@ impl NetworkSender for Networking {
// to ensure we get the latest if it has been updated since last attempt.
let to = match to.as_generational() {
Some(to) => to,
None => match metadata().nodes_config().find_node_by_id(to) {
None => match self.metadata.nodes_config().find_node_by_id(to) {
Ok(node) => node.current_generation,
Err(e) => return Err(NetworkError::UnknownNode(e)),
},
Expand Down
14 changes: 10 additions & 4 deletions crates/core/src/test_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ use crate::network::{
Handler, MessageHandler, MessageRouter, MessageRouterBuilder, NetworkError, NetworkSender,
ProtocolError,
};
use crate::{cancellation_watcher, metadata, spawn_metadata_manager, ShutdownError, TaskId};
use crate::{
cancellation_watcher, metadata, spawn_metadata_manager, MetadataBuilder, ShutdownError, TaskId,
};
use crate::{Metadata, MetadataManager, MetadataWriter};
use crate::{TaskCenter, TaskCenterBuilder};

Expand Down Expand Up @@ -170,9 +172,13 @@ where

let my_node_id = GenerationalNodeId::new(1, 1);
let metadata_store_client = MetadataStoreClient::new_in_memory();
let metadata_manager =
MetadataManager::build(network_sender.clone(), metadata_store_client.clone());
let metadata = metadata_manager.metadata();
let metadata_builder = MetadataBuilder::default();
let metadata = metadata_builder.to_metadata();
let metadata_manager = MetadataManager::new(
metadata_builder,
network_sender.clone(),
metadata_store_client.clone(),
);
let metadata_writer = metadata_manager.writer();
let router_builder = MessageRouterBuilder::default();
let nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned());
Expand Down
16 changes: 10 additions & 6 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use restate_bifrost::BifrostService;
use restate_core::metadata_store::{MetadataStoreClientError, ReadWriteError};
use restate_core::network::MessageRouterBuilder;
use restate_core::network::Networking;
use restate_core::{spawn_metadata_manager, MetadataKind, MetadataManager};
use restate_core::{spawn_metadata_manager, MetadataBuilder, MetadataKind, MetadataManager};
use restate_core::{task_center, TaskKind};
use restate_metadata_store::local::LocalMetadataStoreService;
use restate_metadata_store::MetadataStoreClient;
Expand Down Expand Up @@ -144,11 +144,15 @@ impl Node {
);

let mut router_builder = MessageRouterBuilder::default();
let networking = Networking::default();
let metadata_manager =
MetadataManager::build(networking.clone(), metadata_store_client.clone());
let metadata_builder = MetadataBuilder::default();
let metadata = metadata_builder.to_metadata();
let networking = Networking::new(metadata_builder.to_metadata());
let metadata_manager = MetadataManager::new(
metadata_builder,
networking.clone(),
metadata_store_client.clone(),
);
metadata_manager.register_in_message_router(&mut router_builder);
let metadata = metadata_manager.metadata();
let updating_schema_information = metadata.schema_updateable();
let bifrost = BifrostService::new(metadata.clone());

Expand Down Expand Up @@ -255,7 +259,7 @@ impl Node {
);

let metadata_writer = self.metadata_manager.writer();
let metadata = self.metadata_manager.metadata();
let metadata = self.metadata_manager.metadata().clone();
let is_set = tc.try_set_global_metadata(metadata.clone());
debug_assert!(is_set, "Global metadata was already set");

Expand Down
4 changes: 4 additions & 0 deletions crates/types/src/nodes_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,18 @@
#![allow(dead_code)]

use std::collections::HashMap;
use std::sync::Arc;

use arc_swap::ArcSwap;
use enumset::{EnumSet, EnumSetType};
use serde_with::serde_as;

use crate::net::AdvertisedAddress;
use crate::{flexbuffers_storage_encode_decode, GenerationalNodeId, NodeId, PlainNodeId};
use crate::{Version, Versioned};

pub type UpdateableNodesConfiguration = Arc<ArcSwap<NodesConfiguration>>;

#[derive(Debug, thiserror::Error)]
pub enum NodesConfigError {
#[error("node {0} was not found in config")]
Expand Down
13 changes: 9 additions & 4 deletions tools/bifrost-benchpress/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use bifrost_benchpress::{append_latency, write_to_read, Arguments, Command};
use metrics_exporter_prometheus::PrometheusBuilder;
use restate_bifrost::{Bifrost, BifrostService};
use restate_core::{
spawn_metadata_manager, MetadataManager, MockNetworkSender, TaskCenter, TaskCenterBuilder,
spawn_metadata_manager, MetadataBuilder, MetadataManager, MockNetworkSender, TaskCenter,
TaskCenterBuilder,
};
use restate_errors::fmt::RestateCode;
use restate_metadata_store::{MetadataStoreClient, Precondition};
Expand Down Expand Up @@ -138,10 +139,14 @@ fn spawn_environment(config: Configuration, num_logs: u64) -> (TaskCenter, Bifro
let bifrost = tc.block_on("spawn", None, async move {
let network_sender = MockNetworkSender::default();
let metadata_store_client = MetadataStoreClient::new_in_memory();
let metadata_manager =
MetadataManager::build(network_sender.clone(), metadata_store_client.clone());
let metadata_builder = MetadataBuilder::default();
let metadata = metadata_builder.to_metadata();
let metadata_manager = MetadataManager::new(
metadata_builder,
network_sender.clone(),
metadata_store_client.clone(),
);

let metadata = metadata_manager.metadata();
let metadata_writer = metadata_manager.writer();
task_center.try_set_global_metadata(metadata.clone());

Expand Down

0 comments on commit e665ffd

Please sign in to comment.