Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace deprecated compare_and_swap with compare_exchange #3353

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion tokio/src/io/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,11 @@ impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> {

impl<T> Inner<T> {
fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<Guard<'_, T>> {
if !self.locked.compare_and_swap(false, true, Acquire) {
if self
.locked
.compare_exchange(false, true, Acquire, Acquire)
.is_ok()
{
Poll::Ready(Guard { inner: self })
} else {
// Spin... but investigate a better strategy
Expand Down
5 changes: 3 additions & 2 deletions tokio/src/runtime/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,14 @@ impl<T> Local<T> {
// work. This is because all tasks are pushed into the queue from the
// current thread (or memory has been acquired if the local queue handle
// moved).
let actual = self.inner.head.compare_and_swap(
let actual = self.inner.head.compare_exchange(
prev,
pack(head.wrapping_add(n), head.wrapping_add(n)),
Release,
Release,
);

if actual != prev {
if actual.is_err() {
// We failed to claim the tasks, losing the race. Return out of
// this function and try the full `push` routine again. The queue
// may not be full anymore.
Expand Down
37 changes: 17 additions & 20 deletions tokio/src/sync/mpsc/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,17 +258,16 @@ impl<T> Block<T> {
pub(crate) unsafe fn try_push(
&self,
block: &mut NonNull<Block<T>>,
ordering: Ordering,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

searching the codes, the usage of try_push always pass AcqRel i think we can simple remove the ordering parameter.

) -> Result<(), NonNull<Block<T>>> {
block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP);

let next_ptr = self
.next
.compare_and_swap(ptr::null_mut(), block.as_ptr(), ordering);
.compare_exchange(ptr::null_mut(), block.as_ptr(), AcqRel, Acquire);

match NonNull::new(next_ptr) {
Some(next_ptr) => Err(next_ptr),
None => Ok(()),
match next_ptr {
Ok(_) => Ok(()),
Err(v) => Err(NonNull::new_unchecked(v)),
}
}

Expand Down Expand Up @@ -306,20 +305,18 @@ impl<T> Block<T> {
//
// `Release` ensures that the newly allocated block is available to
// other threads acquiring the next pointer.
let next = NonNull::new(self.next.compare_and_swap(
ptr::null_mut(),
new_block.as_ptr(),
AcqRel,
));

let next = match next {
Some(next) => next,
None => {
// The compare-and-swap succeeded and the newly allocated block
// is successfully pushed.
return new_block;
}
};
let next =
match self
.next
.compare_exchange(ptr::null_mut(), new_block.as_ptr(), AcqRel, Acquire)
{
Ok(_) => {
// The compare-and-swap succeeded and the newly allocated block
// is successfully pushed.
return new_block;
}
Err(v) => unsafe { NonNull::new_unchecked(v) },
};

// There already is a next block in the linked list. The newly allocated
// block could be dropped and the discovered next block returned;
Expand All @@ -333,7 +330,7 @@ impl<T> Block<T> {

// TODO: Should this iteration be capped?
loop {
let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel) };
let actual = unsafe { curr.as_ref().try_push(&mut new_block) };

curr = match actual {
Ok(_) => {
Expand Down
17 changes: 10 additions & 7 deletions tokio/src/sync/mpsc/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::sync::mpsc::block::{self, Block};

use std::fmt;
use std::ptr::NonNull;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};

/// List queue transmit handle
pub(crate) struct Tx<T> {
Expand Down Expand Up @@ -140,11 +140,14 @@ impl<T> Tx<T> {
//
// Acquire is not needed as any "actual" value is not accessed.
// At this point, the linked list is walked to acquire blocks.
let actual =
self.block_tail
.compare_and_swap(block_ptr, next_block.as_ptr(), Release);

if actual == block_ptr {
let actual = self.block_tail.compare_exchange(
block_ptr,
next_block.as_ptr(),
Release,
Release,
);

if actual.is_ok() {
// Synchronize with any senders
let tail_position = self.tail_position.fetch_add(0, Release);

Expand Down Expand Up @@ -191,7 +194,7 @@ impl<T> Tx<T> {

// TODO: Unify this logic with Block::grow
for _ in 0..3 {
match curr.as_ref().try_push(&mut block, AcqRel) {
match curr.as_ref().try_push(&mut block) {
Ok(_) => {
reused = true;
break;
Expand Down
11 changes: 7 additions & 4 deletions tokio/src/sync/task/atomic_waker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,11 @@ impl AtomicWaker {
where
W: WakerRef,
{
match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) {
WAITING => {
match self
.state
.compare_exchange(WAITING, REGISTERING, Acquire, Acquire)
{
Ok(WAITING) => {
unsafe {
// Locked acquired, update the waker cell
self.waker.with_mut(|t| *t = Some(waker.into_waker()));
Expand Down Expand Up @@ -212,7 +215,7 @@ impl AtomicWaker {
}
}
}
WAKING => {
Err(WAKING) => {
// Currently in the process of waking the task, i.e.,
// `wake` is currently being called on the old waker.
// So, we call wake on the new waker.
Expand All @@ -221,7 +224,7 @@ impl AtomicWaker {
// This is equivalent to a spin lock, so use a spin hint.
atomic::spin_loop_hint();
}
state => {
Err(state) => {
// In this case, a concurrent thread is holding the
// "registering" lock. This probably indicates a bug in the
// caller's code as racing to call `register` doesn't make much
Expand Down