Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ vector-config-common = { path = "lib/vector-config-common" }
vector-config-macros = { path = "lib/vector-config-macros" }
vector-lib = { path = "lib/vector-lib", default-features = false, features = ["vrl"] }
vrl = { git = "https://github.com/vectordotdev/vrl.git", branch = "main", features = ["arbitrary", "cli", "test", "test_framework"] }
mock_instant = { version = "0.6" }
serial_test = { version = "3.2" }

[dependencies]
cfg-if.workspace = true
Expand Down Expand Up @@ -453,6 +455,8 @@ openssl-src = { version = "300", default-features = false, features = ["force-en
[dev-dependencies]
approx = "0.5.1"
assert_cmd = { version = "2.0.17", default-features = false }
mock_instant.workspace = true
serial_test.workspace = true
aws-smithy-runtime = { version = "1.8.3", default-features = false, features = ["tls-rustls"] }
azure_core = { version = "0.25", default-features = false, features = ["reqwest", "hmac_openssl", "azurite_workaround"] }
azure_identity = { version = "0.25", default-features = false, features = ["reqwest"] }
Expand Down
210 changes: 186 additions & 24 deletions src/utilization.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,40 @@
//! Component utilization tracking and metrics.
//!
//! This module tracks how much time Vector components spend waiting for input versus actively processing data.
//! Utilization is calculated as a number between `0` and `1`, where `1` means fully utilized and `0` means idle.
//!
//! # Architecture
//!
//! - **Stream Wrapper**: `Utilization<S>` wraps component input streams and sends timing messages when polling.
//! - **Timer**: Tracks wait/active spans and calculates utilization via exponentially weighted moving average (EWMA).
//! - **Emitter**: Centralized `UtilizationEmitter` receives timing messages from all components and periodically reports metrics.
//!
//! # Message Flow
//!
//! 1. Component polls wrapped stream → `Utilization::poll_next()` sends `StartWait` message with timestamp
//! 2. Stream returns data → `Utilization::poll_next()` sends `StopWait` message with timestamp
//! 3. Messages queue in async channel and are processed by `UtilizationEmitter`
//! 4. Every `5` seconds, `Timer::report()` calculates utilization and updates the metric gauge
//!
//! # Delayed Message Handling
//!
//! Messages carry timestamps from when they were sent, but may be processed later due to channel queueing.
//! To prevent invalid utilization calculations, `Timer::end_span()` clamps timestamps to the current reporting
//! period boundary (`overall_start`), ensuring we only account for time within the current measurement window.

use std::{
collections::HashMap,
pin::Pin,
task::{Context, Poll, ready},
time::{Duration, Instant},
time::Duration,
};

#[cfg(not(test))]
use std::time::Instant;

#[cfg(test)]
use mock_instant::global::Instant;

#[cfg(debug_assertions)]
use std::sync::Arc;

Expand Down Expand Up @@ -110,24 +140,32 @@ impl Timer {
}

/// Complete the current waiting span and begin a non-waiting span
pub(crate) fn stop_wait(&mut self, at: Instant) -> Instant {
pub(crate) fn stop_wait(&mut self, at: Instant) {
if self.waiting {
let now = self.end_span(at);
self.end_span(at);
self.waiting = false;
now
} else {
at
}
}

#[cfg(debug_assertions)]
fn report(&mut self, utilization: f64) {
// Note that changing the reporting interval would also affect the actual metric reporting frequency.
// This check reduces debug log spamming.
if self.report_count.is_multiple_of(5) {
debug!(component_id = %self.component_id, %utilization);
}
self.report_count = self.report_count.wrapping_add(1);
}

/// Meant to be called on a regular interval, this method calculates wait
/// ratio since the last time it was called and reports the resulting
/// utilization average.
pub(crate) fn report(&mut self) {
pub(crate) fn update_utilization(&mut self) {
// End the current span so it can be accounted for, but do not change
// whether or not we're in the waiting state. This way the next span
// inherits the correct status.
let now = self.end_span(Instant::now());
let now = Instant::now();
self.end_span(now);

let total_duration = now.duration_since(self.overall_start);
let wait_ratio = self.total_wait.as_secs_f64() / total_duration.as_secs_f64();
Expand All @@ -137,29 +175,27 @@ impl Timer {
let avg = self.ewma.average().unwrap_or(f64::NAN);
let avg_rounded = (avg * 10000.0).round() / 10000.0; // 4 digit precision

#[cfg(debug_assertions)]
{
// Note that changing the reporting interval would also affect the actual metric reporting frequency.
// This check reduces debug log spamming.
if self.report_count.is_multiple_of(5) {
debug!(component_id = %self.component_id, utilization = %avg_rounded);
}
self.report_count = self.report_count.wrapping_add(1);
}

self.gauge.set(avg_rounded);

// Reset overall statistics for the next reporting period.
self.overall_start = self.span_start;
self.overall_start = now;
self.total_wait = Duration::new(0, 0);

#[cfg(debug_assertions)]
self.report(avg_rounded);
}

fn end_span(&mut self, at: Instant) -> Instant {
fn end_span(&mut self, at: Instant) {
// Clamp the timestamp to the current reporting period to handle delayed messages.
// If 'at' is before overall_start (due to old timestamps from queued messages),
// clamp it to overall_start to prevent accounting for time outside this period.
let at = at.max(self.overall_start);

if self.waiting {
self.total_wait += at - self.span_start;
// Similarly, clamp span_start to ensure we don't count wait time from before this period
let span_start = self.span_start.max(self.overall_start);
self.total_wait += at.saturating_duration_since(span_start);
}
self.span_start = at;
self.span_start
}
}

Expand Down Expand Up @@ -251,7 +287,7 @@ impl UtilizationEmitter {

Some(_) = self.intervals.next() => {
for timer in self.timers.values_mut() {
timer.report();
timer.update_utilization();
}
},

Expand Down Expand Up @@ -282,3 +318,129 @@ pub(crate) fn wrap<S>(
inner,
}
}

#[cfg(test)]
mod tests {
use mock_instant::global::MockClock;
use serial_test::serial;

use super::*;

/// Helper function to reset mock clock and create a timer at T=100
fn setup_timer() -> Timer {
// Reset mock clock to ensure test isolation
MockClock::set_time(Duration::ZERO);

// Advance mock time to T=100 to avoid issues with time 0
MockClock::advance(Duration::from_secs(100));

Timer::new(
metrics::gauge!("test_utilization"),
#[cfg(debug_assertions)]
"test_component".into(),
)
}

/// Helper function to assert utilization is approximately equal to expected value
/// and within valid bounds [0, 1]
fn assert_approx_eq(actual: f64, expected: f64, tolerance: f64, description: &str) {
assert!(
actual >= 0.0 && actual <= 1.0,
"Utilization {actual} is outside [0, 1]"
);
assert!(
(actual - expected).abs() < tolerance,
"Expected utilization {description}, got {actual}"
);
}

#[test]
#[serial]
fn test_normal_utilization_within_bounds() {
let mut timer = setup_timer();

// Timer created at T=100. Advance 1 second and start waiting
MockClock::advance(Duration::from_secs(1));
timer.start_wait(Instant::now());

// Advance 2 seconds while waiting (T=101 to T=103)
MockClock::advance(Duration::from_secs(2));
timer.stop_wait(Instant::now());

// Advance 2 more seconds (not waiting), then report (T=103 to T=105)
MockClock::advance(Duration::from_secs(2));
timer.update_utilization();

// total_wait = 2 seconds, total_duration = 5 seconds (T=100 to T=105)
// wait_ratio = 2/5 = 0.4, utilization = 1.0 - 0.4 = 0.6
let avg = timer.ewma.average().unwrap();
assert_approx_eq(avg, 0.6, 0.01, "~0.6");
}

#[test]
#[serial]
fn test_delayed_messages_can_cause_invalid_utilization() {
let mut timer = setup_timer();

// Timer created at T=100. Simulate that some time passes (to T=105)
// and a report period completes, resetting overall_start
MockClock::advance(Duration::from_secs(5));
let now = Instant::now(); // T=105
timer.overall_start = now; // Simulate report period reset

// Now simulate delayed messages with old timestamps from before T=105
// These represent messages sent at T=101, T=103, T=104 but processed after T=105
let t1 = now - Duration::from_secs(4); // T=101
let t3 = now - Duration::from_secs(2); // T=103
let t4 = now - Duration::from_secs(1); // T=104

// Process old messages - they should be clamped to overall_start (T=105)
timer.start_wait(t1); // Should be clamped to T=105
timer.stop_wait(t3); // Should be clamped to T=105 (no wait time added)
timer.start_wait(t4); // Should be clamped to T=105

// Advance 5 seconds and report (T=110)
MockClock::advance(Duration::from_secs(5));
timer.update_utilization();

// With clamping: all old timestamps treated as T=105
// So we waited from T=105 to T=110 = 5 seconds
// total_duration = 5 seconds, total_wait = 5 seconds
// wait_ratio = 1.0, utilization = 0.0
let avg = timer.ewma.average().unwrap();
assert_approx_eq(avg, 0.0, 0.01, "near 0 (always waiting)");
}

#[test]
#[serial]
fn test_always_waiting_utilization() {
let mut timer = setup_timer();

// Timer created at T=100. Start waiting immediately
timer.start_wait(Instant::now());

// Advance 5 seconds while waiting (T=100 to T=105)
MockClock::advance(Duration::from_secs(5));
timer.update_utilization();

// We waited the entire time: total_wait = 5s, total_duration = 5s
// wait_ratio = 1.0, utilization = 0.0
let avg = timer.ewma.average().unwrap();
assert_approx_eq(avg, 0.0, 0.01, "near 0 (always waiting)");
}

#[test]
#[serial]
fn test_never_waiting_utilization() {
let mut timer = setup_timer();

// Advance 5 seconds without waiting (T=100 to T=105)
MockClock::advance(Duration::from_secs(5));
timer.update_utilization();

// Never waited: total_wait = 0, total_duration = 5s
// wait_ratio = 0.0, utilization = 1.0
let avg = timer.ewma.average().unwrap();
assert_approx_eq(avg, 1.0, 0.01, "near 1.0 (never waiting)");
}
}
Loading