Skip to content

Commit

Permalink
fix(iroh): Poll all AsyncUdpSocket sources fairly (#2996)
Browse files Browse the repository at this point in the history
## Description

The existing AsyncUdpSocket::poll_recv polled the UDP IPv4 socket
first, only if that has no datagrams would it poll UDP IPv6 and only
also that had no datagrams to deliver was the relay socket polled.

This highly favoured IPv4 relay traffic, potentially starving the
relay traffic.

This change makes it prefer a different source on each poll.  Meaning
if multiple sockets have data to deliver a fairer delivery happens.

Closes #2981.

## Breaking Changes

<!-- Optional, if there are any breaking changes document them,
including how to migrate older code. -->

## Notes & open questions

- Using Ordering::Relaxed is probably fine.  I believe normally
  this polling would only happen from a single task (the Quinn
  endpoint driver) and even if it was polled concurrently from
  different threads it would not be that bad and still an improvement
  on the current order.
  
- I'm not particularly fond of the implementation, but the macros is
  the best I could come up with.  Maybe there's something cleverer
  which can be done.  

## Change checklist

- [x] Self-review.
- [x] Documentation updates following the [style
guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text),
if relevant.
- [x] Tests if relevant.
- [x] All breaking changes documented.

---------

Co-authored-by: Philipp Krüger <philipp.krueger1@gmail.com>
  • Loading branch information
flub and matheus23 authored Dec 3, 2024
1 parent 647b2fd commit 26c5248
Showing 1 changed file with 118 additions and 52 deletions.
170 changes: 118 additions & 52 deletions iroh/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::{
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
pin::Pin,
sync::{
atomic::{AtomicBool, AtomicU16, AtomicU64, Ordering},
atomic::{AtomicBool, AtomicU16, AtomicU64, AtomicUsize, Ordering},
Arc, RwLock,
},
task::{Context, Poll, Waker},
Expand Down Expand Up @@ -186,6 +186,8 @@ pub(crate) struct MagicSock {
/// Stores wakers, to be called when relay_recv_ch receives new data.
network_recv_wakers: parking_lot::Mutex<Option<Waker>>,
network_send_wakers: Arc<parking_lot::Mutex<Option<Waker>>>,
/// Counter for ordering of [`MagicSock::poll_recv`] polling order.
poll_recv_counter: AtomicUsize,

/// The DNS resolver to be used in this magicsock.
dns_resolver: DnsResolver,
Expand Down Expand Up @@ -650,27 +652,86 @@ impl MagicSock {
bufs: &mut [io::IoSliceMut<'_>],
metas: &mut [quinn_udp::RecvMeta],
) -> Poll<io::Result<usize>> {
// FIXME: currently ipv4 load results in ipv6 traffic being ignored
debug_assert_eq!(bufs.len(), metas.len(), "non matching bufs & metas");
if self.is_closed() {
return Poll::Pending;
}

// order of polling is: UDPv4, UDPv6, relay
let (msgs, from_ipv4) = match self.pconn4.poll_recv(cx, bufs, metas)? {
Poll::Pending | Poll::Ready(0) => match &self.pconn6 {
Some(conn) => match conn.poll_recv(cx, bufs, metas)? {
Poll::Pending | Poll::Ready(0) => {
return self.poll_recv_relay(cx, bufs, metas);
// Three macros to help polling: they return if they get a result, execution
// continues if they were Pending and we need to poll others (or finally return
// Pending).
macro_rules! poll_ipv4 {
() => {
match self.pconn4.poll_recv(cx, bufs, metas)? {
Poll::Pending | Poll::Ready(0) => {}
Poll::Ready(n) => {
self.process_udp_datagrams(true, &mut bufs[..n], &mut metas[..n]);
return Poll::Ready(Ok(n));
}
Poll::Ready(n) => (n, false),
},
None => {
return self.poll_recv_relay(cx, bufs, metas);
}
},
Poll::Ready(n) => (n, true),
};
};
}
macro_rules! poll_ipv6 {
() => {
if let Some(ref pconn) = self.pconn6 {
match pconn.poll_recv(cx, bufs, metas)? {
Poll::Pending | Poll::Ready(0) => {}
Poll::Ready(n) => {
self.process_udp_datagrams(false, &mut bufs[..n], &mut metas[..n]);
return Poll::Ready(Ok(n));
}
}
}
};
}
macro_rules! poll_relay {
() => {
match self.poll_recv_relay(cx, bufs, metas) {
Poll::Pending => {}
Poll::Ready(n) => return Poll::Ready(n),
}
};
}

let counter = self.poll_recv_counter.fetch_add(1, Ordering::Relaxed);
match counter % 3 {
0 => {
// order of polling: UDPv4, UDPv6, relay
poll_ipv4!();
poll_ipv6!();
poll_relay!();
Poll::Pending
}
1 => {
// order of polling: UDPv6, relay, UDPv4
poll_ipv6!();
poll_relay!();
poll_ipv4!();
Poll::Pending
}
_ => {
// order of polling: relay, UDPv4, UDPv6
poll_relay!();
poll_ipv4!();
poll_ipv6!();
Poll::Pending
}
}
}

/// Process datagrams received from UDP sockets.
///
/// All the `bufs` and `metas` should have initialized packets in them.
///
/// This fixes up the datagrams to use the correct [`QuicMappedAddr`] and extracts DISCO
/// packets, processing them inside the magic socket.
fn process_udp_datagrams(
&self,
from_ipv4: bool,
bufs: &mut [io::IoSliceMut<'_>],
metas: &mut [quinn_udp::RecvMeta],
) {
debug_assert_eq!(bufs.len(), metas.len(), "non matching bufs & metas");

// Adding the IP address we received something on results in Quinn using this
// address on the send path to send from. However we let Quinn use a
Expand All @@ -692,77 +753,83 @@ impl MagicSock {

let mut quic_packets_total = 0;

for (meta, buf) in metas.iter_mut().zip(bufs.iter_mut()).take(msgs) {
let mut is_quic = false;
let mut quic_packets_count = 0;
for (meta, buf) in metas.iter_mut().zip(bufs.iter_mut()) {
let mut buf_contains_quic_datagrams = false;
let mut quic_datagram_count = 0;
if meta.len > meta.stride {
trace!(%meta.len, %meta.stride, "GRO datagram received");
inc!(MagicsockMetrics, recv_gro_datagrams);
}

// find disco and stun packets and forward them to the actor
for packet in buf[..meta.len].chunks_mut(meta.stride) {
if packet.len() < meta.stride {
// Chunk through the datagrams in this GRO payload to find disco and stun
// packets and forward them to the actor
for datagram in buf[..meta.len].chunks_mut(meta.stride) {
if datagram.len() < meta.stride {
trace!(
len = %packet.len(),
len = %datagram.len(),
%meta.stride,
"Last GRO datagram smaller than stride",
);
}

let packet_is_quic = if stun::is(packet) {
// Detect DISCO and STUN datagrams and process them. Overwrite the first
// byte of those packets with zero to make Quinn ignore the packet. This
// relies on quinn::EndpointConfig::grease_quic_bit being set to `false`,
// which we do in Endpoint::bind.
if stun::is(datagram) {
trace!(src = %meta.addr, len = %meta.stride, "UDP recv: stun packet");
let packet2 = Bytes::copy_from_slice(packet);
let packet2 = Bytes::copy_from_slice(datagram);
self.net_reporter.receive_stun_packet(packet2, meta.addr);
false
} else if let Some((sender, sealed_box)) = disco::source_and_box(packet) {
// Disco?
datagram[0] = 0u8;
} else if let Some((sender, sealed_box)) = disco::source_and_box(datagram) {
trace!(src = %meta.addr, len = %meta.stride, "UDP recv: disco packet");
self.handle_disco_message(
sender,
sealed_box,
DiscoMessageSource::Udp(meta.addr),
);
false
datagram[0] = 0u8;
} else {
trace!(src = %meta.addr, len = %meta.stride, "UDP recv: quic packet");
if from_ipv4 {
inc_by!(MagicsockMetrics, recv_data_ipv4, packet.len() as _);
inc_by!(MagicsockMetrics, recv_data_ipv4, datagram.len() as _);
} else {
inc_by!(MagicsockMetrics, recv_data_ipv6, packet.len() as _);
inc_by!(MagicsockMetrics, recv_data_ipv6, datagram.len() as _);
}
true
quic_datagram_count += 1;
buf_contains_quic_datagrams = true;
};

if packet_is_quic {
quic_packets_count += 1;
is_quic = true;
} else {
// overwrite the first byte of the packets with zero.
// this makes quinn reliably and quickly ignore the packet as long as
// [`quinn::EndpointConfig::grease_quic_bit`] is set to `false`
// (which we always do in Endpoint::bind).
packet[0] = 0u8;
}
}

if is_quic {
// remap addr
if buf_contains_quic_datagrams {
// Update the NodeMap and remap RecvMeta to the QuicMappedAddr.
match self.node_map.receive_udp(meta.addr) {
None => {
warn!(src = ?meta.addr, count = %quic_packets_count, len = meta.len, "UDP recv quic packets: no node state found, skipping");
// if we have no node state for the from addr, set len to 0 to make quinn skip the buf completely.
warn!(
src = ?meta.addr,
count = %quic_datagram_count,
len = meta.len,
"UDP recv quic packets: no node state found, skipping",
);
// If we have no node state for the from addr, set len to 0 to make
// quinn skip the buf completely.
meta.len = 0;
}
Some((node_id, quic_mapped_addr)) => {
trace!(src = ?meta.addr, node = %node_id.fmt_short(), count = %quic_packets_count, len = meta.len, "UDP recv quic packets");
quic_packets_total += quic_packets_count;
trace!(
src = ?meta.addr,
node = %node_id.fmt_short(),
count = %quic_datagram_count,
len = meta.len,
"UDP recv quic packets",
);
quic_packets_total += quic_datagram_count;
meta.addr = quic_mapped_addr.0;
}
}
} else {
// if there is no non-stun,non-disco packet in the chunk, set len to zero to make
// quinn skip the buf completely.
// If all datagrams in this buf are DISCO or STUN, set len to zero to make
// Quinn skip the buf completely.
meta.len = 0;
}
// Normalize local_ip
Expand All @@ -773,8 +840,6 @@ impl MagicSock {
inc_by!(MagicsockMetrics, recv_datagrams, quic_packets_total as _);
trace!("UDP recv: {} packets", quic_packets_total);
}

Poll::Ready(Ok(msgs))
}

#[instrument(skip_all)]
Expand Down Expand Up @@ -1398,6 +1463,7 @@ impl Handle {
relay_recv_receiver: parking_lot::Mutex::new(relay_recv_receiver),
network_recv_wakers: parking_lot::Mutex::new(None),
network_send_wakers: Arc::new(parking_lot::Mutex::new(None)),
poll_recv_counter: AtomicUsize::new(0),
actor_sender: actor_sender.clone(),
ipv6_reported: Arc::new(AtomicBool::new(false)),
relay_map,
Expand Down

0 comments on commit 26c5248

Please sign in to comment.