Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/23780_fix_panic_partial_cmp_in_sort.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fixed a bug where certain floating-point values such as `f64::NAN`, `f64::INFINITY`, and similar would cause Vector to panic when sorting more than 20 items in some internal functions.

authors: thomasqueirozb
27 changes: 24 additions & 3 deletions src/sinks/util/buffer/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::cmp::Ordering;

use vector_lib::event::metric::{Metric, MetricValue, Sample};

use crate::sinks::util::{
Expand Down Expand Up @@ -106,7 +104,7 @@ pub fn compress_distribution(samples: &mut Vec<Sample>) -> Vec<Sample> {
return Vec::new();
}

samples.sort_by(|a, b| a.value.partial_cmp(&b.value).unwrap_or(Ordering::Equal));
samples.sort_by(|a, b| a.value.total_cmp(&b.value));

let mut acc = Sample {
value: samples[0].value,
Expand All @@ -129,6 +127,7 @@ pub fn compress_distribution(samples: &mut Vec<Sample>) -> Vec<Sample> {

#[cfg(test)]
mod tests {
use itertools::Itertools;
use similar_asserts::assert_eq;
use vector_lib::{
event::metric::{MetricKind, MetricKind::*, MetricValue, StatisticKind},
Expand Down Expand Up @@ -588,6 +587,28 @@ mod tests {
);
}

#[test]
fn compress_distributions_doesnt_panic() {
let to_float = |v: i32| -> f64 { v as f64 };

let mut samples = (0..=15)
.map(to_float)
.chain(std::iter::once(f64::NAN))
.chain((16..=20).map(to_float))
.rev()
.map(|value| Sample { value, rate: 1 })
.collect_vec();

assert_eq!(
compress_distribution(&mut samples),
(0..=20)
.map(to_float)
.chain(std::iter::once(f64::NAN))
.map(|value| Sample { value, rate: 1 })
.collect_vec()
);
}

fn rebuffer_absolute_aggregated_histograms<State: MetricNormalize + Default>() -> Buffer {
let mut events = Vec::new();
for _ in 2..5 {
Expand Down
25 changes: 20 additions & 5 deletions src/sinks/util/statistic.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::cmp::Ordering;

use snafu::Snafu;

use crate::event::metric::Sample;
Expand Down Expand Up @@ -46,9 +44,7 @@ impl DistributionStatistic {
}
}),
_ => Some({
bins.sort_unstable_by(|a, b| {
a.value.partial_cmp(&b.value).unwrap_or(Ordering::Equal)
});
bins.sort_unstable_by(|a, b| a.value.total_cmp(&b.value));

let min = bins.first().unwrap().value;
let max = bins.last().unwrap().value;
Expand Down Expand Up @@ -226,4 +222,23 @@ mod test {
}
);
}

#[test]
fn sort_unstable_doesnt_panic() {
let to_float = |v: i32| -> f64 { v as f64 };

let v: Vec<f64> = (0..=15)
.map(to_float)
.chain(std::iter::once(f64::NAN))
.chain((16..=20).map(to_float))
.rev()
.collect();

// For <20 items the internal sort implementation is different and doesn't panic
assert!(v.len() > 20);

let rates: Vec<u32> = std::iter::repeat([1]).flatten().take(v.len()).collect();
let s: Vec<(f64, u32)> = v.into_iter().zip(rates).collect();
DistributionStatistic::from_samples(&samples(&s), &[0.0, 1.0]);
}
}
51 changes: 48 additions & 3 deletions src/sources/prometheus/parser.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::cmp::Ordering;

use chrono::{DateTime, TimeZone, Utc};
#[cfg(feature = "sources-prometheus-remote-write")]
use vector_lib::prometheus::parser::proto;
Expand Down Expand Up @@ -97,7 +95,9 @@ fn reparse_groups(
let tags = combine_tags(key.labels, tag_overrides.clone());

let mut buckets = metric.buckets;
buckets.sort_unstable_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Equal));
buckets.sort_unstable_by(|a, b| {
a.bucket.total_cmp(&b.bucket).then(a.count.cmp(&b.count))
});
for i in (1..buckets.len()).rev() {
buckets[i].count = buckets[i].count.saturating_sub(buckets[i - 1].count);
}
Expand Down Expand Up @@ -178,9 +178,11 @@ fn combine_tags(

#[cfg(test)]
mod test {
use core::f64;
use std::sync::LazyLock;

use chrono::{TimeZone, Timelike, Utc};
use itertools::Itertools;
use similar_asserts::assert_eq;
use vector_lib::{assert_event_data_eq, metric_tags};

Expand Down Expand Up @@ -737,6 +739,49 @@ mod test {
);
}

#[test]
fn test_histogram_doesnt_panic() {
let mut exp = r#"
# HELP http_request_duration_seconds A histogram of the request duration.
# TYPE http_request_duration_seconds histogram
"#
.to_string();

let to_float = |v: i32| -> f64 { v as f64 };
exp += &(0..=15)
.map(to_float)
.chain(std::iter::once(f64::NAN))
.chain((16..=20).map(to_float))
.rev()
.map(|f| format!("http_request_duration_seconds_bucket{{le=\"{f}\"}} 0 1612411506789"))
.join("\n");

assert_event_data_eq!(
events_to_metrics(parse_text(&exp)),
Ok(vec![
Metric::new(
"http_request_duration_seconds",
MetricKind::Absolute,
MetricValue::AggregatedHistogram {
// These bucket values don't mean/test anything, they just test that the
// sort works without panicking
buckets: (0..=20)
.map(to_float)
.chain(std::iter::once(f64::NAN))
.map(|upper_limit| Bucket {
upper_limit,
count: 0
})
.collect(),
count: 0,
sum: 0.0,
},
)
.with_timestamp(Some(*TIMESTAMP))
]),
);
}

#[test]
fn test_histogram_out_of_order() {
let exp = r#"
Expand Down
Loading