Skip to content

Commit 35b6c84

Browse files
committed
refactor: simplify UDP server receiver
It only gets new UDP requests, whitout spwaning tasks to handle them.
1 parent a5e2baf commit 35b6c84

File tree

1 file changed

+11
-16
lines changed

1 file changed

+11
-16
lines changed

src/servers/udp/server.rs

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -290,22 +290,20 @@ impl Debug for BoundSocket {
290290

291291
struct Receiver {
292292
bound_socket: Arc<BoundSocket>,
293-
tracker: Arc<Tracker>,
294293
data: RefCell<[u8; MAX_PACKET_SIZE]>,
295294
}
296295

297296
impl Receiver {
298-
pub fn new(bound_socket: Arc<BoundSocket>, tracker: Arc<Tracker>) -> Self {
297+
pub fn new(bound_socket: Arc<BoundSocket>) -> Self {
299298
Receiver {
300299
bound_socket,
301-
tracker,
302300
data: RefCell::new([0; MAX_PACKET_SIZE]),
303301
}
304302
}
305303
}
306304

307305
impl Stream for Receiver {
308-
type Item = std::io::Result<AbortHandle>;
306+
type Item = std::io::Result<UdpRequest>;
309307

310308
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
311309
let mut buf = *self.data.borrow_mut();
@@ -319,13 +317,7 @@ impl Stream for Receiver {
319317
Ok(from) => {
320318
let payload = buf.filled().to_vec();
321319
let request = UdpRequest { payload, from };
322-
323-
Some(Ok(tokio::task::spawn(Udp::process_request(
324-
request,
325-
self.tracker.clone(),
326-
self.bound_socket.clone(),
327-
))
328-
.abort_handle()))
320+
Some(Ok(request))
329321
}
330322
Err(err) => Some(Err(err)),
331323
};
@@ -375,15 +367,15 @@ impl Udp {
375367

376368
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "{STARTED_ON}: {local_udp_url}");
377369

378-
let receiver = Receiver::new(bound_socket.into(), tracker);
370+
let receiver = Receiver::new(bound_socket.into());
379371

380372
tracing::trace!(target: UDP_TRACKER_LOG_TARGET, local_udp_url, "Udp::run_with_graceful_shutdown (spawning main loop)");
381373

382374
let running = {
383375
let local_addr = local_udp_url.clone();
384376
tokio::task::spawn(async move {
385377
tracing::debug!(target: UDP_TRACKER_LOG_TARGET, local_addr, "Udp::run_with_graceful_shutdown::task (listening...)");
386-
let () = Self::run_udp_server_main(receiver).await;
378+
let () = Self::run_udp_server_main(receiver, tracker.clone()).await;
387379
})
388380
};
389381

@@ -404,7 +396,7 @@ impl Udp {
404396
tokio::task::yield_now().await; // lets allow the other threads to complete.
405397
}
406398

407-
async fn run_udp_server_main(mut receiver: Receiver) {
399+
async fn run_udp_server_main(mut receiver: Receiver, tracker: Arc<Tracker>) {
408400
let reqs = &mut ActiveRequests::default();
409401

410402
let addr = receiver.bound_socket.local_addr();
@@ -429,12 +421,15 @@ impl Udp {
429421
}
430422
};
431423

432-
if req.is_finished() {
424+
let abort_handle =
425+
tokio::task::spawn(Udp::process_request(req, tracker.clone(), receiver.bound_socket.clone())).abort_handle();
426+
427+
if abort_handle.is_finished() {
433428
continue;
434429
}
435430

436431
// fill buffer with requests
437-
let Err(req) = reqs.rb.try_push(req) else {
432+
let Err(req) = reqs.rb.try_push(abort_handle) else {
438433
continue;
439434
};
440435

0 commit comments

Comments
 (0)