Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/24080_utilization_on_reload.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fixed a bug where utilization metric could be lost for changed components on configuration reload.

authors: esensar Quad9DNS
67 changes: 49 additions & 18 deletions src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use crate::{
spawn_named,
topology::task::TaskError,
transforms::{SyncTransform, TaskTransform, Transform, TransformOutputs, TransformOutputsBuf},
utilization::{UtilizationComponentSender, UtilizationEmitter, wrap},
utilization::{UtilizationComponentSender, UtilizationEmitter, UtilizationRegistry, wrap},
};

static ENRICHMENT_TABLES: LazyLock<vector_lib::enrichment::TableRegistry> =
Expand Down Expand Up @@ -83,7 +83,8 @@ struct Builder<'a> {
healthchecks: HashMap<ComponentKey, Task>,
detach_triggers: HashMap<ComponentKey, Trigger>,
extra_context: ExtraContext,
utilization_emitter: UtilizationEmitter,
utilization_emitter: Option<UtilizationEmitter>,
utilization_registry: UtilizationRegistry,
}

impl<'a> Builder<'a> {
Expand All @@ -92,7 +93,16 @@ impl<'a> Builder<'a> {
diff: &'a ConfigDiff,
buffers: HashMap<ComponentKey, BuiltBuffer>,
extra_context: ExtraContext,
utilization_registry: Option<UtilizationRegistry>,
) -> Self {
// If registry is not passed, we need to build a whole new utilization emitter + registry
// Otherwise, we just store the registry and reuse it for this build
let (emitter, registry) = if let Some(registry) = utilization_registry {
(None, registry)
} else {
let (emitter, registry) = UtilizationEmitter::new();
(Some(emitter), registry)
};
Self {
config,
diff,
Expand All @@ -105,7 +115,8 @@ impl<'a> Builder<'a> {
healthchecks: HashMap::new(),
detach_triggers: HashMap::new(),
extra_context,
utilization_emitter: UtilizationEmitter::new(),
utilization_emitter: emitter,
utilization_registry: registry,
}
}

Expand All @@ -129,7 +140,9 @@ impl<'a> Builder<'a> {
healthchecks: self.healthchecks,
shutdown_coordinator: self.shutdown_coordinator,
detach_triggers: self.detach_triggers,
utilization_emitter: Some(self.utilization_emitter),
utilization: self
.utilization_emitter
.map(|e| (e, self.utilization_registry)),
})
} else {
Err(self.errors)
Expand Down Expand Up @@ -514,7 +527,7 @@ impl<'a> Builder<'a> {

let (transform_task, transform_outputs) = {
let _span = span.enter();
build_transform(transform, node, input_rx, &mut self.utilization_emitter)
build_transform(transform, node, input_rx, &self.utilization_registry)
};

self.outputs.extend(transform_outputs);
Expand Down Expand Up @@ -619,7 +632,7 @@ impl<'a> Builder<'a> {
let (trigger, tripwire) = Tripwire::new();

let utilization_sender = self
.utilization_emitter
.utilization_registry
.add_component(key.clone(), gauge!("utilization"));
let component_key = key.clone();
let sink = async move {
Expand Down Expand Up @@ -763,7 +776,7 @@ pub struct TopologyPieces {
pub(super) healthchecks: HashMap<ComponentKey, Task>,
pub(crate) shutdown_coordinator: SourceShutdownCoordinator,
pub(crate) detach_triggers: HashMap<ComponentKey, Trigger>,
pub(crate) utilization_emitter: Option<UtilizationEmitter>,
pub(crate) utilization: Option<(UtilizationEmitter, UtilizationRegistry)>,
}

/// Builder for constructing TopologyPieces with a fluent API.
Expand All @@ -782,6 +795,7 @@ pub struct TopologyPiecesBuilder<'a> {
diff: &'a ConfigDiff,
buffers: HashMap<ComponentKey, BuiltBuffer>,
extra_context: ExtraContext,
utilization_registry: Option<UtilizationRegistry>,
}

impl<'a> TopologyPiecesBuilder<'a> {
Expand All @@ -792,6 +806,7 @@ impl<'a> TopologyPiecesBuilder<'a> {
diff,
buffers: HashMap::new(),
extra_context: ExtraContext::default(),
utilization_registry: None,
}
}

Expand All @@ -807,14 +822,26 @@ impl<'a> TopologyPiecesBuilder<'a> {
self
}

/// Sets the utilization registry for the topology.
pub fn with_utilization_registry(mut self, registry: Option<UtilizationRegistry>) -> Self {
self.utilization_registry = registry;
self
}

/// Builds the topology pieces, returning errors if any occur.
///
/// Use this method when you need to handle errors explicitly,
/// such as in tests or validation code.
pub async fn build(self) -> Result<TopologyPieces, Vec<String>> {
Builder::new(self.config, self.diff, self.buffers, self.extra_context)
.build()
.await
Builder::new(
self.config,
self.diff,
self.buffers,
self.extra_context,
self.utilization_registry,
)
.build()
.await
}

/// Builds the topology pieces, logging any errors that occur.
Expand All @@ -840,10 +867,12 @@ impl TopologyPieces {
diff: &ConfigDiff,
buffers: HashMap<ComponentKey, BuiltBuffer>,
extra_context: ExtraContext,
utilization_registry: Option<UtilizationRegistry>,
) -> Option<Self> {
TopologyPiecesBuilder::new(config, diff)
.with_buffers(buffers)
.with_extra_context(extra_context)
.with_utilization_registry(utilization_registry)
.build_or_log_errors()
.await
}
Expand All @@ -854,10 +883,12 @@ impl TopologyPieces {
diff: &ConfigDiff,
buffers: HashMap<ComponentKey, BuiltBuffer>,
extra_context: ExtraContext,
utilization_registry: Option<UtilizationRegistry>,
) -> Result<Self, Vec<String>> {
TopologyPiecesBuilder::new(config, diff)
.with_buffers(buffers)
.with_extra_context(extra_context)
.with_utilization_registry(utilization_registry)
.build()
.await
}
Expand Down Expand Up @@ -908,22 +939,22 @@ fn build_transform(
transform: Transform,
node: TransformNode,
input_rx: BufferReceiver<EventArray>,
utilization_emitter: &mut UtilizationEmitter,
utilization_registry: &UtilizationRegistry,
) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
match transform {
// TODO: avoid the double boxing for function transforms here
Transform::Function(t) => {
build_sync_transform(Box::new(t), node, input_rx, utilization_emitter)
build_sync_transform(Box::new(t), node, input_rx, utilization_registry)
}
Transform::Synchronous(t) => build_sync_transform(t, node, input_rx, utilization_emitter),
Transform::Synchronous(t) => build_sync_transform(t, node, input_rx, utilization_registry),
Transform::Task(t) => build_task_transform(
t,
input_rx,
node.input_details.data_type(),
node.typetag,
&node.key,
&node.outputs,
utilization_emitter,
utilization_registry,
),
}
}
Expand All @@ -932,11 +963,11 @@ fn build_sync_transform(
t: Box<dyn SyncTransform>,
node: TransformNode,
input_rx: BufferReceiver<EventArray>,
utilization_emitter: &mut UtilizationEmitter,
utilization_registry: &UtilizationRegistry,
) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key);

let sender = utilization_emitter.add_component(node.key.clone(), gauge!("utilization"));
let sender = utilization_registry.add_component(node.key.clone(), gauge!("utilization"));
let runner = Runner::new(t, input_rx, sender, node.input_details.data_type(), outputs);
let transform = if node.enable_concurrency {
runner.run_concurrently().boxed()
Expand Down Expand Up @@ -1113,11 +1144,11 @@ fn build_task_transform(
typetag: &str,
key: &ComponentKey,
outputs: &[TransformOutput],
utilization_emitter: &mut UtilizationEmitter,
utilization_registry: &UtilizationRegistry,
) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
let (mut fanout, control) = Fanout::new();

let sender = utilization_emitter.add_component(key.clone(), gauge!("utilization"));
let sender = utilization_registry.add_component(key.clone(), gauge!("utilization"));
let input_rx = wrap(sender, key.clone(), input_rx.into_stream());

let events_received = register!(EventsReceived);
Expand Down
10 changes: 8 additions & 2 deletions src/topology/running.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::{
shutdown::SourceShutdownCoordinator,
signal::ShutdownError,
spawn_named,
utilization::UtilizationRegistry,
};

pub type ShutdownErrorReceiver = mpsc::UnboundedReceiver<ShutdownError>;
Expand All @@ -54,6 +55,7 @@ pub struct RunningTopology {
watch: (WatchTx, WatchRx),
pub(crate) running: Arc<AtomicBool>,
graceful_shutdown_duration: Option<Duration>,
utilization_registry: Option<UtilizationRegistry>,
utilization_task: Option<TaskHandle>,
utilization_task_shutdown_trigger: Option<Trigger>,
pending_reload: Option<HashSet<ComponentKey>>,
Expand All @@ -75,6 +77,7 @@ impl RunningTopology {
running: Arc::new(AtomicBool::new(true)),
graceful_shutdown_duration: config.graceful_shutdown_duration,
config,
utilization_registry: None,
utilization_task: None,
utilization_task_shutdown_trigger: None,
pending_reload: None,
Expand Down Expand Up @@ -306,6 +309,7 @@ impl RunningTopology {
if let Some(mut new_pieces) = TopologyPiecesBuilder::new(&new_config, &diff)
.with_buffers(buffers.clone())
.with_extra_context(extra_context.clone())
.with_utilization_registry(self.utilization_registry.clone())
.build_or_log_errors()
.await
{
Expand Down Expand Up @@ -335,6 +339,7 @@ impl RunningTopology {
if let Some(mut new_pieces) = TopologyPiecesBuilder::new(&self.config, &diff)
.with_buffers(buffers)
.with_extra_context(extra_context.clone())
.with_utilization_registry(self.utilization_registry.clone())
.build_or_log_errors()
.await
&& self
Expand Down Expand Up @@ -1298,8 +1303,8 @@ impl RunningTopology {
return None;
}

let mut utilization_emitter = pieces
.utilization_emitter
let (utilization_emitter, utilization_registry) = pieces
.utilization
.take()
.expect("Topology is missing the utilization metric emitter!");
let mut running_topology = Self::new(config, abort_tx);
Expand All @@ -1315,6 +1320,7 @@ impl RunningTopology {

let (utilization_task_shutdown_trigger, utilization_shutdown_signal, _) =
ShutdownSignal::new_wired();
running_topology.utilization_registry = Some(utilization_registry.clone());
running_topology.utilization_task_shutdown_trigger =
Some(utilization_task_shutdown_trigger);
running_topology.utilization_task = Some(tokio::spawn(Task::new(
Expand Down
60 changes: 35 additions & 25 deletions src/utilization.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
collections::HashMap,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll, ready},
time::Duration,
};
Expand All @@ -11,9 +12,6 @@ use std::time::Instant;
#[cfg(test)]
use mock_instant::global::Instant;

#[cfg(debug_assertions)]
use std::sync::Arc;

use futures::{Stream, StreamExt};
use metrics::Gauge;
use pin_project::pin_project;
Expand Down Expand Up @@ -203,33 +201,25 @@ impl UtilizationComponentSender {
}
}

pub(crate) struct UtilizationEmitter {
timers: HashMap<ComponentKey, Timer>,
timer_rx: Receiver<UtilizationTimerMessage>,
/// Registry for components sending utilization data.
///
/// Cloning this is cheap and does not clone the underlying data.
#[derive(Clone)]
pub struct UtilizationRegistry {
timers: Arc<Mutex<HashMap<ComponentKey, Timer>>>,
timer_tx: Sender<UtilizationTimerMessage>,
intervals: IntervalStream,
}

impl UtilizationEmitter {
pub(crate) fn new() -> Self {
let (timer_tx, timer_rx) = channel(4096);
Self {
timers: HashMap::default(),
intervals: IntervalStream::new(interval(UTILIZATION_EMITTER_DURATION)),
timer_tx,
timer_rx,
}
}

impl UtilizationRegistry {
/// Adds a new component to this utilization metric emitter
///
/// Returns a sender which can be used to send utilization information back to the emitter
pub(crate) fn add_component(
&mut self,
&self,
key: ComponentKey,
gauge: Gauge,
) -> UtilizationComponentSender {
self.timers.insert(
self.timers.lock().expect("mutex poisoned").insert(
key.clone(),
Timer::new(
gauge,
Expand All @@ -242,24 +232,44 @@ impl UtilizationEmitter {
component_key: key,
}
}
}

pub(crate) struct UtilizationEmitter {
timers: Arc<Mutex<HashMap<ComponentKey, Timer>>>,
timer_rx: Receiver<UtilizationTimerMessage>,
}

impl UtilizationEmitter {
pub(crate) fn new() -> (Self, UtilizationRegistry) {
let (timer_tx, timer_rx) = channel(4096);
let timers = Arc::new(Mutex::new(HashMap::default()));
(
Self {
timers: Arc::clone(&timers),
timer_rx,
},
UtilizationRegistry { timers, timer_tx },
)
}

pub(crate) async fn run_utilization(&mut self, mut shutdown: ShutdownSignal) {
pub(crate) async fn run_utilization(mut self, mut shutdown: ShutdownSignal) {
let mut intervals = IntervalStream::new(interval(UTILIZATION_EMITTER_DURATION));
loop {
tokio::select! {
message = self.timer_rx.recv() => {
match message {
Some(UtilizationTimerMessage::StartWait(key, start_time)) => {
self.timers.get_mut(&key).expect("Utilization timer missing for component").start_wait(start_time);
self.timers.lock().expect("mutex poisoned").get_mut(&key).expect("Utilization timer missing for component").start_wait(start_time);
}
Some(UtilizationTimerMessage::StopWait(key, stop_time)) => {
self.timers.get_mut(&key).expect("Utilization timer missing for component").stop_wait(stop_time);
self.timers.lock().expect("mutex poisoned").get_mut(&key).expect("Utilization timer missing for component").stop_wait(stop_time);
}
None => break,
}
},

Some(_) = self.intervals.next() => {
for timer in self.timers.values_mut() {
Some(_) = intervals.next() => {
for timer in self.timers.lock().expect("mutex poisoned").values_mut() {
timer.update_utilization();
}
},
Expand Down
Loading