Skip to content

Commit 33f2fbd

Browse files
committed
address review and fix clippy failure
1 parent 470811b commit 33f2fbd

File tree

2 files changed

+22
-25
lines changed

2 files changed

+22
-25
lines changed

tokio-util/tests/mpsc.rs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -339,19 +339,14 @@ async fn actor_weak_sender() {
339339

340340
async fn run(&mut self) {
341341
let mut i = 0;
342-
loop {
343-
match self.receiver.recv().await {
344-
Some(msg) => {
345-
self.handle_message(msg);
346-
}
347-
None => {
348-
break;
349-
}
350-
}
342+
while let Some(msg) = self.receiver.recv().await {
343+
self.handle_message(msg);
344+
351345
if i == 0 {
352346
self.send_message_to_self().await;
353347
}
354-
i += 1;
348+
349+
i += 1
355350
}
356351

357352
assert!(self.received_self_msg);

tokio/src/sync/mpsc/chan.rs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::loom::cell::UnsafeCell;
22
use crate::loom::future::AtomicWaker;
33
use crate::loom::sync::atomic::AtomicUsize;
4-
use crate::loom::sync::{Arc, Weak};
4+
use crate::loom::sync::Arc;
55
use crate::park::thread::CachedParkThread;
66
use crate::park::Park;
77
use crate::sync::mpsc::error::TryRecvError;
@@ -11,9 +11,11 @@ use crate::sync::notify::Notify;
1111
use std::fmt;
1212
use std::mem;
1313
use std::process;
14-
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, SeqCst};
14+
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
15+
use std::sync::Weak;
1516
use std::task::Poll::{Pending, Ready};
1617
use std::task::{Context, Poll};
18+
use std::usize;
1719

1820
/// Channel sender.
1921
pub(crate) struct Tx<T, S> {
@@ -181,28 +183,31 @@ impl<T, S> TxWeak<T, S> {
181183
// even though the channel might have been closed in the meantime.
182184
// Need to check here whether the channel was actually closed.
183185

184-
let mut tx_count = inner.tx_count.load(Relaxed);
186+
let mut tx_count = inner.tx_count.load(Acquire);
187+
188+
if tx_count == 0 {
189+
// channel is closed
190+
mem::drop(inner);
191+
return None;
192+
}
193+
185194
loop {
186-
// FIXME Haven't thought the orderings on the CAS through yet
187195
match inner
188196
.tx_count
189-
.compare_exchange(tx_count, tx_count + 1, SeqCst, SeqCst)
197+
.compare_exchange(tx_count, tx_count + 1, AcqRel, Acquire)
190198
{
191199
Ok(prev_count) => {
192-
if prev_count == 0 {
193-
mem::drop(inner);
194-
return None;
195-
}
200+
assert!(prev_count != 0);
196201

197202
return Some(Tx::new(inner));
198203
}
199-
Err(count) => {
200-
if count == 0 {
204+
Err(prev_count) => {
205+
if prev_count == 0 {
201206
mem::drop(inner);
202207
return None;
203208
}
204209

205-
tx_count = count;
210+
tx_count = prev_count;
206211
}
207212
}
208213
}
@@ -441,9 +446,6 @@ impl Semaphore for (crate::sync::batch_semaphore::Semaphore, usize) {
441446

442447
// ===== impl Semaphore for AtomicUsize =====
443448

444-
use std::sync::atomic::Ordering::Release;
445-
use std::usize;
446-
447449
impl Semaphore for AtomicUsize {
448450
fn add_permit(&self) {
449451
let prev = self.fetch_sub(2, Release);

0 commit comments

Comments
 (0)