Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use updateable config in networking #1667

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Use updateable config in networking
Efficient access to metadata and nodes configuration in performance-sensitive networking code
  • Loading branch information
AhmedSoliman committed Jun 27, 2024
commit 88dd013e9abd3ce0028635055df50d596a79b4c4
2 changes: 1 addition & 1 deletion crates/admin/src/cluster_controller/cluster_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ where
last_state.nodes_config_version,
)
.await;
let nodes_config = metadata.nodes_config();
let nodes_config = metadata.nodes_config_snapshot();

let mut nodes = BTreeMap::new();
let mut join_set = tokio::task::JoinSet::new();
Expand Down
6 changes: 3 additions & 3 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use tokio::time::MissedTickBehavior;
use tokio::time::{Instant, Interval};
use tracing::{debug, warn};

use restate_types::arc_util::Updateable;
use restate_types::arc_util::CachingUpdateable;
use restate_types::config::{AdminOptions, Configuration};
use restate_types::net::cluster_controller::{Action, AttachRequest, AttachResponse, RunPartition};
use restate_types::net::RequestId;
Expand Down Expand Up @@ -56,7 +56,7 @@ pub struct Service<N> {
command_tx: mpsc::Sender<ClusterControllerCommand>,
command_rx: mpsc::Receiver<ClusterControllerCommand>,

configuration: Box<dyn Updateable<AdminOptions> + Send + Sync>,
configuration: Box<dyn CachingUpdateable<AdminOptions> + Send + Sync>,
heartbeat_interval: time::Interval,
log_trim_interval: Option<time::Interval>,
log_trim_threshold: Lsn,
Expand All @@ -67,7 +67,7 @@ where
N: NetworkSender + 'static,
{
pub fn new(
mut configuration: impl Updateable<AdminOptions> + Send + Sync + 'static,
mut configuration: impl CachingUpdateable<AdminOptions> + Send + Sync + 'static,
task_center: TaskCenter,
metadata: Metadata,
networking: N,
Expand Down
4 changes: 2 additions & 2 deletions crates/admin/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::sync::Arc;
use axum::error_handling::HandleErrorLayer;
use http::StatusCode;
use restate_bifrost::Bifrost;
use restate_types::arc_util::Updateable;
use restate_types::arc_util::CachingUpdateable;
use restate_types::config::AdminOptions;
use tonic::transport::Channel;
use tower::ServiceBuilder;
Expand Down Expand Up @@ -59,7 +59,7 @@ where

pub async fn run(
self,
mut updateable_config: impl Updateable<AdminOptions> + Send + 'static,
mut updateable_config: impl CachingUpdateable<AdminOptions> + Send + 'static,
node_svc_client: NodeSvcClient<Channel>,
bifrost: Bifrost,
) -> anyhow::Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/benches/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ pub async fn spawn_environment(
.expect("task_center builds");

restate_types::config::set_current_config(config.clone());
let network_sender = MockNetworkSender::default();
let metadata_builder = MetadataBuilder::default();
let network_sender = MockNetworkSender::new(metadata_builder.to_metadata());

let metadata_store_client = MetadataStoreClient::new_in_memory();
let metadata_builder = MetadataBuilder::default();
let metadata = metadata_builder.to_metadata();
let metadata_manager = MetadataManager::new(
metadata_builder,
Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/src/loglets/local_loglet/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::sync::Arc;
use restate_rocksdb::{
CfExactPattern, CfName, DbName, DbSpecBuilder, RocksDb, RocksDbManager, RocksError,
};
use restate_types::arc_util::Updateable;
use restate_types::arc_util::CachingUpdateable;
use restate_types::config::{LocalLogletOptions, RocksDbOptions};
use restate_types::storage::{StorageDecodeError, StorageEncodeError};
use rocksdb::{BoundColumnFamily, DBCompressionType, SliceTransform, DB};
Expand Down Expand Up @@ -55,7 +55,7 @@ pub struct RocksDbLogStore {
impl RocksDbLogStore {
pub fn new(
options: &LocalLogletOptions,
updateable_options: impl Updateable<RocksDbOptions> + Send + 'static,
updateable_options: impl CachingUpdateable<RocksDbOptions> + Send + 'static,
) -> Result<Self, LogStoreError> {
let db_manager = RocksDbManager::get();

Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/src/loglets/local_loglet/log_store_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use tokio_stream::StreamExt as TokioStreamExt;
use tracing::{debug, error, trace, warn};

use restate_core::{cancellation_watcher, task_center, ShutdownError, TaskKind};
use restate_types::arc_util::Updateable;
use restate_types::arc_util::CachingUpdateable;
use restate_types::config::LocalLogletOptions;
use restate_types::logs::SequenceNumber;

Expand Down Expand Up @@ -76,7 +76,7 @@ impl LogStoreWriter {
/// Must be called from task_center context
pub fn start(
mut self,
mut updateable: impl Updateable<LocalLogletOptions> + Send + 'static,
mut updateable: impl CachingUpdateable<LocalLogletOptions> + Send + 'static,
) -> Result<RocksDbLogWriterHandle, ShutdownError> {
// big enough to allows a second full batch to queue up while the existing one is being processed
let batch_size = std::cmp::max(1, updateable.load().writer_batch_commit_count);
Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/src/loglets/local_loglet/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::sync::{Arc, OnceLock};

use anyhow::Context;
use async_trait::async_trait;
use restate_types::arc_util::Updateable;
use restate_types::arc_util::CachingUpdateable;
use restate_types::config::{Configuration, LocalLogletOptions, RocksDbOptions};
use restate_types::logs::metadata::LogletParams;
use tokio::sync::Mutex as AsyncMutex;
Expand All @@ -35,7 +35,7 @@ pub struct LocalLogletProvider {
impl LocalLogletProvider {
pub fn new(
options: &LocalLogletOptions,
updateable_rocksdb_options: impl Updateable<RocksDbOptions> + Send + 'static,
updateable_rocksdb_options: impl CachingUpdateable<RocksDbOptions> + Send + 'static,
) -> Result<Arc<Self>, ProviderError> {
let log_store = RocksDbLogStore::new(options, updateable_rocksdb_options)
.context("RocksDb LogStore")?;
Expand Down
19 changes: 10 additions & 9 deletions crates/core/src/metadata/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
// by the Apache License, Version 2.0.

use arc_swap::{ArcSwap, ArcSwapOption};
use restate_types::schema::Schema;
use std::ops::Deref;
use std::sync::Arc;

Expand All @@ -25,12 +24,12 @@ use restate_types::net::metadata::{MetadataMessage, MetadataUpdate};
use restate_types::net::MessageEnvelope;
use restate_types::nodes_config::NodesConfiguration;
use restate_types::partition_table::FixedPartitionTable;
use restate_types::schema::Schema;
use restate_types::GenerationalNodeId;
use restate_types::{Version, Versioned};

use crate::cancellation_watcher;
use crate::is_cancellation_requested;
use crate::metadata;
use crate::metadata_store::{MetadataStoreClient, ReadError};
use crate::network::{MessageHandler, MessageRouterBuilder, NetworkSender};
use crate::task_center;
Expand All @@ -57,6 +56,7 @@ where
{
sender: CommandSender,
networking: N,
metadata: Metadata,
}

impl<N> MetadataMessageHandler<N>
Expand All @@ -80,18 +80,18 @@ where
}

fn send_nodes_config(&self, to: GenerationalNodeId, version: Option<Version>) {
let config = metadata().nodes_config();
let config = self.metadata.nodes_config_snapshot();
self.send_metadata_internal(to, version, config.deref(), "nodes_config");
}

fn send_partition_table(&self, to: GenerationalNodeId, version: Option<Version>) {
if let Some(partition_table) = metadata().partition_table() {
if let Some(partition_table) = self.metadata.partition_table() {
self.send_metadata_internal(to, version, partition_table.deref(), "partition_table");
}
}

fn send_logs(&self, to: GenerationalNodeId, version: Option<Version>) {
if let Some(logs) = metadata().logs() {
if let Some(logs) = self.metadata.logs() {
self.send_metadata_internal(to, version, logs.deref(), "logs");
}
}
Expand Down Expand Up @@ -225,6 +225,7 @@ where
sr_builder.add_message_handler(MetadataMessageHandler {
sender: self.metadata.sender.clone(),
networking: self.networking.clone(),
metadata: self.metadata.clone(),
});
}

Expand Down Expand Up @@ -462,9 +463,9 @@ mod tests {
{
let tc = TaskCenterBuilder::default().build()?;
tc.block_on("test", None, async move {
let network_sender = MockNetworkSender::default();
let metadata_store_client = MetadataStoreClient::new_in_memory();
let metadata_builder = MetadataBuilder::default();
let network_sender = MockNetworkSender::new(metadata_builder.to_metadata());
let metadata_store_client = MetadataStoreClient::new_in_memory();
let metadata = metadata_builder.to_metadata();
let metadata_manager =
MetadataManager::new(metadata_builder, network_sender, metadata_store_client);
Expand Down Expand Up @@ -545,10 +546,10 @@ mod tests {
{
let tc = TaskCenterBuilder::default().build()?;
tc.block_on("test", None, async move {
let network_sender = MockNetworkSender::default();
let metadata_builder = MetadataBuilder::default();
let network_sender = MockNetworkSender::new(metadata_builder.to_metadata());
let metadata_store_client = MetadataStoreClient::new_in_memory();

let metadata_builder = MetadataBuilder::default();
let metadata = metadata_builder.to_metadata();
let metadata_manager =
MetadataManager::new(metadata_builder, network_sender, metadata_store_client);
Expand Down
17 changes: 10 additions & 7 deletions crates/core/src/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

mod manager;
pub use manager::MetadataManager;
use restate_types::arc_util::Updateable;
use restate_types::schema::{Schema, UpdateableSchema};

use std::sync::{Arc, OnceLock};
Expand All @@ -33,9 +34,6 @@ use crate::metadata_store::ReadError;
use crate::network::NetworkSender;
use crate::{ShutdownError, TaskCenter, TaskId, TaskKind};

#[derive(Clone, derive_more::From)]
pub struct UpdateableNodesConfiguration(Arc<ArcSwap<NodesConfiguration>>);

#[derive(Debug, thiserror::Error)]
pub enum SyncError {
#[error("failed syncing with metadata store: {0}")]
Expand Down Expand Up @@ -76,13 +74,18 @@ pub struct Metadata {
}

impl Metadata {
/// Panics if nodes configuration is not loaded yet.
pub fn nodes_config(&self) -> Arc<NodesConfiguration> {
#[inline(always)]
pub fn nodes_config_snapshot(&self) -> Arc<NodesConfiguration> {
self.inner.nodes_config.load_full()
}

pub fn updateable_nodes_config(&self) -> UpdateableNodesConfiguration {
UpdateableNodesConfiguration::from(self.inner.nodes_config.clone())
#[inline(always)]
pub fn nodes_config_ref(&self) -> arc_swap::Guard<Arc<NodesConfiguration>> {
self.inner.nodes_config.load()
}

pub fn updateable_nodes_config(&self) -> Updateable<NodesConfiguration> {
Updateable::from(self.inner.nodes_config.clone())
}

#[track_caller]
Expand Down
13 changes: 10 additions & 3 deletions crates/core/src/network/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ use std::time::Instant;
use tokio::sync::mpsc;
use tracing::instrument;

use crate::metadata;
use restate_types::arc_util::Caching;
use restate_types::arc_util::Updateable;
use restate_types::net::codec::Targeted;
use restate_types::net::codec::{serialize_message, WireEncode};
use restate_types::net::ProtocolVersion;
use restate_types::nodes_config::NodesConfiguration;
use restate_types::protobuf::node::message;
use restate_types::protobuf::node::Header;
use restate_types::protobuf::node::Message;
Expand All @@ -43,20 +45,23 @@ pub(crate) struct Connection {
pub(crate) protocol_version: ProtocolVersion,
pub(crate) sender: mpsc::Sender<Message>,
pub(crate) created: std::time::Instant,
updateable_nodes_config: Updateable<NodesConfiguration>,
}

impl Connection {
pub fn new(
peer: GenerationalNodeId,
protocol_version: ProtocolVersion,
sender: mpsc::Sender<Message>,
updateable_nodes_config: Updateable<NodesConfiguration>,
) -> Self {
Self {
cid: rand::random(),
peer,
protocol_version,
sender,
created: std::time::Instant::now(),
updateable_nodes_config,
}
}

Expand All @@ -81,6 +86,7 @@ impl Connection {
peer: self.peer,
connection: Arc::downgrade(self),
protocol_version: self.protocol_version,
caching_nodes_config: self.updateable_nodes_config.clone().into_caching(),
}
}
}
Expand All @@ -98,6 +104,7 @@ pub struct ConnectionSender {
peer: GenerationalNodeId,
connection: Weak<Connection>,
protocol_version: ProtocolVersion,
caching_nodes_config: Caching<NodesConfiguration>,
}

impl ConnectionSender {
Expand All @@ -116,12 +123,12 @@ impl ConnectionSender {
/// This doesn't auto-retry connection resets or send errors, this is up to the user
/// for retrying externally.
#[instrument(skip_all, fields(peer_node_id = %self.peer, target_service = ?message.target(), msg = ?message.kind()))]
pub async fn send<M>(&self, message: M) -> Result<(), NetworkError>
pub async fn send<M>(&mut self, message: M) -> Result<(), NetworkError>
where
M: WireEncode + Targeted,
{
let send_start = Instant::now();
let header = Header::new(metadata().nodes_config_version());
let header = Header::new(self.caching_nodes_config.load().version());
let body =
serialize_message(message, self.protocol_version).map_err(ProtocolError::Codec)?;
let res = self
Expand Down
Loading
Loading