@@ -11,11 +11,8 @@ use crate::utils;
1111use rustwide:: logging:: { self , LogStorage } ;
1212use rustwide:: { BuildDirectory , Workspace } ;
1313use std:: collections:: HashMap ;
14- use std:: sync:: Condvar ;
15- use std:: sync:: {
16- atomic:: { AtomicBool , Ordering } ,
17- Mutex ,
18- } ;
14+ use std:: sync:: Mutex ;
15+ use std:: sync:: { Arc , Condvar , OnceLock } ;
1916use std:: time:: Duration ;
2017
2118pub trait RecordProgress : Send + Sync {
@@ -51,8 +48,10 @@ pub(super) struct Worker<'a> {
5148 ex : & ' a Experiment ,
5249 config : & ' a crate :: config:: Config ,
5350 api : & ' a dyn RecordProgress ,
54- target_dir_cleanup : AtomicBool ,
5551 next_crate : & ' a ( dyn Fn ( ) -> Fallible < Option < Crate > > + Send + Sync ) ,
52+
53+ // Called by the worker thread between crates, when no global state (namely caches) is in use.
54+ between_crates : OnceLock < Box < dyn Fn ( ) + Send + Sync + ' a > > ,
5655}
5756
5857impl < ' a > Worker < ' a > {
@@ -81,7 +80,8 @@ impl<'a> Worker<'a> {
8180 config,
8281 next_crate,
8382 api,
84- target_dir_cleanup : AtomicBool :: new ( false ) ,
83+
84+ between_crates : OnceLock :: new ( ) ,
8585 }
8686 }
8787
@@ -167,10 +167,12 @@ impl<'a> Worker<'a> {
167167 return Ok ( ( ) ) ;
168168 } ;
169169
170- self . maybe_cleanup_target_dir ( ) ?;
171-
172170 info ! ( "{} processing crate {}" , self . name, krate) ;
173171
172+ if let Some ( cb) = self . between_crates . get ( ) {
173+ cb ( ) ;
174+ }
175+
174176 if !self . ex . ignore_blacklist && self . config . should_skip ( & krate) {
175177 for tc in & self . ex . toolchains {
176178 // If a skipped crate is somehow sent to the agent (for example, when a crate was
@@ -338,41 +340,57 @@ impl<'a> Worker<'a> {
338340 }
339341 }
340342 }
341-
342- fn maybe_cleanup_target_dir ( & self ) -> Fallible < ( ) > {
343- if !self . target_dir_cleanup . swap ( false , Ordering :: SeqCst ) {
344- return Ok ( ( ) ) ;
345- }
346- info ! ( "purging target dir for {}" , self . name) ;
347- for dir in self . build_dir . values ( ) {
348- dir. lock ( ) . unwrap ( ) . purge ( ) ?;
349- }
350-
351- Ok ( ( ) )
352- }
353-
354- fn schedule_target_dir_cleanup ( & self ) {
355- self . target_dir_cleanup . store ( true , Ordering :: SeqCst ) ;
356- }
357343}
358344
359- pub ( super ) struct DiskSpaceWatcher < ' a > {
345+ pub ( super ) struct DiskSpaceWatcher {
360346 interval : Duration ,
361347 threshold : f32 ,
362- workers : & ' a [ Worker < ' a > ] ,
363348 should_stop : Mutex < bool > ,
364349 waiter : Condvar ,
350+
351+ // If the bool is true, that means we're waiting for the cache to reach zero, in which case
352+ // workers will wait for it to be false before starting. This gives us a global 'is the cache
353+ // in use' synchronization point.
354+ cache_in_use : Mutex < ( usize , bool ) > ,
355+ cache_waiter : Condvar ,
365356}
366357
367- impl < ' a > DiskSpaceWatcher < ' a > {
368- pub ( super ) fn new ( interval : Duration , threshold : f32 , workers : & ' a [ Worker < ' a > ] ) -> Self {
369- DiskSpaceWatcher {
358+ impl DiskSpaceWatcher {
359+ pub ( super ) fn new ( interval : Duration , threshold : f32 , workers : & [ Worker < ' _ > ] ) -> Arc < Self > {
360+ let this = Arc :: new ( DiskSpaceWatcher {
370361 interval,
371362 threshold,
372- workers,
373363 should_stop : Mutex :: new ( false ) ,
374364 waiter : Condvar :: new ( ) ,
365+
366+ cache_in_use : Mutex :: new ( ( workers. len ( ) , false ) ) ,
367+ cache_waiter : Condvar :: new ( ) ,
368+ } ) ;
369+
370+ let worker_count = workers. len ( ) ;
371+ for worker in workers {
372+ let this = Arc :: clone ( & this) ;
373+ assert ! ( worker
374+ . between_crates
375+ . set( Box :: new( move || {
376+ let mut guard = this. cache_in_use. lock( ) . unwrap( ) ;
377+ // note that we're not running right now.
378+ guard. 0 = guard. 0 . checked_sub( 1 ) . unwrap( ) ;
379+ // Wait until the cache doesn't need to be purged...
380+ if guard. 1 {
381+ info!( "waiting for cache to maybe be purged" ) ;
382+ }
383+ this. cache_waiter. notify_all( ) ;
384+ let mut guard = this. cache_waiter. wait_while( guard, |c| c. 1 ) . unwrap( ) ;
385+ // Then set ourselves as running.
386+ guard. 0 += 1 ;
387+ this. cache_waiter. notify_all( ) ;
388+ assert!( guard. 0 <= worker_count) ;
389+ } ) )
390+ . is_ok( ) ) ;
375391 }
392+
393+ this
376394 }
377395
378396 pub ( super ) fn stop ( & self ) {
@@ -406,14 +424,30 @@ impl<'a> DiskSpaceWatcher<'a> {
406424 } ;
407425
408426 if usage. is_threshold_reached ( self . threshold ) {
409- warn ! ( "running the scheduled thread cleanup" ) ;
410- for worker in self . workers {
411- worker. schedule_target_dir_cleanup ( ) ;
412- }
427+ warn ! ( "waiting for workers to finish" ) ;
428+
429+ // Set interest in cleaning caches and then wait for cache use to drain to zero.
430+ let mut guard = self . cache_in_use . lock ( ) . unwrap ( ) ;
431+ guard. 1 = true ;
432+
433+ self . cache_waiter . notify_all ( ) ;
434+
435+ let mut guard = self . cache_waiter . wait_while ( guard, |c| c. 0 > 0 ) . unwrap ( ) ;
436+
437+ // OK, purging caches, clear interest.
438+ guard. 1 = false ;
439+
440+ self . cache_waiter . notify_all ( ) ;
441+
442+ warn ! ( "purging all build dirs and caches" ) ;
413443
414444 if let Err ( e) = workspace. purge_all_caches ( ) {
415445 warn ! ( "failed to purge caches: {:?}" , e) ;
416446 }
447+
448+ if let Err ( e) = workspace. purge_all_build_dirs ( ) {
449+ warn ! ( "failed to purge build directories: {:?}" , e) ;
450+ }
417451 }
418452 }
419453}
0 commit comments