Skip to content

Commit

Permalink
refactor(turbo-tasks) Add a higher-level task-local state API for the…
Browse files Browse the repository at this point in the history
… Backend trait

This replaces the `execution_scope` API, which doesn't work with detached tasks (or tasks with `local_cells`).
  • Loading branch information
bgw committed Aug 16, 2024
1 parent 14113a3 commit 94aee98
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 54 deletions.
29 changes: 15 additions & 14 deletions turbopack/crates/turbo-tasks-memory/src/memory_backend.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{
borrow::{Borrow, Cow},
cell::RefCell,
future::Future,
hash::{BuildHasher, BuildHasherDefault, Hash},
num::NonZeroU32,
Expand All @@ -16,7 +15,6 @@ use anyhow::{anyhow, bail, Result};
use auto_hash_map::AutoMap;
use dashmap::{mapref::entry::Entry, DashMap};
use rustc_hash::FxHasher;
use tokio::task::futures::TaskLocalFuture;
use tracing::trace_span;
use turbo_prehash::{BuildHasherExt, PassThroughHash, PreHashed};
use turbo_tasks::{
Expand All @@ -37,14 +35,20 @@ use crate::{
PERCENTAGE_MIN_IDLE_TARGET_MEMORY, PERCENTAGE_MIN_TARGET_MEMORY,
},
output::Output,
task::{ReadCellError, Task, TaskType, DEPENDENCIES_TO_TRACK},
task::{ReadCellError, Task, TaskType},
task_statistics::TaskStatisticsApi,
};

fn prehash_task_type(task_type: CachedTaskType) -> PreHashed<CachedTaskType> {
BuildHasherDefault::<FxHasher>::prehash(&Default::default(), task_type)
}

pub struct TaskState {
/// Cells/Outputs/Collectibles that are read during task execution. These will be stored as
/// dependencies when the execution has finished.
pub dependencies_to_track: TaskEdgesSet,
}

pub struct MemoryBackend {
persistent_tasks: NoMoveVec<Task, 13>,
transient_tasks: NoMoveVec<Task, 10>,
Expand Down Expand Up @@ -436,14 +440,11 @@ impl Backend for MemoryBackend {
self.with_task(task, |task| task.get_description())
}

type ExecutionScopeFuture<T: Future<Output = Result<()>> + Send + 'static> =
TaskLocalFuture<RefCell<TaskEdgesSet>, T>;
fn execution_scope<T: Future<Output = Result<()>> + Send + 'static>(
&self,
_task: TaskId,
future: T,
) -> Self::ExecutionScopeFuture<T> {
DEPENDENCIES_TO_TRACK.scope(RefCell::new(TaskEdgesSet::new()), future)
type TaskState = TaskState;
fn new_task_state(&self, _task: TaskId) -> Self::TaskState {
TaskState {
dependencies_to_track: TaskEdgesSet::new(),
}
}

fn try_start_task_execution<'a>(
Expand Down Expand Up @@ -529,7 +530,7 @@ impl Backend for MemoryBackend {
move || format!("reading task output from {reader}"),
turbo_tasks,
|output| {
Task::add_dependency_to_current(TaskEdge::Output(task));
Task::add_dependency_to_current(TaskEdge::Output(task), turbo_tasks);
output.read(reader)
},
)
Expand Down Expand Up @@ -564,7 +565,7 @@ impl Backend for MemoryBackend {
})
.into_typed(index.type_id)))
} else {
Task::add_dependency_to_current(TaskEdge::Cell(task_id, index));
Task::add_dependency_to_current(TaskEdge::Cell(task_id, index), turbo_tasks);
self.with_task(task_id, |task| {
match task.read_cell(
index,
Expand Down Expand Up @@ -623,7 +624,7 @@ impl Backend for MemoryBackend {
reader: TaskId,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
) -> TaskCollectiblesMap {
Task::add_dependency_to_current(TaskEdge::Collectibles(id, trait_id));
Task::add_dependency_to_current(TaskEdge::Collectibles(id, trait_id), turbo_tasks);
Task::read_collectibles(id, trait_id, reader, self, turbo_tasks)
}

Expand Down
25 changes: 10 additions & 15 deletions turbopack/crates/turbo-tasks-memory/src/task.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{
borrow::Cow,
cell::RefCell,
fmt::{self, Debug, Display, Formatter},
future::Future,
hash::{BuildHasherDefault, Hash},
Expand All @@ -17,14 +16,13 @@ use either::Either;
use parking_lot::{Mutex, RwLock};
use rustc_hash::FxHasher;
use smallvec::SmallVec;
use tokio::task_local;
use tracing::Span;
use turbo_prehash::PreHashed;
use turbo_tasks::{
backend::{CachedTaskType, CellContent, TaskCollectiblesMap, TaskExecutionSpec},
event::{Event, EventListener},
get_invalidator, registry, CellId, Invalidator, RawVc, ReadConsistency, TaskId, TaskIdSet,
TraitTypeId, TurboTasksBackendApi, ValueTypeId,
TraitTypeId, TurboTasksBackendApi, TurboTasksBackendApiExt, ValueTypeId,
};

use crate::{
Expand All @@ -45,12 +43,6 @@ pub type NativeTaskFn = Box<dyn Fn() -> NativeTaskFuture + Send + Sync>;
mod aggregation;
mod meta_state;

task_local! {
/// Cells/Outputs/Collectibles that are read during task execution
/// These will be stored as dependencies when the execution has finished
pub(crate) static DEPENDENCIES_TO_TRACK: RefCell<TaskEdgesSet>;
}

type OnceTaskFn = Mutex<Option<Pin<Box<dyn Future<Output = Result<RawVc>> + Send + 'static>>>>;

/// Different Task types
Expand Down Expand Up @@ -966,7 +958,8 @@ impl Task {
let mut change_job = None;
let mut remove_job = None;
let mut drained_cells = SmallVec::<[Cell; 8]>::new();
let dependencies = DEPENDENCIES_TO_TRACK.with(|deps| deps.take());
let dependencies = turbo_tasks
.write_task_state(|deps| std::mem::take(&mut deps.dependencies_to_track));
{
let mut state = self.full_state_mut();

Expand Down Expand Up @@ -1343,11 +1336,13 @@ impl Task {
}
}

pub(crate) fn add_dependency_to_current(dep: TaskEdge) {
DEPENDENCIES_TO_TRACK.with(|list| {
let mut list = list.borrow_mut();
list.insert(dep);
})
pub(crate) fn add_dependency_to_current(
dep: TaskEdge,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
) {
turbo_tasks.write_task_state(|ts| {
ts.dependencies_to_track.insert(dep);
});
}

/// Get an [Invalidator] that can be used to invalidate the current [Task]
Expand Down
29 changes: 21 additions & 8 deletions turbopack/crates/turbo-tasks/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,15 +441,28 @@ pub trait Backend: Sync + Send {

fn get_task_description(&self, task: TaskId) -> String;

type ExecutionScopeFuture<T: Future<Output = Result<()>> + Send + 'static>: Future<Output = Result<()>>
+ Send
+ 'static;
/// Task-local state that stored inside of [`TurboTasksBackendApi`]. Constructed with
/// [`Self::new_task_state`].
///
/// This value that can later be written to or read from using
/// [`crate::TurboTasksBackendApiExt::write_task_state`] or
/// [`crate::TurboTasksBackendApiExt::read_task_state`]
///
/// This data may be shared across multiple threads (must be `Sync`) in order to support
/// detached futures ([`crate::TurboTasksApi::detached_for_testing`]) and [pseudo-tasks using
/// `local_cells`][crate::function]. A [`RwLock`][std::sync::RwLock] is used to provide
/// concurrent access.
type TaskState: Send + Sync + 'static;

fn execution_scope<T: Future<Output = Result<()>> + Send + 'static>(
&self,
task: TaskId,
future: T,
) -> Self::ExecutionScopeFuture<T>;
/// Constructs a new task-local [`Self::TaskState`] for the given `task_id`.
///
/// If a task is re-executed (e.g. because it is invalidated), this function will be called
/// again with the same [`TaskId`].
///
/// This value can be written to or read from using
/// [`crate::TurboTasksBackendApiExt::write_task_state`] and
/// [`crate::TurboTasksBackendApiExt::read_task_state`]
fn new_task_state(&self, task: TaskId) -> Self::TaskState;

fn try_start_task_execution<'a>(
&'a self,
Expand Down
3 changes: 2 additions & 1 deletion turbopack/crates/turbo-tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ pub use manager::{
dynamic_call, dynamic_this_call, emit, get_invalidator, mark_finished, mark_stateful,
prevent_gc, run_once, run_once_with_reason, spawn_blocking, spawn_thread, trait_call,
turbo_tasks, CurrentCellRef, Invalidator, ReadConsistency, TaskPersistence, TurboTasks,
TurboTasksApi, TurboTasksBackendApi, TurboTasksCallApi, Unused, UpdateInfo,
TurboTasksApi, TurboTasksBackendApi, TurboTasksBackendApiExt, TurboTasksCallApi, Unused,
UpdateInfo,
};
pub use native_function::{FunctionMeta, NativeFunction};
pub use raw_vc::{CellId, RawVc, ReadRawVcFuture, ResolveTypeError};
Expand Down
89 changes: 73 additions & 16 deletions turbopack/crates/turbo-tasks/src/manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
any::Any,
borrow::Cow,
future::Future,
hash::{BuildHasherDefault, Hash},
Expand Down Expand Up @@ -236,10 +237,55 @@ pub trait TurboTasksBackendApi<B: Backend + 'static>: TurboTasksCallApi + Sync +

/// Returns the duration from the start of the program to the given instant.
fn program_duration_until(&self, instant: Instant) -> Duration;

/// An untyped object-safe version of [`TurboTasksBackendApiExt::read_task_state`]. Callers
/// should prefer the extension trait's version of this method.
#[allow(clippy::type_complexity)] // Moving this to a typedef would make the docs confusing
fn read_task_state_boxed(&self, func: Box<dyn FnOnce(&B::TaskState) + '_>);

/// An untyped object-safe version of [`TurboTasksBackendApiExt::write_task_state`]. Callers
/// should prefer the extension trait's version of this method.
#[allow(clippy::type_complexity)]
fn write_task_state_boxed(&self, func: Box<dyn FnOnce(&mut B::TaskState) + '_>);

/// Returns a reference to the backend.
fn backend(&self) -> &B;
}

/// An extension trait for methods of `TurboTasksBackendApi` that are not object-safe. This is
/// automatically implemented for all `TurboTasksBackendApi`s using a blanket impl.
pub trait TurboTasksBackendApiExt<B: Backend + 'static>: TurboTasksBackendApi<B> {
/// Allows modification of the [`Backend::TaskState`].
///
/// This function holds open a non-exclusive read lock that blocks writes, so `func` is expected
/// to execute quickly in order to release the lock.
fn read_task_state<T>(&self, func: impl FnOnce(&B::TaskState) -> T) -> T;

/// Allows modification of the [`Backend::TaskState`].
///
/// This function holds open a write lock, so `func` is expected to execute quickly in order to
/// release the lock.
fn write_task_state<T>(&self, func: impl FnOnce(&mut B::TaskState) -> T) -> T;
}

impl<TT, B> TurboTasksBackendApiExt<B> for TT
where
TT: TurboTasksBackendApi<B> + ?Sized,
B: Backend + 'static,
{
fn read_task_state<T>(&self, func: impl FnOnce(&B::TaskState) -> T) -> T {
let mut out = None;
self.read_task_state_boxed(Box::new(|ts| out = Some(func(ts))));
out.expect("write_task_state_boxed must call `func`")
}

fn write_task_state<T>(&self, func: impl FnOnce(&mut B::TaskState) -> T) -> T {
let mut out = None;
self.write_task_state_boxed(Box::new(|ts| out = Some(func(ts))));
out.expect("write_task_state_boxed must call `func`")
}
}

#[allow(clippy::manual_non_exhaustive)]
pub struct UpdateInfo {
pub duration: Duration,
Expand Down Expand Up @@ -341,17 +387,20 @@ struct CurrentGlobalTaskState {
/// Tracks currently running local tasks, and defers cleanup of the global task until those
/// complete.
local_task_tracker: TaskTracker,

backend_state: Box<dyn Any + Send + Sync>,
}

impl CurrentGlobalTaskState {
fn new(task_id: TaskId) -> Self {
fn new(task_id: TaskId, backend_state: Box<dyn Any + Send + Sync>) -> Self {
Self {
task_id,
tasks_to_notify: Vec::new(),
stateful: false,
cell_counters: Some(AutoMap::default()),
local_cells: Vec::new(),
local_task_tracker: TaskTracker::new(),
backend_state,
}
}
}
Expand Down Expand Up @@ -689,7 +738,11 @@ impl<B: Backend + 'static> TurboTasks<B> {
let future = async move {
let mut schedule_again = true;
while schedule_again {
let global_task_state = Arc::new(RwLock::new(CurrentGlobalTaskState::new(task_id)));
let backend_state = this.backend.new_task_state(task_id);
let global_task_state = Arc::new(RwLock::new(CurrentGlobalTaskState::new(
task_id,
Box::new(backend_state),
)));
let local_task_state = CurrentLocalTaskState::new(
this.execution_id_factory.get(),
this.backend
Expand Down Expand Up @@ -754,9 +807,7 @@ impl<B: Backend + 'static> TurboTasks<B> {
anyhow::Ok(())
};

let future = TURBO_TASKS
.scope(self.pin(), self.backend.execution_scope(task_id, future))
.in_current_span();
let future = TURBO_TASKS.scope(self.pin(), future).in_current_span();

#[cfg(feature = "tokio_tracing")]
tokio::task::Builder::new()
Expand Down Expand Up @@ -1316,22 +1367,15 @@ impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
// state as well.
let global_task_state = CURRENT_GLOBAL_TASK_STATE.with(|ts| ts.clone());
let local_task_state = CURRENT_LOCAL_TASK_STATE.with(|ts| ts.clone());
let (task_id, fut) = {
let tracked_fut = {
let ts = global_task_state.read().unwrap();
(ts.task_id, ts.local_task_tracker.track_future(fut))
ts.local_task_tracker.track_future(fut)
};
Box::pin(TURBO_TASKS.scope(
turbo_tasks(),
CURRENT_GLOBAL_TASK_STATE.scope(
global_task_state,
CURRENT_LOCAL_TASK_STATE.scope(
local_task_state,
// TODO(bgw): This will create a new task-local in the backend, which is not
// what we want. Instead we should replace `execution_scope` with a more
// limited API that allows storing thread-local state in a way the manager can
// control.
self.backend.execution_scope(task_id, fut),
),
CURRENT_LOCAL_TASK_STATE.scope(local_task_state, tracked_fut),
),
))
}
Expand Down Expand Up @@ -1450,6 +1494,16 @@ impl<B: Backend + 'static> TurboTasksBackendApi<B> for TurboTasks<B> {
unsafe fn reuse_transient_task_id(&self, id: Unused<TaskId>) {
unsafe { self.transient_task_id_factory.reuse(id.into()) }
}

fn read_task_state_boxed(&self, func: Box<dyn FnOnce(&B::TaskState) + '_>) {
CURRENT_GLOBAL_TASK_STATE
.with(move |ts| func(ts.read().unwrap().backend_state.downcast_ref().unwrap()))
}

fn write_task_state_boxed(&self, func: Box<dyn FnOnce(&mut B::TaskState) + '_>) {
CURRENT_GLOBAL_TASK_STATE
.with(move |ts| func(ts.write().unwrap().backend_state.downcast_mut().unwrap()))
}
}

pub(crate) fn current_task(from: &str) -> TaskId {
Expand Down Expand Up @@ -1666,7 +1720,10 @@ pub fn with_turbo_tasks_for_testing<T>(
TURBO_TASKS.scope(
tt,
CURRENT_GLOBAL_TASK_STATE.scope(
Arc::new(RwLock::new(CurrentGlobalTaskState::new(current_task))),
Arc::new(RwLock::new(CurrentGlobalTaskState::new(
current_task,
Box::new(()),
))),
CURRENT_LOCAL_TASK_STATE.scope(CurrentLocalTaskState::new(execution_id, None), f),
),
)
Expand Down

0 comments on commit 94aee98

Please sign in to comment.