diff --git a/library/std/src/sync/condvar/tests.rs b/library/std/src/sync/condvar/tests.rs index 6757707cd9513..f7a00676daaed 100644 --- a/library/std/src/sync/condvar/tests.rs +++ b/library/std/src/sync/condvar/tests.rs @@ -191,7 +191,7 @@ fn wait_timeout_wake() { #[test] #[should_panic] -#[cfg_attr(not(unix), ignore)] +#[cfg(all(unix, not(target_os = "linux"), not(target_os = "android")))] fn two_mutexes() { let m = Arc::new(Mutex::new(())); let m2 = m.clone(); diff --git a/library/std/src/sys/unix/futex.rs b/library/std/src/sys/unix/futex.rs index 42ddc1d514ec7..c61d948fb601d 100644 --- a/library/std/src/sys/unix/futex.rs +++ b/library/std/src/sys/unix/futex.rs @@ -4,31 +4,46 @@ all(target_os = "emscripten", target_feature = "atomics") ))] -#[cfg(any(target_os = "linux", target_os = "android"))] -use crate::convert::TryInto; -#[cfg(any(target_os = "linux", target_os = "android"))] -use crate::ptr::null; use crate::sync::atomic::AtomicI32; use crate::time::Duration; #[cfg(any(target_os = "linux", target_os = "android"))] -pub fn futex_wait(futex: &AtomicI32, expected: i32, timeout: Option) { - let timespec = timeout.and_then(|d| { - Some(libc::timespec { - // Sleep forever if the timeout is longer than fits in a timespec. - tv_sec: d.as_secs().try_into().ok()?, - // This conversion never truncates, as subsec_nanos is always <1e9. - tv_nsec: d.subsec_nanos() as _, - }) - }); - unsafe { - libc::syscall( - libc::SYS_futex, - futex as *const AtomicI32, - libc::FUTEX_WAIT | libc::FUTEX_PRIVATE_FLAG, - expected, - timespec.as_ref().map_or(null(), |d| d as *const libc::timespec), - ); +pub fn futex_wait(futex: &AtomicI32, expected: i32, timeout: Option) -> bool { + use super::time::Timespec; + use crate::ptr::null; + use crate::sync::atomic::Ordering::Relaxed; + + // Calculate the timeout as an absolute timespec. + // + // Overflows are rounded up to an infinite timeout (None). + let timespec = + timeout.and_then(|d| Some(Timespec::now(libc::CLOCK_MONOTONIC).checked_add_duration(&d)?)); + + loop { + // No need to wait if the value already changed. + if futex.load(Relaxed) != expected { + return true; + } + + // Use FUTEX_WAIT_BITSET rather than FUTEX_WAIT to be able to give an + // absolute time rather than a relative time. + let r = unsafe { + libc::syscall( + libc::SYS_futex, + futex as *const AtomicI32, + libc::FUTEX_WAIT_BITSET | libc::FUTEX_PRIVATE_FLAG, + expected, + timespec.as_ref().map_or(null(), |t| &t.t as *const libc::timespec), + null::(), // This argument is unused for FUTEX_WAIT_BITSET. + !0u32, // A full bitmask, to make it behave like a regular FUTEX_WAIT. + ) + }; + + match (r < 0).then(super::os::errno) { + Some(libc::ETIMEDOUT) => return false, + Some(libc::EINTR) => continue, + _ => return true, + } } } @@ -65,6 +80,18 @@ pub fn futex_wake(futex: &AtomicI32) { } } +#[cfg(any(target_os = "linux", target_os = "android"))] +pub fn futex_wake_all(futex: &AtomicI32) { + unsafe { + libc::syscall( + libc::SYS_futex, + futex as *const AtomicI32, + libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG, + i32::MAX, + ); + } +} + #[cfg(target_os = "emscripten")] pub fn futex_wake(futex: &AtomicI32) { extern "C" { diff --git a/library/std/src/sys/unix/locks/futex.rs b/library/std/src/sys/unix/locks/futex.rs new file mode 100644 index 0000000000000..630351d0dc278 --- /dev/null +++ b/library/std/src/sys/unix/locks/futex.rs @@ -0,0 +1,164 @@ +use crate::sync::atomic::{ + AtomicI32, + Ordering::{Acquire, Relaxed, Release}, +}; +use crate::sys::futex::{futex_wait, futex_wake, futex_wake_all}; +use crate::time::Duration; + +pub type MovableMutex = Mutex; +pub type MovableCondvar = Condvar; + +pub struct Mutex { + /// 0: unlocked + /// 1: locked, no other threads waiting + /// 2: locked, and other threads waiting (contended) + futex: AtomicI32, +} + +impl Mutex { + #[inline] + pub const fn new() -> Self { + Self { futex: AtomicI32::new(0) } + } + + #[inline] + pub unsafe fn init(&mut self) {} + + #[inline] + pub unsafe fn destroy(&self) {} + + #[inline] + pub unsafe fn try_lock(&self) -> bool { + self.futex.compare_exchange(0, 1, Acquire, Relaxed).is_ok() + } + + #[inline] + pub unsafe fn lock(&self) { + if self.futex.compare_exchange(0, 1, Acquire, Relaxed).is_err() { + self.lock_contended(); + } + } + + #[cold] + fn lock_contended(&self) { + // Spin first to speed things up if the lock is released quickly. + let mut state = self.spin(); + + // If it's unlocked now, attempt to take the lock + // without marking it as contended. + if state == 0 { + match self.futex.compare_exchange(0, 1, Acquire, Relaxed) { + Ok(_) => return, // Locked! + Err(s) => state = s, + } + } + + loop { + // Put the lock in contended state. + // We avoid an unnecessary write if it as already set to 2, + // to be friendlier for the caches. + if state != 2 && self.futex.swap(2, Acquire) == 0 { + // We changed it from 0 to 2, so we just succesfully locked it. + return; + } + + // Wait for the futex to change state, assuming it is still 2. + futex_wait(&self.futex, 2, None); + + // Spin again after waking up. + state = self.spin(); + } + } + + fn spin(&self) -> i32 { + let mut spin = 100; + loop { + // We only use `load` (and not `swap` or `compare_exchange`) + // while spinning, to be easier on the caches. + let state = self.futex.load(Relaxed); + + // We stop spinning when the mutex is unlocked (0), + // but also when it's contended (2). + if state != 1 || spin == 0 { + return state; + } + + crate::hint::spin_loop(); + spin -= 1; + } + } + + #[inline] + pub unsafe fn unlock(&self) { + if self.futex.swap(0, Release) == 2 { + // We only wake up one thread. When that thread locks the mutex, it + // will mark the mutex as contended (2) (see lock_contended above), + // which makes sure that any other waiting threads will also be + // woken up eventually. + self.wake(); + } + } + + #[cold] + fn wake(&self) { + futex_wake(&self.futex); + } +} + +pub struct Condvar { + // The value of this atomic is simply incremented on every notification. + // This is used by `.wait()` to not miss any notifications after + // unlocking the mutex and before waiting for notifications. + futex: AtomicI32, +} + +impl Condvar { + #[inline] + pub const fn new() -> Self { + Self { futex: AtomicI32::new(0) } + } + + #[inline] + pub unsafe fn init(&mut self) {} + + #[inline] + pub unsafe fn destroy(&self) {} + + // All the memory orderings here are `Relaxed`, + // because synchronization is done by unlocking and locking the mutex. + + pub unsafe fn notify_one(&self) { + self.futex.fetch_add(1, Relaxed); + futex_wake(&self.futex); + } + + pub unsafe fn notify_all(&self) { + self.futex.fetch_add(1, Relaxed); + futex_wake_all(&self.futex); + } + + pub unsafe fn wait(&self, mutex: &Mutex) { + self.wait_optional_timeout(mutex, None); + } + + pub unsafe fn wait_timeout(&self, mutex: &Mutex, timeout: Duration) -> bool { + self.wait_optional_timeout(mutex, Some(timeout)) + } + + unsafe fn wait_optional_timeout(&self, mutex: &Mutex, timeout: Option) -> bool { + // Examine the notification counter _before_ we unlock the mutex. + let futex_value = self.futex.load(Relaxed); + + // Unlock the mutex before going to sleep. + mutex.unlock(); + + // Wait, but only if there hasn't been any + // notification since we unlocked the mutex. + let r = futex_wait(&self.futex, futex_value, timeout); + + // Lock the mutex again. + mutex.lock(); + + r + } +} diff --git a/library/std/src/sys/unix/locks/mod.rs b/library/std/src/sys/unix/locks/mod.rs index f07a9f93b79a5..30e9f407eec4c 100644 --- a/library/std/src/sys/unix/locks/mod.rs +++ b/library/std/src/sys/unix/locks/mod.rs @@ -1,8 +1,24 @@ -mod pthread_condvar; -mod pthread_mutex; -mod pthread_remutex; -mod pthread_rwlock; -pub use pthread_condvar::{Condvar, MovableCondvar}; -pub use pthread_mutex::{MovableMutex, Mutex}; -pub use pthread_remutex::ReentrantMutex; -pub use pthread_rwlock::{MovableRWLock, RWLock}; +cfg_if::cfg_if! { + if #[cfg(any( + target_os = "linux", + target_os = "android", + ))] { + mod futex; + #[allow(dead_code)] + mod pthread_mutex; // Only used for PthreadMutexAttr, needed by pthread_remutex. + mod pthread_remutex; // FIXME: Implement this using a futex + mod pthread_rwlock; // FIXME: Implement this using a futex + pub use futex::{Mutex, MovableMutex, Condvar, MovableCondvar}; + pub use pthread_remutex::ReentrantMutex; + pub use pthread_rwlock::{RWLock, MovableRWLock}; + } else { + mod pthread_mutex; + mod pthread_remutex; + mod pthread_rwlock; + mod pthread_condvar; + pub use pthread_mutex::{Mutex, MovableMutex}; + pub use pthread_remutex::ReentrantMutex; + pub use pthread_rwlock::{RWLock, MovableRWLock}; + pub use pthread_condvar::{Condvar, MovableCondvar}; + } +} diff --git a/library/std/src/sys/unix/time.rs b/library/std/src/sys/unix/time.rs index 59ddd1aa92f81..498c94d0cdcba 100644 --- a/library/std/src/sys/unix/time.rs +++ b/library/std/src/sys/unix/time.rs @@ -9,8 +9,8 @@ use crate::convert::TryInto; const NSEC_PER_SEC: u64 = 1_000_000_000; #[derive(Copy, Clone)] -struct Timespec { - t: libc::timespec, +pub(in crate::sys::unix) struct Timespec { + pub t: libc::timespec, } impl Timespec { @@ -18,7 +18,7 @@ impl Timespec { Timespec { t: libc::timespec { tv_sec: 0, tv_nsec: 0 } } } - fn sub_timespec(&self, other: &Timespec) -> Result { + pub fn sub_timespec(&self, other: &Timespec) -> Result { if self >= other { // NOTE(eddyb) two aspects of this `if`-`else` are required for LLVM // to optimize it into a branchless form (see also #75545): @@ -51,7 +51,7 @@ impl Timespec { } } - fn checked_add_duration(&self, other: &Duration) -> Option { + pub fn checked_add_duration(&self, other: &Duration) -> Option { let mut secs = other .as_secs() .try_into() // <- target type would be `libc::time_t` @@ -68,7 +68,7 @@ impl Timespec { Some(Timespec { t: libc::timespec { tv_sec: secs, tv_nsec: nsec as _ } }) } - fn checked_sub_duration(&self, other: &Duration) -> Option { + pub fn checked_sub_duration(&self, other: &Duration) -> Option { let mut secs = other .as_secs() .try_into() // <- target type would be `libc::time_t` @@ -266,6 +266,7 @@ mod inner { #[cfg(not(any(target_os = "macos", target_os = "ios")))] mod inner { use crate::fmt; + use crate::mem::MaybeUninit; use crate::sys::cvt; use crate::time::Duration; @@ -285,7 +286,7 @@ mod inner { impl Instant { pub fn now() -> Instant { - Instant { t: now(libc::CLOCK_MONOTONIC) } + Instant { t: Timespec::now(libc::CLOCK_MONOTONIC) } } pub fn checked_sub_instant(&self, other: &Instant) -> Option { @@ -312,7 +313,7 @@ mod inner { impl SystemTime { pub fn now() -> SystemTime { - SystemTime { t: now(libc::CLOCK_REALTIME) } + SystemTime { t: Timespec::now(libc::CLOCK_REALTIME) } } pub fn sub_time(&self, other: &SystemTime) -> Result { @@ -348,9 +349,11 @@ mod inner { #[cfg(any(target_os = "dragonfly", target_os = "espidf"))] pub type clock_t = libc::c_ulong; - fn now(clock: clock_t) -> Timespec { - let mut t = Timespec { t: libc::timespec { tv_sec: 0, tv_nsec: 0 } }; - cvt(unsafe { libc::clock_gettime(clock, &mut t.t) }).unwrap(); - t + impl Timespec { + pub fn now(clock: clock_t) -> Timespec { + let mut t = MaybeUninit::uninit(); + cvt(unsafe { libc::clock_gettime(clock, t.as_mut_ptr()) }).unwrap(); + Timespec { t: unsafe { t.assume_init() } } + } } }