Skip to content

Commit 7b1c190

Browse files
committed
Merge #1593: Fix bug: Wrong UDP Average Connect Time metric
b423bf6 refactor: [#1589] improve readability of UDP performance metrics race condition test (Jose Celano) dc8d4a9 test: [#1589] add race condition test for UDP performance metrics (Jose Celano) a9acca5 refactor: [#1589] rename methods and remove unused code (Jose Celano) 4c082fa refactor: [#1589] make methods private (Jose Celano) cd57f7a fix: [#1589] use average aggregation for UDP processing time metrics (Jose Celano) ba3d8a9 fix: format (Jose Celano) caa69ae test: [#1589] remove uneeded test (Jose Celano) f402b02 chore: remove deprecated comment (Jose Celano) 8fbcf90 refactor(metrics): extract collect_matching_samples to Metric<T> impl (Jose Celano) 384b887 feat(metrics): [#1589] add Avg (average) aggregate function (Jose Celano) ed5f1e6 fix: [#1589] add dedicated metric for UDP request processing in moving average calculation (Jose Celano) 164de92 refactor: [#1589] remvoe duplicate code (Jose Celano) 1c13b12 fix: [#1589] partially. Moving average calculated for each time series (Jose Celano) 47c2949 refactor: [#1598] make recalculate udp avg scrape processing time metric and update atomic (Jose Celano) 59fbb39 refactor: [#1598] make recalculate udp avg announce processing time metric and update atomic (Jose Celano) d50948e refactor: [#1598] make recalculate udp avg connect processing time metric and update atomic (Jose Celano) e6c05b6 refactor(udp-tracker-server): [#1589] move average processing time calculation from Repository to Metrics (Jose Celano) Pull request description: Fix bug: Wrong UDP Average Connect Time metric. ### Context See #1589. ### Subtasks - [x] Read, recalculate, and update average should be atomic. - [x] Average should be calculated for a time series (label set). We are mixing series: we update the average using a label set (segregate average), but count the requests for the average globally (not segregated). - [x] Add a new metric to count requests for the moving average. This counter is also increased atomically when the new average is updated. - [x] Fix global (all trackers, no labels) value for the average. It should be the average of all average samples, not the `SUM`. - [x] Add new average aggregate function to the `metrics` package. - [x] Add tests: - [x] With two times series. Two trackers running on different ports (`6868`, `6969`) - [x] For race conditions, running multiple requests in parallel. To ensure the average is accurate after many iterations. ### How to test 1. Run the tracker: `cargo run` 2. Simulate one `announce` request per UDP server ``` cargo run -p torrust-tracker-client --bin udp_tracker_client announce udp://127.0.0.1:6868 000620bbc6c52d5a96d98f6c0f1dfa523a40df82 | jq cargo run -p torrust-tracker-client --bin udp_tracker_client announce udp://127.0.0.1:6969 000620bbc6c52d5a96d98f6c0f1dfa523a40df82 | jq ``` 3. Check metrics In the labelled metrics, there should be two metric samples like these: curl -s "http://localhost:1212/api/v1/metrics?token=MyAccessToken&format=prometheus" ``` # HELP udp_tracker_server_performance_avg_processing_time_ns Average time to process a UDP request in nanoseconds # TYPE udp_tracker_server_performance_avg_processing_time_ns gauge udp_tracker_server_performance_avg_processing_time_ns{request_kind="announce",server_binding_address_ip_family="inet",server_binding_address_ip_type="plain",server_binding_ip="0.0.0.0",server_binding_port="6868",server_binding_protocol="udp"} 54773 udp_tracker_server_performance_avg_processing_time_ns{request_kind="connect",server_binding_address_ip_family="inet",server_binding_address_ip_type="plain",server_binding_ip="0.0.0.0",server_binding_port="6868",server_binding_protocol="udp"} 40326 udp_tracker_server_performance_avg_processing_time_ns{request_kind="announce",server_binding_address_ip_family="inet",server_binding_address_ip_type="plain",server_binding_ip="0.0.0.0",server_binding_port="6969",server_binding_protocol="udp"} 66063.71428571429 udp_tracker_server_performance_avg_processing_time_ns{request_kind="connect",server_binding_address_ip_family="inet",server_binding_address_ip_type="plain",server_binding_ip="0.0.0.0",server_binding_port="6969",server_binding_protocol="udp"} 43497.71428571428 ``` In the global aggregated metrics: curl -s "http://localhost:1212/api/v1/metrics?token=MyAccessToken&format=prometheus" The values should be the average of the server's averages: ``` udp_avg_connect_processing_time_ns 41911 udp_avg_announce_processing_time_ns 60418 udp_avg_scrape_processing_time_ns 0 ``` 41911 = (40326 + 43497.71428571428)/2 = 41911,857142857 60418 = (54773 + 66063.71428571429)/2 = 60418,357142857 The values are rounded because we use a u64 for the global aggregated metrics. ACKs for top commit: josecelano: ACK b423bf6 Tree-SHA512: 3d2544860838d0817f0700587af87bc6890eaef596db6cc15d02340e2ad9e459e425d2589d4e5c542ec3f0fc250b6e49fb4964be343e670a13d1de3a59b8f712
2 parents 6d96650 + b423bf6 commit 7b1c190

File tree

17 files changed

+1295
-450
lines changed

17 files changed

+1295
-450
lines changed

packages/metrics/README.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ println!("{}", prometheus_output);
6767
### Metric Aggregation
6868

6969
```rust
70-
use torrust_tracker_metrics::metric_collection::aggregate::Sum;
70+
use torrust_tracker_metrics::metric_collection::aggregate::{Sum, Avg};
7171

7272
// Sum all counter values matching specific labels
7373
let total_requests = metrics.sum(
@@ -76,6 +76,14 @@ let total_requests = metrics.sum(
7676
);
7777

7878
println!("Total requests: {:?}", total_requests);
79+
80+
// Calculate average of gauge values matching specific labels
81+
let avg_response_time = metrics.avg(
82+
&metric_name!("response_time_seconds"),
83+
&[("endpoint", "/announce")].into(),
84+
);
85+
86+
println!("Average response time: {:?}", avg_response_time);
7987
```
8088

8189
## Architecture
Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
use crate::counter::Counter;
2+
use crate::gauge::Gauge;
3+
use crate::label::LabelSet;
4+
use crate::metric::aggregate::sum::Sum;
5+
use crate::metric::Metric;
6+
7+
pub trait Avg {
8+
type Output;
9+
fn avg(&self, label_set_criteria: &LabelSet) -> Self::Output;
10+
}
11+
12+
impl Avg for Metric<Counter> {
13+
type Output = f64;
14+
15+
fn avg(&self, label_set_criteria: &LabelSet) -> Self::Output {
16+
let matching_samples = self.collect_matching_samples(label_set_criteria);
17+
18+
if matching_samples.is_empty() {
19+
return 0.0;
20+
}
21+
22+
let sum = self.sum(label_set_criteria);
23+
24+
#[allow(clippy::cast_precision_loss)]
25+
(sum as f64 / matching_samples.len() as f64)
26+
}
27+
}
28+
29+
impl Avg for Metric<Gauge> {
30+
type Output = f64;
31+
32+
fn avg(&self, label_set_criteria: &LabelSet) -> Self::Output {
33+
let matching_samples = self.collect_matching_samples(label_set_criteria);
34+
35+
if matching_samples.is_empty() {
36+
return 0.0;
37+
}
38+
39+
let sum = self.sum(label_set_criteria);
40+
41+
#[allow(clippy::cast_precision_loss)]
42+
(sum / matching_samples.len() as f64)
43+
}
44+
}
45+
46+
#[cfg(test)]
47+
mod tests {
48+
49+
use torrust_tracker_primitives::DurationSinceUnixEpoch;
50+
51+
use crate::counter::Counter;
52+
use crate::gauge::Gauge;
53+
use crate::label::LabelSet;
54+
use crate::metric::aggregate::avg::Avg;
55+
use crate::metric::{Metric, MetricName};
56+
use crate::metric_name;
57+
use crate::sample::Sample;
58+
use crate::sample_collection::SampleCollection;
59+
60+
struct MetricBuilder<T> {
61+
sample_time: DurationSinceUnixEpoch,
62+
name: MetricName,
63+
samples: Vec<Sample<T>>,
64+
}
65+
66+
impl<T> Default for MetricBuilder<T> {
67+
fn default() -> Self {
68+
Self {
69+
sample_time: DurationSinceUnixEpoch::from_secs(1_743_552_000),
70+
name: metric_name!("test_metric"),
71+
samples: vec![],
72+
}
73+
}
74+
}
75+
76+
impl<T> MetricBuilder<T> {
77+
fn with_sample(mut self, value: T, label_set: &LabelSet) -> Self {
78+
let sample = Sample::new(value, self.sample_time, label_set.clone());
79+
self.samples.push(sample);
80+
self
81+
}
82+
83+
fn build(self) -> Metric<T> {
84+
Metric::new(
85+
self.name,
86+
None,
87+
None,
88+
SampleCollection::new(self.samples).expect("invalid samples"),
89+
)
90+
}
91+
}
92+
93+
fn counter_cases() -> Vec<(Metric<Counter>, LabelSet, f64)> {
94+
// (metric, label set criteria, expected_average_value)
95+
vec![
96+
// Metric with one sample without label set
97+
(
98+
MetricBuilder::default().with_sample(1.into(), &LabelSet::empty()).build(),
99+
LabelSet::empty(),
100+
1.0,
101+
),
102+
// Metric with one sample with a label set
103+
(
104+
MetricBuilder::default()
105+
.with_sample(1.into(), &[("l1", "l1_value")].into())
106+
.build(),
107+
[("l1", "l1_value")].into(),
108+
1.0,
109+
),
110+
// Metric with two samples, different label sets, average all
111+
(
112+
MetricBuilder::default()
113+
.with_sample(1.into(), &[("l1", "l1_value")].into())
114+
.with_sample(3.into(), &[("l2", "l2_value")].into())
115+
.build(),
116+
LabelSet::empty(),
117+
2.0, // (1 + 3) / 2 = 2.0
118+
),
119+
// Metric with two samples, different label sets, average one
120+
(
121+
MetricBuilder::default()
122+
.with_sample(1.into(), &[("l1", "l1_value")].into())
123+
.with_sample(2.into(), &[("l2", "l2_value")].into())
124+
.build(),
125+
[("l1", "l1_value")].into(),
126+
1.0,
127+
),
128+
// Metric with three samples, same label key, different label values, average by key
129+
(
130+
MetricBuilder::default()
131+
.with_sample(2.into(), &[("l1", "l1_value"), ("la", "la_value")].into())
132+
.with_sample(4.into(), &[("l1", "l1_value"), ("lb", "lb_value")].into())
133+
.with_sample(6.into(), &[("l1", "l1_value"), ("lc", "lc_value")].into())
134+
.build(),
135+
[("l1", "l1_value")].into(),
136+
4.0, // (2 + 4 + 6) / 3 = 4.0
137+
),
138+
// Metric with two samples, different label values, average by subkey
139+
(
140+
MetricBuilder::default()
141+
.with_sample(5.into(), &[("l1", "l1_value"), ("la", "la_value")].into())
142+
.with_sample(7.into(), &[("l1", "l1_value"), ("lb", "lb_value")].into())
143+
.build(),
144+
[("la", "la_value")].into(),
145+
5.0,
146+
),
147+
// Edge: Metric with no samples at all
148+
(MetricBuilder::default().build(), LabelSet::empty(), 0.0),
149+
// Edge: Metric with samples but no matching labels
150+
(
151+
MetricBuilder::default()
152+
.with_sample(5.into(), &[("foo", "bar")].into())
153+
.build(),
154+
[("not", "present")].into(),
155+
0.0,
156+
),
157+
// Edge: Metric with zero value
158+
(
159+
MetricBuilder::default()
160+
.with_sample(0.into(), &[("l3", "l3_value")].into())
161+
.build(),
162+
[("l3", "l3_value")].into(),
163+
0.0,
164+
),
165+
// Edge: Metric with a very large value
166+
(
167+
MetricBuilder::default()
168+
.with_sample((u64::MAX / 2).into(), &[("edge", "large1")].into())
169+
.with_sample((u64::MAX / 2).into(), &[("edge", "large2")].into())
170+
.build(),
171+
LabelSet::empty(),
172+
#[allow(clippy::cast_precision_loss)]
173+
(u64::MAX as f64 / 2.0), // Average of (max/2) and (max/2)
174+
),
175+
]
176+
}
177+
178+
fn gauge_cases() -> Vec<(Metric<Gauge>, LabelSet, f64)> {
179+
// (metric, label set criteria, expected_average_value)
180+
vec![
181+
// Metric with one sample without label set
182+
(
183+
MetricBuilder::default().with_sample(1.0.into(), &LabelSet::empty()).build(),
184+
LabelSet::empty(),
185+
1.0,
186+
),
187+
// Metric with one sample with a label set
188+
(
189+
MetricBuilder::default()
190+
.with_sample(1.0.into(), &[("l1", "l1_value")].into())
191+
.build(),
192+
[("l1", "l1_value")].into(),
193+
1.0,
194+
),
195+
// Metric with two samples, different label sets, average all
196+
(
197+
MetricBuilder::default()
198+
.with_sample(1.0.into(), &[("l1", "l1_value")].into())
199+
.with_sample(3.0.into(), &[("l2", "l2_value")].into())
200+
.build(),
201+
LabelSet::empty(),
202+
2.0, // (1.0 + 3.0) / 2 = 2.0
203+
),
204+
// Metric with two samples, different label sets, average one
205+
(
206+
MetricBuilder::default()
207+
.with_sample(1.0.into(), &[("l1", "l1_value")].into())
208+
.with_sample(2.0.into(), &[("l2", "l2_value")].into())
209+
.build(),
210+
[("l1", "l1_value")].into(),
211+
1.0,
212+
),
213+
// Metric with three samples, same label key, different label values, average by key
214+
(
215+
MetricBuilder::default()
216+
.with_sample(2.0.into(), &[("l1", "l1_value"), ("la", "la_value")].into())
217+
.with_sample(4.0.into(), &[("l1", "l1_value"), ("lb", "lb_value")].into())
218+
.with_sample(6.0.into(), &[("l1", "l1_value"), ("lc", "lc_value")].into())
219+
.build(),
220+
[("l1", "l1_value")].into(),
221+
4.0, // (2.0 + 4.0 + 6.0) / 3 = 4.0
222+
),
223+
// Metric with two samples, different label values, average by subkey
224+
(
225+
MetricBuilder::default()
226+
.with_sample(5.0.into(), &[("l1", "l1_value"), ("la", "la_value")].into())
227+
.with_sample(7.0.into(), &[("l1", "l1_value"), ("lb", "lb_value")].into())
228+
.build(),
229+
[("la", "la_value")].into(),
230+
5.0,
231+
),
232+
// Edge: Metric with no samples at all
233+
(MetricBuilder::default().build(), LabelSet::empty(), 0.0),
234+
// Edge: Metric with samples but no matching labels
235+
(
236+
MetricBuilder::default()
237+
.with_sample(5.0.into(), &[("foo", "bar")].into())
238+
.build(),
239+
[("not", "present")].into(),
240+
0.0,
241+
),
242+
// Edge: Metric with zero value
243+
(
244+
MetricBuilder::default()
245+
.with_sample(0.0.into(), &[("l3", "l3_value")].into())
246+
.build(),
247+
[("l3", "l3_value")].into(),
248+
0.0,
249+
),
250+
// Edge: Metric with negative values
251+
(
252+
MetricBuilder::default()
253+
.with_sample((-2.0).into(), &[("l4", "l4_value")].into())
254+
.with_sample(4.0.into(), &[("l5", "l5_value")].into())
255+
.build(),
256+
LabelSet::empty(),
257+
1.0, // (-2.0 + 4.0) / 2 = 1.0
258+
),
259+
// Edge: Metric with decimal values
260+
(
261+
MetricBuilder::default()
262+
.with_sample(1.5.into(), &[("l6", "l6_value")].into())
263+
.with_sample(2.5.into(), &[("l7", "l7_value")].into())
264+
.build(),
265+
LabelSet::empty(),
266+
2.0, // (1.5 + 2.5) / 2 = 2.0
267+
),
268+
]
269+
}
270+
271+
#[test]
272+
fn test_counter_cases() {
273+
for (idx, (metric, criteria, expected_value)) in counter_cases().iter().enumerate() {
274+
let avg = metric.avg(criteria);
275+
276+
assert!(
277+
(avg - expected_value).abs() <= f64::EPSILON,
278+
"at case {idx}, expected avg to be {expected_value}, got {avg}"
279+
);
280+
}
281+
}
282+
283+
#[test]
284+
fn test_gauge_cases() {
285+
for (idx, (metric, criteria, expected_value)) in gauge_cases().iter().enumerate() {
286+
let avg = metric.avg(criteria);
287+
288+
assert!(
289+
(avg - expected_value).abs() <= f64::EPSILON,
290+
"at case {idx}, expected avg to be {expected_value}, got {avg}"
291+
);
292+
}
293+
}
294+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1+
pub mod avg;
12
pub mod sum;

packages/metrics/src/metric/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,17 @@ impl<T> Metric<T> {
7878
pub fn is_empty(&self) -> bool {
7979
self.sample_collection.is_empty()
8080
}
81+
82+
#[must_use]
83+
pub fn collect_matching_samples(
84+
&self,
85+
label_set_criteria: &LabelSet,
86+
) -> Vec<(&crate::label::LabelSet, &crate::sample::Measurement<T>)> {
87+
self.sample_collection
88+
.iter()
89+
.filter(|(label_set, _measurement)| label_set.matches(label_set_criteria))
90+
.collect()
91+
}
8192
}
8293

8394
impl Metric<Counter> {

0 commit comments

Comments
 (0)