@@ -6,7 +6,7 @@ use crate::sync::once::ExclusiveState;
66use crate :: sys:: futex:: { futex_wait, futex_wake_all} ;
77
88// On some platforms, the OS is very nice and handles the waiter queue for us.
9- // This means we only need one atomic value with 5 states:
9+ // This means we only need one atomic value with 4 states:
1010
1111/// No initialization has run yet, and no thread is currently using the Once.
1212const INCOMPLETE : u32 = 0 ;
@@ -17,16 +17,20 @@ const POISONED: u32 = 1;
1717/// Some thread is currently attempting to run initialization. It may succeed,
1818/// so all future threads need to wait for it to finish.
1919const RUNNING : u32 = 2 ;
20- /// Some thread is currently attempting to run initialization and there are threads
21- /// waiting for it to finish.
22- const QUEUED : u32 = 3 ;
2320/// Initialization has completed and all future calls should finish immediately.
24- const COMPLETE : u32 = 4 ;
21+ const COMPLETE : u32 = 3 ;
2522
26- // Threads wait by setting the state to QUEUED and calling `futex_wait` on the state
23+ // An additional bit indicates whether there are waiting threads:
24+
25+ /// May only be set if the state is not COMPLETE.
26+ const QUEUED : u32 = 4 ;
27+
28+ // Threads wait by setting the QUEUED bit and calling `futex_wait` on the state
2729// variable. When the running thread finishes, it will wake all waiting threads using
2830// `futex_wake_all`.
2931
32+ const STATE_MASK : u32 = 0b11 ;
33+
3034pub struct OnceState {
3135 poisoned : bool ,
3236 set_state_to : Cell < u32 > ,
@@ -45,7 +49,7 @@ impl OnceState {
4549}
4650
4751struct CompletionGuard < ' a > {
48- state : & ' a AtomicU32 ,
52+ state_and_queued : & ' a AtomicU32 ,
4953 set_state_on_drop_to : u32 ,
5054}
5155
@@ -54,64 +58,106 @@ impl<'a> Drop for CompletionGuard<'a> {
5458 // Use release ordering to propagate changes to all threads checking
5559 // up on the Once. `futex_wake_all` does its own synchronization, hence
5660 // we do not need `AcqRel`.
57- if self . state . swap ( self . set_state_on_drop_to , Release ) == QUEUED {
58- futex_wake_all ( self . state ) ;
61+ if self . state_and_queued . swap ( self . set_state_on_drop_to , Release ) & QUEUED != 0 {
62+ futex_wake_all ( self . state_and_queued ) ;
5963 }
6064 }
6165}
6266
6367pub struct Once {
64- state : AtomicU32 ,
68+ state_and_queued : AtomicU32 ,
6569}
6670
6771impl Once {
6872 #[ inline]
6973 pub const fn new ( ) -> Once {
70- Once { state : AtomicU32 :: new ( INCOMPLETE ) }
74+ Once { state_and_queued : AtomicU32 :: new ( INCOMPLETE ) }
7175 }
7276
7377 #[ inline]
7478 pub fn is_completed ( & self ) -> bool {
7579 // Use acquire ordering to make all initialization changes visible to the
7680 // current thread.
77- self . state . load ( Acquire ) == COMPLETE
81+ self . state_and_queued . load ( Acquire ) == COMPLETE
7882 }
7983
8084 #[ inline]
8185 pub ( crate ) fn state ( & mut self ) -> ExclusiveState {
82- match * self . state . get_mut ( ) {
86+ match * self . state_and_queued . get_mut ( ) {
8387 INCOMPLETE => ExclusiveState :: Incomplete ,
8488 POISONED => ExclusiveState :: Poisoned ,
8589 COMPLETE => ExclusiveState :: Complete ,
8690 _ => unreachable ! ( "invalid Once state" ) ,
8791 }
8892 }
8993
90- // This uses FnMut to match the API of the generic implementation. As this
91- // implementation is quite light-weight, it is generic over the closure and
92- // so avoids the cost of dynamic dispatch.
9394 #[ cold]
9495 #[ track_caller]
95- pub fn call ( & self , ignore_poisoning : bool , f : & mut impl FnMut ( & public :: OnceState ) ) {
96- let mut state = self . state . load ( Acquire ) ;
96+ pub fn wait ( & self , ignore_poisoning : bool ) {
97+ let mut state_and_queued = self . state_and_queued . load ( Acquire ) ;
9798 loop {
99+ let state = state_and_queued & STATE_MASK ;
100+ let queued = state_and_queued & QUEUED != 0 ;
98101 match state {
102+ COMPLETE => return ,
103+ POISONED if !ignore_poisoning => {
104+ // Panic to propagate the poison.
105+ panic ! ( "Once instance has previously been poisoned" ) ;
106+ }
107+ _ => {
108+ // Set the QUEUED bit if it has not already been set.
109+ if !queued {
110+ state_and_queued += QUEUED ;
111+ if let Err ( new) = self . state_and_queued . compare_exchange_weak (
112+ state,
113+ state_and_queued,
114+ Relaxed ,
115+ Acquire ,
116+ ) {
117+ state_and_queued = new;
118+ continue ;
119+ }
120+ }
121+
122+ futex_wait ( & self . state_and_queued , state_and_queued, None ) ;
123+ state_and_queued = self . state_and_queued . load ( Acquire ) ;
124+ }
125+ }
126+ }
127+ }
128+
129+ #[ cold]
130+ #[ track_caller]
131+ pub fn call ( & self , ignore_poisoning : bool , f : & mut dyn FnMut ( & public:: OnceState ) ) {
132+ let mut state_and_queued = self . state_and_queued . load ( Acquire ) ;
133+ loop {
134+ let state = state_and_queued & STATE_MASK ;
135+ let queued = state_and_queued & QUEUED != 0 ;
136+ match state {
137+ COMPLETE => return ,
99138 POISONED if !ignore_poisoning => {
100139 // Panic to propagate the poison.
101140 panic ! ( "Once instance has previously been poisoned" ) ;
102141 }
103142 INCOMPLETE | POISONED => {
104143 // Try to register the current thread as the one running.
105- if let Err ( new) =
106- self . state . compare_exchange_weak ( state, RUNNING , Acquire , Acquire )
107- {
108- state = new;
144+ let next = RUNNING + if queued { QUEUED } else { 0 } ;
145+ if let Err ( new) = self . state_and_queued . compare_exchange_weak (
146+ state_and_queued,
147+ next,
148+ Acquire ,
149+ Acquire ,
150+ ) {
151+ state_and_queued = new;
109152 continue ;
110153 }
154+
111155 // `waiter_queue` will manage other waiting threads, and
112156 // wake them up on drop.
113- let mut waiter_queue =
114- CompletionGuard { state : & self . state , set_state_on_drop_to : POISONED } ;
157+ let mut waiter_queue = CompletionGuard {
158+ state_and_queued : & self . state_and_queued ,
159+ set_state_on_drop_to : POISONED ,
160+ } ;
115161 // Run the function, letting it know if we're poisoned or not.
116162 let f_state = public:: OnceState {
117163 inner : OnceState {
@@ -123,21 +169,27 @@ impl Once {
123169 waiter_queue. set_state_on_drop_to = f_state. inner . set_state_to . get ( ) ;
124170 return ;
125171 }
126- RUNNING | QUEUED => {
127- // Set the state to QUEUED if it is not already.
128- if state == RUNNING
129- && let Err ( new) =
130- self . state . compare_exchange_weak ( RUNNING , QUEUED , Relaxed , Acquire )
131- {
132- state = new;
133- continue ;
172+ _ => {
173+ // All other values must be RUNNING.
174+ assert ! ( state == RUNNING ) ;
175+
176+ // Set the QUEUED bit if it is not already set.
177+ if !queued {
178+ state_and_queued += QUEUED ;
179+ if let Err ( new) = self . state_and_queued . compare_exchange_weak (
180+ state,
181+ state_and_queued,
182+ Relaxed ,
183+ Acquire ,
184+ ) {
185+ state_and_queued = new;
186+ continue ;
187+ }
134188 }
135189
136- futex_wait ( & self . state , QUEUED , None ) ;
137- state = self . state . load ( Acquire ) ;
190+ futex_wait ( & self . state_and_queued , state_and_queued , None ) ;
191+ state_and_queued = self . state_and_queued . load ( Acquire ) ;
138192 }
139- COMPLETE => return ,
140- _ => unreachable ! ( "state is never set to invalid values" ) ,
141193 }
142194 }
143195 }
0 commit comments