Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Windows: Implement condvar, mutex and rwlock using futex #121956

Merged
merged 3 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions library/std/src/sys/locks/condvar/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
cfg_if::cfg_if! {
if #[cfg(any(
all(target_os = "windows", not(target_vendor="win7")),
target_os = "linux",
target_os = "android",
target_os = "freebsd",
Expand All @@ -14,9 +15,9 @@ cfg_if::cfg_if! {
} else if #[cfg(target_family = "unix")] {
mod pthread;
pub use pthread::Condvar;
} else if #[cfg(target_os = "windows")] {
mod windows;
pub use windows::Condvar;
} else if #[cfg(all(target_os = "windows", target_vendor = "win7"))] {
mod windows7;
pub use windows7::Condvar;
} else if #[cfg(all(target_vendor = "fortanix", target_env = "sgx"))] {
mod sgx;
pub use sgx::Condvar;
Expand Down
54 changes: 33 additions & 21 deletions library/std/src/sys/locks/mutex/futex.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,42 @@
use crate::sync::atomic::{
AtomicU32,
self,
Ordering::{Acquire, Relaxed, Release},
};
use crate::sys::futex::{futex_wait, futex_wake};

cfg_if::cfg_if! {
if #[cfg(windows)] {
// On Windows we can have a smol futex
type Atomic = atomic::AtomicU8;
type State = u8;
} else {
type Atomic = atomic::AtomicU32;
type State = u32;
}
}

pub struct Mutex {
/// 0: unlocked
/// 1: locked, no other threads waiting
/// 2: locked, and other threads waiting (contended)
futex: AtomicU32,
futex: Atomic,
}

const UNLOCKED: State = 0;
const LOCKED: State = 1; // locked, no other threads waiting
const CONTENDED: State = 2; // locked, and other threads waiting (contended)

impl Mutex {
#[inline]
pub const fn new() -> Self {
Self { futex: AtomicU32::new(0) }
Self { futex: Atomic::new(UNLOCKED) }
}

#[inline]
pub fn try_lock(&self) -> bool {
self.futex.compare_exchange(0, 1, Acquire, Relaxed).is_ok()
self.futex.compare_exchange(UNLOCKED, LOCKED, Acquire, Relaxed).is_ok()
}

#[inline]
pub fn lock(&self) {
if self.futex.compare_exchange(0, 1, Acquire, Relaxed).is_err() {
if self.futex.compare_exchange(UNLOCKED, LOCKED, Acquire, Relaxed).is_err() {
self.lock_contended();
}
}
Expand All @@ -36,40 +48,40 @@ impl Mutex {

// If it's unlocked now, attempt to take the lock
// without marking it as contended.
if state == 0 {
match self.futex.compare_exchange(0, 1, Acquire, Relaxed) {
if state == UNLOCKED {
match self.futex.compare_exchange(UNLOCKED, LOCKED, Acquire, Relaxed) {
Ok(_) => return, // Locked!
Err(s) => state = s,
}
}

loop {
// Put the lock in contended state.
// We avoid an unnecessary write if it as already set to 2,
// We avoid an unnecessary write if it as already set to CONTENDED,
// to be friendlier for the caches.
if state != 2 && self.futex.swap(2, Acquire) == 0 {
// We changed it from 0 to 2, so we just successfully locked it.
if state != CONTENDED && self.futex.swap(CONTENDED, Acquire) == UNLOCKED {
// We changed it from UNLOCKED to CONTENDED, so we just successfully locked it.
return;
}

// Wait for the futex to change state, assuming it is still 2.
futex_wait(&self.futex, 2, None);
// Wait for the futex to change state, assuming it is still CONTENDED.
futex_wait(&self.futex, CONTENDED, None);

// Spin again after waking up.
state = self.spin();
}
}

fn spin(&self) -> u32 {
fn spin(&self) -> State {
let mut spin = 100;
loop {
// We only use `load` (and not `swap` or `compare_exchange`)
// while spinning, to be easier on the caches.
let state = self.futex.load(Relaxed);

// We stop spinning when the mutex is unlocked (0),
// but also when it's contended (2).
if state != 1 || spin == 0 {
// We stop spinning when the mutex is UNLOCKED,
// but also when it's CONTENDED.
if state != LOCKED || spin == 0 {
return state;
}

Expand All @@ -80,9 +92,9 @@ impl Mutex {

#[inline]
pub unsafe fn unlock(&self) {
if self.futex.swap(0, Release) == 2 {
if self.futex.swap(UNLOCKED, Release) == CONTENDED {
// We only wake up one thread. When that thread locks the mutex, it
// will mark the mutex as contended (2) (see lock_contended above),
// will mark the mutex as CONTENDED (see lock_contended above),
// which makes sure that any other waiting threads will also be
// woken up eventually.
self.wake();
Expand Down
7 changes: 4 additions & 3 deletions library/std/src/sys/locks/mutex/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
cfg_if::cfg_if! {
if #[cfg(any(
all(target_os = "windows", not(target_vendor = "win7")),
target_os = "linux",
target_os = "android",
target_os = "freebsd",
Expand All @@ -19,9 +20,9 @@ cfg_if::cfg_if! {
))] {
mod pthread;
pub use pthread::{Mutex, raw};
} else if #[cfg(target_os = "windows")] {
mod windows;
pub use windows::{Mutex, raw};
} else if #[cfg(all(target_os = "windows", target_vendor = "win7"))] {
mod windows7;
pub use windows7::{Mutex, raw};
} else if #[cfg(all(target_vendor = "fortanix", target_env = "sgx"))] {
mod sgx;
pub use sgx::Mutex;
Expand Down
7 changes: 4 additions & 3 deletions library/std/src/sys/locks/rwlock/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
cfg_if::cfg_if! {
if #[cfg(any(
all(target_os = "windows", not(target_vendor = "win7")),
target_os = "linux",
target_os = "android",
target_os = "freebsd",
Expand All @@ -14,9 +15,9 @@ cfg_if::cfg_if! {
} else if #[cfg(target_family = "unix")] {
mod queue;
pub use queue::RwLock;
} else if #[cfg(target_os = "windows")] {
mod windows;
pub use windows::RwLock;
} else if #[cfg(all(target_os = "windows", target_vendor = "win7"))] {
mod windows7;
pub use windows7::RwLock;
} else if #[cfg(all(target_vendor = "fortanix", target_env = "sgx"))] {
mod sgx;
pub use sgx::RwLock;
Expand Down
4 changes: 4 additions & 0 deletions library/std/src/sys/pal/windows/c.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub type LPVOID = *mut c_void;
pub type LPWCH = *mut WCHAR;
pub type LPWSTR = *mut WCHAR;

#[cfg(target_vendor = "win7")]
pub type PSRWLOCK = *mut SRWLOCK;

pub type socklen_t = c_int;
Expand All @@ -50,7 +51,9 @@ pub const INVALID_HANDLE_VALUE: HANDLE = ::core::ptr::without_provenance_mut(-1i
pub const EXIT_SUCCESS: u32 = 0;
pub const EXIT_FAILURE: u32 = 1;

#[cfg(target_vendor = "win7")]
pub const CONDITION_VARIABLE_INIT: CONDITION_VARIABLE = CONDITION_VARIABLE { Ptr: ptr::null_mut() };
#[cfg(target_vendor = "win7")]
pub const SRWLOCK_INIT: SRWLOCK = SRWLOCK { Ptr: ptr::null_mut() };
pub const INIT_ONCE_STATIC_INIT: INIT_ONCE = INIT_ONCE { Ptr: ptr::null_mut() };

Expand Down Expand Up @@ -373,6 +376,7 @@ extern "system" {
dwmilliseconds: u32,
) -> BOOL;
pub fn WakeByAddressSingle(address: *const c_void);
pub fn WakeByAddressAll(address: *const c_void);
}

#[cfg(target_vendor = "win7")]
Expand Down
85 changes: 85 additions & 0 deletions library/std/src/sys/pal/windows/futex.rs
joboet marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use super::api;
use crate::sys::c;
use crate::sys::dur2timeout;
use core::ffi::c_void;
use core::mem;
use core::ptr;
use core::sync::atomic::{
AtomicBool, AtomicI16, AtomicI32, AtomicI64, AtomicI8, AtomicIsize, AtomicPtr, AtomicU16,
AtomicU32, AtomicU64, AtomicU8, AtomicUsize,
};
use core::time::Duration;

pub unsafe trait Waitable {
type Atomic;
}
macro_rules! unsafe_waitable_int {
($(($int:ty, $atomic:ty)),*$(,)?) => {
$(
unsafe impl Waitable for $int {
type Atomic = $atomic;
}
)*
};
}
unsafe_waitable_int! {
(bool, AtomicBool),
(i8, AtomicI8),
(i16, AtomicI16),
(i32, AtomicI32),
(i64, AtomicI64),
(isize, AtomicIsize),
(u8, AtomicU8),
(u16, AtomicU16),
(u32, AtomicU32),
(u64, AtomicU64),
(usize, AtomicUsize),
}
unsafe impl<T> Waitable for *const T {
type Atomic = AtomicPtr<T>;
}
unsafe impl<T> Waitable for *mut T {
type Atomic = AtomicPtr<T>;
}

pub fn wait_on_address<W: Waitable>(
address: &W::Atomic,
compare: W,
timeout: Option<Duration>,
) -> bool {
unsafe {
let addr = ptr::from_ref(address).cast::<c_void>();
let size = mem::size_of::<W>();
let compare_addr = ptr::addr_of!(compare).cast::<c_void>();
let timeout = timeout.map(dur2timeout).unwrap_or(c::INFINITE);
c::WaitOnAddress(addr, compare_addr, size, timeout) == c::TRUE
}
}

pub fn wake_by_address_single<T>(address: &T) {
unsafe {
let addr = ptr::from_ref(address).cast::<c_void>();
c::WakeByAddressSingle(addr);
}
}

pub fn wake_by_address_all<T>(address: &T) {
unsafe {
let addr = ptr::from_ref(address).cast::<c_void>();
c::WakeByAddressAll(addr);
}
}

pub fn futex_wait<W: Waitable>(futex: &W::Atomic, expected: W, timeout: Option<Duration>) -> bool {
// return false only on timeout
wait_on_address(futex, expected, timeout) || api::get_last_error().code != c::ERROR_TIMEOUT
}

pub fn futex_wake<T>(futex: &T) -> bool {
wake_by_address_single(futex);
false
}

pub fn futex_wake_all<T>(futex: &T) {
wake_by_address_all(futex)
}
2 changes: 2 additions & 0 deletions library/std/src/sys/pal/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub mod args;
pub mod c;
pub mod env;
pub mod fs;
#[cfg(not(target_vendor = "win7"))]
pub mod futex;
pub mod handle;
pub mod io;
pub mod net;
Expand Down
6 changes: 6 additions & 0 deletions src/tools/miri/src/shims/windows/foreign_items.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,12 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {

this.WakeByAddressSingle(ptr_op)?;
}
"WakeByAddressAll" => {
let [ptr_op] =
this.check_shim(abi, Abi::System { unwind: false }, link_name, args)?;

this.WakeByAddressAll(ptr_op)?;
}

// Dynamic symbol loading
"GetProcAddress" => {
Expand Down
15 changes: 15 additions & 0 deletions src/tools/miri/src/shims/windows/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,21 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {

Ok(())
}
fn WakeByAddressAll(&mut self, ptr_op: &OpTy<'tcx, Provenance>) -> InterpResult<'tcx> {
let this = self.eval_context_mut();

let ptr = this.read_pointer(ptr_op)?;

// See the Linux futex implementation for why this fence exists.
this.atomic_fence(AtomicFenceOrd::SeqCst)?;

while let Some(thread) = this.futex_wake(ptr.addr().bytes(), u32::MAX) {
this.unblock_thread(thread);
this.unregister_timeout_callback_if_exists(thread);
}

Ok(())
}

fn SleepConditionVariableSRW(
&mut self,
Expand Down
2 changes: 1 addition & 1 deletion tests/debuginfo/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
//
// cdb-command:dx m,d
// cdb-check:m,d [Type: std::sync::mutex::Mutex<i32>]
// cdb-check: [...] inner [Type: std::sys::locks::mutex::windows::Mutex]
// cdb-check: [...] inner [Type: std::sys::locks::mutex::futex::Mutex]
// cdb-check: [...] poison [Type: std::sync::poison::Flag]
// cdb-check: [...] data : 0 [Type: core::cell::UnsafeCell<i32>]

Expand Down
2 changes: 1 addition & 1 deletion tests/debuginfo/rwlock-read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// cdb-command:dx r
// cdb-check:r [Type: std::sync::rwlock::RwLockReadGuard<i32>]
// cdb-check: [...] data : NonNull([...]: 0) [Type: core::ptr::non_null::NonNull<i32>]
// cdb-check: [...] inner_lock : [...] [Type: std::sys::locks::rwlock::windows::RwLock *]
// cdb-check: [...] inner_lock : [...] [Type: std::sys::locks::rwlock::futex::RwLock *]

#[allow(unused_variables)]

Expand Down
Loading