Skip to content

Commit 59fbb39

Browse files
committed
refactor: [#1598] make recalculate udp avg announce processing time metric and update atomic
It also fixes a division by zero bug when the metrics is updated before the counter for number of conenctions has been increased. It only avoid the division by zero. I will propoerly fixed with independent request counter for the moving average calculation.
1 parent d50948e commit 59fbb39

File tree

3 files changed

+54
-46
lines changed

3 files changed

+54
-46
lines changed

packages/udp-tracker-server/src/statistics/event/handler/response_sent.rs

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,29 +26,13 @@ pub async fn handle_event(
2626
(LabelValue::new("ok"), UdpRequestKind::Connect.into())
2727
}
2828
UdpRequestKind::Announce { announce_request } => {
29-
let new_avg = stats_repository
30-
.recalculate_udp_avg_announce_processing_time_ns(req_processing_time)
31-
.await;
32-
33-
tracing::debug!(
34-
"Updating average processing time metric for announce requests: {} ns",
35-
new_avg
36-
);
37-
3829
let mut label_set = LabelSet::from(context.clone());
3930
label_set.upsert(label_name!("request_kind"), LabelValue::new(&req_kind.to_string()));
40-
match stats_repository
41-
.set_gauge(
42-
&metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS),
43-
&label_set,
44-
new_avg,
45-
now,
46-
)
47-
.await
48-
{
49-
Ok(()) => {}
50-
Err(err) => tracing::error!("Failed to set gauge: {}", err),
51-
}
31+
32+
let _new_avg = stats_repository
33+
.recalculate_udp_avg_announce_processing_time_ns(req_processing_time, &label_set, now)
34+
.await;
35+
5236
(LabelValue::new("ok"), UdpRequestKind::Announce { announce_request }.into())
5337
}
5438
UdpRequestKind::Scrape => {

packages/udp-tracker-server/src/statistics/metrics.rs

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -77,35 +77,30 @@ impl Metrics {
7777
udp_connections_handled
7878
);
7979

80-
self.update_udp_avg_connect_processing_time_ns(new_avg, label_set, now);
80+
self.update_udp_avg_processing_time_ns(new_avg, label_set, now);
8181

8282
new_avg
8383
}
8484

85-
fn update_udp_avg_connect_processing_time_ns(&mut self, new_avg: f64, label_set: &LabelSet, now: DurationSinceUnixEpoch) {
86-
tracing::debug!("Updating average processing time metric for connect requests: {} ns", new_avg);
87-
88-
match self.set_gauge(
89-
&metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS),
90-
label_set,
91-
new_avg,
92-
now,
93-
) {
94-
Ok(()) => {}
95-
Err(err) => tracing::error!("Failed to set gauge: {}", err),
96-
}
97-
}
98-
9985
#[allow(clippy::cast_precision_loss)]
100-
pub fn recalculate_udp_avg_announce_processing_time_ns(&self, req_processing_time: Duration) -> f64 {
86+
pub fn recalculate_udp_avg_announce_processing_time_ns(
87+
&mut self,
88+
req_processing_time: Duration,
89+
label_set: &LabelSet,
90+
now: DurationSinceUnixEpoch,
91+
) -> f64 {
10192
let req_processing_time = req_processing_time.as_nanos() as f64;
10293

10394
let udp_announces_handled = (self.udp4_announces_handled() + self.udp6_announces_handled()) as f64;
10495

10596
let previous_avg = self.udp_avg_announce_processing_time_ns();
10697

107-
// Moving average: https://en.wikipedia.org/wiki/Moving_average
108-
let new_avg = previous_avg as f64 + (req_processing_time - previous_avg as f64) / udp_announces_handled;
98+
let new_avg = if udp_announces_handled == 0.0 {
99+
req_processing_time
100+
} else {
101+
// Moving average: https://en.wikipedia.org/wiki/Moving_average
102+
previous_avg as f64 + (req_processing_time - previous_avg as f64) / udp_announces_handled
103+
};
109104

110105
tracing::debug!(
111106
"Recalculated UDP average announce processing time: {} ns (previous: {} ns, req_processing_time: {} ns, udp_announces_handled: {})",
@@ -115,9 +110,29 @@ impl Metrics {
115110
udp_announces_handled
116111
);
117112

113+
self.update_udp_avg_processing_time_ns(new_avg, label_set, now);
114+
118115
new_avg
119116
}
120117

118+
fn update_udp_avg_processing_time_ns(&mut self, new_avg: f64, label_set: &LabelSet, now: DurationSinceUnixEpoch) {
119+
tracing::debug!(
120+
"Updating average processing time metric to {} ns for label set {}",
121+
new_avg,
122+
label_set,
123+
);
124+
125+
match self.set_gauge(
126+
&metric_name!(UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS),
127+
label_set,
128+
new_avg,
129+
now,
130+
) {
131+
Ok(()) => {}
132+
Err(err) => tracing::error!("Failed to set gauge: {}", err),
133+
}
134+
}
135+
121136
#[allow(clippy::cast_precision_loss)]
122137
pub fn recalculate_udp_avg_scrape_processing_time_ns(&self, req_processing_time: Duration) -> f64 {
123138
let req_processing_time = req_processing_time.as_nanos() as f64;

packages/udp-tracker-server/src/statistics/repository.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,15 @@ impl Repository {
8888
new_avg
8989
}
9090

91-
pub async fn recalculate_udp_avg_announce_processing_time_ns(&self, req_processing_time: Duration) -> f64 {
92-
let stats_lock = self.stats.write().await;
91+
pub async fn recalculate_udp_avg_announce_processing_time_ns(
92+
&self,
93+
req_processing_time: Duration,
94+
label_set: &LabelSet,
95+
now: DurationSinceUnixEpoch,
96+
) -> f64 {
97+
let mut stats_lock = self.stats.write().await;
9398

94-
let new_avg = stats_lock.recalculate_udp_avg_announce_processing_time_ns(req_processing_time);
99+
let new_avg = stats_lock.recalculate_udp_avg_announce_processing_time_ns(req_processing_time, label_set, now);
95100

96101
drop(stats_lock);
97102

@@ -390,7 +395,9 @@ mod tests {
390395

391396
// Calculate new average with processing time of 1500ns
392397
let processing_time = Duration::from_nanos(1500);
393-
let new_avg = repo.recalculate_udp_avg_announce_processing_time_ns(processing_time).await;
398+
let new_avg = repo
399+
.recalculate_udp_avg_announce_processing_time_ns(processing_time, &announce_labels, now)
400+
.await;
394401

395402
// Moving average: previous_avg + (new_value - previous_avg) / total_announces
396403
// 500 + (1500 - 500) / 5 = 500 + 200 = 700
@@ -453,16 +460,18 @@ mod tests {
453460
.recalculate_udp_avg_connect_processing_time_ns(processing_time, &connect_labels, now)
454461
.await;
455462

456-
let _announce_labels = LabelSet::from([("request_kind", "announce")]);
457-
let announce_avg = repo.recalculate_udp_avg_announce_processing_time_ns(processing_time).await;
463+
let announce_labels = LabelSet::from([("request_kind", "announce")]);
464+
let announce_avg = repo
465+
.recalculate_udp_avg_announce_processing_time_ns(processing_time, &announce_labels, now)
466+
.await;
458467

459468
let _scrape_labels = LabelSet::from([("request_kind", "scrape")]);
460469
let scrape_avg = repo.recalculate_udp_avg_scrape_processing_time_ns(processing_time).await;
461470

462471
// With 0 total connections, the formula becomes 0 + (1000 - 0) / 0
463472
// This should handle the division by zero case gracefully
464473
assert!((connect_avg - 1000.0).abs() < f64::EPSILON);
465-
assert!(announce_avg.is_infinite() || announce_avg.is_nan());
474+
assert!((announce_avg - 1000.0).abs() < f64::EPSILON);
466475
assert!(scrape_avg.is_infinite() || scrape_avg.is_nan());
467476
}
468477

0 commit comments

Comments
 (0)