From 4b45a200fbf454d9124bf6480c0e2896e7f52564 Mon Sep 17 00:00:00 2001 From: Sergei Fomin Date: Thu, 18 Jan 2024 08:19:05 +0300 Subject: [PATCH] Rename Atomic -> ConcurrentPush, fix tail locking in recv_ref --- tokio/src/sync/broadcast.rs | 82 +++++++++---------- tokio/src/util/linked_list.rs | 6 +- .../{atomic.rs => concurrent_push.rs} | 43 ++++++---- 3 files changed, 68 insertions(+), 63 deletions(-) rename tokio/src/util/linked_list/{atomic.rs => concurrent_push.rs} (77%) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 3852e7ebab8..dab58ad49a5 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -119,7 +119,7 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; -use crate::util::linked_list::{self, AtomicLinkedList, GuardedLinkedList}; +use crate::util::linked_list::{self, ConcurrentPushLinkedList, GuardedLinkedList}; use crate::util::WakeList; use std::fmt; @@ -329,7 +329,7 @@ struct Tail { closed: bool, /// Receivers waiting for a value. - waiters: AtomicLinkedList::Target>, + waiters: ConcurrentPushLinkedList::Target>, } /// Slot in the buffer. @@ -526,7 +526,7 @@ impl Sender { pos: 0, rx_cnt: receiver_count, closed: false, - waiters: AtomicLinkedList::new(), + waiters: ConcurrentPushLinkedList::new(), }), num_tx: AtomicUsize::new(1), }); @@ -851,7 +851,7 @@ impl<'a, T> Drop for WaitersList<'a, T> { impl<'a, T> WaitersList<'a, T> { fn new( - unguarded_list: AtomicLinkedList::Target>, + unguarded_list: ConcurrentPushLinkedList::Target>, guard: Pin<&'a Waiter>, shared: &'a Shared, ) -> Self { @@ -1075,24 +1075,46 @@ impl Receiver { if slot.pos != self.next { // Release the `slot` lock before attempting to acquire the `tail` - // lock. This is required because `send2` acquires the tail lock + // lock. This is required because `send` acquires the tail lock // first followed by the slot lock. Acquiring the locks in reverse // order here would result in a potential deadlock: `recv_ref` // acquires the `slot` lock and attempts to acquire the `tail` lock - // while `send2` acquired the `tail` lock and attempts to acquire + // while `send` acquired the `tail` lock and attempts to acquire // the slot lock. drop(slot); let mut old_waker = None; - let tail = self.shared.tail.read().unwrap(); + let queued = waiter + .map(|(waiter, _)| { + waiter.with(|ptr| { + // Safety: waiter.queued is atomic. + // Acquire is needed to synchronize with `Shared::notify_rx`. + unsafe { (*ptr).queued.load(Acquire) } + }) + }) + .unwrap_or(false); + + // If `queued` is false, then we are the sole owner if the waiter, + // so read lock on tail suffices. + // If `queued` is true, the waiter might be accessed concurrently, + // so we need a write lock. + let mut tail_read = None; + let mut tail_write = None; + let tail = if queued { + tail_write = Some(self.shared.tail.write().unwrap()); + tail_write.as_deref().unwrap() + } else { + tail_read = Some(self.shared.tail.read().unwrap()); + tail_read.as_deref().unwrap() + }; // Acquire slot lock again slot = self.shared.buffer[idx].read().unwrap(); // Make sure the position did not change. This could happen in the // unlikely event that the buffer is wrapped between dropping the - // read lock and acquiring the tail lock. + // slot lock and acquiring the tail lock. if slot.pos != self.next { let next_pos = slot.pos.wrapping_add(self.shared.buffer.len() as u64); @@ -1104,36 +1126,8 @@ impl Receiver { return Err(TryRecvError::Closed); } - // We will might want to upgrade to a write lock. - let mut tail_read = Some(tail); - let mut tail_write = None; - // Store the waker if let Some((waiter, waker)) = waiter { - let queued = waiter.with(|ptr| { - // Safety: waiter.queued is atomic. - // Acquire is needed to synchronize with `Shared::notify_rx`. - unsafe { (*ptr).queued.load(Acquire) } - }); - - // Release the slot lock before reacquiring tail locks - // to avoid a deadlock. - drop(slot); - - // If waiter is already queued, then a write lock on tail is required - // since other threads may try to mutate the waiter concurrently. - // If the waiter is not queued, we are the only owner now and - // read lock suffices. - let tail_ref: &Tail = if queued { - // TODO: this is sketchy, need do make sure that - // it is safe to drop all the locks here... - tail_read = None; - tail_write = Some(self.shared.tail.write().unwrap()); - tail_write.as_deref().unwrap() - } else { - tail_read.as_deref().unwrap() - }; - // Safety: called while holding a lock on tail. // If waiter is not queued, then we hold a read lock // on tail and can safely mutate `waiter` since we @@ -1156,25 +1150,24 @@ impl Receiver { } } - // If the waiter is already queued, don't do anything. - // If not, enqueue it. - // Relaxed memory order suffices because, if `waiter` - // is shared, then we hold a write lock on tail. + // If the waiter is not already queued, enqueue it. + // Relaxed memory order suffices because, if `queued` + // if `false`, then we are the sole owner of the waiter, + // and all future accesses will happen with tail lock held. if !(*ptr).queued.swap(true, Relaxed) { // Safety: // - `waiter` is not already queued, // - calling `recv_ref` with a waiter implies ownership // of it's `Recv`. As such, this waiter cannot be pushed // concurrently by some other thread. - tail_ref - .waiters - .push_front(NonNull::new_unchecked(&mut *ptr)); + tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr)); } }); } } // Drop the old waker after releasing the locks. + drop(slot); drop(tail_read); drop(tail_write); drop(old_waker); @@ -1191,7 +1184,8 @@ impl Receiver { let missed = next.wrapping_sub(self.next); - drop(tail); + drop(tail_read); + drop(tail_write); // The receiver is slow but no values have been missed if missed == 0 { diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index a742279339c..400c42573ca 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -7,9 +7,9 @@ //! specified node is actually contained by the list. #[cfg(feature = "sync")] -mod atomic; +mod concurrent_push; #[cfg(feature = "sync")] -pub(crate) use self::atomic::AtomicLinkedList; +pub(crate) use self::concurrent_push::ConcurrentPushLinkedList; use core::cell::UnsafeCell; use core::fmt; @@ -115,7 +115,7 @@ unsafe impl Sync for Pointers {} // ===== LinkedListBase ===== -// Common methods between LinkedList and AtomicLinkedList. +// Common methods between LinkedList and ConcurrentPushLinkedList. trait LinkedListBase { // NB: exclusive reference is important for AtomicLinkedList safety guarantees. fn head(&mut self) -> Option>; diff --git a/tokio/src/util/linked_list/atomic.rs b/tokio/src/util/linked_list/concurrent_push.rs similarity index 77% rename from tokio/src/util/linked_list/atomic.rs rename to tokio/src/util/linked_list/concurrent_push.rs index 3636ee7e104..74fe56bae5c 100644 --- a/tokio/src/util/linked_list/atomic.rs +++ b/tokio/src/util/linked_list/concurrent_push.rs @@ -9,9 +9,13 @@ use core::sync::atomic::{ Ordering::{AcqRel, Relaxed}, }; -/// An atomic intrusive linked list. It allows pushing new nodes concurrently. -/// Removing nodes still requires an exclusive reference. -pub(crate) struct AtomicLinkedList { +/// A linked list that supports adding new nodes concurrently. +/// Note that all other operations, e.g. node removals, +/// require external synchronization. +/// The simplest way to achieve it is to use RwLock: +/// pushing nodes only requires a read lock, +/// while removing nodes requires a write lock. +pub(crate) struct ConcurrentPushLinkedList { /// Linked list head. head: AtomicPtr, @@ -22,26 +26,26 @@ pub(crate) struct AtomicLinkedList { _marker: PhantomData<*const L>, } -unsafe impl Send for AtomicLinkedList where L::Target: Send {} -unsafe impl Sync for AtomicLinkedList where L::Target: Sync {} +unsafe impl Send for ConcurrentPushLinkedList where L::Target: Send {} +unsafe impl Sync for ConcurrentPushLinkedList where L::Target: Sync {} -impl Default for AtomicLinkedList { +impl Default for ConcurrentPushLinkedList { fn default() -> Self { Self::new() } } -impl AtomicLinkedList { - /// Creates an empty atomic linked list. - pub(crate) const fn new() -> AtomicLinkedList { - AtomicLinkedList { +impl ConcurrentPushLinkedList { + /// Creates an empty concurrent push linked list. + pub(crate) const fn new() -> ConcurrentPushLinkedList { + ConcurrentPushLinkedList { head: AtomicPtr::new(core::ptr::null_mut()), tail: UnsafeCell::new(None), _marker: PhantomData, } } - /// Convert an atomic linked list into a non-atomic version. + /// Convert a concurrent push LL into a regular LL. pub(crate) fn into_list(mut self) -> LinkedList { LinkedList { head: NonNull::new(*self.head.get_mut()), @@ -51,7 +55,7 @@ impl AtomicLinkedList { } } -impl LinkedListBase for AtomicLinkedList { +impl LinkedListBase for ConcurrentPushLinkedList { fn head(&mut self) -> Option> { NonNull::new(*self.head.get_mut()) } @@ -72,9 +76,9 @@ impl LinkedListBase for AtomicLinkedList { } } -impl AtomicLinkedList { +impl ConcurrentPushLinkedList { /// Atomically adds an element first in the list. - /// This method can be called concurrently from multiple threads. + /// This method can be called concurrently by multiple threads. /// /// # Safety /// @@ -135,6 +139,12 @@ impl AtomicLinkedList { } /// See [LinkedList::remove]. + /// + /// Note that `&mut self` implies that this call is somehow + /// synchronized with `push_front` (e.g. with RwLock). + /// In terms of memory model, there has to be an established + /// happens-before relationship between any given `push_front` + /// and any given `remove`. The relation can go either way. pub(crate) unsafe fn remove(&mut self, node: NonNull) -> Option { LinkedListBase::remove(self, node) } @@ -150,8 +160,9 @@ pub(crate) mod tests { use std::sync::Arc; #[test] - fn atomic_push_front() { - let atomic_list = Arc::new(AtomicLinkedList::<&Entry, <&Entry as Link>::Target>::new()); + fn concurrent_push_front() { + let atomic_list = + Arc::new(ConcurrentPushLinkedList::<&Entry, <&Entry as Link>::Target>::new()); let _entries = [5, 7] .into_iter()