Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace RwLock by a futex based one on Linux #95801

Merged
merged 7 commits into from
Apr 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions library/std/src/sys/unix/futex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
use crate::sync::atomic::AtomicI32;
use crate::time::Duration;

/// Wait for a futex_wake operation to wake us.
///
/// Returns directly if the futex doesn't hold the expected value.
///
/// Returns false on timeout, and true in all other cases.
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn futex_wait(futex: &AtomicI32, expected: i32, timeout: Option<Duration>) -> bool {
use super::time::Timespec;
Expand Down Expand Up @@ -68,18 +73,23 @@ pub fn futex_wait(futex: &AtomicI32, expected: i32, timeout: Option<Duration>) {
}
}

/// Wake up one thread that's blocked on futex_wait on this futex.
///
/// Returns true if this actually woke up such a thread,
/// or false if no thread was waiting on this futex.
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn futex_wake(futex: &AtomicI32) {
pub fn futex_wake(futex: &AtomicI32) -> bool {
m-ou-se marked this conversation as resolved.
Show resolved Hide resolved
unsafe {
libc::syscall(
libc::SYS_futex,
futex as *const AtomicI32,
libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG,
1,
);
) > 0
}
}

/// Wake up all threads that are waiting on futex_wait on this futex.
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn futex_wake_all(futex: &AtomicI32) {
unsafe {
Expand All @@ -93,12 +103,10 @@ pub fn futex_wake_all(futex: &AtomicI32) {
}

#[cfg(target_os = "emscripten")]
pub fn futex_wake(futex: &AtomicI32) {
pub fn futex_wake(futex: &AtomicI32) -> bool {
extern "C" {
fn emscripten_futex_wake(addr: *const AtomicI32, count: libc::c_int) -> libc::c_int;
}

unsafe {
emscripten_futex_wake(futex as *const AtomicI32, 1);
}
unsafe { emscripten_futex_wake(futex as *const AtomicI32, 1) > 0 }
}
313 changes: 313 additions & 0 deletions library/std/src/sys/unix/locks/futex_rwlock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,313 @@
use crate::sync::atomic::{
AtomicI32,
Ordering::{Acquire, Relaxed, Release},
};
use crate::sys::futex::{futex_wait, futex_wake, futex_wake_all};

pub type MovableRwLock = RwLock;

pub struct RwLock {
// The state consists of a 30-bit reader counter, a 'readers waiting' flag, and a 'writers waiting' flag.
// Bits 0..30:
// 0: Unlocked
// 1..=0x3FFF_FFFE: Locked by N readers
// 0x3FFF_FFFF: Write locked
// Bit 30: Readers are waiting on this futex.
// Bit 31: Writers are waiting on the writer_notify futex.
state: AtomicI32,
// The 'condition variable' to notify writers through.
// Incremented on every signal.
writer_notify: AtomicI32,
}

const READ_LOCKED: i32 = 1;
const MASK: i32 = (1 << 30) - 1;
const WRITE_LOCKED: i32 = MASK;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a bit strange to represent WRITE_LOCKED with a special reader count. I think using a separate bit for WRITE_LOCKED would make the code more efficient (e.g. using native overflow flags to detect reader count overflow, using fetch_or for locking instead of a CAS loop, etc).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try that out and see if it simplifies things. It probably does. Thanks. :)

Copy link
Member Author

@m-ou-se m-ou-se Apr 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried it, but it does not seem to simplify things.

fetch_or doesn't seem very useful, because it should only set the bit if the lock isn't read-locked yet. If we unconditionally set the bit when attempting to lock, it will also be set when it's still read-locked and read-unlocking will need special handling for that situation.

Simplifying the overflow check doesn't help, because at the points where this is checked, we also need to check the waiting bits too.

I also prefer to combine the fields, to make invalid states unrepresentable. (Effectively enum { Unlocked, ReadLocked(NonZeroNonMaxU30), WriteLocked } instead of struct { read_locked: u29, write_locked: bool }.)

const MAX_READERS: i32 = MASK - 1;
const READERS_WAITING: i32 = 1 << 30;
const WRITERS_WAITING: i32 = 1 << 31;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason why these are placed in the high bits rather than the low bits? In parking_lot I put the flags in the low bits so that counter overflows can be caught with checked_add which is slightly more efficient assembly code.

Copy link
Member Author

@m-ou-se m-ou-se Apr 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Putting them in the high bits makes it possible for the read condition to compile down to a single comparison:

readers(state) < MAX_READERS && !readers_waiting(state) && !writers_waiting(state)

is then equal to

(state as u32) < 0x3ffffffe.

(Not sure if llvm sees that though.)

Copy link
Contributor

@erikdesjardins erikdesjardins Apr 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LLVM doesn't currently see this, although it sees a related pattern where the 0x3ffffffe mask has only one set bit.

Reported upstream: llvm/llvm-project#54856

(Note that I looked at readers(state) < MAX_READERS & !readers_waiting(state) & !writers_waiting(state) with bitwise and--LLVM has more trouble with logical and.)


fn is_unlocked(state: i32) -> bool {
state & MASK == 0
}

fn is_write_locked(state: i32) -> bool {
state & MASK == WRITE_LOCKED
}

fn has_readers_waiting(state: i32) -> bool {
state & READERS_WAITING != 0
}

fn has_writers_waiting(state: i32) -> bool {
state & WRITERS_WAITING != 0
}

fn is_read_lockable(state: i32) -> bool {
// This also returns false if the counter could overflow if we tried to read lock it.
//
// We don't allow read-locking if there's readers waiting, even if the lock is unlocked
// and there's no writers waiting. The only situation when this happens is after unlocking,
// at which point the unlocking thread might be waking up writers, which have priority over readers.
// The unlocking thread will clear the readers waiting bit and wake up readers, if necssary.
state & MASK < MAX_READERS && !has_readers_waiting(state) && !has_writers_waiting(state)
}

fn has_reached_max_readers(state: i32) -> bool {
state & MASK == MAX_READERS
}

impl RwLock {
#[inline]
pub const fn new() -> Self {
Self { state: AtomicI32::new(0), writer_notify: AtomicI32::new(0) }
}

#[inline]
pub unsafe fn destroy(&self) {}

#[inline]
pub unsafe fn try_read(&self) -> bool {
self.state
.fetch_update(Acquire, Relaxed, |s| is_read_lockable(s).then(|| s + READ_LOCKED))
.is_ok()
}

#[inline]
pub unsafe fn read(&self) {
let state = self.state.load(Relaxed);
if !is_read_lockable(state)
|| self
.state
.compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed)
.is_err()
{
self.read_contended();
}
}

#[inline]
pub unsafe fn read_unlock(&self) {
let state = self.state.fetch_sub(READ_LOCKED, Release) - READ_LOCKED;

// It's impossible for a reader to be waiting on a read-locked RwLock,
// except if there is also a writer waiting.
debug_assert!(!has_readers_waiting(state) || has_writers_waiting(state));

// Wake up a writer if we were the last reader and there's a writer waiting.
if is_unlocked(state) && has_writers_waiting(state) {
self.wake_writer_or_readers(state);
}
}

#[cold]
fn read_contended(&self) {
let mut state = self.spin_read();

loop {
// If we can lock it, lock it.
if is_read_lockable(state) {
match self.state.compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed)
{
Ok(_) => return, // Locked!
Err(s) => {
state = s;
continue;
}
}
}

// Check for overflow.
if has_reached_max_readers(state) {
panic!("too many active read locks on RwLock");
m-ou-se marked this conversation as resolved.
Show resolved Hide resolved
}

// Make sure the readers waiting bit is set before we go to sleep.
if !has_readers_waiting(state) {
if let Err(s) =
self.state.compare_exchange(state, state | READERS_WAITING, Relaxed, Relaxed)
{
state = s;
continue;
}
}

// Wait for the state to change.
futex_wait(&self.state, state | READERS_WAITING, None);

// Spin again after waking up.
state = self.spin_read();
}
}

#[inline]
pub unsafe fn try_write(&self) -> bool {
self.state
.fetch_update(Acquire, Relaxed, |s| is_unlocked(s).then(|| s + WRITE_LOCKED))
.is_ok()
}

#[inline]
pub unsafe fn write(&self) {
if self.state.compare_exchange_weak(0, WRITE_LOCKED, Acquire, Relaxed).is_err() {
self.write_contended();
}
}

#[inline]
pub unsafe fn write_unlock(&self) {
let state = self.state.fetch_sub(WRITE_LOCKED, Release) - WRITE_LOCKED;

debug_assert!(is_unlocked(state));

if has_writers_waiting(state) || has_readers_waiting(state) {
self.wake_writer_or_readers(state);
}
}

#[cold]
fn write_contended(&self) {
let mut state = self.spin_write();

let mut other_writers_waiting = 0;

loop {
// If it's unlocked, we try to lock it.
if is_unlocked(state) {
match self.state.compare_exchange_weak(
state,
state | WRITE_LOCKED | other_writers_waiting,
Acquire,
Relaxed,
) {
Ok(_) => return, // Locked!
Err(s) => {
state = s;
continue;
}
}
}

// Set the waiting bit indicating that we're waiting on it.
if !has_writers_waiting(state) {
if let Err(s) =
self.state.compare_exchange(state, state | WRITERS_WAITING, Relaxed, Relaxed)
{
state = s;
continue;
}
}

// Other writers might be waiting now too, so we should make sure
// we keep that bit on once we manage lock it.
other_writers_waiting = WRITERS_WAITING;

// Examine the notification counter before we check if `state` has changed,
// to make sure we don't miss any notifications.
let seq = self.writer_notify.load(Acquire);

// Don't go to sleep if the lock has become available,
// or if the writers waiting bit is no longer set.
let s = self.state.load(Relaxed);
if is_unlocked(state) || !has_writers_waiting(s) {
state = s;
continue;
}

// Wait for the state to change.
futex_wait(&self.writer_notify, seq, None);

// Spin again after waking up.
state = self.spin_write();
}
}

/// Wake up waiting threads after unlocking.
///
/// If both are waiting, this will wake up only one writer, but will fall
/// back to waking up readers if there was no writer to wake up.
#[cold]
fn wake_writer_or_readers(&self, mut state: i32) {
assert!(is_unlocked(state));

// The readers waiting bit might be turned on at any point now,
// since readers will block when there's anything waiting.
// Writers will just lock the lock though, regardless of the waiting bits,
// so we don't have to worry about the writer waiting bit.
//
// If the lock gets locked in the meantime, we don't have to do
// anything, because then the thread that locked the lock will take
// care of waking up waiters when it unlocks.

// If only writers are waiting, wake one of them up.
if state == WRITERS_WAITING {
match self.state.compare_exchange(state, 0, Relaxed, Relaxed) {
m-ou-se marked this conversation as resolved.
Show resolved Hide resolved
Ok(_) => {
self.wake_writer();
return;
}
Err(s) => {
// Maybe some readers are now waiting too. So, continue to the next `if`.
state = s;
}
}
}

// If both writers and readers are waiting, leave the readers waiting
// and only wake up one writer.
if state == READERS_WAITING + WRITERS_WAITING {
if self.state.compare_exchange(state, READERS_WAITING, Relaxed, Relaxed).is_err() {
// The lock got locked. Not our problem anymore.
return;
}
if self.wake_writer() {
return;
}
// No writers were actually blocked on futex_wait, so we continue
// to wake up readers instead, since we can't be sure if we notified a writer.
state = READERS_WAITING;
}

// If readers are waiting, wake them all up.
if state == READERS_WAITING {
if self.state.compare_exchange(state, 0, Relaxed, Relaxed).is_ok() {
futex_wake_all(&self.state);
}
}
}

/// This wakes one writer and returns true if we woke up a writer that was
/// blocked on futex_wait.
///
/// If this returns false, it might still be the case that we notified a
/// writer that was about to go to sleep.
fn wake_writer(&self) -> bool {
m-ou-se marked this conversation as resolved.
Show resolved Hide resolved
self.writer_notify.fetch_add(1, Release);
futex_wake(&self.writer_notify)
}

/// Spin for a while, but stop directly at the given condition.
fn spin_until(&self, f: impl Fn(i32) -> bool) -> i32 {
let mut spin = 100; // Chosen by fair dice roll.
loop {
let state = self.state.load(Relaxed);
m-ou-se marked this conversation as resolved.
Show resolved Hide resolved
if f(state) || spin == 0 {
return state;
}
crate::hint::spin_loop();
spin -= 1;
}
}

fn spin_write(&self) -> i32 {
// Stop spinning when it's unlocked or when there's waiting writers, to keep things somewhat fair.
self.spin_until(|state| is_unlocked(state) || has_writers_waiting(state))
}

fn spin_read(&self) -> i32 {
// Stop spinning when it's unlocked or read locked, or when there's waiting threads.
self.spin_until(|state| {
!is_write_locked(state) || has_readers_waiting(state) || has_writers_waiting(state)
})
}
}
4 changes: 2 additions & 2 deletions library/std/src/sys/unix/locks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ cfg_if::cfg_if! {
target_os = "android",
))] {
mod futex;
mod futex_rwlock;
#[allow(dead_code)]
mod pthread_mutex; // Only used for PthreadMutexAttr, needed by pthread_remutex.
mod pthread_remutex; // FIXME: Implement this using a futex
mod pthread_rwlock; // FIXME: Implement this using a futex
pub use futex::{Mutex, MovableMutex, Condvar, MovableCondvar};
pub use pthread_remutex::ReentrantMutex;
pub use pthread_rwlock::{RwLock, MovableRwLock};
pub use futex_rwlock::{RwLock, MovableRwLock};
} else {
mod pthread_mutex;
mod pthread_remutex;
Expand Down