Skip to content

Commit 64045be

Browse files
author
Jonathan Woollett-Light
committed
feat: Futex
1 parent b28132b commit 64045be

File tree

3 files changed

+389
-0
lines changed

3 files changed

+389
-0
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ ucontext = ["signal"]
6969
uio = []
7070
user = ["feature"]
7171
zerocopy = ["fs", "uio"]
72+
futex = []
7273

7374
[dev-dependencies]
7475
assert-impl = "0.1"

src/sys/futex.rs

Lines changed: 382 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,382 @@
1+
//! Fast user-space locking.
2+
3+
use crate::{Errno, Result};
4+
use libc::{syscall, SYS_futex};
5+
use std::cell::UnsafeCell;
6+
use std::convert::TryFrom;
7+
use std::os::unix::io::{FromRawFd, OwnedFd};
8+
use std::time::Duration;
9+
10+
fn timespec(duration: Duration) -> libc::timespec {
11+
let tv_sec = duration.as_secs().try_into().unwrap();
12+
let tv_nsec = duration.subsec_nanos().try_into().unwrap();
13+
libc::timespec { tv_sec, tv_nsec }
14+
}
15+
16+
fn unwrap_or_null<T>(option: Option<&T>) -> *const T {
17+
match option {
18+
Some(t) => t,
19+
None => std::ptr::null(),
20+
}
21+
}
22+
23+
/// Fast user-space locking.
24+
///
25+
/// By default we presume the futex is not process-private, that is, it is used across processes. If
26+
/// you know it is process-private you can set `PRIVATE` to `true` which allows some additional
27+
/// optimizations.
28+
/// ```
29+
/// # use nix::{
30+
/// # sys::{futex::Futex, mman::{mmap, MapFlags, ProtFlags}},
31+
/// # errno::Errno,
32+
/// # unistd::{fork,ForkResult},
33+
/// # };
34+
/// # use std::{
35+
/// # time::{Instant, Duration},
36+
/// # num::NonZeroUsize,
37+
/// # mem::{ManuallyDrop, size_of},
38+
/// # os::unix::io::OwnedFd,
39+
/// # sync::Arc,
40+
/// # thread::{spawn, sleep},
41+
/// # };
42+
/// const TIMEOUT: Duration = Duration::from_millis(500);
43+
/// const DELTA: Duration = Duration::from_millis(100);
44+
/// # fn main() -> nix::Result<()> {
45+
/// let futex: Futex = Futex::new(0);
46+
///
47+
/// // If the value of the futex is 0, wait for wake. Since the value is 0 and no wake occurs,
48+
/// // we expect the timeout will pass.
49+
///
50+
/// let instant = Instant::now();
51+
/// assert_eq!(futex.wait(0, Some(TIMEOUT)),Err(Errno::ETIMEDOUT));
52+
/// assert!(instant.elapsed() > TIMEOUT);
53+
///
54+
/// // If the value of the futex is 1, wait for wake. Since the value is 0, not 1, this will
55+
/// // return immediately.
56+
///
57+
/// let instant = Instant::now();
58+
/// assert_eq!(futex.wait(1, Some(TIMEOUT)),Err(Errno::EAGAIN));
59+
/// assert!(instant.elapsed() < DELTA);
60+
///
61+
/// // Test across threads
62+
/// // -------------------------------------------------------------------------
63+
///
64+
/// let futex = Arc::new(futex);
65+
/// let futex_clone = futex.clone();
66+
/// let instant = Instant::now();
67+
/// spawn(move || {
68+
/// sleep(TIMEOUT);
69+
/// assert_eq!(futex_clone.wake(1),Ok(1));
70+
/// });
71+
/// assert_eq!(futex.wait(0, Some(2 * TIMEOUT)), Ok(()));
72+
/// assert!(instant.elapsed() > TIMEOUT && instant.elapsed() < TIMEOUT + DELTA);
73+
///
74+
/// // Test across processes
75+
/// // -------------------------------------------------------------------------
76+
///
77+
/// let shared_memory = unsafe { mmap::<OwnedFd>(
78+
/// None,
79+
/// NonZeroUsize::new_unchecked(size_of::<Futex<false>>()),
80+
/// ProtFlags::PROT_WRITE | ProtFlags::PROT_READ,
81+
/// MapFlags::MAP_SHARED | MapFlags::MAP_ANONYMOUS,
82+
/// None,
83+
/// 0
84+
/// )? };
85+
/// let futex_ptr = shared_memory.cast::<Futex<false>>();
86+
/// let futex = unsafe { &*futex_ptr };
87+
/// match unsafe { fork()? } {
88+
/// ForkResult::Parent { child } => {
89+
/// sleep(TIMEOUT);
90+
/// assert_eq!(futex.wake(1),Ok(1));
91+
/// // Wait for child process to exit
92+
/// unsafe {
93+
/// assert_eq!(libc::waitpid(child.as_raw(), std::ptr::null_mut(), 0), child.as_raw());
94+
/// }
95+
/// },
96+
/// ForkResult::Child => {
97+
/// let now = Instant::now();
98+
/// assert_eq!(futex.wait(0, Some(2 * TIMEOUT)),Ok(()));
99+
/// assert!(now.elapsed() > TIMEOUT && now.elapsed() < TIMEOUT + DELTA);
100+
/// }
101+
/// }
102+
/// # Ok(())
103+
/// # }
104+
/// ```
105+
#[derive(Debug)]
106+
pub struct Futex<const PRIVATE: bool = false>(pub UnsafeCell<u32>);
107+
108+
impl<const PRIVATE: bool> Futex<PRIVATE> {
109+
const MASK: i32 = if PRIVATE { libc::FUTEX_PRIVATE_FLAG } else { 0 };
110+
111+
/// Constructs new futex with a given `val`.
112+
pub fn new(val: u32) -> Self {
113+
Self(UnsafeCell::new(val))
114+
}
115+
116+
/// If the value of the futex:
117+
/// - `== val`, the thread sleeps waiting for a [`Futex::wake`] call, in this case this thread
118+
/// is considered a waiter on this futex.
119+
/// - `!= val`, then `Err` with [`Errno::EAGAIN`] is immediately returned.
120+
///
121+
/// If the timeout is:
122+
/// - `Some(_)` it specifies a timeout for the wait.
123+
/// - `None` it will block indefinitely.
124+
///
125+
/// Wraps [`libc::FUTEX_WAIT`].
126+
pub fn wait(&self, val: u32, timeout: Option<Duration>) -> Result<()> {
127+
let timespec = timeout.map(timespec);
128+
let timespec_ptr = unwrap_or_null(timespec.as_ref());
129+
130+
let res = unsafe {
131+
syscall(
132+
SYS_futex,
133+
self.0.get(),
134+
Self::MASK | libc::FUTEX_WAIT,
135+
val,
136+
timespec_ptr,
137+
)
138+
};
139+
Errno::result(res).map(drop)
140+
}
141+
142+
/// Wakes at most `val` waiters.
143+
///
144+
/// - `val == 1` wakes a single waiter.
145+
/// - `val == u32::MAX` wakes all waiters.
146+
///
147+
/// No guarantee is provided about which waiters are awoken. A waiter with a higher scheduling
148+
/// priority is not guaranteed to be awoken in preference to a waiter with a lower priority.
149+
///
150+
/// Wraps [`libc::FUTEX_WAKE`].
151+
pub fn wake(&self, val: u32) -> Result<u32> {
152+
let res = unsafe {
153+
syscall(SYS_futex, self.0.get(), Self::MASK | libc::FUTEX_WAKE, val)
154+
};
155+
Errno::result(res).map(|x| u32::try_from(x).unwrap())
156+
}
157+
158+
/// Creates a file descriptor associated with the futex.
159+
///
160+
/// When [`Futex::wake`] is performed on the futex this file indicates being readable with
161+
/// `select`, `poll` and `epoll`.
162+
///
163+
/// The file descriptor can be used to obtain asynchronous notifications: if val is nonzero,
164+
/// then, when another process or thread executes a FUTEX_WAKE, the caller will receive the
165+
/// signal number that was passed in val.
166+
///
167+
/// **Because it was inherently racy, this is unsupported from Linux 2.6.26 onward.**
168+
///
169+
/// Wraps [`libc::FUTEX_FD`].
170+
pub fn fd(&self, val: u32) -> Result<OwnedFd> {
171+
let res = unsafe {
172+
syscall(SYS_futex, self.0.get(), Self::MASK | libc::FUTEX_WAKE, val)
173+
};
174+
175+
// On a 32 bit arch `x` will be an `i32` and will trigger this lint.
176+
#[allow(clippy::useless_conversion)]
177+
Errno::result(res)
178+
.map(|x| unsafe { OwnedFd::from_raw_fd(i32::try_from(x).unwrap()) })
179+
}
180+
181+
/// [`Futex::cmp_requeue`] without the check being made using `val3`.
182+
///
183+
/// Wraps [`libc::FUTEX_REQUEUE`].
184+
pub fn requeue(&self, val: u32, val2: u32, uaddr2: &Self) -> Result<u32> {
185+
let res = unsafe {
186+
syscall(
187+
SYS_futex,
188+
self.0.get(),
189+
Self::MASK | libc::FUTEX_CMP_REQUEUE,
190+
val,
191+
val2,
192+
&uaddr2.0,
193+
)
194+
};
195+
Errno::result(res).map(|x| u32::try_from(x).unwrap())
196+
}
197+
198+
/// Wakes `val` waiters, moving remaining (up to `val2`) waiters to `uaddr2`.
199+
///
200+
/// If the value of this futex `== val3` returns `Err` with [`Errno::EAGAIN`].
201+
///
202+
/// Typical values to specify for `val` are `0` or `1` (Specifying `u32::MAX` makes the
203+
/// [`Futex::cmp_requeue`] equivalent to [`Futex::wake`]).
204+
///
205+
/// Typical values to specify for `val2` are `1` or `u32::MAX` (Specifying `0` makes
206+
/// [`Futex::cmp_requeue`] equivalent to [`Futex::wait`]).
207+
///
208+
/// Wraps [`libc::FUTEX_CMP_REQUEUE`].
209+
pub fn cmp_requeue(
210+
&self,
211+
val: u32,
212+
val2: u32,
213+
uaddr2: &Self,
214+
val3: u32,
215+
) -> Result<u32> {
216+
let res = unsafe {
217+
syscall(
218+
SYS_futex,
219+
self.0.get(),
220+
Self::MASK | libc::FUTEX_CMP_REQUEUE,
221+
val,
222+
val2,
223+
&uaddr2.0,
224+
val3,
225+
)
226+
};
227+
Errno::result(res).map(|x| u32::try_from(x).unwrap())
228+
}
229+
230+
/// Wraps [`libc::FUTEX_WAKE_OP`].
231+
pub fn wake_op(
232+
&self,
233+
val: u32,
234+
val2: u32,
235+
uaddr2: &Self,
236+
val3: u32,
237+
) -> Result<u32> {
238+
let res = unsafe {
239+
syscall(
240+
SYS_futex,
241+
self.0.get(),
242+
Self::MASK | libc::FUTEX_WAKE_OP,
243+
val,
244+
val2,
245+
&uaddr2.0,
246+
val3,
247+
)
248+
};
249+
Errno::result(res).map(|x| u32::try_from(x).unwrap())
250+
}
251+
252+
/// Wraps [`libc::FUTEX_WAIT_BITSET`].
253+
pub fn wait_bitset(
254+
&self,
255+
val: u32,
256+
timeout: Option<Duration>,
257+
val3: u32,
258+
) -> Result<()> {
259+
let timespec = timeout.map(timespec);
260+
let timespec_ptr = unwrap_or_null(timespec.as_ref());
261+
262+
let res = unsafe {
263+
syscall(
264+
SYS_futex,
265+
self.0.get(),
266+
Self::MASK | libc::FUTEX_WAIT_BITSET,
267+
val,
268+
timespec_ptr,
269+
val3,
270+
)
271+
};
272+
Errno::result(res).map(drop)
273+
}
274+
275+
/// Wraps [`libc::FUTEX_WAKE_BITSET`].
276+
pub fn wake_bitset(&self, val: u32, val3: u32) -> Result<u32> {
277+
let res = unsafe {
278+
syscall(SYS_futex, self.0.get(), libc::FUTEX_WAKE_BITSET, val, val3)
279+
};
280+
Errno::result(res).map(|x| u32::try_from(x).unwrap())
281+
}
282+
283+
/// Wraps [`libc::FUTEX_LOCK_PI`].
284+
pub fn lock_pi(&self, timeout: Option<Duration>) -> Result<()> {
285+
let timespec = timeout.map(timespec);
286+
let timespec_ptr = unwrap_or_null(timespec.as_ref());
287+
288+
let res = unsafe {
289+
syscall(
290+
SYS_futex,
291+
self.0.get(),
292+
Self::MASK | libc::FUTEX_LOCK_PI,
293+
timespec_ptr,
294+
)
295+
};
296+
Errno::result(res).map(drop)
297+
}
298+
299+
/// Wraps [`libc::FUTEX_LOCK_PI2`].
300+
#[cfg(target_os = "linux")]
301+
pub fn lock_pi2(&self, timeout: Option<Duration>) -> Result<()> {
302+
let timespec = timeout.map(timespec);
303+
let timespec_ptr = unwrap_or_null(timespec.as_ref());
304+
305+
let res = unsafe {
306+
syscall(
307+
SYS_futex,
308+
self.0.get(),
309+
Self::MASK | libc::FUTEX_LOCK_PI2,
310+
timespec_ptr,
311+
)
312+
};
313+
Errno::result(res).map(drop)
314+
}
315+
316+
/// Wraps [`libc::FUTEX_TRYLOCK_PI`].
317+
pub fn trylock_pi(&self) -> Result<()> {
318+
let res = unsafe {
319+
syscall(
320+
SYS_futex,
321+
self.0.get(),
322+
Self::MASK | libc::FUTEX_TRYLOCK_PI,
323+
)
324+
};
325+
Errno::result(res).map(drop)
326+
}
327+
328+
/// `libc::FUTEX_UNLOCK_PI`
329+
pub fn unlock_pi(&self) -> Result<()> {
330+
let res = unsafe {
331+
syscall(SYS_futex, self.0.get(), Self::MASK | libc::FUTEX_UNLOCK_PI)
332+
};
333+
Errno::result(res).map(drop)
334+
}
335+
336+
/// Wraps [`libc::FUTEX_CMP_REQUEUE_PI`].
337+
pub fn cmp_requeue_pi(
338+
&self,
339+
val: u32,
340+
val2: u32,
341+
uaddr2: &Self,
342+
val3: u32,
343+
) -> Result<u32> {
344+
let res = unsafe {
345+
syscall(
346+
SYS_futex,
347+
self.0.get(),
348+
Self::MASK | libc::FUTEX_CMP_REQUEUE_PI,
349+
val,
350+
val2,
351+
&uaddr2.0,
352+
val3,
353+
)
354+
};
355+
Errno::result(res).map(|x| u32::try_from(x).unwrap())
356+
}
357+
358+
/// Wraps [`libc::FUTEX_WAIT_REQUEUE_PI`].
359+
pub fn wait_requeue_pi(
360+
&self,
361+
val: u32,
362+
timeout: Option<Duration>,
363+
uaddr2: &Self,
364+
) -> Result<()> {
365+
let timespec = timeout.map(timespec);
366+
let timespec_ptr = unwrap_or_null(timespec.as_ref());
367+
368+
let res = unsafe {
369+
syscall(
370+
SYS_futex,
371+
self.0.get(),
372+
Self::MASK | libc::FUTEX_WAIT_REQUEUE_PI,
373+
val,
374+
timespec_ptr,
375+
&uaddr2.0,
376+
)
377+
};
378+
Errno::result(res).map(drop)
379+
}
380+
}
381+
382+
unsafe impl Sync for Futex {}

0 commit comments

Comments
 (0)