@@ -5,8 +5,6 @@ use alloc::collections::VecDeque;
55use alloc:: vec:: Vec ;
66
77use core:: future:: Future ;
8- #[ cfg( debug_assertions) ]
9- use core:: sync:: atomic:: { AtomicU8 , Ordering } ;
108use core:: task:: { Poll , Waker } ;
119
1210/// The maximum queue size we allow before starting to drop events.
@@ -17,8 +15,6 @@ pub(crate) struct EventQueue {
1715 waker : Arc < Mutex < Option < Waker > > > ,
1816 #[ cfg( feature = "std" ) ]
1917 condvar : Arc < crate :: sync:: Condvar > ,
20- #[ cfg( debug_assertions) ]
21- num_held_notifier_guards : Arc < AtomicU8 > ,
2218}
2319
2420impl EventQueue {
@@ -30,25 +26,6 @@ impl EventQueue {
3026 waker,
3127 #[ cfg( feature = "std" ) ]
3228 condvar : Arc :: new ( crate :: sync:: Condvar :: new ( ) ) ,
33- #[ cfg( debug_assertions) ]
34- num_held_notifier_guards : Arc :: new ( AtomicU8 :: new ( 0 ) ) ,
35- }
36- }
37-
38- pub fn enqueue < E : Into < LiquidityEvent > > ( & self , event : E ) {
39- #[ cfg( debug_assertions) ]
40- {
41- let num_held_notifier_guards = self . num_held_notifier_guards . load ( Ordering :: Relaxed ) ;
42- debug_assert ! (
43- num_held_notifier_guards > 0 ,
44- "We should be holding at least one notifier guard whenever enqueuing new events"
45- ) ;
46- }
47- let mut queue = self . queue . lock ( ) . unwrap ( ) ;
48- if queue. len ( ) < MAX_EVENT_QUEUE_SIZE {
49- queue. push_back ( event. into ( ) ) ;
50- } else {
51- return ;
5229 }
5330 }
5431
@@ -91,76 +68,36 @@ impl EventQueue {
9168
9269 // Returns an [`EventQueueNotifierGuard`] that will notify about new event when dropped.
9370 pub fn notifier ( & self ) -> EventQueueNotifierGuard {
94- #[ cfg( debug_assertions) ]
95- {
96- self . num_held_notifier_guards . fetch_add ( 1 , Ordering :: Relaxed ) ;
97- }
98- EventQueueNotifierGuard {
99- queue : Arc :: clone ( & self . queue ) ,
100- waker : Arc :: clone ( & self . waker ) ,
101- #[ cfg( feature = "std" ) ]
102- condvar : Arc :: clone ( & self . condvar ) ,
103- #[ cfg( debug_assertions) ]
104- num_held_notifier_guards : Arc :: clone ( & self . num_held_notifier_guards ) ,
105- }
106- }
107- }
108-
109- impl Drop for EventQueue {
110- fn drop ( & mut self ) {
111- #[ cfg( debug_assertions) ]
112- {
113- let num_held_notifier_guards = self . num_held_notifier_guards . load ( Ordering :: Relaxed ) ;
114- debug_assert ! (
115- num_held_notifier_guards == 0 ,
116- "We should not be holding any notifier guards when the event queue is dropped"
117- ) ;
118- }
71+ EventQueueNotifierGuard ( self )
11972 }
12073}
12174
12275// A guard type that will notify about new events when dropped.
12376#[ must_use]
124- pub ( crate ) struct EventQueueNotifierGuard {
125- queue : Arc < Mutex < VecDeque < LiquidityEvent > > > ,
126- waker : Arc < Mutex < Option < Waker > > > ,
127- #[ cfg( feature = "std" ) ]
128- condvar : Arc < crate :: sync:: Condvar > ,
129- #[ cfg( debug_assertions) ]
130- num_held_notifier_guards : Arc < AtomicU8 > ,
77+ pub ( crate ) struct EventQueueNotifierGuard < ' a > ( & ' a EventQueue ) ;
78+
79+ impl < ' a > EventQueueNotifierGuard < ' a > {
80+ pub fn enqueue < E : Into < LiquidityEvent > > ( & self , event : E ) {
81+ let mut queue = self . 0 . queue . lock ( ) . unwrap ( ) ;
82+ if queue. len ( ) < MAX_EVENT_QUEUE_SIZE {
83+ queue. push_back ( event. into ( ) ) ;
84+ } else {
85+ return ;
86+ }
87+ }
13188}
13289
133- impl Drop for EventQueueNotifierGuard {
90+ impl < ' a > Drop for EventQueueNotifierGuard < ' a > {
13491 fn drop ( & mut self ) {
135- let should_notify = !self . queue . lock ( ) . unwrap ( ) . is_empty ( ) ;
92+ let should_notify = !self . 0 . queue . lock ( ) . unwrap ( ) . is_empty ( ) ;
13693
13794 if should_notify {
138- if let Some ( waker) = self . waker . lock ( ) . unwrap ( ) . take ( ) {
95+ if let Some ( waker) = self . 0 . waker . lock ( ) . unwrap ( ) . take ( ) {
13996 waker. wake ( ) ;
14097 }
14198
14299 #[ cfg( feature = "std" ) ]
143- self . condvar . notify_one ( ) ;
144- }
145-
146- #[ cfg( debug_assertions) ]
147- {
148- let res = self . num_held_notifier_guards . fetch_update (
149- Ordering :: Relaxed ,
150- Ordering :: Relaxed ,
151- |x| Some ( x. saturating_sub ( 1 ) ) ,
152- ) ;
153- match res {
154- Ok ( previous_value) if previous_value == 0 => debug_assert ! (
155- false ,
156- "num_held_notifier_guards counter out-of-sync! This should never happen!"
157- ) ,
158- Err ( _) => debug_assert ! (
159- false ,
160- "num_held_notifier_guards counter out-of-sync! This should never happen!"
161- ) ,
162- _ => { } ,
163- }
100+ self . 0 . condvar . notify_one ( ) ;
164101 }
165102 }
166103}
@@ -209,8 +146,8 @@ mod tests {
209146 } ) ;
210147
211148 for _ in 0 ..3 {
212- let _guard = event_queue. notifier ( ) ;
213- event_queue . enqueue ( expected_event. clone ( ) ) ;
149+ let guard = event_queue. notifier ( ) ;
150+ guard . enqueue ( expected_event. clone ( ) ) ;
214151 }
215152
216153 assert_eq ! ( event_queue. wait_next_event( ) , expected_event) ;
@@ -235,25 +172,25 @@ mod tests {
235172 let mut delayed_enqueue = false ;
236173
237174 for _ in 0 ..25 {
238- let _guard = event_queue. notifier ( ) ;
239- event_queue . enqueue ( expected_event. clone ( ) ) ;
175+ let guard = event_queue. notifier ( ) ;
176+ guard . enqueue ( expected_event. clone ( ) ) ;
240177 enqueued_events. fetch_add ( 1 , Ordering :: SeqCst ) ;
241178 }
242179
243180 loop {
244181 tokio:: select! {
245182 _ = tokio:: time:: sleep( Duration :: from_millis( 10 ) ) , if !delayed_enqueue => {
246- let _guard = event_queue. notifier( ) ;
247- event_queue . enqueue( expected_event. clone( ) ) ;
183+ let guard = event_queue. notifier( ) ;
184+ guard . enqueue( expected_event. clone( ) ) ;
248185 enqueued_events. fetch_add( 1 , Ordering :: SeqCst ) ;
249186 delayed_enqueue = true ;
250187 }
251188 e = event_queue. next_event_async( ) => {
252189 assert_eq!( e, expected_event) ;
253190 received_events. fetch_add( 1 , Ordering :: SeqCst ) ;
254191
255- let _guard = event_queue. notifier( ) ;
256- event_queue . enqueue( expected_event. clone( ) ) ;
192+ let guard = event_queue. notifier( ) ;
193+ guard . enqueue( expected_event. clone( ) ) ;
257194 enqueued_events. fetch_add( 1 , Ordering :: SeqCst ) ;
258195 }
259196 e = event_queue. next_event_async( ) => {
@@ -285,9 +222,9 @@ mod tests {
285222 std:: thread:: spawn ( move || {
286223 // Sleep a bit before we enqueue the events everybody is waiting for.
287224 std:: thread:: sleep ( Duration :: from_millis ( 20 ) ) ;
288- let _guard = thread_queue. notifier ( ) ;
289- thread_queue . enqueue ( thread_event. clone ( ) ) ;
290- thread_queue . enqueue ( thread_event. clone ( ) ) ;
225+ let guard = thread_queue. notifier ( ) ;
226+ guard . enqueue ( thread_event. clone ( ) ) ;
227+ guard . enqueue ( thread_event. clone ( ) ) ;
291228 } ) ;
292229
293230 let e = event_queue. next_event_async ( ) . await ;
0 commit comments