From 5a43208fb8e9e8e5f66e6351656967b7becbe773 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Mon, 22 Apr 2024 23:46:48 +1000 Subject: [PATCH] Add back `preadv2` optimization for `try_acquire` on Linux Signed-off-by: Jiahao XU --- src/unix.rs | 103 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 103 insertions(+) diff --git a/src/unix.rs b/src/unix.rs index 8f3905e..c275a35 100644 --- a/src/unix.rs +++ b/src/unix.rs @@ -285,6 +285,33 @@ impl Client { pub fn try_acquire(&self) -> io::Result> { let mut buf = [0]; + // On Linux, we can use preadv2 to do non-blocking read, + // even if `O_NONBLOCK` is not set. + // + // TODO: musl libc supports preadv2 since 1.2.5, but `libc` crate + // hasn't yet added it. + #[cfg(target_os = "linux")] + { + let read = self.read().as_raw_fd(); + loop { + match linux::non_blocking_read(read, &mut buf) { + Ok(1) => return Ok(Some(Acquired { byte: buf[0] })), + Ok(_) => { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "early EOF on jobserver pipe", + )) + } + + Err(e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(None), + Err(e) if e.kind() == io::ErrorKind::Interrupted => continue, + Err(e) if e.kind() == io::ErrorKind::Unsupported => break, + + Err(err) => return Err(err), + } + } + } + let (mut fifo, is_non_blocking) = match self { Self::Fifo { file, @@ -368,6 +395,82 @@ impl Client { } } +// This should be available for all linux targets, +// though only [`non_blocking_read`] currently uses it so adding gnu cfg. +#[cfg(target_os = "linux")] +mod linux { + use super::*; + + use libc::{iovec, off_t, ssize_t, syscall, SYS_preadv2}; + + fn cvt_ssize(t: ssize_t) -> io::Result { + if t == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(t) + } + } + + fn preadv2( + fd: c_int, + iov: *const iovec, + iovcnt: c_int, + offset: off_t, + flags: c_int, + ) -> ssize_t { + #[cfg(all(target_arch = "x86_64", target_pointer_width = "64"))] + let res = syscall(SYS_preadv2, fd, iov, iovcnt, offset, 0, flags); + + #[cfg(all(target_arch = "x86_64", target_pointer_width = "32"))] + let res = syscall(SYS_preadv2, fd, iov, iovcnt, offset, 0, flags); + + #[cfg(not(target_arch = "x86_64"))] + let res = syscall( + SYS_preadv2, + fd, + iov, + iovcnt, + offset as libc::c_long, + ((offset as u64) >> 32) as libc::c_long, + flags, + ); + + res.try_into().unwrap() + } + + pub fn non_blocking_read(fd: c_int, buf: &mut [u8]) -> io::Result { + static IS_NONBLOCKING_READ_UNSUPPORTED: AtomicBool = AtomicBool::new(false); + + if IS_NONBLOCKING_READ_UNSUPPORTED.load(Ordering::Relaxed) { + return Err(io::ErrorKind::Unsupported.into()); + } + + match cvt_ssize(unsafe { + preadv2( + fd, + &iovec { + iov_base: buf.as_ptr() as *mut _, + iov_len: buf.len(), + }, + 1, + -1, + libc::RWF_NOWAIT, + ) + }) { + Ok(cnt) => Ok(cnt.try_into().unwrap()), + Err(err) if err.raw_os_error() == Some(libc::EOPNOTSUPP) => { + IS_NONBLOCKING_READ_UNSUPPORTED.store(true, Ordering::Relaxed); + Err(io::ErrorKind::Unsupported.into()) + } + Err(err) if err.kind() == io::ErrorKind::Unsupported => { + IS_NONBLOCKING_READ_UNSUPPORTED.store(true, Ordering::Relaxed); + Err(err) + } + Err(err) => Err(err), + } + } +} + #[derive(Debug)] pub struct Helper { thread: JoinHandle<()>,