Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
95886f8
fix(utilization_metric): run a separate task for utilization to ensur…
esensar Dec 19, 2024
59b7d19
Add changelog entry
esensar Dec 23, 2024
416e1cf
Remove unnecessary clone when building utilization task
esensar Jan 10, 2025
bb5d7ca
Name utilization task `utilization_heartbeat`
esensar Jan 10, 2025
d7d8694
Join `utilization_task` when stopping topology
esensar Jan 10, 2025
db6f273
Shutdown utilization task when stopping topology
esensar Jan 17, 2025
e81af45
Hack: fix utilization never ending, by polling another stream?
esensar Jan 27, 2025
c37f106
Credit Quad9DNS in changelog
esensar Mar 3, 2025
fc1566f
Merge remote-tracking branch 'origin/master' into fix/utilization-metric
pront Mar 11, 2025
295bc8f
Merge branch 'master' into fix/utilization-metric
esensar Jun 10, 2025
a54a25d
Merge remote-tracking branch 'origin/master' into fix/utilization-metric
pront Jun 12, 2025
f4e5e44
Replace unbounded_channel with channel
esensar Jun 13, 2025
accdc6e
Replace unwrap with expect in utilization
esensar Jun 13, 2025
762fbdd
Increase timer channel buffer size
esensar Jun 14, 2025
a6bc1b5
Wrap utilization timer logic in a separate sender
esensar Jun 19, 2025
4efb580
Rename start and stop fns
esensar Jun 19, 2025
64dbb89
Merge branch 'master' into fix/utilization-metric
esensar Jun 20, 2025
e381c16
Remove unused clippy check
esensar Jun 20, 2025
3e4aa96
Merge branch 'master' into fix/utilization-metric
pront Jun 20, 2025
72c1437
Merge branch 'master' into fix/utilization-metric
thomasqueirozb Jun 20, 2025
58f2ca9
Replace unwrap with expect in topology start
esensar Jun 23, 2025
bad119a
Merge branch 'master' into fix/utilization-metric
thomasqueirozb Jun 23, 2025
4775b5a
Merge branch 'master' into fix/utilization-metric
pront Jul 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/22070_utilization_metric_periodic_emit.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `utilization` metric is now properly published periodically, even when no events are flowing through the components.

authors: esensar Quad9DNS
50 changes: 31 additions & 19 deletions src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{

use futures::{stream::FuturesOrdered, FutureExt, StreamExt, TryStreamExt};
use futures_util::stream::FuturesUnordered;
use metrics::gauge;
use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire};
use tokio::{
select,
Expand Down Expand Up @@ -51,7 +52,7 @@ use crate::{
spawn_named,
topology::task::TaskError,
transforms::{SyncTransform, TaskTransform, Transform, TransformOutputs, TransformOutputsBuf},
utilization::wrap,
utilization::{wrap, UtilizationComponentSender, UtilizationEmitter},
SourceSender,
};

Expand Down Expand Up @@ -84,6 +85,7 @@ struct Builder<'a> {
healthchecks: HashMap<ComponentKey, Task>,
detach_triggers: HashMap<ComponentKey, Trigger>,
extra_context: ExtraContext,
utilization_emitter: UtilizationEmitter,
}

impl<'a> Builder<'a> {
Expand All @@ -105,6 +107,7 @@ impl<'a> Builder<'a> {
healthchecks: HashMap::new(),
detach_triggers: HashMap::new(),
extra_context,
utilization_emitter: UtilizationEmitter::new(),
}
}

Expand All @@ -128,6 +131,7 @@ impl<'a> Builder<'a> {
healthchecks: self.healthchecks,
shutdown_coordinator: self.shutdown_coordinator,
detach_triggers: self.detach_triggers,
utilization_emitter: Some(self.utilization_emitter),
})
} else {
Err(self.errors)
Expand Down Expand Up @@ -513,7 +517,7 @@ impl<'a> Builder<'a> {

let (transform_task, transform_outputs) = {
let _span = span.enter();
build_transform(transform, node, input_rx)
build_transform(transform, node, input_rx, &mut self.utilization_emitter)
};

self.outputs.extend(transform_outputs);
Expand Down Expand Up @@ -615,6 +619,10 @@ impl<'a> Builder<'a> {

let (trigger, tripwire) = Tripwire::new();

let utilization_sender = self
.utilization_emitter
.add_component(key.clone(), gauge!("utilization"));
let component_key = key.clone();
let sink = async move {
debug!("Sink starting.");

Expand All @@ -630,7 +638,7 @@ impl<'a> Builder<'a> {
.take()
.expect("Task started but input has been taken.");

let mut rx = wrap(rx);
let mut rx = wrap(utilization_sender, component_key.clone(), rx);

let events_received = register!(EventsReceived);
sink.run(
Expand Down Expand Up @@ -711,6 +719,7 @@ pub struct TopologyPieces {
pub(super) healthchecks: HashMap<ComponentKey, Task>,
pub(crate) shutdown_coordinator: SourceShutdownCoordinator,
pub(crate) detach_triggers: HashMap<ComponentKey, Trigger>,
pub(crate) utilization_emitter: Option<UtilizationEmitter>,
}

impl TopologyPieces {
Expand Down Expand Up @@ -789,18 +798,22 @@ fn build_transform(
transform: Transform,
node: TransformNode,
input_rx: BufferReceiver<EventArray>,
utilization_emitter: &mut UtilizationEmitter,
) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
match transform {
// TODO: avoid the double boxing for function transforms here
Transform::Function(t) => build_sync_transform(Box::new(t), node, input_rx),
Transform::Synchronous(t) => build_sync_transform(t, node, input_rx),
Transform::Function(t) => {
build_sync_transform(Box::new(t), node, input_rx, utilization_emitter)
}
Transform::Synchronous(t) => build_sync_transform(t, node, input_rx, utilization_emitter),
Transform::Task(t) => build_task_transform(
t,
input_rx,
node.input_details.data_type(),
node.typetag,
&node.key,
&node.outputs,
utilization_emitter,
),
}
}
Expand All @@ -809,10 +822,12 @@ fn build_sync_transform(
t: Box<dyn SyncTransform>,
node: TransformNode,
input_rx: BufferReceiver<EventArray>,
utilization_emitter: &mut UtilizationEmitter,
) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key);

let runner = Runner::new(t, input_rx, node.input_details.data_type(), outputs);
let sender = utilization_emitter.add_component(node.key.clone(), gauge!("utilization"));
let runner = Runner::new(t, input_rx, sender, node.input_details.data_type(), outputs);
let transform = if node.enable_concurrency {
runner.run_concurrently().boxed()
} else {
Expand Down Expand Up @@ -852,15 +867,15 @@ struct Runner {
input_rx: Option<BufferReceiver<EventArray>>,
input_type: DataType,
outputs: TransformOutputs,
timer: crate::utilization::Timer,
last_report: Instant,
timer_tx: UtilizationComponentSender,
events_received: Registered<EventsReceived>,
}

impl Runner {
fn new(
transform: Box<dyn SyncTransform>,
input_rx: BufferReceiver<EventArray>,
timer_tx: UtilizationComponentSender,
input_type: DataType,
outputs: TransformOutputs,
) -> Self {
Expand All @@ -869,18 +884,13 @@ impl Runner {
input_rx: Some(input_rx),
input_type,
outputs,
timer: crate::utilization::Timer::new(),
last_report: Instant::now(),
timer_tx,
events_received: register!(EventsReceived),
}
}

fn on_events_received(&mut self, events: &EventArray) {
let stopped = self.timer.stop_wait();
if stopped.duration_since(self.last_report).as_secs() >= 5 {
self.timer.report();
self.last_report = stopped;
}
self.timer_tx.try_send_stop_wait();

self.events_received.emit(CountByteSize(
events.len(),
Expand All @@ -889,7 +899,7 @@ impl Runner {
}

async fn send_outputs(&mut self, outputs_buf: &mut TransformOutputsBuf) -> crate::Result<()> {
self.timer.start_wait();
self.timer_tx.try_send_start_wait();
self.outputs.send(outputs_buf).await
}

Expand All @@ -906,7 +916,7 @@ impl Runner {
.into_stream()
.filter(move |events| ready(filter_events_type(events, self.input_type)));

self.timer.start_wait();
self.timer_tx.try_send_start_wait();
while let Some(events) = input_rx.next().await {
self.on_events_received(&events);
self.transform.transform_all(events, &mut outputs_buf);
Expand All @@ -932,7 +942,7 @@ impl Runner {
let mut in_flight = FuturesOrdered::new();
let mut shutting_down = false;

self.timer.start_wait();
self.timer_tx.try_send_start_wait();
loop {
tokio::select! {
biased;
Expand Down Expand Up @@ -993,10 +1003,12 @@ fn build_task_transform(
typetag: &str,
key: &ComponentKey,
outputs: &[TransformOutput],
utilization_emitter: &mut UtilizationEmitter,
) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
let (mut fanout, control) = Fanout::new();

let input_rx = crate::utilization::wrap(input_rx.into_stream());
let sender = utilization_emitter.add_component(key.clone(), gauge!("utilization"));
let input_rx = wrap(sender, key.clone(), input_rx.into_stream());

let events_received = register!(EventsReceived);
let filtered = input_rx
Expand Down
44 changes: 39 additions & 5 deletions src/topology/running.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ use std::{
};

use super::{
builder,
builder::TopologyPieces,
builder::{self, TopologyPieces},
fanout::{ControlChannel, ControlMessage},
handle_errors, retain, take_healthchecks,
task::TaskOutput,
task::{Task, TaskOutput},
BuiltBuffer, TaskHandle,
};
use crate::{
Expand All @@ -23,14 +22,15 @@ use crate::{
spawn_named,
};
use futures::{future, Future, FutureExt};
use stream_cancel::Trigger;
use tokio::{
sync::{mpsc, watch},
time::{interval, sleep_until, Duration, Instant},
};
use tracing::Instrument;
use vector_lib::buffers::topology::channel::BufferSender;
use vector_lib::tap::topology::{TapOutput, TapResource, WatchRx, WatchTx};
use vector_lib::trigger::DisabledTrigger;
use vector_lib::{buffers::topology::channel::BufferSender, shutdown::ShutdownSignal};

pub type ShutdownErrorReceiver = mpsc::UnboundedReceiver<ShutdownError>;

Expand All @@ -49,6 +49,8 @@ pub struct RunningTopology {
watch: (WatchTx, WatchRx),
pub(crate) running: Arc<AtomicBool>,
graceful_shutdown_duration: Option<Duration>,
utilization_task: Option<TaskHandle>,
utilization_task_shutdown_trigger: Option<Trigger>,
pending_reload: Option<HashSet<ComponentKey>>,
}

Expand All @@ -68,6 +70,8 @@ impl RunningTopology {
running: Arc::new(AtomicBool::new(true)),
graceful_shutdown_duration: config.graceful_shutdown_duration,
config,
utilization_task: None,
utilization_task_shutdown_trigger: None,
pending_reload: None,
}
}
Expand Down Expand Up @@ -127,15 +131,21 @@ impl RunningTopology {
// pump in self.tasks, and the other for source in self.source_tasks.
let mut check_handles = HashMap::<ComponentKey, Vec<_>>::new();

let map_closure = |_result| ();

// We need to give some time to the sources to gracefully shutdown, so
// we will merge them with other tasks.
for (key, task) in self.tasks.into_iter().chain(self.source_tasks.into_iter()) {
let task = task.map(|_result| ()).shared();
let task = task.map(map_closure).shared();

wait_handles.push(task.clone());
check_handles.entry(key).or_default().push(task);
}

if let Some(utilization_task) = self.utilization_task {
wait_handles.push(utilization_task.map(map_closure).shared());
}

// If we reach this, we will forcefully shutdown the sources. If None, we will never force shutdown.
let deadline = self
.graceful_shutdown_duration
Expand Down Expand Up @@ -212,6 +222,9 @@ impl RunningTopology {

// Now kick off the shutdown process by shutting down the sources.
let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline);
if let Some(trigger) = self.utilization_task_shutdown_trigger {
trigger.cancel();
}

futures::future::join(source_shutdown_complete, shutdown_complete_future).map(|_| ())
}
Expand Down Expand Up @@ -1220,6 +1233,10 @@ impl RunningTopology {
return None;
}

let mut utilization_emitter = pieces
.utilization_emitter
.take()
.expect("Topology is missing the utilization metric emitter!");
let mut running_topology = Self::new(config, abort_tx);

if !running_topology
Expand All @@ -1231,6 +1248,23 @@ impl RunningTopology {
running_topology.connect_diff(&diff, &mut pieces).await;
running_topology.spawn_diff(&diff, pieces);

let (utilization_task_shutdown_trigger, utilization_shutdown_signal, _) =
ShutdownSignal::new_wired();
running_topology.utilization_task_shutdown_trigger =
Some(utilization_task_shutdown_trigger);
running_topology.utilization_task = Some(tokio::spawn(Task::new(
"utilization_heartbeat".into(),
"",
async move {
utilization_emitter
.run_utilization(utilization_shutdown_signal)
.await;
// TODO: new task output type for this? Or handle this task in a completely
// different way
Ok(TaskOutput::Healthcheck)
},
)));

Some((running_topology, abort_rx))
}
}
Expand Down
Loading
Loading