From c322a50d3e1f3d4a058a708d146a61df3411b3ab Mon Sep 17 00:00:00 2001 From: Mindaugas Vinkelis Date: Fri, 8 Nov 2024 00:12:30 +0200 Subject: [PATCH] Histogram implementation cleanup (#2283) --- .../src/metrics/internal/histogram.rs | 171 +++++------------- 1 file changed, 46 insertions(+), 125 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/internal/histogram.rs b/opentelemetry-sdk/src/metrics/internal/histogram.rs index 4456d36645..05a6e07e23 100644 --- a/opentelemetry-sdk/src/metrics/internal/histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/histogram.rs @@ -9,11 +9,7 @@ use opentelemetry::KeyValue; use super::ValueMap; use super::{Aggregator, Number}; -struct HistogramTracker { - buckets: Mutex>, -} - -impl Aggregator for HistogramTracker +impl Aggregator for Mutex> where T: Number, { @@ -22,27 +18,26 @@ where type PreComputedValue = (T, usize); fn update(&self, (value, index): (T, usize)) { - let mut buckets = match self.buckets.lock() { - Ok(guard) => guard, - Err(_) => return, - }; + let mut buckets = self.lock().unwrap_or_else(|err| err.into_inner()); - buckets.bin(index, value); - buckets.sum(value); + buckets.total += value; + buckets.count += 1; + buckets.counts[index] += 1; + if value < buckets.min { + buckets.min = value; + } + if value > buckets.max { + buckets.max = value + } } fn create(count: &usize) -> Self { - HistogramTracker { - buckets: Mutex::new(Buckets::::new(*count)), - } + Mutex::new(Buckets::::new(*count)) } fn clone_and_reset(&self, count: &usize) -> Self { - let mut current = self.buckets.lock().unwrap_or_else(|err| err.into_inner()); - let cloned = replace(current.deref_mut(), Buckets::new(*count)); - Self { - buckets: Mutex::new(cloned), - } + let mut current = self.lock().unwrap_or_else(|err| err.into_inner()); + Mutex::new(replace(current.deref_mut(), Buckets::new(*count))) } } @@ -65,27 +60,12 @@ impl Buckets { ..Default::default() } } - - fn sum(&mut self, value: T) { - self.total += value; - } - - fn bin(&mut self, idx: usize, value: T) { - self.counts[idx] += 1; - self.count += 1; - if value < self.min { - self.min = value; - } - if value > self.max { - self.max = value - } - } } /// Summarizes a set of measurements as a histogram with explicitly defined /// buckets. pub(crate) struct Histogram { - value_map: ValueMap>, + value_map: ValueMap>>, bounds: Vec, record_min_max: bool, record_sum: bool, @@ -93,34 +73,21 @@ pub(crate) struct Histogram { } impl Histogram { - pub(crate) fn new(boundaries: Vec, record_min_max: bool, record_sum: bool) -> Self { - // TODO fix the bug, by first removing NaN and only then getting buckets_count - // once we know the reason for performance degradation - let buckets_count = boundaries.len() + 1; - let mut histogram = Histogram { + pub(crate) fn new(mut bounds: Vec, record_min_max: bool, record_sum: bool) -> Self { + bounds.retain(|v| !v.is_nan()); + bounds.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out")); + let buckets_count = bounds.len() + 1; + Histogram { value_map: ValueMap::new(buckets_count), - bounds: boundaries, + bounds, record_min_max, record_sum, start: Mutex::new(SystemTime::now()), - }; - - histogram.bounds.retain(|v| !v.is_nan()); - histogram - .bounds - .sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out")); - - histogram + } } pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { let f = measurement.into_float(); - // Ignore NaN and infinity. - // Only makes sense if T is f64, maybe this could be no-op for other cases? - // TODO: uncomment once we know the reason for performance degradation - // if f.is_infinite() || f.is_nan() { - // return; - // } // This search will return an index in the range `[0, bounds.len()]`, where // it will return `bounds.len()` if value is greater than the last element // of `bounds`. This aligns with the buckets in that the length of buckets @@ -156,17 +123,14 @@ impl Histogram { self.value_map .collect_and_reset(&mut h.data_points, |attributes, aggr| { - let b = aggr - .buckets - .into_inner() - .unwrap_or_else(|err| err.into_inner()); + let b = aggr.into_inner().unwrap_or_else(|err| err.into_inner()); HistogramDataPoint { attributes, start_time: prev_start, time: t, count: b.count, bounds: self.bounds.clone(), - bucket_counts: b.counts.clone(), + bucket_counts: b.counts, sum: if self.record_sum { b.total } else { @@ -214,7 +178,7 @@ impl Histogram { self.value_map .collect_readonly(&mut h.data_points, |attributes, aggr| { - let b = aggr.buckets.lock().unwrap_or_else(|err| err.into_inner()); + let b = aggr.lock().unwrap_or_else(|err| err.into_inner()); HistogramDataPoint { attributes, start_time: prev_start, @@ -245,68 +209,25 @@ impl Histogram { } } -// TODO: uncomment once we know the reason for performance degradation -// #[cfg(test)] -// mod tests { +#[cfg(test)] +mod tests { + use super::*; -// use super::*; - -// #[test] -// fn when_f64_is_nan_or_infinity_then_ignore() { -// struct Expected { -// min: f64, -// max: f64, -// sum: f64, -// count: u64, -// } -// impl Expected { -// fn new(min: f64, max: f64, sum: f64, count: u64) -> Self { -// Expected { -// min, -// max, -// sum, -// count, -// } -// } -// } -// struct TestCase { -// values: Vec, -// expected: Expected, -// } - -// let test_cases = vec![ -// TestCase { -// values: vec![2.0, 4.0, 1.0], -// expected: Expected::new(1.0, 4.0, 7.0, 3), -// }, -// TestCase { -// values: vec![2.0, 4.0, 1.0, f64::INFINITY], -// expected: Expected::new(1.0, 4.0, 7.0, 3), -// }, -// TestCase { -// values: vec![2.0, 4.0, 1.0, -f64::INFINITY], -// expected: Expected::new(1.0, 4.0, 7.0, 3), -// }, -// TestCase { -// values: vec![2.0, f64::NAN, 4.0, 1.0], -// expected: Expected::new(1.0, 4.0, 7.0, 3), -// }, -// TestCase { -// values: vec![4.0, 4.0, 4.0, 2.0, 16.0, 1.0], -// expected: Expected::new(1.0, 16.0, 31.0, 6), -// }, -// ]; - -// for test in test_cases { -// let h = Histogram::new(vec![], true, true); -// for v in test.values { -// h.measure(v, &[]); -// } -// let res = h.value_map.no_attribute_tracker.buckets.lock().unwrap(); -// assert_eq!(test.expected.max, res.max); -// assert_eq!(test.expected.min, res.min); -// assert_eq!(test.expected.sum, res.total); -// assert_eq!(test.expected.count, res.count); -// } -// } -// } + #[test] + fn check_buckets_are_selected_correctly() { + let hist = Histogram::::new(vec![1.0, 3.0, 6.0], false, false); + for v in 1..11 { + hist.measure(v, &[]); + } + let (count, dp) = hist.cumulative(None); + let dp = dp.unwrap(); + let dp = dp.as_any().downcast_ref::>().unwrap(); + assert_eq!(count, 1); + assert_eq!(dp.data_points[0].count, 10); + assert_eq!(dp.data_points[0].bucket_counts.len(), 4); + assert_eq!(dp.data_points[0].bucket_counts[0], 1); // 1 + assert_eq!(dp.data_points[0].bucket_counts[1], 2); // 2, 3 + assert_eq!(dp.data_points[0].bucket_counts[2], 3); // 4, 5, 6 + assert_eq!(dp.data_points[0].bucket_counts[3], 4); // 7, 8, 9, 10 + } +}