Skip to content

Commit

Permalink
Soft-leader election mechanism for cc based on observed cluster state
Browse files Browse the repository at this point in the history
Summary:
Every Admin node runs a CC which can act as the leader. The acting leader is decided based on deterministically sorting all admin nodes
(e.g. based on their plain node id) and picking the first alive node.

The acting leader is allowed to make control decisions (e.g. calling into the Scheduler and the LogsController).

Fixes restatedev#2238
  • Loading branch information
muhamadazmy committed Nov 13, 2024
1 parent 612c7c2 commit 0369a6c
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 13 deletions.
2 changes: 1 addition & 1 deletion crates/admin/src/cluster_controller/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ impl<T: TransportConnect> Scheduler<T> {
let alive_workers = observed_cluster_state
.alive_nodes
.keys()
.cloned()
.filter(|node_id| nodes_config.has_worker_role(node_id))
.cloned()
.collect();

self.update_scheduling_plan(&alive_workers, nodes_config, placement_hints)
Expand Down
70 changes: 58 additions & 12 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ use std::time::Duration;
use anyhow::anyhow;
use codederror::CodedError;
use futures::future::OptionFuture;
use itertools::Itertools;
use tokio::sync::{mpsc, oneshot};
use tokio::time;
use tokio::time::{Instant, Interval, MissedTickBehavior};
use tonic::codec::CompressionEncoding;
use tracing::{debug, info, warn};
use tracing::{debug, info, trace, warn};

use restate_bifrost::{Bifrost, BifrostAdmin};
use restate_core::metadata_store::{retry_on_network_error, MetadataStoreClient};
Expand All @@ -40,6 +41,7 @@ use restate_types::logs::{LogId, Lsn, SequenceNumber};
use restate_types::metadata_store::keys::PARTITION_TABLE_KEY;
use restate_types::net::metadata::MetadataKind;
use restate_types::net::partition_processor_manager::CreateSnapshotRequest;
use restate_types::nodes_config::NodesConfiguration;
use restate_types::partition_table::PartitionTable;
use restate_types::protobuf::common::AdminStatus;
use restate_types::{GenerationalNodeId, Version};
Expand Down Expand Up @@ -312,33 +314,35 @@ impl<T: TransportConnect> Service<T> {
}
}
Ok(cluster_state) = cluster_state_watcher.next_cluster_state() => {
let nodes_config = &nodes_config.live_load();
observed_cluster_state.update(&cluster_state);
logs_controller.on_observed_cluster_state_update(
nodes_config,
&observed_cluster_state, SchedulingPlanNodeSetSelectorHints::from(&scheduler))?;
scheduler.on_observed_cluster_state(
&observed_cluster_state,
nodes_config,
LogsBasedPartitionProcessorPlacementHints::from(&logs_controller))
.await?;

if self.is_active_controller(nodes_config.live_load(), &observed_cluster_state) {
self.on_cluster_state_update(
nodes_config.live_load(),
&observed_cluster_state,
&mut logs_controller,
&mut scheduler,
).await?;
}
}
result = logs_controller.run_async_operations() => {
result?;
}
Ok(_) = logs_watcher.changed() => {
Ok(_) = logs_watcher.changed(), if self.is_active_controller(nodes_config.live_load(), &observed_cluster_state) => {
logs_controller.on_logs_update(self.metadata.logs_ref())?;
// tell the scheduler about potentially newly provisioned logs
scheduler.on_logs_update(logs.live_load(), partition_table.live_load()).await?
}
Ok(_) = partition_table_watcher.changed() => {
Ok(_) = partition_table_watcher.changed(), if self.is_active_controller(nodes_config.live_load(), &observed_cluster_state) => {
let partition_table = partition_table.live_load();
let logs = logs.live_load();

logs_controller.on_partition_table_update(partition_table);
scheduler.on_logs_update(logs, partition_table).await?;
}
Some(cmd) = self.command_rx.recv() => {
// note: This branch is safe to enable on passive CCs
// since it works only as a gateway to leader PPs
self.on_cluster_cmd(cmd, bifrost_admin).await;
}
_ = config_watcher.changed() => {
Expand All @@ -356,6 +360,48 @@ impl<T: TransportConnect> Service<T> {
}
}

fn is_active_controller(
&self,
nodes_config: &NodesConfiguration,
observed_cluster_state: &ObservedClusterState,
) -> bool {
let maybe_leader = nodes_config
.get_admin_nodes()
.filter(|node| observed_cluster_state.is_node_alive(node.current_generation))
.map(|node| node.current_generation.as_plain())
.sorted()
.next();

// assume active if no leader CC (None) or self holds the smallest plain node id with role admin
!maybe_leader.is_some_and(|admin_id| admin_id != self.metadata.my_node_id().as_plain())
}

async fn on_cluster_state_update(
&self,
nodes_config: &NodesConfiguration,
observed_cluster_state: &ObservedClusterState,
logs_controller: &mut LogsController,
scheduler: &mut Scheduler<T>,
) -> anyhow::Result<()> {
trace!("Acting like a cluster controller");

logs_controller.on_observed_cluster_state_update(
nodes_config,
observed_cluster_state,
SchedulingPlanNodeSetSelectorHints::from(scheduler as &Scheduler<T>),
)?;

scheduler
.on_observed_cluster_state(
observed_cluster_state,
nodes_config,
LogsBasedPartitionProcessorPlacementHints::from(logs_controller as &LogsController),
)
.await?;

Ok(())
}

async fn init_partition_table(&mut self) -> anyhow::Result<()> {
let configuration = self.configuration.live_load();

Expand Down

0 comments on commit 0369a6c

Please sign in to comment.