Skip to content

Commit

Permalink
Rename Atomic -> ConcurrentPush, fix tail locking in recv_ref
Browse files Browse the repository at this point in the history
  • Loading branch information
vnetserg committed Jan 18, 2024
1 parent 90b1c0e commit 4b45a20
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 63 deletions.
82 changes: 38 additions & 44 deletions tokio/src/sync/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
use crate::util::linked_list::{self, AtomicLinkedList, GuardedLinkedList};
use crate::util::linked_list::{self, ConcurrentPushLinkedList, GuardedLinkedList};
use crate::util::WakeList;

use std::fmt;
Expand Down Expand Up @@ -329,7 +329,7 @@ struct Tail {
closed: bool,

/// Receivers waiting for a value.
waiters: AtomicLinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
waiters: ConcurrentPushLinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
}

/// Slot in the buffer.
Expand Down Expand Up @@ -526,7 +526,7 @@ impl<T> Sender<T> {
pos: 0,
rx_cnt: receiver_count,
closed: false,
waiters: AtomicLinkedList::new(),
waiters: ConcurrentPushLinkedList::new(),
}),
num_tx: AtomicUsize::new(1),
});
Expand Down Expand Up @@ -851,7 +851,7 @@ impl<'a, T> Drop for WaitersList<'a, T> {

impl<'a, T> WaitersList<'a, T> {
fn new(
unguarded_list: AtomicLinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
unguarded_list: ConcurrentPushLinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
guard: Pin<&'a Waiter>,
shared: &'a Shared<T>,
) -> Self {
Expand Down Expand Up @@ -1075,24 +1075,46 @@ impl<T> Receiver<T> {

if slot.pos != self.next {
// Release the `slot` lock before attempting to acquire the `tail`
// lock. This is required because `send2` acquires the tail lock
// lock. This is required because `send` acquires the tail lock
// first followed by the slot lock. Acquiring the locks in reverse
// order here would result in a potential deadlock: `recv_ref`
// acquires the `slot` lock and attempts to acquire the `tail` lock
// while `send2` acquired the `tail` lock and attempts to acquire
// while `send` acquired the `tail` lock and attempts to acquire
// the slot lock.
drop(slot);

let mut old_waker = None;

let tail = self.shared.tail.read().unwrap();
let queued = waiter
.map(|(waiter, _)| {
waiter.with(|ptr| {
// Safety: waiter.queued is atomic.
// Acquire is needed to synchronize with `Shared::notify_rx`.
unsafe { (*ptr).queued.load(Acquire) }
})
})
.unwrap_or(false);

// If `queued` is false, then we are the sole owner if the waiter,
// so read lock on tail suffices.
// If `queued` is true, the waiter might be accessed concurrently,
// so we need a write lock.
let mut tail_read = None;
let mut tail_write = None;
let tail = if queued {
tail_write = Some(self.shared.tail.write().unwrap());
tail_write.as_deref().unwrap()
} else {
tail_read = Some(self.shared.tail.read().unwrap());
tail_read.as_deref().unwrap()
};

// Acquire slot lock again
slot = self.shared.buffer[idx].read().unwrap();

// Make sure the position did not change. This could happen in the
// unlikely event that the buffer is wrapped between dropping the
// read lock and acquiring the tail lock.
// slot lock and acquiring the tail lock.
if slot.pos != self.next {
let next_pos = slot.pos.wrapping_add(self.shared.buffer.len() as u64);

Expand All @@ -1104,36 +1126,8 @@ impl<T> Receiver<T> {
return Err(TryRecvError::Closed);
}

// We will might want to upgrade to a write lock.
let mut tail_read = Some(tail);
let mut tail_write = None;

// Store the waker
if let Some((waiter, waker)) = waiter {
let queued = waiter.with(|ptr| {
// Safety: waiter.queued is atomic.
// Acquire is needed to synchronize with `Shared::notify_rx`.
unsafe { (*ptr).queued.load(Acquire) }
});

// Release the slot lock before reacquiring tail locks
// to avoid a deadlock.
drop(slot);

// If waiter is already queued, then a write lock on tail is required
// since other threads may try to mutate the waiter concurrently.
// If the waiter is not queued, we are the only owner now and
// read lock suffices.
let tail_ref: &Tail = if queued {
// TODO: this is sketchy, need do make sure that
// it is safe to drop all the locks here...
tail_read = None;
tail_write = Some(self.shared.tail.write().unwrap());
tail_write.as_deref().unwrap()
} else {
tail_read.as_deref().unwrap()
};

// Safety: called while holding a lock on tail.
// If waiter is not queued, then we hold a read lock
// on tail and can safely mutate `waiter` since we
Expand All @@ -1156,25 +1150,24 @@ impl<T> Receiver<T> {
}
}

// If the waiter is already queued, don't do anything.
// If not, enqueue it.
// Relaxed memory order suffices because, if `waiter`
// is shared, then we hold a write lock on tail.
// If the waiter is not already queued, enqueue it.
// Relaxed memory order suffices because, if `queued`
// if `false`, then we are the sole owner of the waiter,
// and all future accesses will happen with tail lock held.
if !(*ptr).queued.swap(true, Relaxed) {
// Safety:
// - `waiter` is not already queued,
// - calling `recv_ref` with a waiter implies ownership
// of it's `Recv`. As such, this waiter cannot be pushed
// concurrently by some other thread.
tail_ref
.waiters
.push_front(NonNull::new_unchecked(&mut *ptr));
tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr));
}
});
}
}

// Drop the old waker after releasing the locks.
drop(slot);
drop(tail_read);
drop(tail_write);
drop(old_waker);
Expand All @@ -1191,7 +1184,8 @@ impl<T> Receiver<T> {

let missed = next.wrapping_sub(self.next);

drop(tail);
drop(tail_read);
drop(tail_write);

// The receiver is slow but no values have been missed
if missed == 0 {
Expand Down
6 changes: 3 additions & 3 deletions tokio/src/util/linked_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
//! specified node is actually contained by the list.

#[cfg(feature = "sync")]
mod atomic;
mod concurrent_push;
#[cfg(feature = "sync")]
pub(crate) use self::atomic::AtomicLinkedList;
pub(crate) use self::concurrent_push::ConcurrentPushLinkedList;

use core::cell::UnsafeCell;
use core::fmt;
Expand Down Expand Up @@ -115,7 +115,7 @@ unsafe impl<T: Sync> Sync for Pointers<T> {}

// ===== LinkedListBase =====

// Common methods between LinkedList and AtomicLinkedList.
// Common methods between LinkedList and ConcurrentPushLinkedList.
trait LinkedListBase<L: Link> {
// NB: exclusive reference is important for AtomicLinkedList safety guarantees.
fn head(&mut self) -> Option<NonNull<L::Target>>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@ use core::sync::atomic::{
Ordering::{AcqRel, Relaxed},
};

/// An atomic intrusive linked list. It allows pushing new nodes concurrently.
/// Removing nodes still requires an exclusive reference.
pub(crate) struct AtomicLinkedList<L, T> {
/// A linked list that supports adding new nodes concurrently.
/// Note that all other operations, e.g. node removals,
/// require external synchronization.
/// The simplest way to achieve it is to use RwLock:
/// pushing nodes only requires a read lock,
/// while removing nodes requires a write lock.
pub(crate) struct ConcurrentPushLinkedList<L, T> {
/// Linked list head.
head: AtomicPtr<T>,

Expand All @@ -22,26 +26,26 @@ pub(crate) struct AtomicLinkedList<L, T> {
_marker: PhantomData<*const L>,
}

unsafe impl<L: Link> Send for AtomicLinkedList<L, L::Target> where L::Target: Send {}
unsafe impl<L: Link> Sync for AtomicLinkedList<L, L::Target> where L::Target: Sync {}
unsafe impl<L: Link> Send for ConcurrentPushLinkedList<L, L::Target> where L::Target: Send {}
unsafe impl<L: Link> Sync for ConcurrentPushLinkedList<L, L::Target> where L::Target: Sync {}

impl<L: Link> Default for AtomicLinkedList<L, L::Target> {
impl<L: Link> Default for ConcurrentPushLinkedList<L, L::Target> {
fn default() -> Self {
Self::new()
}
}

impl<L, T> AtomicLinkedList<L, T> {
/// Creates an empty atomic linked list.
pub(crate) const fn new() -> AtomicLinkedList<L, T> {
AtomicLinkedList {
impl<L, T> ConcurrentPushLinkedList<L, T> {
/// Creates an empty concurrent push linked list.
pub(crate) const fn new() -> ConcurrentPushLinkedList<L, T> {
ConcurrentPushLinkedList {
head: AtomicPtr::new(core::ptr::null_mut()),
tail: UnsafeCell::new(None),
_marker: PhantomData,
}
}

/// Convert an atomic linked list into a non-atomic version.
/// Convert a concurrent push LL into a regular LL.
pub(crate) fn into_list(mut self) -> LinkedList<L, T> {
LinkedList {
head: NonNull::new(*self.head.get_mut()),
Expand All @@ -51,7 +55,7 @@ impl<L, T> AtomicLinkedList<L, T> {
}
}

impl<L: Link> LinkedListBase<L> for AtomicLinkedList<L, L::Target> {
impl<L: Link> LinkedListBase<L> for ConcurrentPushLinkedList<L, L::Target> {
fn head(&mut self) -> Option<NonNull<L::Target>> {
NonNull::new(*self.head.get_mut())
}
Expand All @@ -72,9 +76,9 @@ impl<L: Link> LinkedListBase<L> for AtomicLinkedList<L, L::Target> {
}
}

impl<L: Link> AtomicLinkedList<L, L::Target> {
impl<L: Link> ConcurrentPushLinkedList<L, L::Target> {
/// Atomically adds an element first in the list.
/// This method can be called concurrently from multiple threads.
/// This method can be called concurrently by multiple threads.
///
/// # Safety
///
Expand Down Expand Up @@ -135,6 +139,12 @@ impl<L: Link> AtomicLinkedList<L, L::Target> {
}

/// See [LinkedList::remove].
///
/// Note that `&mut self` implies that this call is somehow
/// synchronized with `push_front` (e.g. with RwLock).
/// In terms of memory model, there has to be an established
/// happens-before relationship between any given `push_front`
/// and any given `remove`. The relation can go either way.
pub(crate) unsafe fn remove(&mut self, node: NonNull<L::Target>) -> Option<L::Handle> {
LinkedListBase::remove(self, node)
}
Expand All @@ -150,8 +160,9 @@ pub(crate) mod tests {
use std::sync::Arc;

#[test]
fn atomic_push_front() {
let atomic_list = Arc::new(AtomicLinkedList::<&Entry, <&Entry as Link>::Target>::new());
fn concurrent_push_front() {
let atomic_list =
Arc::new(ConcurrentPushLinkedList::<&Entry, <&Entry as Link>::Target>::new());

let _entries = [5, 7]
.into_iter()
Expand Down

0 comments on commit 4b45a20

Please sign in to comment.