Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MetadataBuilder to untangle shared metadata construction #1665

Merged
merged 2 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,13 +372,14 @@ mod tests {
builder.network_sender.clone(),
&mut builder.router_builder,
);
let metadata = builder.metadata.clone();
let svc_handle = svc.handle();

let node_env = builder.build().await;

let mut bifrost = node_env
let bifrost = node_env
.tc
.run_in_scope("init", None, Bifrost::init())
.run_in_scope("init", None, Bifrost::init(metadata))
.await;

node_env.tc.spawn(
Expand Down Expand Up @@ -446,6 +447,7 @@ mod tests {
async fn auto_log_trim() -> anyhow::Result<()> {
let mut builder = TestCoreEnvBuilder::new_with_mock_network();

let metadata = builder.metadata.clone();
let mut admin_options = AdminOptions::default();
admin_options.log_trim_threshold = 5;
let interval_duration = Duration::from_secs(10);
Expand Down Expand Up @@ -479,9 +481,9 @@ mod tests {
.build()
.await;

let mut bifrost = node_env
let bifrost = node_env
.tc
.run_in_scope("init", None, Bifrost::init())
.run_in_scope("init", None, Bifrost::init(metadata))
.await;

node_env.tc.spawn(
Expand Down
6 changes: 3 additions & 3 deletions crates/bifrost/benches/append_throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async fn append_records_multi_log(bifrost: Bifrost, log_id_range: Range<u64>, co
let mut appends = FuturesUnordered::new();
for log_id in log_id_range {
for _ in 0..count_per_log {
let mut bifrost = bifrost.clone();
let bifrost = bifrost.clone();
appends.push(async move {
let _ = bifrost
.append(LogId::from(log_id), Payload::default())
Expand All @@ -43,13 +43,13 @@ async fn append_records_multi_log(bifrost: Bifrost, log_id_range: Range<u64>, co
async fn append_records_concurrent_single_log(bifrost: Bifrost, log_id: LogId, count_per_log: u64) {
let mut appends = FuturesOrdered::new();
for _ in 0..count_per_log {
let mut bifrost = bifrost.clone();
let bifrost = bifrost.clone();
appends.push_back(async move { bifrost.append(log_id, Payload::default()).await.unwrap() })
}
while appends.next().await.is_some() {}
}

async fn append_seq(mut bifrost: Bifrost, log_id: LogId, count: u64) {
async fn append_seq(bifrost: Bifrost, log_id: LogId, count: u64) {
for _ in 1..=count {
let _ = bifrost
.append(log_id, Payload::default())
Expand Down
13 changes: 9 additions & 4 deletions crates/bifrost/benches/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
// by the Apache License, Version 2.0.

use restate_core::{
spawn_metadata_manager, MetadataManager, MockNetworkSender, TaskCenter, TaskCenterBuilder,
spawn_metadata_manager, MetadataBuilder, MetadataManager, MockNetworkSender, TaskCenter,
TaskCenterBuilder,
};
use restate_metadata_store::{MetadataStoreClient, Precondition};
use restate_rocksdb::RocksDbManager;
Expand All @@ -32,10 +33,14 @@ pub async fn spawn_environment(
let network_sender = MockNetworkSender::default();

let metadata_store_client = MetadataStoreClient::new_in_memory();
let metadata_manager =
MetadataManager::build(network_sender.clone(), metadata_store_client.clone());
let metadata_builder = MetadataBuilder::default();
let metadata = metadata_builder.to_metadata();
let metadata_manager = MetadataManager::new(
metadata_builder,
network_sender.clone(),
metadata_store_client.clone(),
);

let metadata = metadata_manager.metadata();
let metadata_writer = metadata_manager.writer();
tc.try_set_global_metadata(metadata.clone());

Expand Down
28 changes: 14 additions & 14 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use once_cell::sync::OnceCell;
use smallvec::SmallVec;
use tracing::{error, instrument};

use restate_core::{metadata, Metadata, MetadataKind};
use restate_core::{Metadata, MetadataKind};
use restate_types::logs::metadata::{ProviderKind, Segment};
use restate_types::logs::{LogId, Lsn, Payload, SequenceNumber};
use restate_types::storage::StorageCodec;
Expand All @@ -41,18 +41,18 @@ use crate::{
#[derive(Clone)]
pub struct Bifrost {
inner: Arc<BifrostInner>,
metadata: Metadata,
}

impl Bifrost {
pub(crate) fn new(inner: Arc<BifrostInner>) -> Self {
Self { inner }
pub(crate) fn new(inner: Arc<BifrostInner>, metadata: Metadata) -> Self {
Self { inner, metadata }
}

#[cfg(any(test, feature = "test-util"))]
pub async fn init() -> Self {
pub async fn init(metadata: Metadata) -> Self {
use crate::BifrostService;

let metadata = metadata();
let bifrost_svc = BifrostService::new(metadata);
let bifrost = bifrost_svc.handle();

Expand All @@ -67,15 +67,15 @@ impl Bifrost {
/// Appends a single record to a log. The log id must exist, otherwise the
/// operation fails with [`Error::UnknownLogId`]
#[instrument(level = "debug", skip(self, payload), err)]
pub async fn append(&mut self, log_id: LogId, payload: Payload) -> Result<Lsn> {
pub async fn append(&self, log_id: LogId, payload: Payload) -> Result<Lsn> {
self.inner.append(log_id, payload).await
}

/// Appends a batch of records to a log. The log id must exist, otherwise the
/// operation fails with [`Error::UnknownLogId`]. The returned Lsn is the Lsn of the first
/// record in this batch. This will only return after all records have been stored.
#[instrument(level = "debug", skip(self, payloads), err)]
pub async fn append_batch(&mut self, log_id: LogId, payloads: &[Payload]) -> Result<Lsn> {
pub async fn append_batch(&self, log_id: LogId, payloads: &[Payload]) -> Result<Lsn> {
self.inner.append_batch(log_id, payloads).await
}

Expand Down Expand Up @@ -133,7 +133,7 @@ impl Bifrost {

/// The version of the currently loaded logs metadata
pub fn version(&self) -> Version {
metadata().logs_version()
self.metadata.logs_version()
}

#[cfg(test)]
Expand Down Expand Up @@ -393,7 +393,7 @@ mod tests {
use googletest::prelude::*;

use crate::{Record, TrimGap};
use restate_core::TestCoreEnv;
use restate_core::{metadata, TestCoreEnv};
use restate_core::{task_center, TestCoreEnvBuilder};
use restate_rocksdb::RocksDbManager;
use restate_types::arc_util::Constant;
Expand All @@ -414,9 +414,9 @@ mod tests {
.await;
let tc = node_env.tc;
tc.run_in_scope("test", None, async {
let mut bifrost = Bifrost::init().await;
let bifrost = Bifrost::init(metadata()).await;

let mut clean_bifrost_clone = bifrost.clone();
let clean_bifrost_clone = bifrost.clone();

let mut max_lsn = Lsn::INVALID;
for i in 1..=5 {
Expand All @@ -434,7 +434,7 @@ mod tests {
assert_that!(resp, pat!(Err(pat!(Error::UnknownLogId(eq(invalid_log))))));

// use a cloned bifrost.
let mut cloned_bifrost = bifrost.clone();
let cloned_bifrost = bifrost.clone();
for _ in 1..=5 {
// Append a record to memory
let lsn = cloned_bifrost
Expand Down Expand Up @@ -489,7 +489,7 @@ mod tests {
// to ensure that appends do not fail while waiting for the loglet;
let memory_provider = MemoryLogletProvider::with_init_delay(delay);

let mut bifrost = Bifrost::init().await;
let bifrost = Bifrost::init(metadata()).await;

// Inject out preconfigured memory provider
bifrost
Expand Down Expand Up @@ -518,7 +518,7 @@ mod tests {
RocksDbManager::init(Constant::new(CommonOptions::default()));

let log_id = LogId::from(0);
let mut bifrost = Bifrost::init().await;
let bifrost = Bifrost::init(metadata()).await;

assert!(bifrost.get_trim_point(log_id).await?.is_none());

Expand Down
6 changes: 3 additions & 3 deletions crates/bifrost/src/read_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ mod tests {
use super::*;
use googletest::prelude::*;

use restate_core::{TaskKind, TestCoreEnvBuilder};
use restate_core::{metadata, TaskKind, TestCoreEnvBuilder};
use restate_rocksdb::RocksDbManager;
use restate_types::arc_util::Constant;
use restate_types::config::CommonOptions;
Expand Down Expand Up @@ -193,7 +193,7 @@ mod tests {
RocksDbManager::init(Constant::new(CommonOptions::default()));

let read_after = Lsn::from(5);
let mut bifrost = Bifrost::init().await;
let bifrost = Bifrost::init(metadata()).await;

let log_id = LogId::from(0);
let mut reader = bifrost.create_reader(log_id, read_after, Lsn::MAX).await?;
Expand Down Expand Up @@ -282,7 +282,7 @@ mod tests {
RocksDbManager::init(Constant::new(CommonOptions::default()));

let log_id = LogId::from(0);
let mut bifrost = Bifrost::init().await;
let bifrost = Bifrost::init(metadata()).await;

assert!(bifrost.get_trim_point(log_id).await?.is_none());

Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ pub struct BifrostService {
impl BifrostService {
pub fn new(metadata: Metadata) -> Self {
let (watchdog_sender, watchdog_receiver) = tokio::sync::mpsc::unbounded_channel();
let inner = Arc::new(BifrostInner::new(metadata, watchdog_sender));
let bifrost = Bifrost::new(inner.clone());
let inner = Arc::new(BifrostInner::new(metadata.clone(), watchdog_sender));
let bifrost = Bifrost::new(inner.clone(), metadata);
let watchdog = Watchdog::new(inner.clone(), watchdog_receiver);
Self {
inner,
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ mod task_center_types;
pub mod worker_api;

pub use metadata::{
spawn_metadata_manager, Metadata, MetadataKind, MetadataManager, MetadataWriter, SyncError,
spawn_metadata_manager, Metadata, MetadataBuilder, MetadataKind, MetadataManager,
MetadataWriter, SyncError,
};
pub use task_center::*;
pub use task_center_types::*;
Expand Down
66 changes: 38 additions & 28 deletions crates/core/src/metadata/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ use crate::metadata_store::{MetadataStoreClient, ReadError};
use crate::network::{MessageHandler, MessageRouterBuilder, NetworkSender};
use crate::task_center;

use super::{Metadata, MetadataContainer, MetadataInner, MetadataKind, MetadataWriter};
use super::MetadataBuilder;
use super::{Metadata, MetadataContainer, MetadataKind, MetadataWriter};

pub(super) type CommandSender = mpsc::UnboundedSender<Command>;
pub(super) type CommandReceiver = mpsc::UnboundedReceiver<Command>;
Expand Down Expand Up @@ -197,8 +198,7 @@ where
/// - NodesConfiguration
/// - Partition table
pub struct MetadataManager<N> {
self_sender: CommandSender,
inner: Arc<MetadataInner>,
metadata: Metadata,
inbound: CommandReceiver,
networking: N,
metadata_store_client: MetadataStoreClient,
Expand All @@ -208,30 +208,32 @@ impl<N> MetadataManager<N>
where
N: NetworkSender + 'static + Clone,
{
pub fn build(networking: N, metadata_store_client: MetadataStoreClient) -> Self {
let (self_sender, inbound) = mpsc::unbounded_channel();
pub fn new(
metadata_builder: MetadataBuilder,
networking: N,
metadata_store_client: MetadataStoreClient,
) -> Self {
Self {
inner: Arc::new(MetadataInner::default()),
inbound,
self_sender,
metadata: metadata_builder.metadata,
inbound: metadata_builder.receiver,
networking,
metadata_store_client,
}
}

pub fn register_in_message_router(&self, sr_builder: &mut MessageRouterBuilder) {
sr_builder.add_message_handler(MetadataMessageHandler {
sender: self.self_sender.clone(),
sender: self.metadata.sender.clone(),
networking: self.networking.clone(),
});
}

pub fn metadata(&self) -> Metadata {
Metadata::new(self.inner.clone(), self.self_sender.clone())
pub fn metadata(&self) -> &Metadata {
&self.metadata
}

pub fn writer(&self) -> MetadataWriter {
MetadataWriter::new(self.self_sender.clone(), self.inner.clone())
MetadataWriter::new(self.metadata.sender.clone(), self.metadata.inner.clone())
}

/// Start and wait for shutdown signal.
Expand Down Expand Up @@ -330,26 +332,27 @@ where
}

fn update_nodes_configuration(&mut self, config: NodesConfiguration) {
let maybe_new_version = Self::update_option_internal(&self.inner.nodes_config, config);
let maybe_new_version =
Self::update_option_internal(&self.metadata.inner.nodes_config, config);

self.notify_watches(maybe_new_version, MetadataKind::NodesConfiguration);
}

fn update_partition_table(&mut self, partition_table: FixedPartitionTable) {
let maybe_new_version =
Self::update_option_internal(&self.inner.partition_table, partition_table);
Self::update_option_internal(&self.metadata.inner.partition_table, partition_table);

self.notify_watches(maybe_new_version, MetadataKind::PartitionTable);
}

fn update_logs(&mut self, logs: Logs) {
let maybe_new_version = Self::update_option_internal(&self.inner.logs, logs);
let maybe_new_version = Self::update_option_internal(&self.metadata.inner.logs, logs);

self.notify_watches(maybe_new_version, MetadataKind::Logs);
}

fn update_schema(&mut self, schema: Schema) {
let maybe_new_version = Self::update_internal(&self.inner.schema, schema);
let maybe_new_version = Self::update_internal(&self.metadata.inner.schema, schema);

self.notify_watches(maybe_new_version, MetadataKind::Schema);
}
Expand Down Expand Up @@ -399,14 +402,16 @@ where

fn notify_watches(&mut self, maybe_new_version: Version, kind: MetadataKind) {
// notify watches.
self.inner.write_watches[kind].sender.send_if_modified(|v| {
if maybe_new_version > *v {
*v = maybe_new_version;
true
} else {
false
}
});
self.metadata.inner.write_watches[kind]
.sender
.send_if_modified(|v| {
if maybe_new_version > *v {
*v = maybe_new_version;
true
} else {
false
}
});
}
}

Expand Down Expand Up @@ -460,9 +465,11 @@ mod tests {
tc.block_on("test", None, async move {
let network_sender = MockNetworkSender::default();
let metadata_store_client = MetadataStoreClient::new_in_memory();
let metadata_manager = MetadataManager::build(network_sender, metadata_store_client);
let metadata_builder = MetadataBuilder::default();
let metadata = metadata_builder.to_metadata();
let metadata_manager =
MetadataManager::new(metadata_builder, network_sender, metadata_store_client);
let metadata_writer = metadata_manager.writer();
let metadata = metadata_manager.metadata();

assert_eq!(Version::INVALID, config_version(&metadata));

Expand Down Expand Up @@ -541,9 +548,12 @@ mod tests {
tc.block_on("test", None, async move {
let network_sender = MockNetworkSender::default();
let metadata_store_client = MetadataStoreClient::new_in_memory();
let metadata_manager = MetadataManager::build(network_sender, metadata_store_client);

let metadata_builder = MetadataBuilder::default();
let metadata = metadata_builder.to_metadata();
let metadata_manager =
MetadataManager::new(metadata_builder, network_sender, metadata_store_client);
let metadata_writer = metadata_manager.writer();
let metadata = metadata_manager.metadata();

assert_eq!(Version::INVALID, config_version(&metadata));

Expand Down
Loading
Loading