Skip to content

Commit 37ca2f0

Browse files
authored
sync: remove inner mutex in SetOnce (#7554)
1 parent c8371d4 commit 37ca2f0

File tree

2 files changed

+53
-33
lines changed

2 files changed

+53
-33
lines changed

tokio/src/sync/notify.rs

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -741,12 +741,14 @@ impl Notify {
741741
/// }
742742
/// ```
743743
pub fn notify_waiters(&self) {
744-
let mut waiters = self.waiters.lock();
745-
746-
// The state must be loaded while the lock is held. The state may only
747-
// transition out of WAITING while the lock is held.
748-
let curr = self.state.load(SeqCst);
744+
self.lock_waiter_list().notify_waiters();
745+
}
749746

747+
fn inner_notify_waiters<'a>(
748+
&'a self,
749+
curr: usize,
750+
mut waiters: crate::loom::sync::MutexGuard<'a, LinkedList<Waiter, Waiter>>,
751+
) {
750752
if matches!(get_state(curr), EMPTY | NOTIFIED) {
751753
// There are no waiting tasks. All we need to do is increment the
752754
// number of times this method was called.
@@ -814,6 +816,20 @@ impl Notify {
814816

815817
wakers.wake_all();
816818
}
819+
820+
pub(crate) fn lock_waiter_list(&self) -> NotifyGuard<'_> {
821+
let guarded_waiters = self.waiters.lock();
822+
823+
// The state must be loaded while the lock is held. The state may only
824+
// transition out of WAITING while the lock is held.
825+
let current_state = self.state.load(SeqCst);
826+
827+
NotifyGuard {
828+
guarded_notify: self,
829+
guarded_waiters,
830+
current_state,
831+
}
832+
}
817833
}
818834

819835
impl Default for Notify {
@@ -1374,3 +1390,20 @@ unsafe impl linked_list::Link for Waiter {
13741390
}
13751391

13761392
fn is_unpin<T: Unpin>() {}
1393+
1394+
/// A guard that provides exclusive access to a `Notify`'s internal
1395+
/// waiters list.
1396+
///
1397+
/// While this guard is held, the `Notify` instance's waiter list is locked.
1398+
pub(crate) struct NotifyGuard<'a> {
1399+
guarded_notify: &'a Notify,
1400+
guarded_waiters: crate::loom::sync::MutexGuard<'a, WaitList>,
1401+
current_state: usize,
1402+
}
1403+
1404+
impl NotifyGuard<'_> {
1405+
pub(crate) fn notify_waiters(self) {
1406+
self.guarded_notify
1407+
.inner_notify_waiters(self.current_state, self.guarded_waiters);
1408+
}
1409+
}

tokio/src/sync/set_once.rs

Lines changed: 15 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
use super::Notify;
22

33
use crate::loom::cell::UnsafeCell;
4-
use crate::loom::sync::{atomic::AtomicBool, Mutex};
4+
use crate::loom::sync::atomic::AtomicBool;
55

66
use std::error::Error;
77
use std::fmt;
8+
use std::future::{poll_fn, Future};
89
use std::mem::MaybeUninit;
910
use std::ops::Drop;
1011
use std::ptr;
1112
use std::sync::atomic::Ordering;
13+
use std::task::Poll;
1214

1315
// This file contains an implementation of an SetOnce. The value of SetOnce
1416
// can only be modified once during initialization.
@@ -90,9 +92,6 @@ pub struct SetOnce<T> {
9092
value_set: AtomicBool,
9193
value: UnsafeCell<MaybeUninit<T>>,
9294
notify: Notify,
93-
// we lock the mutex inside set to ensure
94-
// only one caller of set can run at a time
95-
lock: Mutex<()>,
9695
}
9796

9897
impl<T> Default for SetOnce<T> {
@@ -140,7 +139,6 @@ impl<T> From<T> for SetOnce<T> {
140139
value_set: AtomicBool::new(true),
141140
value: UnsafeCell::new(MaybeUninit::new(value)),
142141
notify: Notify::new(),
143-
lock: Mutex::new(()),
144142
}
145143
}
146144
}
@@ -152,7 +150,6 @@ impl<T> SetOnce<T> {
152150
value_set: AtomicBool::new(false),
153151
value: UnsafeCell::new(MaybeUninit::uninit()),
154152
notify: Notify::new(),
155-
lock: Mutex::new(()),
156153
}
157154
}
158155

@@ -195,7 +192,6 @@ impl<T> SetOnce<T> {
195192
value_set: AtomicBool::new(false),
196193
value: UnsafeCell::new(MaybeUninit::uninit()),
197194
notify: Notify::const_new(),
198-
lock: Mutex::const_new(()),
199195
}
200196
}
201197

@@ -246,7 +242,6 @@ impl<T> SetOnce<T> {
246242
value_set: AtomicBool::new(true),
247243
value: UnsafeCell::new(MaybeUninit::new(value)),
248244
notify: Notify::const_new(),
249-
lock: Mutex::const_new(()),
250245
}
251246
}
252247

@@ -287,19 +282,16 @@ impl<T> SetOnce<T> {
287282
return Err(SetOnceError(value));
288283
}
289284

290-
// SAFETY: lock the mutex to ensure only one caller of set
285+
// SAFETY: lock notify to ensure only one caller of set
291286
// can run at a time.
292-
let guard = self.lock.lock();
287+
let guard = self.notify.lock_waiter_list();
293288

294289
if self.initialized() {
295-
// If the value is already set, we return an error
296-
drop(guard);
297-
298290
return Err(SetOnceError(value));
299291
}
300292

301293
// SAFETY: We have locked the mutex and checked if the value is
302-
// initalized or not, so we can safely write to the value
294+
// initialized or not, so we can safely write to the value
303295
unsafe {
304296
self.value.with_mut(|ptr| (*ptr).as_mut_ptr().write(value));
305297
}
@@ -308,10 +300,8 @@ impl<T> SetOnce<T> {
308300
// atomic is able to read the value we just stored.
309301
self.value_set.store(true, Ordering::Release);
310302

311-
drop(guard);
312-
313303
// notify the waiting wakers that the value is set
314-
self.notify.notify_waiters();
304+
guard.notify_waiters();
315305

316306
Ok(())
317307
}
@@ -353,20 +343,17 @@ impl<T> SetOnce<T> {
353343
}
354344

355345
let notify_fut = self.notify.notified();
356-
{
357-
// Taking the lock here ensures that a concurrent call to `set`
358-
// will see the creation of `notify_fut` in case the check
359-
// fails.
360-
let _guard = self.lock.lock();
346+
pin!(notify_fut);
361347

348+
poll_fn(|cx| {
349+
// Register under the notify's internal lock.
350+
let ret = notify_fut.as_mut().poll(cx);
362351
if self.value_set.load(Ordering::Relaxed) {
363-
// SAFETY: the state is initialized
364-
return unsafe { self.get_unchecked() };
352+
return Poll::Ready(());
365353
}
366-
}
367-
368-
// wait until the value is set
369-
notify_fut.await;
354+
ret
355+
})
356+
.await;
370357
}
371358
}
372359
}

0 commit comments

Comments
 (0)