diff --git a/crates/admin/src/cluster_controller/scheduler.rs b/crates/admin/src/cluster_controller/scheduler.rs index 2937ee231..c93e995d4 100644 --- a/crates/admin/src/cluster_controller/scheduler.rs +++ b/crates/admin/src/cluster_controller/scheduler.rs @@ -134,8 +134,8 @@ impl Scheduler { let alive_workers = observed_cluster_state .alive_nodes .keys() - .cloned() .filter(|node_id| nodes_config.has_worker_role(node_id)) + .cloned() .collect(); self.update_scheduling_plan(&alive_workers, nodes_config, placement_hints) diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index 2b6997eee..058be230b 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -15,11 +15,12 @@ use std::time::Duration; use anyhow::anyhow; use codederror::CodedError; use futures::future::OptionFuture; +use itertools::Itertools; use tokio::sync::{mpsc, oneshot}; use tokio::time; use tokio::time::{Instant, Interval, MissedTickBehavior}; use tonic::codec::CompressionEncoding; -use tracing::{debug, info, warn}; +use tracing::{debug, info, trace, warn}; use restate_bifrost::{Bifrost, BifrostAdmin}; use restate_core::metadata_store::{retry_on_network_error, MetadataStoreClient}; @@ -40,6 +41,7 @@ use restate_types::logs::{LogId, Lsn, SequenceNumber}; use restate_types::metadata_store::keys::PARTITION_TABLE_KEY; use restate_types::net::metadata::MetadataKind; use restate_types::net::partition_processor_manager::CreateSnapshotRequest; +use restate_types::nodes_config::NodesConfiguration; use restate_types::partition_table::PartitionTable; use restate_types::protobuf::common::AdminStatus; use restate_types::{GenerationalNodeId, Version}; @@ -312,26 +314,26 @@ impl Service { } } Ok(cluster_state) = cluster_state_watcher.next_cluster_state() => { - let nodes_config = &nodes_config.live_load(); observed_cluster_state.update(&cluster_state); - logs_controller.on_observed_cluster_state_update( - nodes_config, - &observed_cluster_state, SchedulingPlanNodeSetSelectorHints::from(&scheduler))?; - scheduler.on_observed_cluster_state( - &observed_cluster_state, - nodes_config, - LogsBasedPartitionProcessorPlacementHints::from(&logs_controller)) - .await?; + + if self.is_active_controller(nodes_config.live_load(), &observed_cluster_state) { + self.on_cluster_state_update( + nodes_config.live_load(), + &observed_cluster_state, + &mut logs_controller, + &mut scheduler, + ).await?; + } } result = logs_controller.run_async_operations() => { result?; } - Ok(_) = logs_watcher.changed() => { + Ok(_) = logs_watcher.changed(), if self.is_active_controller(nodes_config.live_load(), &observed_cluster_state) => { logs_controller.on_logs_update(self.metadata.logs_ref())?; // tell the scheduler about potentially newly provisioned logs scheduler.on_logs_update(logs.live_load(), partition_table.live_load()).await? } - Ok(_) = partition_table_watcher.changed() => { + Ok(_) = partition_table_watcher.changed(), if self.is_active_controller(nodes_config.live_load(), &observed_cluster_state) => { let partition_table = partition_table.live_load(); let logs = logs.live_load(); @@ -339,6 +341,8 @@ impl Service { scheduler.on_logs_update(logs, partition_table).await?; } Some(cmd) = self.command_rx.recv() => { + // note: This branch is safe to enable on passive CCs + // since it works only as a gateway to leader PPs self.on_cluster_cmd(cmd, bifrost_admin).await; } _ = config_watcher.changed() => { @@ -356,6 +360,48 @@ impl Service { } } + fn is_active_controller( + &self, + nodes_config: &NodesConfiguration, + observed_cluster_state: &ObservedClusterState, + ) -> bool { + let maybe_leader = nodes_config + .get_admin_nodes() + .filter(|node| observed_cluster_state.is_node_alive(node.current_generation)) + .map(|node| node.current_generation.as_plain()) + .sorted() + .next(); + + // assume active if no leader CC (None) or self holds the smallest plain node id with role admin + !maybe_leader.is_some_and(|admin_id| admin_id != self.metadata.my_node_id().as_plain()) + } + + async fn on_cluster_state_update( + &self, + nodes_config: &NodesConfiguration, + observed_cluster_state: &ObservedClusterState, + logs_controller: &mut LogsController, + scheduler: &mut Scheduler, + ) -> anyhow::Result<()> { + trace!("Acting like a cluster controller"); + + logs_controller.on_observed_cluster_state_update( + nodes_config, + observed_cluster_state, + SchedulingPlanNodeSetSelectorHints::from(scheduler as &Scheduler), + )?; + + scheduler + .on_observed_cluster_state( + observed_cluster_state, + nodes_config, + LogsBasedPartitionProcessorPlacementHints::from(logs_controller as &LogsController), + ) + .await?; + + Ok(()) + } + async fn init_partition_table(&mut self) -> anyhow::Result<()> { let configuration = self.configuration.live_load();