Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schemas -> Live<Schema> #1671

Merged
merged 2 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions benchmarks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,16 @@
use std::num::NonZeroU64;
use std::time::Duration;

use arc_swap::ArcSwap;
use futures_util::{future, TryFutureExt};
use hyper::header::CONTENT_TYPE;
use hyper::{Body, Uri};
use pprof::flamegraph::Options;
use restate_rocksdb::RocksDbManager;
use restate_server::config_loader::ConfigLoaderBuilder;
use restate_types::arc_util::Constant;
use restate_types::config::{
CommonOptionsBuilder, Configuration, ConfigurationBuilder, UpdateableConfiguration,
WorkerOptionsBuilder,
CommonOptionsBuilder, Configuration, ConfigurationBuilder, WorkerOptionsBuilder,
};
use restate_types::live::Constant;
use tokio::runtime::Runtime;

use restate_core::{TaskCenter, TaskCenterBuilder, TaskKind};
Expand Down Expand Up @@ -97,7 +95,7 @@ pub fn spawn_restate(config: Configuration) -> TaskCenter {
.expect("task_center builds");
let cloned_tc = tc.clone();
restate_types::config::set_current_config(config.clone());
let updateable_config = UpdateableConfiguration::new(ArcSwap::from_pointee(config.clone()));
let updateable_config = Configuration::updateable();

tc.run_in_scope_sync("db-manager-init", None, || {
RocksDbManager::init(Constant::new(config.common))
Expand Down
4 changes: 3 additions & 1 deletion clippy.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
# https://github.com/rust-lang/rust-clippy/issues/9801
ignore-interior-mutability = ["bytes::Bytes", "bytestring::ByteString", "http::header::HeaderName", "http::header::HeaderValue"]
ignore-interior-mutability = ["bytes::Bytes", "bytestring::ByteString", "http::header::HeaderName", "http::header::HeaderValue"]
# Sometimes we want to keep the mod.rs file clean
allow-private-module-inception = true
2 changes: 1 addition & 1 deletion crates/admin/src/cluster_controller/cluster_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ where
last_state.nodes_config_version,
)
.await;
let nodes_config = metadata.nodes_config();
let nodes_config = metadata.nodes_config_snapshot();

let mut nodes = BTreeMap::new();
let mut join_set = tokio::task::JoinSet::new();
Expand Down
12 changes: 6 additions & 6 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use tokio::time::MissedTickBehavior;
use tokio::time::{Instant, Interval};
use tracing::{debug, warn};

use restate_types::arc_util::Updateable;
use restate_types::config::{AdminOptions, Configuration};
use restate_types::live::LiveLoad;
use restate_types::net::cluster_controller::{Action, AttachRequest, AttachResponse, RunPartition};
use restate_types::net::RequestId;
use restate_types::partition_table::{FixedPartitionTable, KeyRange};
Expand Down Expand Up @@ -56,7 +56,7 @@ pub struct Service<N> {
command_tx: mpsc::Sender<ClusterControllerCommand>,
command_rx: mpsc::Receiver<ClusterControllerCommand>,

configuration: Box<dyn Updateable<AdminOptions> + Send + Sync>,
configuration: Box<dyn LiveLoad<AdminOptions> + Send + Sync>,
heartbeat_interval: time::Interval,
log_trim_interval: Option<time::Interval>,
log_trim_threshold: Lsn,
Expand All @@ -67,7 +67,7 @@ where
N: NetworkSender + 'static,
{
pub fn new(
mut configuration: impl Updateable<AdminOptions> + Send + Sync + 'static,
mut configuration: impl LiveLoad<AdminOptions> + Send + Sync + 'static,
task_center: TaskCenter,
metadata: Metadata,
networking: N,
Expand All @@ -83,7 +83,7 @@ where
router_builder,
);

let options = configuration.load();
let options = configuration.live_load();

let heartbeat_interval = Self::create_heartbeat_interval(options);
let (log_trim_interval, log_trim_threshold) = Self::create_log_trim_interval(options);
Expand Down Expand Up @@ -219,7 +219,7 @@ where

fn on_config_update(&mut self) {
debug!("Updating the cluster controller settings.");
let options = self.configuration.load();
let options = self.configuration.live_load();

self.heartbeat_interval = Self::create_heartbeat_interval(options);
(self.log_trim_interval, self.log_trim_threshold) = Self::create_log_trim_interval(options);
Expand Down Expand Up @@ -345,10 +345,10 @@ mod tests {
use restate_bifrost::{Bifrost, Record, TrimGap};
use restate_core::network::{MessageHandler, NetworkSender};
use restate_core::{MockNetworkSender, TaskKind, TestCoreEnvBuilder};
use restate_types::arc_util::Constant;
use restate_types::cluster::cluster_state::{PartitionProcessorStatus, RunMode};
use restate_types::config::AdminOptions;
use restate_types::identifiers::PartitionId;
use restate_types::live::Constant;
use restate_types::logs::{LogId, Lsn, Payload, SequenceNumber};
use restate_types::net::partition_processor_manager::{
GetProcessorsState, ProcessorsStateResponse,
Expand Down
6 changes: 3 additions & 3 deletions crates/admin/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use std::sync::Arc;
use axum::error_handling::HandleErrorLayer;
use http::StatusCode;
use restate_bifrost::Bifrost;
use restate_types::arc_util::Updateable;
use restate_types::config::AdminOptions;
use restate_types::live::LiveLoad;
use tonic::transport::Channel;
use tower::ServiceBuilder;
use tracing::info;
Expand Down Expand Up @@ -59,11 +59,11 @@ where

pub async fn run(
self,
mut updateable_config: impl Updateable<AdminOptions> + Send + 'static,
mut updateable_config: impl LiveLoad<AdminOptions> + Send + 'static,
node_svc_client: NodeSvcClient<Channel>,
bifrost: Bifrost,
) -> anyhow::Result<()> {
let opts = updateable_config.load();
let opts = updateable_config.live_load();

let rest_state =
state::AdminServiceState::new(self.schema_registry, bifrost, task_center());
Expand Down
6 changes: 3 additions & 3 deletions crates/bifrost/benches/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use restate_core::{
};
use restate_metadata_store::{MetadataStoreClient, Precondition};
use restate_rocksdb::RocksDbManager;
use restate_types::arc_util::Constant;
use restate_types::config::Configuration;
use restate_types::live::Constant;
use restate_types::logs::metadata::ProviderKind;
use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY;

Expand All @@ -30,10 +30,10 @@ pub async fn spawn_environment(
.expect("task_center builds");

restate_types::config::set_current_config(config.clone());
let network_sender = MockNetworkSender::default();
let metadata_builder = MetadataBuilder::default();
let network_sender = MockNetworkSender::new(metadata_builder.to_metadata());

let metadata_store_client = MetadataStoreClient::new_in_memory();
let metadata_builder = MetadataBuilder::default();
let metadata = metadata_builder.to_metadata();
let metadata_manager = MetadataManager::new(
metadata_builder,
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,8 @@ mod tests {
use restate_core::{metadata, TestCoreEnv};
use restate_core::{task_center, TestCoreEnvBuilder};
use restate_rocksdb::RocksDbManager;
use restate_types::arc_util::Constant;
use restate_types::config::CommonOptions;
use restate_types::live::Constant;
use restate_types::logs::SequenceNumber;
use restate_types::partition_table::FixedPartitionTable;
use test_log::test;
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/src/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{LogRecord, LsnExt, ProviderError};
pub fn create_provider(kind: ProviderKind) -> Result<Arc<dyn LogletProvider>, ProviderError> {
match kind {
ProviderKind::Local => Ok(crate::loglets::local_loglet::LocalLogletProvider::new(
&Configuration::current().load().bifrost.local,
&Configuration::pinned().bifrost.local,
Configuration::mapped_updateable(|c| &c.bifrost.local.rocksdb),
)?),
ProviderKind::InMemory => Ok(crate::loglets::memory_loglet::MemoryLogletProvider::new()?),
Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/src/loglets/local_loglet/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use std::sync::Arc;
use restate_rocksdb::{
CfExactPattern, CfName, DbName, DbSpecBuilder, RocksDb, RocksDbManager, RocksError,
};
use restate_types::arc_util::Updateable;
use restate_types::config::{LocalLogletOptions, RocksDbOptions};
use restate_types::live::LiveLoad;
use restate_types::storage::{StorageDecodeError, StorageEncodeError};
use rocksdb::{BoundColumnFamily, DBCompressionType, SliceTransform, DB};
use static_assertions::const_assert;
Expand Down Expand Up @@ -55,7 +55,7 @@ pub struct RocksDbLogStore {
impl RocksDbLogStore {
pub fn new(
options: &LocalLogletOptions,
updateable_options: impl Updateable<RocksDbOptions> + Send + 'static,
updateable_options: impl LiveLoad<RocksDbOptions> + Send + 'static,
) -> Result<Self, LogStoreError> {
let db_manager = RocksDbManager::get();

Expand Down
10 changes: 5 additions & 5 deletions crates/bifrost/src/loglets/local_loglet/log_store_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use tokio_stream::StreamExt as TokioStreamExt;
use tracing::{debug, error, trace, warn};

use restate_core::{cancellation_watcher, task_center, ShutdownError, TaskKind};
use restate_types::arc_util::Updateable;
use restate_types::config::LocalLogletOptions;
use restate_types::live::LiveLoad;
use restate_types::logs::SequenceNumber;

use crate::loglet::LogletOffset;
Expand Down Expand Up @@ -76,10 +76,10 @@ impl LogStoreWriter {
/// Must be called from task_center context
pub fn start(
mut self,
mut updateable: impl Updateable<LocalLogletOptions> + Send + 'static,
mut updateable: impl LiveLoad<LocalLogletOptions> + Send + 'static,
) -> Result<RocksDbLogWriterHandle, ShutdownError> {
// big enough to allows a second full batch to queue up while the existing one is being processed
let batch_size = std::cmp::max(1, updateable.load().writer_batch_commit_count);
let batch_size = std::cmp::max(1, updateable.live_load().writer_batch_commit_count);
// leave twice as much space in the the channel to ensure we can enqueue up-to a full batch in
// the backlog while we process this one.
let (sender, receiver) = mpsc::channel(batch_size * 2);
Expand All @@ -90,7 +90,7 @@ impl LogStoreWriter {
None,
async move {
debug!("Start running LogStoreWriter");
let opts = updateable.load();
let opts = updateable.live_load();
let batch_size = std::cmp::max(1, opts.writer_batch_commit_count);
let batch_duration: Duration = opts.writer_batch_commit_duration.into();
// We don't want to use chunks_timeout if time-based batching is disabled, why?
Expand All @@ -114,7 +114,7 @@ impl LogStoreWriter {
break;
}
Some(cmds) = TokioStreamExt::next(&mut receiver) => {
let opts = updateable.load();
let opts = updateable.live_load();
self.handle_commands(opts, cmds).await;
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/src/loglets/local_loglet/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use std::sync::{Arc, OnceLock};

use anyhow::Context;
use async_trait::async_trait;
use restate_types::arc_util::Updateable;
use restate_types::config::{Configuration, LocalLogletOptions, RocksDbOptions};
use restate_types::live::LiveLoad;
use restate_types::logs::metadata::LogletParams;
use tokio::sync::Mutex as AsyncMutex;
use tracing::debug;
Expand All @@ -35,7 +35,7 @@ pub struct LocalLogletProvider {
impl LocalLogletProvider {
pub fn new(
options: &LocalLogletOptions,
updateable_rocksdb_options: impl Updateable<RocksDbOptions> + Send + 'static,
updateable_rocksdb_options: impl LiveLoad<RocksDbOptions> + Send + 'static,
) -> Result<Arc<Self>, ProviderError> {
let log_store = RocksDbLogStore::new(options, updateable_rocksdb_options)
.context("RocksDb LogStore")?;
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/src/read_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ mod tests {

use restate_core::{metadata, TaskKind, TestCoreEnvBuilder};
use restate_rocksdb::RocksDbManager;
use restate_types::arc_util::Constant;
use restate_types::config::CommonOptions;
use restate_types::live::Constant;
use restate_types::logs::metadata::ProviderKind;
use tokio_stream::StreamExt;
use tracing::info;
Expand Down
22 changes: 11 additions & 11 deletions crates/core/src/metadata/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
// by the Apache License, Version 2.0.

use arc_swap::{ArcSwap, ArcSwapOption};
use restate_types::schema::Schema;
use std::ops::Deref;
use std::sync::Arc;

Expand All @@ -25,12 +24,12 @@ use restate_types::net::metadata::{MetadataMessage, MetadataUpdate};
use restate_types::net::MessageEnvelope;
use restate_types::nodes_config::NodesConfiguration;
use restate_types::partition_table::FixedPartitionTable;
use restate_types::schema::Schema;
use restate_types::GenerationalNodeId;
use restate_types::{Version, Versioned};

use crate::cancellation_watcher;
use crate::is_cancellation_requested;
use crate::metadata;
use crate::metadata_store::{MetadataStoreClient, ReadError};
use crate::network::{MessageHandler, MessageRouterBuilder, NetworkSender};
use crate::task_center;
Expand All @@ -57,6 +56,7 @@ where
{
sender: CommandSender,
networking: N,
metadata: Metadata,
}

impl<N> MetadataMessageHandler<N>
Expand All @@ -80,18 +80,18 @@ where
}

fn send_nodes_config(&self, to: GenerationalNodeId, version: Option<Version>) {
let config = metadata().nodes_config();
let config = self.metadata.nodes_config_snapshot();
self.send_metadata_internal(to, version, config.deref(), "nodes_config");
}

fn send_partition_table(&self, to: GenerationalNodeId, version: Option<Version>) {
if let Some(partition_table) = metadata().partition_table() {
if let Some(partition_table) = self.metadata.partition_table() {
self.send_metadata_internal(to, version, partition_table.deref(), "partition_table");
}
}

fn send_logs(&self, to: GenerationalNodeId, version: Option<Version>) {
if let Some(logs) = metadata().logs() {
if let Some(logs) = self.metadata.logs() {
self.send_metadata_internal(to, version, logs.deref(), "logs");
}
}
Expand Down Expand Up @@ -225,6 +225,7 @@ where
sr_builder.add_message_handler(MetadataMessageHandler {
sender: self.metadata.sender.clone(),
networking: self.networking.clone(),
metadata: self.metadata.clone(),
});
}

Expand Down Expand Up @@ -332,8 +333,7 @@ where
}

fn update_nodes_configuration(&mut self, config: NodesConfiguration) {
let maybe_new_version =
Self::update_option_internal(&self.metadata.inner.nodes_config, config);
let maybe_new_version = Self::update_internal(&self.metadata.inner.nodes_config, config);

self.notify_watches(maybe_new_version, MetadataKind::NodesConfiguration);
}
Expand Down Expand Up @@ -463,9 +463,9 @@ mod tests {
{
let tc = TaskCenterBuilder::default().build()?;
tc.block_on("test", None, async move {
let network_sender = MockNetworkSender::default();
let metadata_store_client = MetadataStoreClient::new_in_memory();
let metadata_builder = MetadataBuilder::default();
let network_sender = MockNetworkSender::new(metadata_builder.to_metadata());
let metadata_store_client = MetadataStoreClient::new_in_memory();
let metadata = metadata_builder.to_metadata();
let metadata_manager =
MetadataManager::new(metadata_builder, network_sender, metadata_store_client);
Expand Down Expand Up @@ -546,10 +546,10 @@ mod tests {
{
let tc = TaskCenterBuilder::default().build()?;
tc.block_on("test", None, async move {
let network_sender = MockNetworkSender::default();
let metadata_builder = MetadataBuilder::default();
let network_sender = MockNetworkSender::new(metadata_builder.to_metadata());
let metadata_store_client = MetadataStoreClient::new_in_memory();

let metadata_builder = MetadataBuilder::default();
let metadata = metadata_builder.to_metadata();
let metadata_manager =
MetadataManager::new(metadata_builder, network_sender, metadata_store_client);
Expand Down
Loading
Loading