@@ -6,10 +6,34 @@ use std::sync::atomic::{AtomicUsize, Ordering};
66use std:: sync:: { Condvar , Mutex } ;
77use std:: thread;
88use std:: usize;
9+ use DeadlockHandler ;
10+
11+ struct SleepData {
12+ /// The number of threads in the thread pool.
13+ worker_count : usize ,
14+
15+ /// The number of threads in the thread pool which are running and
16+ /// aren't blocked in user code or sleeping.
17+ active_threads : usize ,
18+
19+ /// The number of threads which are blocked in user code.
20+ /// This doesn't include threads blocked by this module.
21+ blocked_threads : usize ,
22+ }
23+
24+ impl SleepData {
25+ /// Checks if the conditions for a deadlock holds and if so calls the deadlock handler
26+ #[ inline]
27+ pub fn deadlock_check ( & self , deadlock_handler : & Option < Box < DeadlockHandler > > ) {
28+ if self . active_threads == 0 && self . blocked_threads > 0 {
29+ ( deadlock_handler. as_ref ( ) . unwrap ( ) ) ( ) ;
30+ }
31+ }
32+ }
933
1034pub ( super ) struct Sleep {
1135 state : AtomicUsize ,
12- data : Mutex < ( ) > ,
36+ data : Mutex < SleepData > ,
1337 tickle : Condvar ,
1438}
1539
@@ -20,14 +44,42 @@ const ROUNDS_UNTIL_SLEEPY: usize = 32;
2044const ROUNDS_UNTIL_ASLEEP : usize = 64 ;
2145
2246impl Sleep {
23- pub ( super ) fn new ( ) -> Sleep {
47+ pub ( super ) fn new ( worker_count : usize ) -> Sleep {
2448 Sleep {
2549 state : AtomicUsize :: new ( AWAKE ) ,
26- data : Mutex :: new ( ( ) ) ,
50+ data : Mutex :: new ( SleepData {
51+ worker_count,
52+ active_threads : worker_count,
53+ blocked_threads : 0 ,
54+ } ) ,
2755 tickle : Condvar :: new ( ) ,
2856 }
2957 }
3058
59+ /// Mark a Rayon worker thread as blocked. This triggers the deadlock handler
60+ /// if no other worker thread is active
61+ #[ inline]
62+ pub fn mark_blocked ( & self , deadlock_handler : & Option < Box < DeadlockHandler > > ) {
63+ let mut data = self . data . lock ( ) . unwrap ( ) ;
64+ debug_assert ! ( data. active_threads > 0 ) ;
65+ debug_assert ! ( data. blocked_threads < data. worker_count) ;
66+ debug_assert ! ( data. active_threads > 0 ) ;
67+ data. active_threads -= 1 ;
68+ data. blocked_threads += 1 ;
69+
70+ data. deadlock_check ( deadlock_handler) ;
71+ }
72+
73+ /// Mark a previously blocked Rayon worker thread as unblocked
74+ #[ inline]
75+ pub fn mark_unblocked ( & self ) {
76+ let mut data = self . data . lock ( ) . unwrap ( ) ;
77+ debug_assert ! ( data. active_threads < data. worker_count) ;
78+ debug_assert ! ( data. blocked_threads > 0 ) ;
79+ data. active_threads += 1 ;
80+ data. blocked_threads -= 1 ;
81+ }
82+
3183 fn anyone_sleeping ( & self , state : usize ) -> bool {
3284 state & SLEEPING != 0
3385 }
@@ -61,7 +113,12 @@ impl Sleep {
61113 }
62114
63115 #[ inline]
64- pub ( super ) fn no_work_found ( & self , worker_index : usize , yields : usize ) -> usize {
116+ pub ( super ) fn no_work_found (
117+ & self ,
118+ worker_index : usize ,
119+ yields : usize ,
120+ deadlock_handler : & Option < Box < DeadlockHandler > > ,
121+ ) -> usize {
65122 log ! ( DidNotFindWork {
66123 worker: worker_index,
67124 yields: yields,
@@ -88,7 +145,7 @@ impl Sleep {
88145 }
89146 } else {
90147 debug_assert_eq ! ( yields, ROUNDS_UNTIL_ASLEEP ) ;
91- self . sleep ( worker_index) ;
148+ self . sleep ( worker_index, deadlock_handler ) ;
92149 0
93150 }
94151 }
@@ -122,7 +179,10 @@ impl Sleep {
122179 old_state: old_state,
123180 } ) ;
124181 if self . anyone_sleeping ( old_state) {
125- let _data = self . data . lock ( ) . unwrap ( ) ;
182+ let mut data = self . data . lock ( ) . unwrap ( ) ;
183+ // Set the active threads to the number of workers,
184+ // excluding threads blocked by the user since we won't wake those up
185+ data. active_threads = data. worker_count - data. blocked_threads ;
126186 self . tickle . notify_all ( ) ;
127187 }
128188 }
@@ -188,7 +248,7 @@ impl Sleep {
188248 self . worker_is_sleepy ( state, worker_index)
189249 }
190250
191- fn sleep ( & self , worker_index : usize ) {
251+ fn sleep ( & self , worker_index : usize , deadlock_handler : & Option < Box < DeadlockHandler > > ) {
192252 loop {
193253 // Acquire here suffices. If we observe that the current worker is still
194254 // sleepy, then in fact we know that no writes have occurred, and anyhow
@@ -235,7 +295,7 @@ impl Sleep {
235295 // reason for the `compare_exchange` to fail is if an
236296 // awaken comes, in which case the next cycle around
237297 // the loop will just return.
238- let data = self . data . lock ( ) . unwrap ( ) ;
298+ let mut data = self . data . lock ( ) . unwrap ( ) ;
239299
240300 // This must be SeqCst on success because we want to
241301 // ensure:
@@ -264,6 +324,11 @@ impl Sleep {
264324 log ! ( FellAsleep {
265325 worker: worker_index
266326 } ) ;
327+
328+ // Decrement the number of active threads and check for a deadlock
329+ data. active_threads -= 1 ;
330+ data. deadlock_check ( deadlock_handler) ;
331+
267332 let _ = self . tickle . wait ( data) . unwrap ( ) ;
268333 log ! ( GotAwoken {
269334 worker: worker_index
0 commit comments