Skip to content

Commit 0e6f2ff

Browse files
feat: Futex
1 parent 99ea907 commit 0e6f2ff

File tree

3 files changed

+370
-0
lines changed

3 files changed

+370
-0
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ This project adheres to [Semantic Versioning](https://semver.org/).
55

66
## [Unreleased] - ReleaseDate
77
### Added
8+
9+
- Added futex interface.
10+
([#1907](https://github.com/nix-rust/nix/pull/1907))
811
- Add `PF_ROUTE` to `SockType` on macOS, iOS, all of the BSDs, Fuchsia, Haiku, Illumos.
912
([#1867](https://github.com/nix-rust/nix/pull/1867))
1013
- Added `nix::ucontext` module on `aarch64-unknown-linux-gnu`.

src/sys/futex.rs

Lines changed: 363 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,363 @@
1+
use crate::{Errno, Result};
2+
use libc::{syscall, SYS_futex};
3+
use std::cell::UnsafeCell;
4+
use std::convert::TryFrom;
5+
use std::os::unix::io::{FromRawFd, OwnedFd};
6+
use std::time::Duration;
7+
8+
fn timespec(duration: Duration) -> libc::timespec {
9+
let tv_sec = duration.as_secs().try_into().unwrap();
10+
let tv_nsec = duration.subsec_nanos().try_into().unwrap();
11+
libc::timespec { tv_sec, tv_nsec }
12+
}
13+
14+
fn unwrap_or_null<T>(option: Option<&T>) -> *const T {
15+
match option {
16+
Some(t) => t,
17+
None => std::ptr::null(),
18+
}
19+
}
20+
21+
/// Fast user-space locking.
22+
///
23+
/// By default we presume the futex is not process-private, that is, it is used across processes. If
24+
/// you know it is process-private you can set `PRIVATE` to `true` which allows some additional
25+
/// optimizations.
26+
/// ```
27+
/// # use nix::{
28+
/// # sys::{futex::Futex, mman::{mmap, MapFlags, ProtFlags}},
29+
/// # errno::Errno,
30+
/// # unistd::{fork,ForkResult},
31+
/// # };
32+
/// # use std::{
33+
/// # time::{Instant, Duration},
34+
/// # num::NonZeroUsize,
35+
/// # mem::{ManuallyDrop, size_of},
36+
/// # os::unix::io::OwnedFd,
37+
/// # sync::Arc,
38+
/// # thread::{spawn, sleep},
39+
/// # };
40+
/// const TIMEOUT: Duration = Duration::from_millis(500);
41+
/// const DELTA: Duration = Duration::from_millis(100);
42+
/// # fn main() -> nix::Result<()> {
43+
/// let futex: Futex = Futex::new(0);
44+
///
45+
/// // If the value of the futex is 0, wait for wake. Since the value is 0 and no wake occurs,
46+
/// // we expect the timeout will pass.
47+
///
48+
/// let instant = Instant::now();
49+
/// assert_eq!(futex.wait(0, Some(TIMEOUT)),Err(Errno::ETIMEDOUT));
50+
/// assert!(instant.elapsed() > TIMEOUT);
51+
///
52+
/// // If the value of the futex is 1, wait for wake. Since the value is 0, not 1, this will
53+
/// // return immediately.
54+
///
55+
/// let instant = Instant::now();
56+
/// assert_eq!(futex.wait(1, Some(TIMEOUT)),Err(Errno::EAGAIN));
57+
/// assert!(instant.elapsed() < DELTA);
58+
///
59+
/// // Test across threads
60+
/// // -------------------------------------------------------------------------
61+
///
62+
/// let futex = Arc::new(futex);
63+
/// let futex_clone = futex.clone();
64+
/// let instant = Instant::now();
65+
/// spawn(move || {
66+
/// sleep(TIMEOUT);
67+
/// assert_eq!(futex_clone.wake(1),Ok(1));
68+
/// });
69+
/// assert_eq!(futex.wait(0, Some(2 * TIMEOUT)), Ok(()));
70+
/// assert!(instant.elapsed() > TIMEOUT && instant.elapsed() < TIMEOUT + DELTA);
71+
///
72+
/// // Test across processes
73+
/// // -------------------------------------------------------------------------
74+
///
75+
/// let shared_memory = unsafe { mmap::<OwnedFd>(
76+
/// None,
77+
/// NonZeroUsize::new_unchecked(size_of::<Futex<false>>()),
78+
/// ProtFlags::PROT_WRITE | ProtFlags::PROT_READ,
79+
/// MapFlags::MAP_SHARED | MapFlags::MAP_ANONYMOUS,
80+
/// None,
81+
/// 0
82+
/// )? };
83+
/// let futex_ptr = shared_memory.cast::<Futex<false>>();
84+
/// let futex = unsafe { &*futex_ptr };
85+
/// match unsafe { fork()? } {
86+
/// ForkResult::Parent { child } => {
87+
/// sleep(TIMEOUT);
88+
/// assert_eq!(futex.wake(1),Ok(1));
89+
/// // Wait for child process to exit
90+
/// unsafe {
91+
/// assert_eq!(libc::waitpid(child.as_raw(), std::ptr::null_mut(), 0), child.as_raw());
92+
/// }
93+
/// },
94+
/// ForkResult::Child => {
95+
/// let now = Instant::now();
96+
/// assert_eq!(futex.wait(0, Some(2 * TIMEOUT)),Ok(()));
97+
/// assert!(now.elapsed() > TIMEOUT && now.elapsed() < TIMEOUT + DELTA);
98+
/// }
99+
/// }
100+
/// # Ok(())
101+
/// # }
102+
/// ```
103+
#[derive(Debug)]
104+
pub struct Futex<const PRIVATE: bool = false>(UnsafeCell<u32>);
105+
impl<const PRIVATE: bool> Futex<PRIVATE> {
106+
const MASK: i32 = if PRIVATE { libc::FUTEX_PRIVATE_FLAG } else { 0 };
107+
/// Constructs new futex with a given `val`.
108+
pub fn new(val: u32) -> Self {
109+
Self(UnsafeCell::new(val))
110+
}
111+
/// If the value of the futex:
112+
/// - `== val`, the thread sleeps waiting for a [`Futex::wake`] call, in this case this thread
113+
/// is considered a waiter on this futex.
114+
/// - `!= val`, then `Err` with [`Errno::EAGAIN`] is immediately returned.
115+
///
116+
/// If the timeout is:
117+
/// - `Some(_)` it specifies a timeout for the wait.
118+
/// - `None` it will block indefinitely.
119+
///
120+
/// Wraps [`libc::FUTEX_WAIT`].
121+
pub fn wait(&self, val: u32, timeout: Option<Duration>) -> Result<()> {
122+
let timespec = timeout.map(timespec);
123+
let timespec_ptr = unwrap_or_null(timespec.as_ref());
124+
125+
let res = unsafe {
126+
syscall(
127+
SYS_futex,
128+
self.0.get(),
129+
Self::MASK | libc::FUTEX_WAIT,
130+
val,
131+
timespec_ptr,
132+
)
133+
};
134+
Errno::result(res).map(drop)
135+
}
136+
/// Wakes at most `val` waiters.
137+
///
138+
/// - `val == 1` wakes a single waiter.
139+
/// - `val == u32::MAX` wakes all waiters.
140+
///
141+
/// No guarantee is provided about which waiters are awoken. A waiter with a higher scheduling
142+
/// priority is not guaranteed to be awoken in preference to a waiter with a lower priority.
143+
///
144+
/// Wraps [`libc::FUTEX_WAKE`].
145+
pub fn wake(&self, val: u32) -> Result<u32> {
146+
let res = unsafe {
147+
syscall(SYS_futex, self.0.get(), Self::MASK | libc::FUTEX_WAKE, val)
148+
};
149+
Errno::result(res).map(|x| u32::try_from(x).unwrap())
150+
}
151+
/// Creates a file descriptor associated with the futex.
152+
///
153+
/// When [`Futex::wake`] is performed on the futex this file indicates being readable with
154+
/// `select`, `poll` and `epoll`.
155+
///
156+
/// The file descriptor can be used to obtain asynchronous notifications: if val is nonzero,
157+
/// then, when another process or thread executes a FUTEX_WAKE, the caller will receive the
158+
/// signal number that was passed in val.
159+
///
160+
/// **Because it was inherently racy, this is unsupported from Linux 2.6.26 onward.**
161+
///
162+
/// Wraps [`libc::FUTEX_FD`].
163+
pub fn fd(&self, val: u32) -> Result<OwnedFd> {
164+
let res = unsafe {
165+
syscall(SYS_futex, self.0.get(), Self::MASK | libc::FUTEX_WAKE, val)
166+
};
167+
168+
// On a 32 bit arch `x` will be an `i32` and will trigger this lint.
169+
#[allow(clippy::useless_conversion)]
170+
Errno::result(res)
171+
.map(|x| unsafe { OwnedFd::from_raw_fd(i32::try_from(x).unwrap()) })
172+
}
173+
/// [`Futex::cmp_requeue`] without the check being made using `val3`.
174+
///
175+
/// Wraps [`libc::FUTEX_REQUEUE`].
176+
pub fn requeue(&self, val: u32, val2: u32, uaddr2: &Self) -> Result<u32> {
177+
let res = unsafe {
178+
syscall(
179+
SYS_futex,
180+
self.0.get(),
181+
Self::MASK | libc::FUTEX_CMP_REQUEUE,
182+
val,
183+
val2,
184+
&uaddr2.0,
185+
)
186+
};
187+
Errno::result(res).map(|x| u32::try_from(x).unwrap())
188+
}
189+
/// Wakes `val` waiters, moving remaining (up to `val2`) waiters to `uaddr2`.
190+
///
191+
/// If the value of this futex `== val3` returns `Err` with [`Errno::EAGAIN`].
192+
///
193+
/// Typical values to specify for `val` are `0` or `1` (Specifying `u32::MAX` makes the
194+
/// [`Futex::cmp_requeue`] equivalent to [`Futex::wake`]).
195+
///
196+
/// Typical values to specify for `val2` are `1` or `u32::MAX` (Specifying `0` makes
197+
/// [`Futex::cmp_requeue`] equivalent to [`Futex::wait`]).
198+
///
199+
/// Wraps [`libc::FUTEX_CMP_REQUEUE`].
200+
pub fn cmp_requeue(
201+
&self,
202+
val: u32,
203+
val2: u32,
204+
uaddr2: &Self,
205+
val3: u32,
206+
) -> Result<u32> {
207+
let res = unsafe {
208+
syscall(
209+
SYS_futex,
210+
self.0.get(),
211+
Self::MASK | libc::FUTEX_CMP_REQUEUE,
212+
val,
213+
val2,
214+
&uaddr2.0,
215+
val3,
216+
)
217+
};
218+
Errno::result(res).map(|x| u32::try_from(x).unwrap())
219+
}
220+
/// Wraps [`libc::FUTEX_WAKE_OP`].
221+
pub fn wake_op(
222+
&self,
223+
val: u32,
224+
val2: u32,
225+
uaddr2: &Self,
226+
val3: u32,
227+
) -> Result<u32> {
228+
let res = unsafe {
229+
syscall(
230+
SYS_futex,
231+
self.0.get(),
232+
Self::MASK | libc::FUTEX_WAKE_OP,
233+
val,
234+
val2,
235+
&uaddr2.0,
236+
val3,
237+
)
238+
};
239+
Errno::result(res).map(|x| u32::try_from(x).unwrap())
240+
}
241+
/// Wraps [`libc::FUTEX_WAIT_BITSET`].
242+
pub fn wait_bitset(
243+
&self,
244+
val: u32,
245+
timeout: Option<Duration>,
246+
val3: u32,
247+
) -> Result<()> {
248+
let timespec = timeout.map(timespec);
249+
let timespec_ptr = unwrap_or_null(timespec.as_ref());
250+
251+
let res = unsafe {
252+
syscall(
253+
SYS_futex,
254+
self.0.get(),
255+
Self::MASK | libc::FUTEX_WAIT_BITSET,
256+
val,
257+
timespec_ptr,
258+
val3,
259+
)
260+
};
261+
Errno::result(res).map(drop)
262+
}
263+
/// Wraps [`libc::FUTEX_WAKE_BITSET`].
264+
pub fn wake_bitset(&self, val: u32, val3: u32) -> Result<u32> {
265+
let res = unsafe {
266+
syscall(SYS_futex, self.0.get(), libc::FUTEX_WAKE_BITSET, val, val3)
267+
};
268+
Errno::result(res).map(|x| u32::try_from(x).unwrap())
269+
}
270+
/// Wraps [`libc::FUTEX_LOCK_PI`].
271+
pub fn lock_pi(&self, timeout: Option<Duration>) -> Result<()> {
272+
let timespec = timeout.map(timespec);
273+
let timespec_ptr = unwrap_or_null(timespec.as_ref());
274+
275+
let res = unsafe {
276+
syscall(
277+
SYS_futex,
278+
self.0.get(),
279+
Self::MASK | libc::FUTEX_LOCK_PI,
280+
timespec_ptr,
281+
)
282+
};
283+
Errno::result(res).map(drop)
284+
}
285+
/// Wraps [`libc::FUTEX_LOCK_PI2`].
286+
#[cfg(target_os = "linux")]
287+
pub fn lock_pi2(&self, timeout: Option<Duration>) -> Result<()> {
288+
let timespec = timeout.map(timespec);
289+
let timespec_ptr = unwrap_or_null(timespec.as_ref());
290+
291+
let res = unsafe {
292+
syscall(
293+
SYS_futex,
294+
self.0.get(),
295+
Self::MASK | libc::FUTEX_LOCK_PI2,
296+
timespec_ptr,
297+
)
298+
};
299+
Errno::result(res).map(drop)
300+
}
301+
/// Wraps [`libc::FUTEX_TRYLOCK_PI`].
302+
pub fn trylock_pi(&self) -> Result<()> {
303+
let res = unsafe {
304+
syscall(
305+
SYS_futex,
306+
self.0.get(),
307+
Self::MASK | libc::FUTEX_TRYLOCK_PI,
308+
)
309+
};
310+
Errno::result(res).map(drop)
311+
}
312+
/// `libc::FUTEX_UNLOCK_PI`
313+
pub fn unlock_pi(&self) -> Result<()> {
314+
let res = unsafe {
315+
syscall(SYS_futex, self.0.get(), Self::MASK | libc::FUTEX_UNLOCK_PI)
316+
};
317+
Errno::result(res).map(drop)
318+
}
319+
/// Wraps [`libc::FUTEX_CMP_REQUEUE_PI`].
320+
pub fn cmp_requeue_pi(
321+
&self,
322+
val: u32,
323+
val2: u32,
324+
uaddr2: &Self,
325+
val3: u32,
326+
) -> Result<u32> {
327+
let res = unsafe {
328+
syscall(
329+
SYS_futex,
330+
self.0.get(),
331+
Self::MASK | libc::FUTEX_CMP_REQUEUE_PI,
332+
val,
333+
val2,
334+
&uaddr2.0,
335+
val3,
336+
)
337+
};
338+
Errno::result(res).map(|x| u32::try_from(x).unwrap())
339+
}
340+
/// Wraps [`libc::FUTEX_WAIT_REQUEUE_PI`].
341+
pub fn wait_requeue_pi(
342+
&self,
343+
val: u32,
344+
timeout: Option<Duration>,
345+
uaddr2: &Self,
346+
) -> Result<()> {
347+
let timespec = timeout.map(timespec);
348+
let timespec_ptr = unwrap_or_null(timespec.as_ref());
349+
350+
let res = unsafe {
351+
syscall(
352+
SYS_futex,
353+
self.0.get(),
354+
Self::MASK | libc::FUTEX_WAIT_REQUEUE_PI,
355+
val,
356+
timespec_ptr,
357+
&uaddr2.0,
358+
)
359+
};
360+
Errno::result(res).map(drop)
361+
}
362+
}
363+
unsafe impl Sync for Futex {}

src/sys/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,3 +226,7 @@ feature! {
226226
#![feature = "time"]
227227
pub mod timer;
228228
}
229+
230+
/// Fast user-space locking.
231+
#[cfg(any(target_os = "android", target_os = "linux"))]
232+
pub mod futex;

0 commit comments

Comments
 (0)