From 00255e627b77a939fbcc4f06787d490ad4884e7e Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Fri, 25 Oct 2024 15:01:57 -0400 Subject: [PATCH] add `downgrade` to `queue` implementation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit adds the `downgrade` method onto the inner `RwLock` queue implementation. There are also a few other style patches included in this commit. Co-authored-by: Jonas Böttiger --- std/src/sys/sync/rwlock/queue.rs | 276 +++++++++++++++++++++++-------- 1 file changed, 206 insertions(+), 70 deletions(-) diff --git a/std/src/sys/sync/rwlock/queue.rs b/std/src/sys/sync/rwlock/queue.rs index 5879d1f84154f..77a5ee23309be 100644 --- a/std/src/sys/sync/rwlock/queue.rs +++ b/std/src/sys/sync/rwlock/queue.rs @@ -40,16 +40,16 @@ //! //! ## State //! -//! A single [`AtomicPtr`] is used as state variable. The lowest three bits are used to indicate the +//! A single [`AtomicPtr`] is used as state variable. The lowest four bits are used to indicate the //! meaning of the remaining bits: //! -//! | [`LOCKED`] | [`QUEUED`] | [`QUEUE_LOCKED`] | Remaining | | -//! |:-----------|:-----------|:-----------------|:-------------|:----------------------------------------------------------------------------------------------------------------------------| -//! | 0 | 0 | 0 | 0 | The lock is unlocked, no threads are waiting | -//! | 1 | 0 | 0 | 0 | The lock is write-locked, no threads waiting | -//! | 1 | 0 | 0 | n > 0 | The lock is read-locked with n readers | -//! | 0 | 1 | * | `*mut Node` | The lock is unlocked, but some threads are waiting. Only writers may lock the lock | -//! | 1 | 1 | * | `*mut Node` | The lock is locked, but some threads are waiting. If the lock is read-locked, the last queue node contains the reader count | +//! | [`LOCKED`] | [`QUEUED`] | [`QUEUE_LOCKED`] | [`DOWNGRADED`] | Remaining | | +//! |------------|:-----------|:-----------------|:---------------|:-------------|:----------------------------------------------------------------------------------------------------------------------------| +//! | 0 | 0 | 0 | 0 | 0 | The lock is unlocked, no threads are waiting | +//! | 1 | 0 | 0 | 0 | 0 | The lock is write-locked, no threads waiting | +//! | 1 | 0 | 0 | 0 | n > 0 | The lock is read-locked with n readers | +//! | 0 | 1 | * | 0 | `*mut Node` | The lock is unlocked, but some threads are waiting. Only writers may lock the lock | +//! | 1 | 1 | * | * | `*mut Node` | The lock is locked, but some threads are waiting. If the lock is read-locked, the last queue node contains the reader count | //! //! ## Waiter Queue //! @@ -100,9 +100,9 @@ //! wake up the next waiter(s). //! //! `QUEUE_LOCKED` is set atomically at the same time as the enqueuing/unlocking operations. The -//! thread releasing the `QUEUE_LOCK` bit will check the state of the lock and wake up waiters as -//! appropriate. This guarantees forward progress even if the unlocking thread could not acquire the -//! queue lock. +//! thread releasing the `QUEUE_LOCKED` bit will check the state of the lock (in particular, whether +//! a downgrade was requested using the [`DOWNGRADED`] bit) and wake up waiters as appropriate. This +//! guarantees forward progress even if the unlocking thread could not acquire the queue lock. //! //! ## Memory Orderings //! @@ -129,8 +129,10 @@ const UNLOCKED: State = without_provenance_mut(0); const LOCKED: usize = 1 << 0; const QUEUED: usize = 1 << 1; const QUEUE_LOCKED: usize = 1 << 2; -const SINGLE: usize = 1 << 3; -const NODE_MASK: usize = !(QUEUE_LOCKED | QUEUED | LOCKED); +const DOWNGRADED: usize = 1 << 3; +const SINGLE: usize = 1 << 4; +const STATE: usize = DOWNGRADED | QUEUE_LOCKED | QUEUED | LOCKED; +const NODE_MASK: usize = !STATE; /// Locking uses exponential backoff. `SPIN_COUNT` indicates how many times the locking operation /// will be retried. @@ -141,8 +143,7 @@ const SPIN_COUNT: usize = 7; /// Marks the state as write-locked, if possible. #[inline] fn write_lock(state: State) -> Option { - let state = state.wrapping_byte_add(LOCKED); - if state.addr() & LOCKED == LOCKED { Some(state) } else { None } + if state.addr() & LOCKED == 0 { Some(state.map_addr(|addr| addr | LOCKED)) } else { None } } /// Marks the state as read-locked, if possible. @@ -169,7 +170,11 @@ unsafe fn to_node(state: State) -> NonNull { /// The representation of a thread waiting on the lock queue. /// /// We initialize these `Node`s on thread execution stacks to avoid allocation. -#[repr(align(8))] +/// +/// Note that we need an alignment of 16 to ensure that the last 4 bits of any +/// pointers to `Node`s are always zeroed (for the bit flags described in the +/// module-level documentation). +#[repr(align(16))] struct Node { next: AtomicLink, prev: AtomicLink, @@ -255,7 +260,7 @@ impl Node { /// # Safety /// /// * `head` must point to a node in a valid queue. -/// * `head` must be in front of the head of the queue at the time of the last removal. +/// * `head` must be in front of the previous head node that was used to perform the last removal. /// * The part of the queue starting with `head` must not be modified during this call. unsafe fn find_tail_and_add_backlinks(head: NonNull) -> NonNull { let mut current = head; @@ -282,6 +287,28 @@ unsafe fn find_tail_and_add_backlinks(head: NonNull) -> NonNull { } } +/// [`complete`](Node::complete)s all threads in the queue ending with `tail`. +/// +/// # Safety +/// +/// * `tail` must be a valid tail of a fully linked queue. +/// * The current thread must have exclusive access to that queue. +unsafe fn complete_all(tail: NonNull) { + let mut current = tail; + + // Traverse backwards through the queue (FIFO) and `complete` all of the nodes. + loop { + let prev = unsafe { current.as_ref().prev.get() }; + unsafe { + Node::complete(current); + } + match prev { + Some(prev) => current = prev, + None => return, + } + } +} + /// A type to guard against the unwinds of stacks that nodes are located on due to panics. struct PanicGuard; @@ -332,10 +359,11 @@ impl RwLock { #[cold] fn lock_contended(&self, write: bool) { - let update_fn = if write { write_lock } else { read_lock }; let mut node = Node::new(write); let mut state = self.state.load(Relaxed); let mut count = 0; + let update_fn = if write { write_lock } else { read_lock }; + loop { // Optimistically update the state. if let Some(next) = update_fn(state) { @@ -372,6 +400,7 @@ impl RwLock { .map_addr(|addr| addr | QUEUED | (state.addr() & LOCKED)) as State; + let mut is_queue_locked = false; if state.addr() & QUEUED == 0 { // If this is the first node in the queue, set the `tail` field to the node itself // to ensure there is a valid `tail` field in the queue (Invariants 1 & 2). @@ -383,6 +412,9 @@ impl RwLock { // Try locking the queue to eagerly add backlinks. next = next.map_addr(|addr| addr | QUEUE_LOCKED); + + // Track if we changed the `QUEUE_LOCKED` bit from off to on. + is_queue_locked = state.addr() & QUEUE_LOCKED == 0; } // Register the node, using release ordering to propagate our changes to the waking @@ -398,8 +430,9 @@ impl RwLock { // Guard against unwinds using a `PanicGuard` that aborts when dropped. let guard = PanicGuard; - // If the current thread locked the queue, unlock it to eagerly add backlinks. - if state.addr() & (QUEUE_LOCKED | QUEUED) == QUEUED { + // If the current thread locked the queue, unlock it to eagerly adding backlinks. + if is_queue_locked { + // SAFETY: This thread set the `QUEUE_LOCKED` bit above. unsafe { self.unlock_queue(next); } @@ -427,6 +460,12 @@ impl RwLock { // If there are no threads queued, simply decrement the reader count. let count = state.addr() - (SINGLE | LOCKED); Some(if count > 0 { without_provenance_mut(count | LOCKED) } else { UNLOCKED }) + } else if state.addr() & DOWNGRADED != 0 { + // This thread used to have exclusive access, but requested a downgrade. This has + // not been completed yet, so we still have exclusive access. + // Retract the downgrade request and unlock, but leave waking up new threads to the + // thread that already holds the queue lock. + Some(state.mask(!(DOWNGRADED | LOCKED))) } else { None } @@ -476,40 +515,127 @@ impl RwLock { /// /// * The lock must be exclusively owned by this thread. /// * There must be threads queued on the lock. + /// * There cannot be a `downgrade` in progress. #[cold] - unsafe fn unlock_contended(&self, mut state: State) { + unsafe fn unlock_contended(&self, state: State) { + debug_assert!(state.addr() & STATE == (QUEUED | LOCKED)); + + let mut current = state; + + // We want to atomically release the lock and try to acquire the queue lock. loop { + // First check if the queue lock is already held. + if current.addr() & QUEUE_LOCKED != 0 { + // Another thread holds the queue lock, so let them wake up waiters for us. + let next = current.mask(!LOCKED); + match self.state.compare_exchange_weak(current, next, Release, Relaxed) { + Ok(_) => return, + Err(new) => { + current = new; + continue; + } + } + } + // Atomically release the lock and try to acquire the queue lock. - let next = state.map_addr(|a| (a & !LOCKED) | QUEUE_LOCKED); - match self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) { - // The queue lock was acquired. Release it, waking up the next - // waiter in the process. - Ok(_) if state.addr() & QUEUE_LOCKED == 0 => unsafe { - return self.unlock_queue(next); - }, - // Another thread already holds the queue lock, leave waking up - // waiters to it. - Ok(_) => return, - Err(new) => state = new, + let next = current.map_addr(|addr| (addr & !LOCKED) | QUEUE_LOCKED); + match self.state.compare_exchange_weak(current, next, AcqRel, Relaxed) { + Ok(_) => { + // Now that we have the queue lock, we can wake up the next waiter. + // SAFETY: This thread is exclusively owned by this thread. + unsafe { self.unlock_queue(next) }; + return; + } + Err(new) => current = new, + } + } + } + + /// # Safety + /// + /// * The lock must be write-locked by this thread. + #[inline] + pub unsafe fn downgrade(&self) { + // Optimistically change the state from write-locked with a single writer and no waiters to + // read-locked with a single reader and no waiters. + if let Err(state) = self.state.compare_exchange( + without_provenance_mut(LOCKED), + without_provenance_mut(SINGLE | LOCKED), + Release, + Relaxed, + ) { + // SAFETY: The only way the state can have changed is if there are threads queued. + // Wake all of them up. + unsafe { self.downgrade_slow(state) } + } + } + + /// Downgrades the lock from write-locked to read-locked in the case that there are threads + /// waiting on the wait queue. + /// + /// This function will either wake up all of the waiters on the wait queue or designate the + /// current holder of the queue lock to wake up all of the waiters instead. Once the waiters + /// wake up, they will continue in the execution loop of `lock_contended`. + /// + /// # Safety + /// + /// * The lock must be write-locked by this thread. + /// * There must be threads queued on the lock. + #[cold] + unsafe fn downgrade_slow(&self, mut state: State) { + debug_assert!(state.addr() & (DOWNGRADED | QUEUED | LOCKED) == (QUEUED | LOCKED)); + + // Attempt to wake up all waiters by taking ownership of the entire waiter queue. + loop { + if state.addr() & QUEUE_LOCKED != 0 { + // Another thread already holds the queue lock. Tell it to wake up all waiters. + // If the other thread succeeds in waking up waiters before we release our lock, the + // effect will be just the same as if we had changed the state below. + // Otherwise, the `DOWNGRADED` bit will still be set, meaning that when this thread + // calls `read_unlock` later (because it holds a read lock and must unlock + // eventually), it will realize that the lock is still exclusively locked and act + // accordingly. + let next = state.map_addr(|addr| addr | DOWNGRADED); + match self.state.compare_exchange_weak(state, next, Release, Relaxed) { + Ok(_) => return, + Err(new) => state = new, + } + } else { + // Grab the entire queue by swapping the `state` with a single reader. + let next = ptr::without_provenance_mut(SINGLE | LOCKED); + if let Err(new) = self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) { + state = new; + continue; + } + + // SAFETY: We have full ownership of this queue now, so nobody else can modify it. + let tail = unsafe { find_tail_and_add_backlinks(to_node(state)) }; + + // Wake up all waiters. + // SAFETY: `tail` was just computed, meaning the whole queue is linked. + unsafe { complete_all(tail) }; + + return; } } } - /// Unlocks the queue. If the lock is unlocked, wakes up the next eligible - /// thread(s). + /// Unlocks the queue. Wakes up all threads if a downgrade was requested, otherwise wakes up the + /// next eligible thread(s) if the lock is unlocked. /// /// # Safety /// - /// The queue lock must be held by the current thread. + /// * The queue lock must be held by the current thread. + /// * There must be threads queued on the lock. unsafe fn unlock_queue(&self, mut state: State) { debug_assert_eq!(state.addr() & (QUEUED | QUEUE_LOCKED), QUEUED | QUEUE_LOCKED); loop { let tail = unsafe { find_tail_and_add_backlinks(to_node(state)) }; - if state.addr() & LOCKED == LOCKED { - // Another thread has locked the lock. Leave waking up waiters - // to them by releasing the queue lock. + if state.addr() & (DOWNGRADED | LOCKED) == LOCKED { + // Another thread has locked the lock and no downgrade was requested. + // Leave waking up waiters to them by releasing the queue lock. match self.state.compare_exchange_weak( state, state.mask(!QUEUE_LOCKED), @@ -524,53 +650,63 @@ impl RwLock { } } + // Since we hold the queue lock and downgrades cannot be requested if the lock is + // already read-locked, we have exclusive control over the queue here and can make + // modifications. + + let downgrade = state.addr() & DOWNGRADED != 0; let is_writer = unsafe { tail.as_ref().write }; - if is_writer && let Some(prev) = unsafe { tail.as_ref().prev.get() } { - // `tail` is a writer and there is a node before `tail`. - // Split off `tail`. + if !downgrade + && is_writer + && let Some(prev) = unsafe { tail.as_ref().prev.get() } + { + // If we are not downgrading and the next thread is a writer, only wake up that + // writing thread. - // There are no set `tail` links before the node pointed to by - // `state`, so the first non-null tail field will be current - // (invariant 2). Invariant 4 is fullfilled since `find_tail` - // was called on this node, which ensures all backlinks are set. + // Split off `tail`. + // There are no set `tail` links before the node pointed to by `state`, so the first + // non-null tail field will be current (Invariant 2). + // We also fulfill Invariant 4 since `find_tail` was called on this node, which + // ensures all backlinks are set. unsafe { to_node(state).as_ref().tail.set(Some(prev)); } - // Release the queue lock. Doing this by subtraction is more - // efficient on modern processors since it is a single instruction - // instead of an update loop, which will fail if new threads are - // added to the list. - self.state.fetch_byte_sub(QUEUE_LOCKED, Release); + // Try to release the queue lock. We need to check the state again since another + // thread might have acquired the lock and requested a downgrade. + let next = state.mask(!QUEUE_LOCKED); + if let Err(new) = self.state.compare_exchange_weak(state, next, Release, Acquire) { + // Undo the tail modification above, so that we can find the tail again above. + // As mentioned above, we have exclusive control over the queue, so no other + // thread could have noticed the change. + unsafe { + to_node(state).as_ref().tail.set(Some(tail)); + } + state = new; + continue; + } - // The tail was split off and the lock released. Mark the node as - // completed. + // The tail was split off and the lock was released. Mark the node as completed. unsafe { return Node::complete(tail); } } else { - // The next waiter is a reader or the queue only consists of one - // waiter. Just wake all threads. - - // The lock cannot be locked (checked above), so mark it as - // unlocked to reset the queue. - if let Err(new) = - self.state.compare_exchange_weak(state, UNLOCKED, Release, Acquire) - { + // We are either downgrading, the next waiter is a reader, or the queue only + // consists of one waiter. In any case, just wake all threads. + + // Clear the queue. + let next = + if downgrade { ptr::without_provenance_mut(SINGLE | LOCKED) } else { UNLOCKED }; + if let Err(new) = self.state.compare_exchange_weak(state, next, Release, Acquire) { state = new; continue; } - let mut current = tail; - loop { - let prev = unsafe { current.as_ref().prev.get() }; - unsafe { - Node::complete(current); - } - match prev { - Some(prev) => current = prev, - None => return, - } + // SAFETY: we computed `tail` above, and no new nodes can have been added since + // (otherwise the CAS above would have failed). + // Thus we have complete control over the whole queue. + unsafe { + return complete_all(tail); } } }