Skip to content

Commit

Permalink
util: more general interface for UdpFramed
Browse files Browse the repository at this point in the history
Previously `UdpFramed` only accepted smart pointers to `UdpSocket`.
This commit changes it to accept anything that has `.poll_recv_from()`
and `.poll_send_to()`. It allows for easier testing and more
composability.
  • Loading branch information
GoldsteinE committed Sep 1, 2023
1 parent 37bb47c commit c6e7472
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 13 deletions.
21 changes: 8 additions & 13 deletions tokio-util/src/udp/frame.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
use crate::codec::{Decoder, Encoder};
use crate::udp::{PollRecvFrom, PollSendTo};

use futures_core::Stream;
use tokio::{io::ReadBuf, net::UdpSocket};

use bytes::{BufMut, BytesMut};
use futures_core::ready;
use futures_sink::Sink;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{
borrow::Borrow,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
};
use std::{io, mem::MaybeUninit};

/// A unified [`Stream`] and [`Sink`] interface to an underlying `UdpSocket`, using
Expand Down Expand Up @@ -54,7 +52,7 @@ impl<C, T> Unpin for UdpFramed<C, T> {}

impl<C, T> Stream for UdpFramed<C, T>
where
T: Borrow<UdpSocket>,
T: PollRecvFrom,
C: Decoder,
{
type Item = Result<(C::Item, SocketAddr), C::Error>;
Expand Down Expand Up @@ -87,7 +85,7 @@ where
let buf = unsafe { &mut *(pin.rd.chunk_mut() as *mut _ as *mut [MaybeUninit<u8>]) };
let mut read = ReadBuf::uninit(buf);
let ptr = read.filled().as_ptr();
let res = ready!(pin.socket.borrow().poll_recv_from(cx, &mut read));
let res = ready!(pin.socket.poll_recv_from(cx, &mut read));

assert_eq!(ptr, read.filled().as_ptr());
let addr = res?;
Expand All @@ -107,7 +105,7 @@ where

impl<I, C, T> Sink<(I, SocketAddr)> for UdpFramed<C, T>
where
T: Borrow<UdpSocket>,
T: PollSendTo,
C: Encoder<I>,
{
type Error = C::Error;
Expand Down Expand Up @@ -141,13 +139,13 @@ where
}

let Self {
ref socket,
ref mut socket,
ref mut out_addr,
ref mut wr,
..
} = *self;

let n = ready!(socket.borrow().poll_send_to(cx, wr, *out_addr))?;
let n = ready!(socket.poll_send_to(cx, wr, *out_addr))?;

let wrote_all = n == self.wr.len();
self.wr.clear();
Expand All @@ -172,10 +170,7 @@ where
}
}

impl<C, T> UdpFramed<C, T>
where
T: Borrow<UdpSocket>,
{
impl<C, T> UdpFramed<C, T> {
/// Create a new `UdpFramed` backed by the given socket and codec.
///
/// See struct level documentation for more details.
Expand Down
58 changes: 58 additions & 0 deletions tokio-util/src/udp/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,62 @@
//! UDP framing

use std::{
borrow::Borrow,
io,
net::SocketAddr,
task::{Context, Poll},
};
use tokio::{io::ReadBuf, net::UdpSocket};

mod frame;
pub use frame::UdpFramed;

/// Types that support receiving datagrams.
///
/// This trait is implemented for any types that implement [`Borrow`]<[`UdpSocket`]>.
pub trait PollRecvFrom {
/// Attempts to receive a single datagram on the socket.
///
/// See [`UdpSocket::poll_recv_from()`] for more information.
fn poll_recv_from(
&mut self,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<SocketAddr>>;
}

impl<T: Borrow<UdpSocket>> PollRecvFrom for T {
fn poll_recv_from(
&mut self,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<SocketAddr>> {
(*self).borrow().poll_recv_from(cx, buf)
}
}

/// Types that support sending datagrams to [`SocketAddr`]s.
///
/// This trait is implemented for any types that implement [`Borrow`]<[`UdpSocket`]>.
pub trait PollSendTo {
/// Attempts to send data on the socket to a given address.
///
/// See [`UdpSocket::poll_send_to()`] for more information.
fn poll_send_to(
&mut self,
cx: &mut Context<'_>,
buf: &[u8],
target: SocketAddr,
) -> Poll<io::Result<usize>>;
}

impl<T: Borrow<UdpSocket>> PollSendTo for T {
fn poll_send_to(
&mut self,
cx: &mut Context<'_>,
buf: &[u8],
target: SocketAddr,
) -> Poll<io::Result<usize>> {
(*self).borrow().poll_send_to(cx, buf, target)
}
}

0 comments on commit c6e7472

Please sign in to comment.