Skip to content

Commit

Permalink
Updateable is now Live<T>
Browse files Browse the repository at this point in the history
- Automatic caching and redesigned interface
- Nodes configuration is not optional anymore. Node configuration is empty with an INVALID version before loading.
- Updateable<T> provides a readonly live view of an updateable. Namely, it hides all access to the underlying ArcSwap.
- Use updateable config in networking and avoid metadata()
- Efficient access to metadata and nodes configuration in performance-sensitive networking code
- Easy to map and around as owned types
- Supports projections and type erasure
  • Loading branch information
AhmedSoliman committed Jun 27, 2024
1 parent e665ffd commit 295517d
Show file tree
Hide file tree
Showing 45 changed files with 474 additions and 403 deletions.
8 changes: 3 additions & 5 deletions benchmarks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,16 @@
use std::num::NonZeroU64;
use std::time::Duration;

use arc_swap::ArcSwap;
use futures_util::{future, TryFutureExt};
use hyper::header::CONTENT_TYPE;
use hyper::{Body, Uri};
use pprof::flamegraph::Options;
use restate_rocksdb::RocksDbManager;
use restate_server::config_loader::ConfigLoaderBuilder;
use restate_types::arc_util::Constant;
use restate_types::config::{
CommonOptionsBuilder, Configuration, ConfigurationBuilder, UpdateableConfiguration,
WorkerOptionsBuilder,
CommonOptionsBuilder, Configuration, ConfigurationBuilder, WorkerOptionsBuilder,
};
use restate_types::live::Constant;
use tokio::runtime::Runtime;

use restate_core::{TaskCenter, TaskCenterBuilder, TaskKind};
Expand Down Expand Up @@ -97,7 +95,7 @@ pub fn spawn_restate(config: Configuration) -> TaskCenter {
.expect("task_center builds");
let cloned_tc = tc.clone();
restate_types::config::set_current_config(config.clone());
let updateable_config = UpdateableConfiguration::new(ArcSwap::from_pointee(config.clone()));
let updateable_config = Configuration::updateable();

tc.run_in_scope_sync("db-manager-init", None, || {
RocksDbManager::init(Constant::new(config.common))
Expand Down
4 changes: 3 additions & 1 deletion clippy.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
# https://github.com/rust-lang/rust-clippy/issues/9801
ignore-interior-mutability = ["bytes::Bytes", "bytestring::ByteString", "http::header::HeaderName", "http::header::HeaderValue"]
ignore-interior-mutability = ["bytes::Bytes", "bytestring::ByteString", "http::header::HeaderName", "http::header::HeaderValue"]
# Sometimes we want to keep the mod.rs file clean
allow-private-module-inception = true
2 changes: 1 addition & 1 deletion crates/admin/src/cluster_controller/cluster_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ where
last_state.nodes_config_version,
)
.await;
let nodes_config = metadata.nodes_config();
let nodes_config = metadata.nodes_config_snapshot();

let mut nodes = BTreeMap::new();
let mut join_set = tokio::task::JoinSet::new();
Expand Down
12 changes: 6 additions & 6 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use tokio::time::MissedTickBehavior;
use tokio::time::{Instant, Interval};
use tracing::{debug, warn};

use restate_types::arc_util::Updateable;
use restate_types::config::{AdminOptions, Configuration};
use restate_types::live::LiveLoad;
use restate_types::net::cluster_controller::{Action, AttachRequest, AttachResponse, RunPartition};
use restate_types::net::RequestId;
use restate_types::partition_table::{FixedPartitionTable, KeyRange};
Expand Down Expand Up @@ -56,7 +56,7 @@ pub struct Service<N> {
command_tx: mpsc::Sender<ClusterControllerCommand>,
command_rx: mpsc::Receiver<ClusterControllerCommand>,

configuration: Box<dyn Updateable<AdminOptions> + Send + Sync>,
configuration: Box<dyn LiveLoad<AdminOptions> + Send + Sync>,
heartbeat_interval: time::Interval,
log_trim_interval: Option<time::Interval>,
log_trim_threshold: Lsn,
Expand All @@ -67,7 +67,7 @@ where
N: NetworkSender + 'static,
{
pub fn new(
mut configuration: impl Updateable<AdminOptions> + Send + Sync + 'static,
mut configuration: impl LiveLoad<AdminOptions> + Send + Sync + 'static,
task_center: TaskCenter,
metadata: Metadata,
networking: N,
Expand All @@ -83,7 +83,7 @@ where
router_builder,
);

let options = configuration.load();
let options = configuration.live_load();

let heartbeat_interval = Self::create_heartbeat_interval(options);
let (log_trim_interval, log_trim_threshold) = Self::create_log_trim_interval(options);
Expand Down Expand Up @@ -219,7 +219,7 @@ where

fn on_config_update(&mut self) {
debug!("Updating the cluster controller settings.");
let options = self.configuration.load();
let options = self.configuration.live_load();

self.heartbeat_interval = Self::create_heartbeat_interval(options);
(self.log_trim_interval, self.log_trim_threshold) = Self::create_log_trim_interval(options);
Expand Down Expand Up @@ -345,10 +345,10 @@ mod tests {
use restate_bifrost::{Bifrost, Record, TrimGap};
use restate_core::network::{MessageHandler, NetworkSender};
use restate_core::{MockNetworkSender, TaskKind, TestCoreEnvBuilder};
use restate_types::arc_util::Constant;
use restate_types::cluster::cluster_state::{PartitionProcessorStatus, RunMode};
use restate_types::config::AdminOptions;
use restate_types::identifiers::PartitionId;
use restate_types::live::Constant;
use restate_types::logs::{LogId, Lsn, Payload, SequenceNumber};
use restate_types::net::partition_processor_manager::{
GetProcessorsState, ProcessorsStateResponse,
Expand Down
6 changes: 3 additions & 3 deletions crates/admin/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use std::sync::Arc;
use axum::error_handling::HandleErrorLayer;
use http::StatusCode;
use restate_bifrost::Bifrost;
use restate_types::arc_util::Updateable;
use restate_types::config::AdminOptions;
use restate_types::live::LiveLoad;
use tonic::transport::Channel;
use tower::ServiceBuilder;
use tracing::info;
Expand Down Expand Up @@ -59,11 +59,11 @@ where

pub async fn run(
self,
mut updateable_config: impl Updateable<AdminOptions> + Send + 'static,
mut updateable_config: impl LiveLoad<AdminOptions> + Send + 'static,
node_svc_client: NodeSvcClient<Channel>,
bifrost: Bifrost,
) -> anyhow::Result<()> {
let opts = updateable_config.load();
let opts = updateable_config.live_load();

let rest_state =
state::AdminServiceState::new(self.schema_registry, bifrost, task_center());
Expand Down
6 changes: 3 additions & 3 deletions crates/bifrost/benches/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use restate_core::{
};
use restate_metadata_store::{MetadataStoreClient, Precondition};
use restate_rocksdb::RocksDbManager;
use restate_types::arc_util::Constant;
use restate_types::config::Configuration;
use restate_types::live::Constant;
use restate_types::logs::metadata::ProviderKind;
use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY;

Expand All @@ -30,10 +30,10 @@ pub async fn spawn_environment(
.expect("task_center builds");

restate_types::config::set_current_config(config.clone());
let network_sender = MockNetworkSender::default();
let metadata_builder = MetadataBuilder::default();
let network_sender = MockNetworkSender::new(metadata_builder.to_metadata());

let metadata_store_client = MetadataStoreClient::new_in_memory();
let metadata_builder = MetadataBuilder::default();
let metadata = metadata_builder.to_metadata();
let metadata_manager = MetadataManager::new(
metadata_builder,
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,8 @@ mod tests {
use restate_core::{metadata, TestCoreEnv};
use restate_core::{task_center, TestCoreEnvBuilder};
use restate_rocksdb::RocksDbManager;
use restate_types::arc_util::Constant;
use restate_types::config::CommonOptions;
use restate_types::live::Constant;
use restate_types::logs::SequenceNumber;
use restate_types::partition_table::FixedPartitionTable;
use test_log::test;
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/src/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{LogRecord, LsnExt, ProviderError};
pub fn create_provider(kind: ProviderKind) -> Result<Arc<dyn LogletProvider>, ProviderError> {
match kind {
ProviderKind::Local => Ok(crate::loglets::local_loglet::LocalLogletProvider::new(
&Configuration::current().load().bifrost.local,
&Configuration::pinned().bifrost.local,
Configuration::mapped_updateable(|c| &c.bifrost.local.rocksdb),
)?),
ProviderKind::InMemory => Ok(crate::loglets::memory_loglet::MemoryLogletProvider::new()?),
Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/src/loglets/local_loglet/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use std::sync::Arc;
use restate_rocksdb::{
CfExactPattern, CfName, DbName, DbSpecBuilder, RocksDb, RocksDbManager, RocksError,
};
use restate_types::arc_util::Updateable;
use restate_types::config::{LocalLogletOptions, RocksDbOptions};
use restate_types::live::LiveLoad;
use restate_types::storage::{StorageDecodeError, StorageEncodeError};
use rocksdb::{BoundColumnFamily, DBCompressionType, SliceTransform, DB};
use static_assertions::const_assert;
Expand Down Expand Up @@ -55,7 +55,7 @@ pub struct RocksDbLogStore {
impl RocksDbLogStore {
pub fn new(
options: &LocalLogletOptions,
updateable_options: impl Updateable<RocksDbOptions> + Send + 'static,
updateable_options: impl LiveLoad<RocksDbOptions> + Send + 'static,
) -> Result<Self, LogStoreError> {
let db_manager = RocksDbManager::get();

Expand Down
10 changes: 5 additions & 5 deletions crates/bifrost/src/loglets/local_loglet/log_store_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use tokio_stream::StreamExt as TokioStreamExt;
use tracing::{debug, error, trace, warn};

use restate_core::{cancellation_watcher, task_center, ShutdownError, TaskKind};
use restate_types::arc_util::Updateable;
use restate_types::config::LocalLogletOptions;
use restate_types::live::LiveLoad;
use restate_types::logs::SequenceNumber;

use crate::loglet::LogletOffset;
Expand Down Expand Up @@ -76,10 +76,10 @@ impl LogStoreWriter {
/// Must be called from task_center context
pub fn start(
mut self,
mut updateable: impl Updateable<LocalLogletOptions> + Send + 'static,
mut updateable: impl LiveLoad<LocalLogletOptions> + Send + 'static,
) -> Result<RocksDbLogWriterHandle, ShutdownError> {
// big enough to allows a second full batch to queue up while the existing one is being processed
let batch_size = std::cmp::max(1, updateable.load().writer_batch_commit_count);
let batch_size = std::cmp::max(1, updateable.live_load().writer_batch_commit_count);
// leave twice as much space in the the channel to ensure we can enqueue up-to a full batch in
// the backlog while we process this one.
let (sender, receiver) = mpsc::channel(batch_size * 2);
Expand All @@ -90,7 +90,7 @@ impl LogStoreWriter {
None,
async move {
debug!("Start running LogStoreWriter");
let opts = updateable.load();
let opts = updateable.live_load();
let batch_size = std::cmp::max(1, opts.writer_batch_commit_count);
let batch_duration: Duration = opts.writer_batch_commit_duration.into();
// We don't want to use chunks_timeout if time-based batching is disabled, why?
Expand All @@ -114,7 +114,7 @@ impl LogStoreWriter {
break;
}
Some(cmds) = TokioStreamExt::next(&mut receiver) => {
let opts = updateable.load();
let opts = updateable.live_load();
self.handle_commands(opts, cmds).await;
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/src/loglets/local_loglet/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use std::sync::{Arc, OnceLock};

use anyhow::Context;
use async_trait::async_trait;
use restate_types::arc_util::Updateable;
use restate_types::config::{Configuration, LocalLogletOptions, RocksDbOptions};
use restate_types::live::LiveLoad;
use restate_types::logs::metadata::LogletParams;
use tokio::sync::Mutex as AsyncMutex;
use tracing::debug;
Expand All @@ -35,7 +35,7 @@ pub struct LocalLogletProvider {
impl LocalLogletProvider {
pub fn new(
options: &LocalLogletOptions,
updateable_rocksdb_options: impl Updateable<RocksDbOptions> + Send + 'static,
updateable_rocksdb_options: impl LiveLoad<RocksDbOptions> + Send + 'static,
) -> Result<Arc<Self>, ProviderError> {
let log_store = RocksDbLogStore::new(options, updateable_rocksdb_options)
.context("RocksDb LogStore")?;
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/src/read_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ mod tests {

use restate_core::{metadata, TaskKind, TestCoreEnvBuilder};
use restate_rocksdb::RocksDbManager;
use restate_types::arc_util::Constant;
use restate_types::config::CommonOptions;
use restate_types::live::Constant;
use restate_types::logs::metadata::ProviderKind;
use tokio_stream::StreamExt;
use tracing::info;
Expand Down
22 changes: 11 additions & 11 deletions crates/core/src/metadata/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
// by the Apache License, Version 2.0.

use arc_swap::{ArcSwap, ArcSwapOption};
use restate_types::schema::Schema;
use std::ops::Deref;
use std::sync::Arc;

Expand All @@ -25,12 +24,12 @@ use restate_types::net::metadata::{MetadataMessage, MetadataUpdate};
use restate_types::net::MessageEnvelope;
use restate_types::nodes_config::NodesConfiguration;
use restate_types::partition_table::FixedPartitionTable;
use restate_types::schema::Schema;
use restate_types::GenerationalNodeId;
use restate_types::{Version, Versioned};

use crate::cancellation_watcher;
use crate::is_cancellation_requested;
use crate::metadata;
use crate::metadata_store::{MetadataStoreClient, ReadError};
use crate::network::{MessageHandler, MessageRouterBuilder, NetworkSender};
use crate::task_center;
Expand All @@ -57,6 +56,7 @@ where
{
sender: CommandSender,
networking: N,
metadata: Metadata,
}

impl<N> MetadataMessageHandler<N>
Expand All @@ -80,18 +80,18 @@ where
}

fn send_nodes_config(&self, to: GenerationalNodeId, version: Option<Version>) {
let config = metadata().nodes_config();
let config = self.metadata.nodes_config_snapshot();
self.send_metadata_internal(to, version, config.deref(), "nodes_config");
}

fn send_partition_table(&self, to: GenerationalNodeId, version: Option<Version>) {
if let Some(partition_table) = metadata().partition_table() {
if let Some(partition_table) = self.metadata.partition_table() {
self.send_metadata_internal(to, version, partition_table.deref(), "partition_table");
}
}

fn send_logs(&self, to: GenerationalNodeId, version: Option<Version>) {
if let Some(logs) = metadata().logs() {
if let Some(logs) = self.metadata.logs() {
self.send_metadata_internal(to, version, logs.deref(), "logs");
}
}
Expand Down Expand Up @@ -225,6 +225,7 @@ where
sr_builder.add_message_handler(MetadataMessageHandler {
sender: self.metadata.sender.clone(),
networking: self.networking.clone(),
metadata: self.metadata.clone(),
});
}

Expand Down Expand Up @@ -332,8 +333,7 @@ where
}

fn update_nodes_configuration(&mut self, config: NodesConfiguration) {
let maybe_new_version =
Self::update_option_internal(&self.metadata.inner.nodes_config, config);
let maybe_new_version = Self::update_internal(&self.metadata.inner.nodes_config, config);

self.notify_watches(maybe_new_version, MetadataKind::NodesConfiguration);
}
Expand Down Expand Up @@ -463,9 +463,9 @@ mod tests {
{
let tc = TaskCenterBuilder::default().build()?;
tc.block_on("test", None, async move {
let network_sender = MockNetworkSender::default();
let metadata_store_client = MetadataStoreClient::new_in_memory();
let metadata_builder = MetadataBuilder::default();
let network_sender = MockNetworkSender::new(metadata_builder.to_metadata());
let metadata_store_client = MetadataStoreClient::new_in_memory();
let metadata = metadata_builder.to_metadata();
let metadata_manager =
MetadataManager::new(metadata_builder, network_sender, metadata_store_client);
Expand Down Expand Up @@ -546,10 +546,10 @@ mod tests {
{
let tc = TaskCenterBuilder::default().build()?;
tc.block_on("test", None, async move {
let network_sender = MockNetworkSender::default();
let metadata_builder = MetadataBuilder::default();
let network_sender = MockNetworkSender::new(metadata_builder.to_metadata());
let metadata_store_client = MetadataStoreClient::new_in_memory();

let metadata_builder = MetadataBuilder::default();
let metadata = metadata_builder.to_metadata();
let metadata_manager =
MetadataManager::new(metadata_builder, network_sender, metadata_store_client);
Expand Down
Loading

0 comments on commit 295517d

Please sign in to comment.