diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index e414ed6ace..07c2d5e9d2 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -22,7 +22,7 @@ use std::{ net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, pin::Pin, sync::{ - atomic::{AtomicBool, AtomicU16, AtomicU64, Ordering}, + atomic::{AtomicBool, AtomicU16, AtomicU64, AtomicUsize, Ordering}, Arc, RwLock, }, task::{Context, Poll, Waker}, @@ -186,6 +186,8 @@ pub(crate) struct MagicSock { /// Stores wakers, to be called when relay_recv_ch receives new data. network_recv_wakers: parking_lot::Mutex>, network_send_wakers: Arc>>, + /// Counter for ordering of [`MagicSock::poll_recv`] polling order. + poll_recv_counter: AtomicUsize, /// The DNS resolver to be used in this magicsock. dns_resolver: DnsResolver, @@ -650,27 +652,86 @@ impl MagicSock { bufs: &mut [io::IoSliceMut<'_>], metas: &mut [quinn_udp::RecvMeta], ) -> Poll> { - // FIXME: currently ipv4 load results in ipv6 traffic being ignored debug_assert_eq!(bufs.len(), metas.len(), "non matching bufs & metas"); if self.is_closed() { return Poll::Pending; } - // order of polling is: UDPv4, UDPv6, relay - let (msgs, from_ipv4) = match self.pconn4.poll_recv(cx, bufs, metas)? { - Poll::Pending | Poll::Ready(0) => match &self.pconn6 { - Some(conn) => match conn.poll_recv(cx, bufs, metas)? { - Poll::Pending | Poll::Ready(0) => { - return self.poll_recv_relay(cx, bufs, metas); + // Three macros to help polling: they return if they get a result, execution + // continues if they were Pending and we need to poll others (or finally return + // Pending). + macro_rules! poll_ipv4 { + () => { + match self.pconn4.poll_recv(cx, bufs, metas)? { + Poll::Pending | Poll::Ready(0) => {} + Poll::Ready(n) => { + self.process_udp_datagrams(true, &mut bufs[..n], &mut metas[..n]); + return Poll::Ready(Ok(n)); } - Poll::Ready(n) => (n, false), - }, - None => { - return self.poll_recv_relay(cx, bufs, metas); } - }, - Poll::Ready(n) => (n, true), - }; + }; + } + macro_rules! poll_ipv6 { + () => { + if let Some(ref pconn) = self.pconn6 { + match pconn.poll_recv(cx, bufs, metas)? { + Poll::Pending | Poll::Ready(0) => {} + Poll::Ready(n) => { + self.process_udp_datagrams(false, &mut bufs[..n], &mut metas[..n]); + return Poll::Ready(Ok(n)); + } + } + } + }; + } + macro_rules! poll_relay { + () => { + match self.poll_recv_relay(cx, bufs, metas) { + Poll::Pending => {} + Poll::Ready(n) => return Poll::Ready(n), + } + }; + } + + let counter = self.poll_recv_counter.fetch_add(1, Ordering::Relaxed); + match counter % 3 { + 0 => { + // order of polling: UDPv4, UDPv6, relay + poll_ipv4!(); + poll_ipv6!(); + poll_relay!(); + Poll::Pending + } + 1 => { + // order of polling: UDPv6, relay, UDPv4 + poll_ipv6!(); + poll_relay!(); + poll_ipv4!(); + Poll::Pending + } + _ => { + // order of polling: relay, UDPv4, UDPv6 + poll_relay!(); + poll_ipv4!(); + poll_ipv6!(); + Poll::Pending + } + } + } + + /// Process datagrams received from UDP sockets. + /// + /// All the `bufs` and `metas` should have initialized packets in them. + /// + /// This fixes up the datagrams to use the correct [`QuicMappedAddr`] and extracts DISCO + /// packets, processing them inside the magic socket. + fn process_udp_datagrams( + &self, + from_ipv4: bool, + bufs: &mut [io::IoSliceMut<'_>], + metas: &mut [quinn_udp::RecvMeta], + ) { + debug_assert_eq!(bufs.len(), metas.len(), "non matching bufs & metas"); // Adding the IP address we received something on results in Quinn using this // address on the send path to send from. However we let Quinn use a @@ -692,77 +753,83 @@ impl MagicSock { let mut quic_packets_total = 0; - for (meta, buf) in metas.iter_mut().zip(bufs.iter_mut()).take(msgs) { - let mut is_quic = false; - let mut quic_packets_count = 0; + for (meta, buf) in metas.iter_mut().zip(bufs.iter_mut()) { + let mut buf_contains_quic_datagrams = false; + let mut quic_datagram_count = 0; if meta.len > meta.stride { trace!(%meta.len, %meta.stride, "GRO datagram received"); inc!(MagicsockMetrics, recv_gro_datagrams); } - // find disco and stun packets and forward them to the actor - for packet in buf[..meta.len].chunks_mut(meta.stride) { - if packet.len() < meta.stride { + // Chunk through the datagrams in this GRO payload to find disco and stun + // packets and forward them to the actor + for datagram in buf[..meta.len].chunks_mut(meta.stride) { + if datagram.len() < meta.stride { trace!( - len = %packet.len(), + len = %datagram.len(), %meta.stride, "Last GRO datagram smaller than stride", ); } - let packet_is_quic = if stun::is(packet) { + // Detect DISCO and STUN datagrams and process them. Overwrite the first + // byte of those packets with zero to make Quinn ignore the packet. This + // relies on quinn::EndpointConfig::grease_quic_bit being set to `false`, + // which we do in Endpoint::bind. + if stun::is(datagram) { trace!(src = %meta.addr, len = %meta.stride, "UDP recv: stun packet"); - let packet2 = Bytes::copy_from_slice(packet); + let packet2 = Bytes::copy_from_slice(datagram); self.net_reporter.receive_stun_packet(packet2, meta.addr); - false - } else if let Some((sender, sealed_box)) = disco::source_and_box(packet) { - // Disco? + datagram[0] = 0u8; + } else if let Some((sender, sealed_box)) = disco::source_and_box(datagram) { trace!(src = %meta.addr, len = %meta.stride, "UDP recv: disco packet"); self.handle_disco_message( sender, sealed_box, DiscoMessageSource::Udp(meta.addr), ); - false + datagram[0] = 0u8; } else { trace!(src = %meta.addr, len = %meta.stride, "UDP recv: quic packet"); if from_ipv4 { - inc_by!(MagicsockMetrics, recv_data_ipv4, packet.len() as _); + inc_by!(MagicsockMetrics, recv_data_ipv4, datagram.len() as _); } else { - inc_by!(MagicsockMetrics, recv_data_ipv6, packet.len() as _); + inc_by!(MagicsockMetrics, recv_data_ipv6, datagram.len() as _); } - true + quic_datagram_count += 1; + buf_contains_quic_datagrams = true; }; - - if packet_is_quic { - quic_packets_count += 1; - is_quic = true; - } else { - // overwrite the first byte of the packets with zero. - // this makes quinn reliably and quickly ignore the packet as long as - // [`quinn::EndpointConfig::grease_quic_bit`] is set to `false` - // (which we always do in Endpoint::bind). - packet[0] = 0u8; - } } - if is_quic { - // remap addr + if buf_contains_quic_datagrams { + // Update the NodeMap and remap RecvMeta to the QuicMappedAddr. match self.node_map.receive_udp(meta.addr) { None => { - warn!(src = ?meta.addr, count = %quic_packets_count, len = meta.len, "UDP recv quic packets: no node state found, skipping"); - // if we have no node state for the from addr, set len to 0 to make quinn skip the buf completely. + warn!( + src = ?meta.addr, + count = %quic_datagram_count, + len = meta.len, + "UDP recv quic packets: no node state found, skipping", + ); + // If we have no node state for the from addr, set len to 0 to make + // quinn skip the buf completely. meta.len = 0; } Some((node_id, quic_mapped_addr)) => { - trace!(src = ?meta.addr, node = %node_id.fmt_short(), count = %quic_packets_count, len = meta.len, "UDP recv quic packets"); - quic_packets_total += quic_packets_count; + trace!( + src = ?meta.addr, + node = %node_id.fmt_short(), + count = %quic_datagram_count, + len = meta.len, + "UDP recv quic packets", + ); + quic_packets_total += quic_datagram_count; meta.addr = quic_mapped_addr.0; } } } else { - // if there is no non-stun,non-disco packet in the chunk, set len to zero to make - // quinn skip the buf completely. + // If all datagrams in this buf are DISCO or STUN, set len to zero to make + // Quinn skip the buf completely. meta.len = 0; } // Normalize local_ip @@ -773,8 +840,6 @@ impl MagicSock { inc_by!(MagicsockMetrics, recv_datagrams, quic_packets_total as _); trace!("UDP recv: {} packets", quic_packets_total); } - - Poll::Ready(Ok(msgs)) } #[instrument(skip_all)] @@ -1398,6 +1463,7 @@ impl Handle { relay_recv_receiver: parking_lot::Mutex::new(relay_recv_receiver), network_recv_wakers: parking_lot::Mutex::new(None), network_send_wakers: Arc::new(parking_lot::Mutex::new(None)), + poll_recv_counter: AtomicUsize::new(0), actor_sender: actor_sender.clone(), ipv6_reported: Arc::new(AtomicBool::new(false)), relay_map,