diff --git a/turbopack/crates/turbo-tasks-memory/src/memory_backend.rs b/turbopack/crates/turbo-tasks-memory/src/memory_backend.rs index 0458b7bd34f67..f7840d3f524fc 100644 --- a/turbopack/crates/turbo-tasks-memory/src/memory_backend.rs +++ b/turbopack/crates/turbo-tasks-memory/src/memory_backend.rs @@ -1,6 +1,5 @@ use std::{ borrow::{Borrow, Cow}, - cell::RefCell, future::Future, hash::{BuildHasher, BuildHasherDefault, Hash}, num::NonZeroU32, @@ -16,7 +15,6 @@ use anyhow::{anyhow, bail, Result}; use auto_hash_map::AutoMap; use dashmap::{mapref::entry::Entry, DashMap}; use rustc_hash::FxHasher; -use tokio::task::futures::TaskLocalFuture; use tracing::trace_span; use turbo_prehash::{BuildHasherExt, PassThroughHash, PreHashed}; use turbo_tasks::{ @@ -37,7 +35,7 @@ use crate::{ PERCENTAGE_MIN_IDLE_TARGET_MEMORY, PERCENTAGE_MIN_TARGET_MEMORY, }, output::Output, - task::{ReadCellError, Task, TaskType, DEPENDENCIES_TO_TRACK}, + task::{ReadCellError, Task, TaskType}, task_statistics::TaskStatisticsApi, }; @@ -45,6 +43,12 @@ fn prehash_task_type(task_type: CachedTaskType) -> PreHashed { BuildHasherDefault::::prehash(&Default::default(), task_type) } +pub struct TaskState { + /// Cells/Outputs/Collectibles that are read during task execution. These will be stored as + /// dependencies when the execution has finished. + pub dependencies_to_track: TaskEdgesSet, +} + pub struct MemoryBackend { persistent_tasks: NoMoveVec, transient_tasks: NoMoveVec, @@ -436,14 +440,11 @@ impl Backend for MemoryBackend { self.with_task(task, |task| task.get_description()) } - type ExecutionScopeFuture> + Send + 'static> = - TaskLocalFuture, T>; - fn execution_scope> + Send + 'static>( - &self, - _task: TaskId, - future: T, - ) -> Self::ExecutionScopeFuture { - DEPENDENCIES_TO_TRACK.scope(RefCell::new(TaskEdgesSet::new()), future) + type TaskState = TaskState; + fn new_task_state(&self, _task: TaskId) -> Self::TaskState { + TaskState { + dependencies_to_track: TaskEdgesSet::new(), + } } fn try_start_task_execution<'a>( @@ -529,7 +530,7 @@ impl Backend for MemoryBackend { move || format!("reading task output from {reader}"), turbo_tasks, |output| { - Task::add_dependency_to_current(TaskEdge::Output(task)); + Task::add_dependency_to_current(TaskEdge::Output(task), turbo_tasks); output.read(reader) }, ) @@ -564,7 +565,7 @@ impl Backend for MemoryBackend { }) .into_typed(index.type_id))) } else { - Task::add_dependency_to_current(TaskEdge::Cell(task_id, index)); + Task::add_dependency_to_current(TaskEdge::Cell(task_id, index), turbo_tasks); self.with_task(task_id, |task| { match task.read_cell( index, @@ -623,7 +624,7 @@ impl Backend for MemoryBackend { reader: TaskId, turbo_tasks: &dyn TurboTasksBackendApi, ) -> TaskCollectiblesMap { - Task::add_dependency_to_current(TaskEdge::Collectibles(id, trait_id)); + Task::add_dependency_to_current(TaskEdge::Collectibles(id, trait_id), turbo_tasks); Task::read_collectibles(id, trait_id, reader, self, turbo_tasks) } diff --git a/turbopack/crates/turbo-tasks-memory/src/task.rs b/turbopack/crates/turbo-tasks-memory/src/task.rs index c440728acfcee..5d2024dfbef37 100644 --- a/turbopack/crates/turbo-tasks-memory/src/task.rs +++ b/turbopack/crates/turbo-tasks-memory/src/task.rs @@ -1,6 +1,5 @@ use std::{ borrow::Cow, - cell::RefCell, fmt::{self, Debug, Display, Formatter}, future::Future, hash::{BuildHasherDefault, Hash}, @@ -17,14 +16,13 @@ use either::Either; use parking_lot::{Mutex, RwLock}; use rustc_hash::FxHasher; use smallvec::SmallVec; -use tokio::task_local; use tracing::Span; use turbo_prehash::PreHashed; use turbo_tasks::{ backend::{CachedTaskType, CellContent, TaskCollectiblesMap, TaskExecutionSpec}, event::{Event, EventListener}, get_invalidator, registry, CellId, Invalidator, RawVc, ReadConsistency, TaskId, TaskIdSet, - TraitTypeId, TurboTasksBackendApi, ValueTypeId, + TraitTypeId, TurboTasksBackendApi, TurboTasksBackendApiExt, ValueTypeId, }; use crate::{ @@ -45,12 +43,6 @@ pub type NativeTaskFn = Box NativeTaskFuture + Send + Sync>; mod aggregation; mod meta_state; -task_local! { - /// Cells/Outputs/Collectibles that are read during task execution - /// These will be stored as dependencies when the execution has finished - pub(crate) static DEPENDENCIES_TO_TRACK: RefCell; -} - type OnceTaskFn = Mutex> + Send + 'static>>>>; /// Different Task types @@ -966,7 +958,8 @@ impl Task { let mut change_job = None; let mut remove_job = None; let mut drained_cells = SmallVec::<[Cell; 8]>::new(); - let dependencies = DEPENDENCIES_TO_TRACK.with(|deps| deps.take()); + let dependencies = turbo_tasks + .write_task_state(|deps| std::mem::take(&mut deps.dependencies_to_track)); { let mut state = self.full_state_mut(); @@ -1343,11 +1336,13 @@ impl Task { } } - pub(crate) fn add_dependency_to_current(dep: TaskEdge) { - DEPENDENCIES_TO_TRACK.with(|list| { - let mut list = list.borrow_mut(); - list.insert(dep); - }) + pub(crate) fn add_dependency_to_current( + dep: TaskEdge, + turbo_tasks: &dyn TurboTasksBackendApi, + ) { + turbo_tasks.write_task_state(|ts| { + ts.dependencies_to_track.insert(dep); + }); } /// Get an [Invalidator] that can be used to invalidate the current [Task] diff --git a/turbopack/crates/turbo-tasks/src/backend.rs b/turbopack/crates/turbo-tasks/src/backend.rs index 38e95b2dfa87d..a272bbad83572 100644 --- a/turbopack/crates/turbo-tasks/src/backend.rs +++ b/turbopack/crates/turbo-tasks/src/backend.rs @@ -441,15 +441,28 @@ pub trait Backend: Sync + Send { fn get_task_description(&self, task: TaskId) -> String; - type ExecutionScopeFuture> + Send + 'static>: Future> - + Send - + 'static; + /// Task-local state that stored inside of [`TurboTasksBackendApi`]. Constructed with + /// [`Self::new_task_state`]. + /// + /// This value that can later be written to or read from using + /// [`crate::TurboTasksBackendApiExt::write_task_state`] or + /// [`crate::TurboTasksBackendApiExt::read_task_state`] + /// + /// This data may be shared across multiple threads (must be `Sync`) in order to support + /// detached futures ([`crate::TurboTasksApi::detached_for_testing`]) and [pseudo-tasks using + /// `local_cells`][crate::function]. A [`RwLock`][std::sync::RwLock] is used to provide + /// concurrent access. + type TaskState: Send + Sync + 'static; - fn execution_scope> + Send + 'static>( - &self, - task: TaskId, - future: T, - ) -> Self::ExecutionScopeFuture; + /// Constructs a new task-local [`Self::TaskState`] for the given `task_id`. + /// + /// If a task is re-executed (e.g. because it is invalidated), this function will be called + /// again with the same [`TaskId`]. + /// + /// This value can be written to or read from using + /// [`crate::TurboTasksBackendApiExt::write_task_state`] and + /// [`crate::TurboTasksBackendApiExt::read_task_state`] + fn new_task_state(&self, task: TaskId) -> Self::TaskState; fn try_start_task_execution<'a>( &'a self, diff --git a/turbopack/crates/turbo-tasks/src/lib.rs b/turbopack/crates/turbo-tasks/src/lib.rs index 26b484434db62..eb0c4497535d9 100644 --- a/turbopack/crates/turbo-tasks/src/lib.rs +++ b/turbopack/crates/turbo-tasks/src/lib.rs @@ -91,7 +91,8 @@ pub use manager::{ dynamic_call, dynamic_this_call, emit, get_invalidator, mark_finished, mark_stateful, prevent_gc, run_once, run_once_with_reason, spawn_blocking, spawn_thread, trait_call, turbo_tasks, CurrentCellRef, Invalidator, ReadConsistency, TaskPersistence, TurboTasks, - TurboTasksApi, TurboTasksBackendApi, TurboTasksCallApi, Unused, UpdateInfo, + TurboTasksApi, TurboTasksBackendApi, TurboTasksBackendApiExt, TurboTasksCallApi, Unused, + UpdateInfo, }; pub use native_function::{FunctionMeta, NativeFunction}; pub use raw_vc::{CellId, RawVc, ReadRawVcFuture, ResolveTypeError}; diff --git a/turbopack/crates/turbo-tasks/src/manager.rs b/turbopack/crates/turbo-tasks/src/manager.rs index 15de2eaa84b52..5b76872f120ce 100644 --- a/turbopack/crates/turbo-tasks/src/manager.rs +++ b/turbopack/crates/turbo-tasks/src/manager.rs @@ -1,4 +1,5 @@ use std::{ + any::Any, borrow::Cow, future::Future, hash::{BuildHasherDefault, Hash}, @@ -236,10 +237,55 @@ pub trait TurboTasksBackendApi: TurboTasksCallApi + Sync + /// Returns the duration from the start of the program to the given instant. fn program_duration_until(&self, instant: Instant) -> Duration; + + /// An untyped object-safe version of [`TurboTasksBackendApiExt::read_task_state`]. Callers + /// should prefer the extension trait's version of this method. + #[allow(clippy::type_complexity)] // Moving this to a typedef would make the docs confusing + fn read_task_state_boxed(&self, func: Box); + + /// An untyped object-safe version of [`TurboTasksBackendApiExt::write_task_state`]. Callers + /// should prefer the extension trait's version of this method. + #[allow(clippy::type_complexity)] + fn write_task_state_boxed(&self, func: Box); + /// Returns a reference to the backend. fn backend(&self) -> &B; } +/// An extension trait for methods of `TurboTasksBackendApi` that are not object-safe. This is +/// automatically implemented for all `TurboTasksBackendApi`s using a blanket impl. +pub trait TurboTasksBackendApiExt: TurboTasksBackendApi { + /// Allows modification of the [`Backend::TaskState`]. + /// + /// This function holds open a non-exclusive read lock that blocks writes, so `func` is expected + /// to execute quickly in order to release the lock. + fn read_task_state(&self, func: impl FnOnce(&B::TaskState) -> T) -> T; + + /// Allows modification of the [`Backend::TaskState`]. + /// + /// This function holds open a write lock, so `func` is expected to execute quickly in order to + /// release the lock. + fn write_task_state(&self, func: impl FnOnce(&mut B::TaskState) -> T) -> T; +} + +impl TurboTasksBackendApiExt for TT +where + TT: TurboTasksBackendApi + ?Sized, + B: Backend + 'static, +{ + fn read_task_state(&self, func: impl FnOnce(&B::TaskState) -> T) -> T { + let mut out = None; + self.read_task_state_boxed(Box::new(|ts| out = Some(func(ts)))); + out.expect("write_task_state_boxed must call `func`") + } + + fn write_task_state(&self, func: impl FnOnce(&mut B::TaskState) -> T) -> T { + let mut out = None; + self.write_task_state_boxed(Box::new(|ts| out = Some(func(ts)))); + out.expect("write_task_state_boxed must call `func`") + } +} + #[allow(clippy::manual_non_exhaustive)] pub struct UpdateInfo { pub duration: Duration, @@ -341,10 +387,12 @@ struct CurrentGlobalTaskState { /// Tracks currently running local tasks, and defers cleanup of the global task until those /// complete. local_task_tracker: TaskTracker, + + backend_state: Box, } impl CurrentGlobalTaskState { - fn new(task_id: TaskId) -> Self { + fn new(task_id: TaskId, backend_state: Box) -> Self { Self { task_id, tasks_to_notify: Vec::new(), @@ -352,6 +400,7 @@ impl CurrentGlobalTaskState { cell_counters: Some(AutoMap::default()), local_cells: Vec::new(), local_task_tracker: TaskTracker::new(), + backend_state, } } } @@ -689,7 +738,11 @@ impl TurboTasks { let future = async move { let mut schedule_again = true; while schedule_again { - let global_task_state = Arc::new(RwLock::new(CurrentGlobalTaskState::new(task_id))); + let backend_state = this.backend.new_task_state(task_id); + let global_task_state = Arc::new(RwLock::new(CurrentGlobalTaskState::new( + task_id, + Box::new(backend_state), + ))); let local_task_state = CurrentLocalTaskState::new( this.execution_id_factory.get(), this.backend @@ -754,9 +807,7 @@ impl TurboTasks { anyhow::Ok(()) }; - let future = TURBO_TASKS - .scope(self.pin(), self.backend.execution_scope(task_id, future)) - .in_current_span(); + let future = TURBO_TASKS.scope(self.pin(), future).in_current_span(); #[cfg(feature = "tokio_tracing")] tokio::task::Builder::new() @@ -1316,22 +1367,15 @@ impl TurboTasksApi for TurboTasks { // state as well. let global_task_state = CURRENT_GLOBAL_TASK_STATE.with(|ts| ts.clone()); let local_task_state = CURRENT_LOCAL_TASK_STATE.with(|ts| ts.clone()); - let (task_id, fut) = { + let tracked_fut = { let ts = global_task_state.read().unwrap(); - (ts.task_id, ts.local_task_tracker.track_future(fut)) + ts.local_task_tracker.track_future(fut) }; Box::pin(TURBO_TASKS.scope( turbo_tasks(), CURRENT_GLOBAL_TASK_STATE.scope( global_task_state, - CURRENT_LOCAL_TASK_STATE.scope( - local_task_state, - // TODO(bgw): This will create a new task-local in the backend, which is not - // what we want. Instead we should replace `execution_scope` with a more - // limited API that allows storing thread-local state in a way the manager can - // control. - self.backend.execution_scope(task_id, fut), - ), + CURRENT_LOCAL_TASK_STATE.scope(local_task_state, tracked_fut), ), )) } @@ -1450,6 +1494,16 @@ impl TurboTasksBackendApi for TurboTasks { unsafe fn reuse_transient_task_id(&self, id: Unused) { unsafe { self.transient_task_id_factory.reuse(id.into()) } } + + fn read_task_state_boxed(&self, func: Box) { + CURRENT_GLOBAL_TASK_STATE + .with(move |ts| func(ts.read().unwrap().backend_state.downcast_ref().unwrap())) + } + + fn write_task_state_boxed(&self, func: Box) { + CURRENT_GLOBAL_TASK_STATE + .with(move |ts| func(ts.write().unwrap().backend_state.downcast_mut().unwrap())) + } } pub(crate) fn current_task(from: &str) -> TaskId { @@ -1666,7 +1720,10 @@ pub fn with_turbo_tasks_for_testing( TURBO_TASKS.scope( tt, CURRENT_GLOBAL_TASK_STATE.scope( - Arc::new(RwLock::new(CurrentGlobalTaskState::new(current_task))), + Arc::new(RwLock::new(CurrentGlobalTaskState::new( + current_task, + Box::new(()), + ))), CURRENT_LOCAL_TASK_STATE.scope(CurrentLocalTaskState::new(execution_id, None), f), ), )