Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: replace deprecated compare_and_swap with compare_exchange #3331

Merged
merged 7 commits into from
Dec 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion tokio/src/io/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,11 @@ impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> {

impl<T> Inner<T> {
fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<Guard<'_, T>> {
if !self.locked.compare_and_swap(false, true, Acquire) {
if self
.locked
.compare_exchange(false, true, Acquire, Acquire)
.is_ok()
{
Poll::Ready(Guard { inner: self })
} else {
// Spin... but investigate a better strategy
Expand Down
31 changes: 23 additions & 8 deletions tokio/src/loom/std/atomic_u64.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ mod imp {
}

impl AtomicU64 {
pub(crate) fn new(val: u64) -> AtomicU64 {
AtomicU64 {
pub(crate) fn new(val: u64) -> Self {
Self {
inner: Mutex::new(val),
}
}
Expand All @@ -45,16 +45,31 @@ mod imp {
prev
}

pub(crate) fn compare_and_swap(&self, old: u64, new: u64, _: Ordering) -> u64 {
pub(crate) fn compare_exchange(
&self,
current: u64,
new: u64,
_success: Ordering,
_failure: Ordering,
) -> Result<u64, u64> {
let mut lock = self.inner.lock().unwrap();
let prev = *lock;

if prev != old {
return prev;
if *lock == current {
*lock = new;
Ok(current)
} else {
Err(*lock)
}
}

*lock = new;
prev
pub(crate) fn compare_exchange_weak(
&self,
current: u64,
new: u64,
success: Ordering,
failure: Ordering,
) -> Result<u64, u64> {
self.compare_exchange(current, new, success, failure)
}
}
}
20 changes: 12 additions & 8 deletions tokio/src/runtime/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::runtime::task;
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::ptr::{self, NonNull};
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};

/// Producer handle. May only be used from a single thread.
pub(super) struct Local<T: 'static> {
Expand Down Expand Up @@ -194,13 +194,17 @@ impl<T> Local<T> {
// work. This is because all tasks are pushed into the queue from the
// current thread (or memory has been acquired if the local queue handle
// moved).
let actual = self.inner.head.compare_and_swap(
prev,
pack(head.wrapping_add(n), head.wrapping_add(n)),
Release,
);

if actual != prev {
if self
.inner
.head
.compare_exchange(
prev,
pack(head.wrapping_add(n), head.wrapping_add(n)),
Release,
Relaxed,
)
.is_err()
{
// We failed to claim the tasks, losing the race. Return out of
// this function and try the full `push` routine again. The queue
// may not be full anymore.
Expand Down
18 changes: 10 additions & 8 deletions tokio/src/sync/mpsc/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,15 @@ impl<T> Block<T> {
pub(crate) unsafe fn try_push(
&self,
block: &mut NonNull<Block<T>>,
ordering: Ordering,
success: Ordering,
failure: Ordering,
) -> Result<(), NonNull<Block<T>>> {
block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP);

let next_ptr = self
.next
.compare_and_swap(ptr::null_mut(), block.as_ptr(), ordering);
.compare_exchange(ptr::null_mut(), block.as_ptr(), success, failure)
.unwrap_or_else(|x| x);

match NonNull::new(next_ptr) {
Some(next_ptr) => Err(next_ptr),
Expand Down Expand Up @@ -306,11 +308,11 @@ impl<T> Block<T> {
//
// `Release` ensures that the newly allocated block is available to
// other threads acquiring the next pointer.
let next = NonNull::new(self.next.compare_and_swap(
ptr::null_mut(),
new_block.as_ptr(),
AcqRel,
));
let next = NonNull::new(
self.next
.compare_exchange(ptr::null_mut(), new_block.as_ptr(), AcqRel, Acquire)
.unwrap_or_else(|x| x),
);

let next = match next {
Some(next) => next,
Expand All @@ -333,7 +335,7 @@ impl<T> Block<T> {

// TODO: Should this iteration be capped?
loop {
let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel) };
let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel, Acquire) };

curr = match actual {
Ok(_) => {
Expand Down
12 changes: 6 additions & 6 deletions tokio/src/sync/mpsc/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@ impl<T> Tx<T> {
//
// Acquire is not needed as any "actual" value is not accessed.
// At this point, the linked list is walked to acquire blocks.
let actual =
self.block_tail
.compare_and_swap(block_ptr, next_block.as_ptr(), Release);

if actual == block_ptr {
if self
.block_tail
.compare_exchange(block_ptr, next_block.as_ptr(), Release, Relaxed)
.is_ok()
{
// Synchronize with any senders
let tail_position = self.tail_position.fetch_add(0, Release);

Expand Down Expand Up @@ -191,7 +191,7 @@ impl<T> Tx<T> {

// TODO: Unify this logic with Block::grow
for _ in 0..3 {
match curr.as_ref().try_push(&mut block, AcqRel) {
match curr.as_ref().try_push(&mut block, AcqRel, Acquire) {
Ok(_) => {
reused = true;
break;
Expand Down
6 changes: 5 additions & 1 deletion tokio/src/sync/task/atomic_waker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,11 @@ impl AtomicWaker {
where
W: WakerRef,
{
match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) {
match self
.state
.compare_exchange(WAITING, REGISTERING, Acquire, Acquire)
.unwrap_or_else(|x| x)
{
WAITING => {
unsafe {
// Locked acquired, update the waker cell
Expand Down
76 changes: 2 additions & 74 deletions tokio/src/time/driver/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
//! refuse to mark the timer as pending.

use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::AtomicU64;
use crate::loom::sync::atomic::Ordering;

use crate::sync::AtomicWaker;
Expand All @@ -71,79 +72,6 @@ const STATE_DEREGISTERED: u64 = u64::max_value();
const STATE_PENDING_FIRE: u64 = STATE_DEREGISTERED - 1;
const STATE_MIN_VALUE: u64 = STATE_PENDING_FIRE;

/// Not all platforms support 64-bit compare-and-swap. This hack replaces the
/// AtomicU64 with a mutex around a u64 on platforms that don't. This is slow,
/// unfortunately, but 32-bit platforms are a bit niche so it'll do for now.
///
/// Note: We use "x86 or 64-bit pointers" as the condition here because
/// target_has_atomic is not stable.
#[cfg(all(
not(tokio_force_time_entry_locked),
any(target_arch = "x86", target_pointer_width = "64")
))]
type AtomicU64 = crate::loom::sync::atomic::AtomicU64;

#[cfg(not(all(
not(tokio_force_time_entry_locked),
any(target_arch = "x86", target_pointer_width = "64")
)))]
#[derive(Debug)]
struct AtomicU64 {
inner: crate::loom::sync::Mutex<u64>,
}

#[cfg(not(all(
not(tokio_force_time_entry_locked),
any(target_arch = "x86", target_pointer_width = "64")
)))]
impl AtomicU64 {
fn new(v: u64) -> Self {
Self {
inner: crate::loom::sync::Mutex::new(v),
}
}

fn load(&self, _order: Ordering) -> u64 {
debug_assert_ne!(_order, Ordering::SeqCst); // we only provide AcqRel with the lock
*self.inner.lock()
}

fn store(&self, v: u64, _order: Ordering) {
debug_assert_ne!(_order, Ordering::SeqCst); // we only provide AcqRel with the lock
*self.inner.lock() = v;
}

fn compare_exchange(
&self,
current: u64,
new: u64,
_success: Ordering,
_failure: Ordering,
) -> Result<u64, u64> {
debug_assert_ne!(_success, Ordering::SeqCst); // we only provide AcqRel with the lock
debug_assert_ne!(_failure, Ordering::SeqCst);

let mut lock = self.inner.lock();

if *lock == current {
*lock = new;
Ok(current)
} else {
Err(*lock)
}
}

fn compare_exchange_weak(
&self,
current: u64,
new: u64,
success: Ordering,
failure: Ordering,
) -> Result<u64, u64> {
self.compare_exchange(current, new, success, failure)
}
}

/// This structure holds the current shared state of the timer - its scheduled
/// time (if registered), or otherwise the result of the timer completing, as
/// well as the registered waker.
Expand Down Expand Up @@ -300,7 +228,7 @@ impl StateCell {
/// expiration time.
///
/// While this function is memory-safe, it should only be called from a
/// context holding both `&mut TimerEntry` and the driver lock.
/// context holding both `&mut TimerEntry` and the driver lock.
fn set_expiration(&self, timestamp: u64) {
debug_assert!(timestamp < STATE_MIN_VALUE);

Expand Down