Skip to content

Commit

Permalink
chore: replace deprecated compare_and_swap with compare_exchange (#3331)
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e authored Dec 27, 2020
1 parent c492926 commit 770044c
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 106 deletions.
6 changes: 5 additions & 1 deletion tokio/src/io/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,11 @@ impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> {

impl<T> Inner<T> {
fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<Guard<'_, T>> {
if !self.locked.compare_and_swap(false, true, Acquire) {
if self
.locked
.compare_exchange(false, true, Acquire, Acquire)
.is_ok()
{
Poll::Ready(Guard { inner: self })
} else {
// Spin... but investigate a better strategy
Expand Down
31 changes: 23 additions & 8 deletions tokio/src/loom/std/atomic_u64.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ mod imp {
}

impl AtomicU64 {
pub(crate) fn new(val: u64) -> AtomicU64 {
AtomicU64 {
pub(crate) fn new(val: u64) -> Self {
Self {
inner: Mutex::new(val),
}
}
Expand All @@ -45,16 +45,31 @@ mod imp {
prev
}

pub(crate) fn compare_and_swap(&self, old: u64, new: u64, _: Ordering) -> u64 {
pub(crate) fn compare_exchange(
&self,
current: u64,
new: u64,
_success: Ordering,
_failure: Ordering,
) -> Result<u64, u64> {
let mut lock = self.inner.lock().unwrap();
let prev = *lock;

if prev != old {
return prev;
if *lock == current {
*lock = new;
Ok(current)
} else {
Err(*lock)
}
}

*lock = new;
prev
pub(crate) fn compare_exchange_weak(
&self,
current: u64,
new: u64,
success: Ordering,
failure: Ordering,
) -> Result<u64, u64> {
self.compare_exchange(current, new, success, failure)
}
}
}
20 changes: 12 additions & 8 deletions tokio/src/runtime/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::runtime::task;
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::ptr::{self, NonNull};
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};

/// Producer handle. May only be used from a single thread.
pub(super) struct Local<T: 'static> {
Expand Down Expand Up @@ -194,13 +194,17 @@ impl<T> Local<T> {
// work. This is because all tasks are pushed into the queue from the
// current thread (or memory has been acquired if the local queue handle
// moved).
let actual = self.inner.head.compare_and_swap(
prev,
pack(head.wrapping_add(n), head.wrapping_add(n)),
Release,
);

if actual != prev {
if self
.inner
.head
.compare_exchange(
prev,
pack(head.wrapping_add(n), head.wrapping_add(n)),
Release,
Relaxed,
)
.is_err()
{
// We failed to claim the tasks, losing the race. Return out of
// this function and try the full `push` routine again. The queue
// may not be full anymore.
Expand Down
18 changes: 10 additions & 8 deletions tokio/src/sync/mpsc/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,15 @@ impl<T> Block<T> {
pub(crate) unsafe fn try_push(
&self,
block: &mut NonNull<Block<T>>,
ordering: Ordering,
success: Ordering,
failure: Ordering,
) -> Result<(), NonNull<Block<T>>> {
block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP);

let next_ptr = self
.next
.compare_and_swap(ptr::null_mut(), block.as_ptr(), ordering);
.compare_exchange(ptr::null_mut(), block.as_ptr(), success, failure)
.unwrap_or_else(|x| x);

match NonNull::new(next_ptr) {
Some(next_ptr) => Err(next_ptr),
Expand Down Expand Up @@ -306,11 +308,11 @@ impl<T> Block<T> {
//
// `Release` ensures that the newly allocated block is available to
// other threads acquiring the next pointer.
let next = NonNull::new(self.next.compare_and_swap(
ptr::null_mut(),
new_block.as_ptr(),
AcqRel,
));
let next = NonNull::new(
self.next
.compare_exchange(ptr::null_mut(), new_block.as_ptr(), AcqRel, Acquire)
.unwrap_or_else(|x| x),
);

let next = match next {
Some(next) => next,
Expand All @@ -333,7 +335,7 @@ impl<T> Block<T> {

// TODO: Should this iteration be capped?
loop {
let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel) };
let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel, Acquire) };

curr = match actual {
Ok(_) => {
Expand Down
12 changes: 6 additions & 6 deletions tokio/src/sync/mpsc/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@ impl<T> Tx<T> {
//
// Acquire is not needed as any "actual" value is not accessed.
// At this point, the linked list is walked to acquire blocks.
let actual =
self.block_tail
.compare_and_swap(block_ptr, next_block.as_ptr(), Release);

if actual == block_ptr {
if self
.block_tail
.compare_exchange(block_ptr, next_block.as_ptr(), Release, Relaxed)
.is_ok()
{
// Synchronize with any senders
let tail_position = self.tail_position.fetch_add(0, Release);

Expand Down Expand Up @@ -191,7 +191,7 @@ impl<T> Tx<T> {

// TODO: Unify this logic with Block::grow
for _ in 0..3 {
match curr.as_ref().try_push(&mut block, AcqRel) {
match curr.as_ref().try_push(&mut block, AcqRel, Acquire) {
Ok(_) => {
reused = true;
break;
Expand Down
6 changes: 5 additions & 1 deletion tokio/src/sync/task/atomic_waker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,11 @@ impl AtomicWaker {
where
W: WakerRef,
{
match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) {
match self
.state
.compare_exchange(WAITING, REGISTERING, Acquire, Acquire)
.unwrap_or_else(|x| x)
{
WAITING => {
unsafe {
// Locked acquired, update the waker cell
Expand Down
76 changes: 2 additions & 74 deletions tokio/src/time/driver/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
//! refuse to mark the timer as pending.

use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::AtomicU64;
use crate::loom::sync::atomic::Ordering;

use crate::sync::AtomicWaker;
Expand All @@ -71,79 +72,6 @@ const STATE_DEREGISTERED: u64 = u64::max_value();
const STATE_PENDING_FIRE: u64 = STATE_DEREGISTERED - 1;
const STATE_MIN_VALUE: u64 = STATE_PENDING_FIRE;

/// Not all platforms support 64-bit compare-and-swap. This hack replaces the
/// AtomicU64 with a mutex around a u64 on platforms that don't. This is slow,
/// unfortunately, but 32-bit platforms are a bit niche so it'll do for now.
///
/// Note: We use "x86 or 64-bit pointers" as the condition here because
/// target_has_atomic is not stable.
#[cfg(all(
not(tokio_force_time_entry_locked),
any(target_arch = "x86", target_pointer_width = "64")
))]
type AtomicU64 = crate::loom::sync::atomic::AtomicU64;

#[cfg(not(all(
not(tokio_force_time_entry_locked),
any(target_arch = "x86", target_pointer_width = "64")
)))]
#[derive(Debug)]
struct AtomicU64 {
inner: crate::loom::sync::Mutex<u64>,
}

#[cfg(not(all(
not(tokio_force_time_entry_locked),
any(target_arch = "x86", target_pointer_width = "64")
)))]
impl AtomicU64 {
fn new(v: u64) -> Self {
Self {
inner: crate::loom::sync::Mutex::new(v),
}
}

fn load(&self, _order: Ordering) -> u64 {
debug_assert_ne!(_order, Ordering::SeqCst); // we only provide AcqRel with the lock
*self.inner.lock()
}

fn store(&self, v: u64, _order: Ordering) {
debug_assert_ne!(_order, Ordering::SeqCst); // we only provide AcqRel with the lock
*self.inner.lock() = v;
}

fn compare_exchange(
&self,
current: u64,
new: u64,
_success: Ordering,
_failure: Ordering,
) -> Result<u64, u64> {
debug_assert_ne!(_success, Ordering::SeqCst); // we only provide AcqRel with the lock
debug_assert_ne!(_failure, Ordering::SeqCst);

let mut lock = self.inner.lock();

if *lock == current {
*lock = new;
Ok(current)
} else {
Err(*lock)
}
}

fn compare_exchange_weak(
&self,
current: u64,
new: u64,
success: Ordering,
failure: Ordering,
) -> Result<u64, u64> {
self.compare_exchange(current, new, success, failure)
}
}

/// This structure holds the current shared state of the timer - its scheduled
/// time (if registered), or otherwise the result of the timer completing, as
/// well as the registered waker.
Expand Down Expand Up @@ -300,7 +228,7 @@ impl StateCell {
/// expiration time.
///
/// While this function is memory-safe, it should only be called from a
/// context holding both `&mut TimerEntry` and the driver lock.
/// context holding both `&mut TimerEntry` and the driver lock.
fn set_expiration(&self, timestamp: u64) {
debug_assert!(timestamp < STATE_MIN_VALUE);

Expand Down

0 comments on commit 770044c

Please sign in to comment.