Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/24080_utilization_on_reload.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fixed a bug where utilization metric could be lost for changed components on configuration reload.

authors: esensar Quad9DNS
35 changes: 27 additions & 8 deletions src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ impl<'a> Builder<'a> {
diff: &'a ConfigDiff,
buffers: HashMap<ComponentKey, BuiltBuffer>,
extra_context: ExtraContext,
utilization_emitter: Option<UtilizationEmitter>,
) -> Self {
Self {
config,
Expand All @@ -105,7 +106,7 @@ impl<'a> Builder<'a> {
healthchecks: HashMap::new(),
detach_triggers: HashMap::new(),
extra_context,
utilization_emitter: UtilizationEmitter::new(),
utilization_emitter: utilization_emitter.unwrap_or_else(|| UtilizationEmitter::new()),
}
}

Expand Down Expand Up @@ -514,7 +515,7 @@ impl<'a> Builder<'a> {

let (transform_task, transform_outputs) = {
let _span = span.enter();
build_transform(transform, node, input_rx, &mut self.utilization_emitter)
build_transform(transform, node, input_rx, &self.utilization_emitter)
};

self.outputs.extend(transform_outputs);
Expand Down Expand Up @@ -782,6 +783,7 @@ pub struct TopologyPiecesBuilder<'a> {
diff: &'a ConfigDiff,
buffers: HashMap<ComponentKey, BuiltBuffer>,
extra_context: ExtraContext,
utilization_emitter: Option<UtilizationEmitter>,
}

impl<'a> TopologyPiecesBuilder<'a> {
Expand All @@ -792,6 +794,7 @@ impl<'a> TopologyPiecesBuilder<'a> {
diff,
buffers: HashMap::new(),
extra_context: ExtraContext::default(),
utilization_emitter: None,
}
}

Expand All @@ -807,14 +810,26 @@ impl<'a> TopologyPiecesBuilder<'a> {
self
}

/// Sets the utilization emitter for the topology.
pub fn with_utilization_emitter(mut self, emitter: Option<UtilizationEmitter>) -> Self {
self.utilization_emitter = emitter;
self
}

/// Builds the topology pieces, returning errors if any occur.
///
/// Use this method when you need to handle errors explicitly,
/// such as in tests or validation code.
pub async fn build(self) -> Result<TopologyPieces, Vec<String>> {
Builder::new(self.config, self.diff, self.buffers, self.extra_context)
.build()
.await
Builder::new(
self.config,
self.diff,
self.buffers,
self.extra_context,
self.utilization_emitter,
)
.build()
.await
}

/// Builds the topology pieces, logging any errors that occur.
Expand All @@ -840,10 +855,12 @@ impl TopologyPieces {
diff: &ConfigDiff,
buffers: HashMap<ComponentKey, BuiltBuffer>,
extra_context: ExtraContext,
utilization_emitter: Option<UtilizationEmitter>,
) -> Option<Self> {
TopologyPiecesBuilder::new(config, diff)
.with_buffers(buffers)
.with_extra_context(extra_context)
.with_utilization_emitter(utilization_emitter)
.build_or_log_errors()
.await
}
Expand All @@ -854,10 +871,12 @@ impl TopologyPieces {
diff: &ConfigDiff,
buffers: HashMap<ComponentKey, BuiltBuffer>,
extra_context: ExtraContext,
utilization_emitter: Option<UtilizationEmitter>,
) -> Result<Self, Vec<String>> {
TopologyPiecesBuilder::new(config, diff)
.with_buffers(buffers)
.with_extra_context(extra_context)
.with_utilization_emitter(utilization_emitter)
.build()
.await
}
Expand Down Expand Up @@ -908,7 +927,7 @@ fn build_transform(
transform: Transform,
node: TransformNode,
input_rx: BufferReceiver<EventArray>,
utilization_emitter: &mut UtilizationEmitter,
utilization_emitter: &UtilizationEmitter,
) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
match transform {
// TODO: avoid the double boxing for function transforms here
Expand All @@ -932,7 +951,7 @@ fn build_sync_transform(
t: Box<dyn SyncTransform>,
node: TransformNode,
input_rx: BufferReceiver<EventArray>,
utilization_emitter: &mut UtilizationEmitter,
utilization_emitter: &UtilizationEmitter,
) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key);

Expand Down Expand Up @@ -1113,7 +1132,7 @@ fn build_task_transform(
typetag: &str,
key: &ComponentKey,
outputs: &[TransformOutput],
utilization_emitter: &mut UtilizationEmitter,
utilization_emitter: &UtilizationEmitter,
) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
let (mut fanout, control) = Fanout::new();

Expand Down
6 changes: 6 additions & 0 deletions src/topology/running.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::{
shutdown::SourceShutdownCoordinator,
signal::ShutdownError,
spawn_named,
utilization::UtilizationEmitter,
};

pub type ShutdownErrorReceiver = mpsc::UnboundedReceiver<ShutdownError>;
Expand All @@ -54,6 +55,7 @@ pub struct RunningTopology {
watch: (WatchTx, WatchRx),
pub(crate) running: Arc<AtomicBool>,
graceful_shutdown_duration: Option<Duration>,
utilization_emitter: Option<UtilizationEmitter>,
utilization_task: Option<TaskHandle>,
utilization_task_shutdown_trigger: Option<Trigger>,
pending_reload: Option<HashSet<ComponentKey>>,
Expand All @@ -75,6 +77,7 @@ impl RunningTopology {
running: Arc::new(AtomicBool::new(true)),
graceful_shutdown_duration: config.graceful_shutdown_duration,
config,
utilization_emitter: None,
utilization_task: None,
utilization_task_shutdown_trigger: None,
pending_reload: None,
Expand Down Expand Up @@ -306,6 +309,7 @@ impl RunningTopology {
if let Some(mut new_pieces) = TopologyPiecesBuilder::new(&new_config, &diff)
.with_buffers(buffers.clone())
.with_extra_context(extra_context.clone())
.with_utilization_emitter(self.utilization_emitter.clone())
.build_or_log_errors()
.await
{
Expand Down Expand Up @@ -335,6 +339,7 @@ impl RunningTopology {
if let Some(mut new_pieces) = TopologyPiecesBuilder::new(&self.config, &diff)
.with_buffers(buffers)
.with_extra_context(extra_context.clone())
.with_utilization_emitter(self.utilization_emitter.clone())
.build_or_log_errors()
.await
&& self
Expand Down Expand Up @@ -1315,6 +1320,7 @@ impl RunningTopology {

let (utilization_task_shutdown_trigger, utilization_shutdown_signal, _) =
ShutdownSignal::new_wired();
running_topology.utilization_emitter = Some(utilization_emitter.clone());
running_topology.utilization_task_shutdown_trigger =
Some(utilization_task_shutdown_trigger);
running_topology.utilization_task = Some(tokio::spawn(Task::new(
Expand Down
44 changes: 30 additions & 14 deletions src/utilization.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
collections::HashMap,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll, ready},
time::Duration,
};
Expand Down Expand Up @@ -203,33 +204,43 @@ impl UtilizationComponentSender {
}
}

pub(crate) struct UtilizationEmitter {
timers: HashMap<ComponentKey, Timer>,
timer_rx: Receiver<UtilizationTimerMessage>,
pub struct UtilizationEmitter {
timers: Arc<Mutex<HashMap<ComponentKey, Timer>>>,
timer_rx: Option<Receiver<UtilizationTimerMessage>>,
timer_tx: Sender<UtilizationTimerMessage>,
intervals: IntervalStream,
}

impl Clone for UtilizationEmitter {
/// Cloning for UtilizationEmitter skipps the timer received. Only one instance is expected to
/// hold it and that instance should be running the `run_utilization` task.
fn clone(&self) -> Self {
Self {
timers: self.timers.clone(),
timer_rx: None,
timer_tx: self.timer_tx.clone(),
}
Comment on lines +213 to +221
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have opted for Clone implementation which skips timer_rx for the cloned version.

I have also considered an approach with 2 separate components - the emitter, which would run the task and "registry" which would just be used to add components. While the second approach sounds better to me, it would require more significant changes to the codebase, but without any further benefit (I would still need to have a lock on timers map, since that part would be shared).

}
}

impl UtilizationEmitter {
pub(crate) fn new() -> Self {
let (timer_tx, timer_rx) = channel(4096);
Self {
timers: HashMap::default(),
intervals: IntervalStream::new(interval(UTILIZATION_EMITTER_DURATION)),
timers: Arc::new(Mutex::new(HashMap::default())),
timer_tx,
timer_rx,
timer_rx: Some(timer_rx).into(),
}
}

/// Adds a new component to this utilization metric emitter
///
/// Returns a sender which can be used to send utilization information back to the emitter
pub(crate) fn add_component(
&mut self,
&self,
key: ComponentKey,
gauge: Gauge,
) -> UtilizationComponentSender {
self.timers.insert(
self.timers.lock().expect("mutex poisoned").insert(
key.clone(),
Timer::new(
gauge,
Expand All @@ -244,22 +255,27 @@ impl UtilizationEmitter {
}

pub(crate) async fn run_utilization(&mut self, mut shutdown: ShutdownSignal) {
let mut intervals = IntervalStream::new(interval(UTILIZATION_EMITTER_DURATION));
let Some(mut timer_rx) = self.timer_rx.take() else {
warn!("Utilization metric emitter failed to start! Missing timer message receiver!");
return;
};
loop {
tokio::select! {
message = self.timer_rx.recv() => {
message = timer_rx.recv() => {
match message {
Some(UtilizationTimerMessage::StartWait(key, start_time)) => {
self.timers.get_mut(&key).expect("Utilization timer missing for component").start_wait(start_time);
self.timers.lock().expect("mutex poisoned").get_mut(&key).expect("Utilization timer missing for component").start_wait(start_time);
}
Some(UtilizationTimerMessage::StopWait(key, stop_time)) => {
self.timers.get_mut(&key).expect("Utilization timer missing for component").stop_wait(stop_time);
self.timers.lock().expect("mutex poisoned").get_mut(&key).expect("Utilization timer missing for component").stop_wait(stop_time);
}
None => break,
}
},

Some(_) = self.intervals.next() => {
for timer in self.timers.values_mut() {
Some(_) = intervals.next() => {
for timer in self.timers.lock().expect("mutex poisoned").values_mut() {
timer.update_utilization();
}
},
Expand Down
Loading