Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for sendmmsg/recvmmsg #494

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
208 changes: 208 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -727,3 +727,211 @@ impl<'name, 'bufs, 'control> fmt::Debug for MsgHdrMut<'name, 'bufs, 'control> {
"MsgHdrMut".fmt(fmt)
}
}

/// Configuration of a `sendmmsg(2)` system call.
///
/// This wraps `mmsghdr` on Unix. Also see [`MmsgHdrMut`] for the variant used
/// by `recvmmsg(2)`.
#[cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
))]
pub struct MmsgHdr<'addr, 'bufs, 'control> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about matching mmsghdr here? For example

pub struct MmsgHdr<'addr, 'bufs, 'control> {
    msg: MsgHdr,
    len: libc::c_uint,
}

This way the caller can decide if they want to use a Vec, a slice an array or something else.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Being able to avoid the allocation which the vec implies in the hot I/O loop when the batch size is const would be nice for sure.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see two options:

  • go with your suggestion, and let callers provide their own slices of MmsgHdr. This works for the "lower level" Socket::sendmmsg function, but not for Socket::send_multiple_to as it needs some memory layout adaptation
  • const generics may be an option? though we may still want to defer to the caller for that

inner: Vec<sys::mmsghdr>,
#[allow(clippy::type_complexity)]
_lifetimes: PhantomData<(&'addr SockAddr, &'bufs IoSlice<'bufs>, &'control [u8])>,
}

#[cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
))]
impl<'addr, 'bufs, 'control> MmsgHdr<'addr, 'bufs, 'control> {
/// Create a new `MmsgHdr` with all empty/zero fields.
#[allow(clippy::new_without_default)]
pub fn new(len: usize) -> MmsgHdr<'addr, 'bufs, 'control> {
// SAFETY: all zero is valid for `mmsghdr`
MmsgHdr {
inner: vec![unsafe { mem::zeroed() }; len],
_lifetimes: PhantomData,
}
}

/// Set the addresses (name) of the message.
///
/// Corresponds to setting `msg_name` and `msg_namelen`.
pub fn with_addrs(mut self, addrs: &'addr [SockAddr]) -> Self {
for (msg, addr) in self.inner.iter_mut().zip(addrs) {
sys::set_msghdr_name(&mut msg.msg_hdr, addr);
}
self
}

/// Set the buffer(s) of the message.
///
/// Corresponds to setting `msg_iov` and `msg_iovlen` on Unix.
pub fn with_buffers(mut self, bufs: &'bufs [IoSlice<'_>]) -> Self {
for (msg, buf) in self.inner.iter_mut().zip(bufs) {
sys::set_msghdr_iov(&mut msg.msg_hdr, buf as *const _ as *mut libc::iovec, 1);
}
self
}

/// Set the control buffer of the messages.
///
/// Corresponds to setting `msg_control` and `msg_controllen` on Unix.
pub fn with_control(mut self, bufs: &'control [&'control [u8]]) -> Self {
for (msg, buf) in self.inner.iter_mut().zip(bufs) {
sys::set_msghdr_control(&mut msg.msg_hdr, buf.as_ptr() as *mut _, buf.len())
}
self
}

/// Set the flags on the messages
///
/// Corresponds to setting `msg_flags` on Unix.
pub fn with_flags(mut self, flags: &[sys::c_int]) -> Self {
for (msg, flags) in self.inner.iter_mut().zip(flags) {
sys::set_msghdr_flags(&mut msg.msg_hdr, *flags);
}
self
}
}

#[cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
))]
impl<'name, 'bufs, 'control> fmt::Debug for MmsgHdr<'name, 'bufs, 'control> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
"MmsgHdr".fmt(fmt)
}
}

/// Configuration of a `recvmmsg(2)` system call
///
/// This wraps `mmsghdr` on Unix. Also see [`MmsgHdr`] for the variant used by
/// `sendmmsg(2)`.
#[cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
))]
pub struct MmsgHdrMut<'addr, 'bufs, 'control> {
inner: Vec<sys::mmsghdr>,
#[allow(clippy::type_complexity)]
_lifetimes: PhantomData<(
&'addr mut SockAddr,
&'bufs mut MaybeUninitSlice<'bufs>,
&'control mut [u8],
)>,
}

#[cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
))]
impl<'addr, 'bufs, 'control> MmsgHdrMut<'addr, 'bufs, 'control> {
/// Create a new `MmsgHdrMut` with all empty/zero fields.
#[allow(clippy::new_without_default)]
pub fn new(len: usize) -> MmsgHdrMut<'addr, 'bufs, 'control> {
// SAFETY: all zero is valid for `msghdr` and `WSAMSG`.
MmsgHdrMut {
inner: vec![unsafe { mem::zeroed() }; len],
_lifetimes: PhantomData,
}
}

/// Set the mutable address (name) of the message.
///
/// Corresponds to setting `msg_name` and `msg_namelen` on Unix and `name`
/// and `namelen` on Windows.
pub fn with_addrs(mut self, addrs: &'addr mut [SockAddr]) -> Self {
for (msg, addr) in self.inner.iter_mut().zip(addrs) {
sys::set_msghdr_name(&mut msg.msg_hdr, addr);
}
self
}

/// Set the mutable buffer(s) of the message.
///
/// Corresponds to setting `msg_iov` and `msg_iovlen` on Unix and `lpBuffers`
/// and `dwBufferCount` on Windows.
pub fn with_buffers(mut self, bufs: &'bufs mut [MaybeUninitSlice<'_>]) -> Self {
for (msg, buf) in self.inner.iter_mut().zip(bufs) {
sys::set_msghdr_iov(&mut msg.msg_hdr, buf as *mut _ as *mut libc::iovec, 1);
}
self
}

/// Set the mutable control buffer of the message.
///
/// Corresponds to setting `msg_control` and `msg_controllen` on Unix and
/// `Control` on Windows.
pub fn with_control(mut self, buf: &'control mut [&'control mut [MaybeUninit<u8>]]) -> Self {
for (msg, buf) in self.inner.iter_mut().zip(buf) {
sys::set_msghdr_control(&mut msg.msg_hdr, buf.as_mut_ptr().cast(), buf.len());
}
self
}

/// Returns the flags of the message.
pub fn flags(&self, n: usize) -> Vec<RecvFlags> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think accessors are also needed for len and control_len fields in order to be able to usefully consume the result of the syscall.

Rather than the 3 vec allocations it might also be nice to have a method to return all 3 at once so we'd at least have only 1, or to avoid it completely with an iterator based API.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

diff --git a/src/lib.rs b/src/lib.rs
index 6c49cd1..51ebcc2 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -923,7 +923,43 @@ impl<'addr, 'bufs, 'control> MmsgHdrMut<'addr, 'bufs, 'control> {
         self
     }
 
-    /// Returns the flags of the message.
+    /// Returns the lengths, flags and control lengths of the messages.
+    pub fn results(&self, n: usize) -> Vec<(usize, RecvFlags, usize)> {
+        self.inner
+            .iter()
+            .take(n)
+            .map(|msg| {
+                (
+                    msg.msg_len as _,
+                    sys::msghdr_flags(&msg.msg_hdr),
+                    sys::msghdr_control_len(&msg.msg_hdr),
+                )
+            })
+            .collect()
+    }
+
+    /// Extends the vec with the length, flags and control lengths of the messages.
+    /// This avoids the need to allocate a new vec on each use which affects `results`.
+    pub fn extend_with_results(&self, v: &mut Vec<(usize, RecvFlags, usize)>, n: usize) {
+        v.extend(self.inner.iter().take(n).map(|msg| {
+            (
+                msg.msg_len as _,
+                sys::msghdr_flags(&msg.msg_hdr),
+                sys::msghdr_control_len(&msg.msg_hdr),
+            )
+        }));
+    }
+
+    /// Returns the lengths of the messages.
+    pub fn lens(&self, n: usize) -> Vec<usize> {
+        self.inner
+            .iter()
+            .take(n)
+            .map(|msg| msg.msg_len as _)
+            .collect()
+    }
+
+    /// Returns the flags of the messages.
     pub fn flags(&self, n: usize) -> Vec<RecvFlags> {
         self.inner
             .iter()
@@ -931,6 +967,15 @@ impl<'addr, 'bufs, 'control> MmsgHdrMut<'addr, 'bufs, 'control> {
             .map(|msg| sys::msghdr_flags(&msg.msg_hdr))
             .collect()
     }
+
+    /// Returns the control lengths of the messages.
+    pub fn control_lens(&self, n: usize) -> Vec<usize> {
+        self.inner
+            .iter()
+            .take(n)
+            .map(|msg| sys::msghdr_control_len(&msg.msg_hdr))
+            .collect()
+    }
 }
 
 #[cfg(all(

Is what I ended up with.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I quite like the idea of an iterator-based API. This lets the user choose if they want a Vec.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not too bad for results, lens and control_lens since they are Copy but for recvmmsg getting back mutable access to the received buffer is a bit of a lifetime quagmire.

You basically setup the largest number of buffers you want to receive at once, which is fine if you use them all, but often you'll receive some smaller number.

The overhead of repopulating from scratch on each iteration is noticeable enough that it leads to populating a smaller number of potential receives than you otherwise would want to.

Ideally you'd want to repopulate the buffers which did get used but reuse the ones which didn't. I've absolutely failed to make that work with the lifetimes involved, since the MmsgHdrs end up holding a mutable reference into whatever owns the destination buffers1, which means one needs to drop the mmsghdr which you might otherwise be able to partially reuse.

AFAICT the same lifetime issue exists whether the collection in MsgHdrMut is internal (as in the Vec here) or external (as in a slice of the MsgHdrMut proposed in #494 (comment)).

Is there some technique where the lifetime of the (used) head can be split from the (unused) tail?

Footnotes

  1. In my case the buffers are owned by an array of BytesMut, but it could be a Vec<Vec<u8>> or whatever.

self.inner
.iter()
.take(n)
.map(|msg| sys::msghdr_flags(&msg.msg_hdr))
.collect()
}
}

#[cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
))]
impl<'name, 'bufs, 'control> fmt::Debug for MmsgHdrMut<'name, 'bufs, 'control> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
"MmsgHdrMut".fmt(fmt)
}
}
148 changes: 148 additions & 0 deletions src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ use crate::MsgHdrMut;
use crate::{Domain, Protocol, SockAddr, TcpKeepalive, Type};
#[cfg(not(target_os = "redox"))]
use crate::{MaybeUninitSlice, MsgHdr, RecvFlags};
#[cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
))]
use crate::{MmsgHdr, MmsgHdrMut};

/// Owned wrapper around a system socket.
///
Expand Down Expand Up @@ -586,6 +598,40 @@ impl Socket {
sys::recv_from_vectored(self.as_raw(), bufs, flags)
}

/// Receive multiple messages in a single call.
#[cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
))]
#[cfg_attr(
docsrs,
doc(cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
)))
)]
pub fn recv_multiple_from(
&self,
msgs: &mut [MaybeUninitSlice<'_>],
flags: c_int,
) -> io::Result<Vec<(usize, RecvFlags, SockAddr)>> {
sys::recv_multiple_from(self.as_raw(), msgs, flags)
}

/// Receives data from the socket, without removing it from the queue.
///
/// Successive calls return the same data. This is accomplished by passing
Expand Down Expand Up @@ -642,6 +688,41 @@ impl Socket {
sys::recvmsg(self.as_raw(), msg, flags)
}

/// Receive multiple messages from a socket using a message structure.
#[doc = man_links!(recvmmsg(2))]
#[cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
))]
#[cfg_attr(
docsrs,
doc(cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
)))
)]
pub fn recvmmsg(
&self,
msgs: &mut MmsgHdrMut<'_, '_, '_>,
flags: sys::c_int,
) -> io::Result<usize> {
sys::recvmmsg(self.as_raw(), msgs, flags)
}

/// Sends data on the socket to a connected peer.
///
/// This is typically used on TCP sockets or datagram sockets which have
Expand Down Expand Up @@ -741,13 +822,80 @@ impl Socket {
sys::send_to_vectored(self.as_raw(), bufs, addr, flags)
}

/// Send multiple data to multiple peers listening on `addrs`. Return the amount of bytes
/// written for each message.
#[cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
))]
#[cfg_attr(
docsrs,
doc(cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
)))
)]
pub fn send_multiple_to(
&self,
msgs: &[IoSlice<'_>],
to: &[SockAddr],
flags: c_int,
) -> io::Result<Vec<usize>> {
sys::send_multiple_to(self.as_raw(), msgs, to, flags)
}

/// Send a message on a socket using a message structure.
#[doc = man_links!(sendmsg(2))]
#[cfg(not(target_os = "redox"))]
#[cfg_attr(docsrs, doc(cfg(not(target_os = "redox"))))]
pub fn sendmsg(&self, msg: &MsgHdr<'_, '_, '_>, flags: sys::c_int) -> io::Result<usize> {
sys::sendmsg(self.as_raw(), msg, flags)
}

/// Send multiple messages on a socket using a multiple message structure.
#[doc = man_links!(sendmmsg(2))]
#[cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
))]
#[cfg_attr(
docsrs,
doc(cfg(all(
unix,
not(any(
target_os = "redox",
target_os = "solaris",
target_os = "hurd",
target_os = "vita",
target_os = "illumos",
target_vendor = "apple"
))
)))
)]
pub fn sendmmsg(&self, msgs: &MmsgHdr<'_, '_, '_>, flags: sys::c_int) -> io::Result<usize> {
sys::sendmmsg(self.as_raw(), msgs, flags)
}
}

/// Set `SOCK_CLOEXEC` and `NO_HANDLE_INHERIT` on the `ty`pe on platforms that
Expand Down
Loading