@@ -14,7 +14,6 @@ use std::{
14
14
Arc ,
15
15
} ,
16
16
thread:: available_parallelism,
17
- time:: { Duration , Instant } ,
18
17
} ;
19
18
20
19
use anyhow:: { bail, Result } ;
@@ -23,6 +22,7 @@ use dashmap::DashMap;
23
22
use parking_lot:: { Condvar , Mutex } ;
24
23
use rustc_hash:: FxHasher ;
25
24
use smallvec:: smallvec;
25
+ use tokio:: time:: { Duration , Instant } ;
26
26
use turbo_tasks:: {
27
27
backend:: {
28
28
Backend , BackendJobId , CachedTaskType , CellContent , TaskExecutionSpec , TransientTaskRoot ,
@@ -54,6 +54,9 @@ use crate::{
54
54
utils:: { bi_map:: BiMap , chunked_vec:: ChunkedVec , ptr_eq_arc:: PtrEqArc , sharded:: Sharded } ,
55
55
} ;
56
56
57
+ const BACKEND_JOB_INITIAL_SNAPSHOT : BackendJobId = unsafe { BackendJobId :: new_unchecked ( 1 ) } ;
58
+ const BACKEND_JOB_FOLLOW_UP_SNAPSHOT : BackendJobId = unsafe { BackendJobId :: new_unchecked ( 2 ) } ;
59
+
57
60
const SNAPSHOT_REQUESTED_BIT : usize = 1 << ( usize:: BITS - 1 ) ;
58
61
59
62
struct SnapshotRequest {
@@ -123,7 +126,7 @@ struct TurboTasksBackendInner {
123
126
/// Condition Variable that is triggered when a snapshot is completed and
124
127
/// operations can continue.
125
128
snapshot_completed : Condvar ,
126
- /// The timestamp of the last started snapshot.
129
+ /// The timestamp of the last started snapshot since [`Self::start_time`] .
127
130
last_snapshot : AtomicU64 ,
128
131
129
132
stopping : AtomicBool ,
@@ -291,10 +294,11 @@ impl TurboTasksBackendInner {
291
294
child_task : TaskId ,
292
295
turbo_tasks : & dyn TurboTasksBackendApi < TurboTasksBackend > ,
293
296
) {
294
- operation:: ConnectChildOperation :: run ( parent_task, child_task, unsafe {
295
- // Safety: Passing `None` is safe.
296
- self . execute_context_with_tx ( None , turbo_tasks)
297
- } ) ;
297
+ operation:: ConnectChildOperation :: run (
298
+ parent_task,
299
+ child_task,
300
+ self . execute_context ( turbo_tasks) ,
301
+ ) ;
298
302
}
299
303
300
304
fn try_read_task_output (
@@ -541,13 +545,11 @@ impl TurboTasksBackendInner {
541
545
snapshot_request. snapshot_requested = true ;
542
546
let active_operations = self
543
547
. in_progress_operations
544
- . fetch_or ( SNAPSHOT_REQUESTED_BIT , std :: sync :: atomic :: Ordering :: Relaxed ) ;
548
+ . fetch_or ( SNAPSHOT_REQUESTED_BIT , Ordering :: Relaxed ) ;
545
549
if active_operations != 0 {
546
550
self . operations_suspended
547
551
. wait_while ( & mut snapshot_request, |_| {
548
- self . in_progress_operations
549
- . load ( std:: sync:: atomic:: Ordering :: Relaxed )
550
- != SNAPSHOT_REQUESTED_BIT
552
+ self . in_progress_operations . load ( Ordering :: Relaxed ) != SNAPSHOT_REQUESTED_BIT
551
553
} ) ;
552
554
}
553
555
let suspended_operations = snapshot_request
@@ -562,7 +564,7 @@ impl TurboTasksBackendInner {
562
564
let mut snapshot_request = self . snapshot_request . lock ( ) ;
563
565
snapshot_request. snapshot_requested = false ;
564
566
self . in_progress_operations
565
- . fetch_sub ( SNAPSHOT_REQUESTED_BIT , std :: sync :: atomic :: Ordering :: Relaxed ) ;
567
+ . fetch_sub ( SNAPSHOT_REQUESTED_BIT , Ordering :: Relaxed ) ;
566
568
self . snapshot_completed . notify_all ( ) ;
567
569
let snapshot_time = Instant :: now ( ) ;
568
570
drop ( snapshot_request) ;
@@ -622,7 +624,7 @@ impl TurboTasksBackendInner {
622
624
}
623
625
624
626
// Schedule the snapshot job
625
- turbo_tasks. schedule_backend_background_job ( BackendJobId :: from ( 1 ) ) ;
627
+ turbo_tasks. schedule_backend_background_job ( BACKEND_JOB_INITIAL_SNAPSHOT ) ;
626
628
}
627
629
628
630
fn stopping ( & self ) {
@@ -1157,15 +1159,15 @@ impl TurboTasksBackendInner {
1157
1159
turbo_tasks : & ' a dyn TurboTasksBackendApi < TurboTasksBackend > ,
1158
1160
) -> Pin < Box < dyn Future < Output = ( ) > + Send + ' a > > {
1159
1161
Box :: pin ( async move {
1160
- if * id == 1 || * id == 2 {
1162
+ if id == BACKEND_JOB_INITIAL_SNAPSHOT || id == BACKEND_JOB_FOLLOW_UP_SNAPSHOT {
1161
1163
let last_snapshot = self . last_snapshot . load ( Ordering :: Relaxed ) ;
1162
1164
let mut last_snapshot = self . start_time + Duration :: from_millis ( last_snapshot) ;
1163
1165
loop {
1164
1166
const FIRST_SNAPSHOT_WAIT : Duration = Duration :: from_secs ( 30 ) ;
1165
1167
const SNAPSHOT_INTERVAL : Duration = Duration :: from_secs ( 15 ) ;
1166
1168
const IDLE_TIMEOUT : Duration = Duration :: from_secs ( 1 ) ;
1167
1169
1168
- let time = if * id == 1 {
1170
+ let time = if id == BACKEND_JOB_INITIAL_SNAPSHOT {
1169
1171
FIRST_SNAPSHOT_WAIT
1170
1172
} else {
1171
1173
SNAPSHOT_INTERVAL
@@ -1177,7 +1179,11 @@ impl TurboTasksBackendInner {
1177
1179
if !self . stopping . load ( Ordering :: Acquire ) {
1178
1180
let mut idle_start_listener = self . idle_start_event . listen ( ) ;
1179
1181
let mut idle_end_listener = self . idle_end_event . listen ( ) ;
1180
- let mut idle_time = until + IDLE_TIMEOUT ;
1182
+ let mut idle_time = if turbo_tasks. is_idle ( ) {
1183
+ Instant :: now ( ) + IDLE_TIMEOUT
1184
+ } else {
1185
+ far_future ( )
1186
+ } ;
1181
1187
loop {
1182
1188
tokio:: select! {
1183
1189
_ = & mut stop_listener => {
@@ -1191,10 +1197,10 @@ impl TurboTasksBackendInner {
1191
1197
idle_time = until + IDLE_TIMEOUT ;
1192
1198
idle_end_listener = self . idle_end_event. listen( )
1193
1199
} ,
1194
- _ = tokio:: time:: sleep_until( until. into ( ) ) => {
1200
+ _ = tokio:: time:: sleep_until( until) => {
1195
1201
break ;
1196
1202
} ,
1197
- _ = tokio:: time:: sleep_until( idle_time. into ( ) ) => {
1203
+ _ = tokio:: time:: sleep_until( idle_time) => {
1198
1204
if turbo_tasks. is_idle( ) {
1199
1205
break ;
1200
1206
}
@@ -1212,10 +1218,12 @@ impl TurboTasksBackendInner {
1212
1218
continue ;
1213
1219
}
1214
1220
let last_snapshot = last_snapshot. duration_since ( self . start_time ) ;
1215
- self . last_snapshot
1216
- . store ( last_snapshot. as_millis ( ) as u64 , Ordering :: Relaxed ) ;
1221
+ self . last_snapshot . store (
1222
+ last_snapshot. as_millis ( ) . try_into ( ) . unwrap ( ) ,
1223
+ Ordering :: Relaxed ,
1224
+ ) ;
1217
1225
1218
- turbo_tasks. schedule_backend_background_job ( BackendJobId :: from ( 2 ) ) ;
1226
+ turbo_tasks. schedule_backend_background_job ( BACKEND_JOB_FOLLOW_UP_SNAPSHOT ) ;
1219
1227
return ;
1220
1228
}
1221
1229
}
@@ -1525,3 +1533,12 @@ impl Backend for TurboTasksBackend {
1525
1533
todo ! ( )
1526
1534
}
1527
1535
}
1536
+
1537
+ // from https://github.com/tokio-rs/tokio/blob/29cd6ec1ec6f90a7ee1ad641c03e0e00badbcb0e/tokio/src/time/instant.rs#L57-L63
1538
+ fn far_future ( ) -> Instant {
1539
+ // Roughly 30 years from now.
1540
+ // API does not provide a way to obtain max `Instant`
1541
+ // or convert specific date in the future to instant.
1542
+ // 1000 years overflows on macOS, 100 years overflows on FreeBSD.
1543
+ Instant :: now ( ) + Duration :: from_secs ( 86400 * 365 * 30 )
1544
+ }
0 commit comments