@@ -5779,35 +5779,40 @@ where
5779
5779
pub async fn process_pending_events_async < Future : core:: future:: Future , H : Fn ( Event ) -> Future > (
5780
5780
& self , handler : H
5781
5781
) {
5782
- if self . pending_events_processor . compare_exchange ( false , true , Ordering :: Acquire , Ordering :: Relaxed ) . is_err ( ) {
5783
- return ;
5784
- }
5782
+ let mut processed_all_events = false ;
5783
+ while !processed_all_events {
5784
+ if self . pending_events_processor . compare_exchange ( false , true , Ordering :: Acquire , Ordering :: Relaxed ) . is_err ( ) {
5785
+ return ;
5786
+ }
5785
5787
5786
- let mut result = NotifyOption :: SkipPersist ;
5788
+ let mut result = NotifyOption :: SkipPersist ;
5787
5789
5788
- // TODO: This behavior should be documented. It's unintuitive that we query
5789
- // ChannelMonitors when clearing other events.
5790
- if self . process_pending_monitor_events ( ) {
5791
- result = NotifyOption :: DoPersist ;
5792
- }
5790
+ // TODO: This behavior should be documented. It's unintuitive that we query
5791
+ // ChannelMonitors when clearing other events.
5792
+ if self . process_pending_monitor_events ( ) {
5793
+ result = NotifyOption :: DoPersist ;
5794
+ }
5793
5795
5794
- let pending_events = self . pending_events . lock ( ) . unwrap ( ) . clone ( ) ;
5795
- let num_events = pending_events. len ( ) ;
5796
- if !pending_events. is_empty ( ) {
5797
- result = NotifyOption :: DoPersist ;
5798
- }
5796
+ let pending_events = self . pending_events . lock ( ) . unwrap ( ) . clone ( ) ;
5797
+ let num_events = pending_events. len ( ) ;
5798
+ if !pending_events. is_empty ( ) {
5799
+ result = NotifyOption :: DoPersist ;
5800
+ }
5799
5801
5800
- for event in pending_events {
5801
- handler ( event) . await ;
5802
- }
5802
+ for event in pending_events {
5803
+ handler ( event) . await ;
5804
+ }
5803
5805
5804
- self . pending_events . lock ( ) . unwrap ( ) . drain ( ..num_events) ;
5806
+ let mut pending_events = self . pending_events . lock ( ) . unwrap ( ) . clone ( ) ;
5807
+ pending_events. drain ( ..num_events) ;
5808
+ processed_all_events = pending_events. is_empty ( ) ;
5805
5809
5806
- if result == NotifyOption :: DoPersist {
5807
- self . persistence_notifier . notify ( ) ;
5808
- }
5810
+ if result == NotifyOption :: DoPersist {
5811
+ self . persistence_notifier . notify ( ) ;
5812
+ }
5809
5813
5810
- self . pending_events_processor . store ( false , Ordering :: Release ) ;
5814
+ self . pending_events_processor . store ( false , Ordering :: Release ) ;
5815
+ }
5811
5816
}
5812
5817
}
5813
5818
0 commit comments