Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Turbopack] remove exceeding cells #69059

Closed
wants to merge 49 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
9d1592f
add new backend
sokra Jul 26, 2024
023f16e
new backend
sokra Jul 29, 2024
32b447a
add more backend method implementations
sokra Aug 2, 2024
fcea2fd
very basic operation working
sokra Aug 7, 2024
c6f4e70
add own cells reading
sokra Aug 7, 2024
db0bc43
call transient tasks
sokra Aug 7, 2024
ca6cbb4
call persistent tasks
sokra Aug 7, 2024
da0f04c
run tests multiple times to test caching
sokra Aug 7, 2024
41d716c
add cell/output reading and creating cached tasks
sokra Aug 7, 2024
f573143
serialization fixup
sokra Aug 9, 2024
5029e1c
remove unused stuff
sokra Aug 9, 2024
ed8b087
clippy
sokra Aug 9, 2024
2fb17f3
remove things not yet used (persistence)
sokra Aug 9, 2024
0d8a666
clippy
sokra Aug 9, 2024
37df4cc
add more tests for new backend
sokra Aug 9, 2024
eb0b6c9
add support for connect_child
sokra Aug 13, 2024
78af7a5
set root type on root tasks
sokra Aug 9, 2024
ffc9e21
remove old edges when task recomputation completes
sokra Aug 12, 2024
c448018
add try_get_function_id
sokra Aug 14, 2024
b0977cb
Revert "remove things not yet used (persistence)"
sokra Aug 9, 2024
dd40246
add lmdb persisting and restoring
sokra Aug 8, 2024
bdec3c3
pass test name to test_config to construct db name
sokra Aug 9, 2024
2cce8d9
continue uncompleted operations
sokra Aug 9, 2024
d3fcc67
create dir and logging
sokra Aug 12, 2024
024c2f1
improve error messages
sokra Aug 13, 2024
95e3f2c
handle keys larger than 511 bytes
sokra Aug 13, 2024
6dac853
set root type on root tasks
sokra Aug 9, 2024
4d5b42e
move backend impl into type alias
sokra Aug 12, 2024
733d42a
add new backend feature
sokra Aug 12, 2024
c736b0d
add missing import
sokra Aug 14, 2024
36b118c
initial aggregation update
sokra Aug 13, 2024
03abe58
enable new backend
sokra Aug 13, 2024
686c198
WIP: remove todos
sokra Aug 13, 2024
e126a75
more aggregation operations
sokra Aug 13, 2024
bc62ead
handle state serialization
sokra Aug 14, 2024
aa5ec04
validate serialization and improve errors
sokra Aug 14, 2024
c39ec7d
avoid some serde skip and untagged
sokra Aug 14, 2024
40aab63
use TransientState
sokra Aug 14, 2024
1aa5572
validate serialization and improve errors
sokra Aug 14, 2024
4835ae1
store project options in state
sokra Aug 14, 2024
44e902a
show lookup error
sokra Aug 14, 2024
d4f7561
log restored db entries
sokra Aug 14, 2024
26b7652
improve error
sokra Aug 14, 2024
2a0dd5a
improve benchmark_file_io for persistent cache
sokra Aug 14, 2024
4866761
WIP: print new tasks
sokra Aug 14, 2024
6db46f3
gracefully stop turbo-tasks to allow persisting to complete
sokra Aug 14, 2024
a96143d
avoid storing transient tasks
sokra Aug 14, 2024
92aff18
WIP: logging
sokra Aug 14, 2024
15950e6
remove exceeding cells
sokra Aug 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
new backend
  • Loading branch information
sokra committed Sep 4, 2024
commit 023f16e3b9036a2667c0e8c8c10372e261105058
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions turbopack/crates/turbo-tasks-backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ workspace = true

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
auto-hash-map = { workspace = true }
dashmap = { workspace = true }
once_cell = { workspace = true }
Expand Down
319 changes: 310 additions & 9 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,338 @@
mod operation;
mod storage;

use std::{collections::VecDeque, sync::Arc};
use std::{
borrow::Cow,
collections::HashSet,
future::Future,
hash::BuildHasherDefault,
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};

use parking_lot::Mutex;
use turbo_tasks::{backend::CachedTaskType, TaskId};
use anyhow::Result;
use auto_hash_map::{AutoMap, AutoSet};
use parking_lot::{Condvar, Mutex};
use rustc_hash::FxHasher;
use turbo_tasks::{
backend::{
Backend, BackendJobId, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskType,
TypedCellContent,
},
event::EventListener,
util::IdFactoryWithReuse,
CellId, FunctionId, RawVc, ReadConsistency, TaskId, TraitTypeId, TurboTasksBackendApi,
ValueTypeId, TRANSIENT_TASK_BIT,
};

use self::{operation::Operation, storage::Storage};
use self::{
operation::{AnyOperation, ExecuteContext, Operation},
storage::Storage,
};
use crate::{
data::{CachedDataItem, CachedDataUpdate},
utils::{bi_map::BiMap, chunked_vec::ChunkedVec},
utils::{bi_map::BiMap, chunked_vec::ChunkedVec, ptr_eq_arc::PtrEqArc},
};

const SNAPSHOT_REQUESTED_BIT: usize = 1 << 63;

struct SnapshotRequest {
snapshot_requested: bool,
suspended_operations: HashSet<PtrEqArc<AnyOperation>>,
}

impl SnapshotRequest {
fn new() -> Self {
Self {
snapshot_requested: false,
suspended_operations: HashSet::new(),
}
}
}

pub struct TurboTasksBackend {
persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
transient_task_id_factory: IdFactoryWithReuse<TaskId>,

persisted_task_cache_log: Mutex<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>,
task_cache: BiMap<Arc<CachedTaskType>, TaskId>,

persisted_storage_log: Mutex<ChunkedVec<CachedDataUpdate>>,
storage: Storage<TaskId, CachedDataItem>,
operations: Mutex<VecDeque<Box<dyn Operation>>>,

/// Number of executing operations + Highest bit is set when snapshot is
/// requested. When that bit is set, operations should pause until the
/// snapshot is completed. When the bit is set and in progress counter
/// reaches zero, `operations_completed_when_snapshot_requested` is
/// triggered.
in_progress_operations: AtomicUsize,

snapshot_request: Mutex<SnapshotRequest>,
/// Condition Variable that is triggered when `in_progress_operations`
/// reaches zero while snapshot is requested. All operations are either
/// completed or suspended.
operations_suspended: Condvar,
/// Condition Variable that is triggered when a snapshot is completed and
/// operations can continue.
snapshot_completed: Condvar,
}

impl TurboTasksBackend {
pub fn new() -> Self {
Self {
persisted_task_id_factory: IdFactoryWithReuse::new(1, (TRANSIENT_TASK_BIT - 1) as u64),
transient_task_id_factory: IdFactoryWithReuse::new(
TRANSIENT_TASK_BIT as u64,
u32::MAX as u64,
),
persisted_task_cache_log: Mutex::new(ChunkedVec::new()),
task_cache: BiMap::new(),
persisted_storage_log: Mutex::new(ChunkedVec::new()),
storage: Storage::new(),
operations: Mutex::new(VecDeque::new()),
in_progress_operations: AtomicUsize::new(0),
snapshot_request: Mutex::new(SnapshotRequest::new()),
operations_suspended: Condvar::new(),
snapshot_completed: Condvar::new(),
}
}

fn run_operation(&self, operation: Box<dyn Operation>) {
self.operations.lock().push_back(operation);
fn run_operation(
&self,
operation: impl Operation,
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) {
operation.execute(ExecuteContext::new(self, turbo_tasks));
}

fn operation_suspend_point(&self, suspend: impl FnOnce() -> AnyOperation) {
if (self.in_progress_operations.load(Ordering::Relaxed) & SNAPSHOT_REQUESTED_BIT) != 0 {
let operation = Arc::new(suspend());
let mut snapshot_request = self.snapshot_request.lock();
if snapshot_request.snapshot_requested {
snapshot_request
.suspended_operations
.insert(operation.clone().into());
let value = self.in_progress_operations.fetch_sub(1, Ordering::AcqRel);
assert!((value & SNAPSHOT_REQUESTED_BIT) != 0);
if value == SNAPSHOT_REQUESTED_BIT {
self.operations_suspended.notify_all();
}
self.snapshot_completed
.wait_while(&mut snapshot_request, |snapshot_request| {
snapshot_request.snapshot_requested
});
self.in_progress_operations.fetch_add(1, Ordering::AcqRel);
snapshot_request
.suspended_operations
.remove(&operation.into());
}
}
}
}

// Operations
impl TurboTasksBackend {
pub fn connect_child(
&self,
parent_task: TaskId,
child_task: TaskId,
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) {
self.run_operation(
operation::ConnectChildOperation::new(parent_task, child_task),
turbo_tasks,
);
}
}

impl Backend for TurboTasksBackend {
fn get_or_create_persistent_task(
&self,
task_type: CachedTaskType,
parent_task: TaskId,
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) -> TaskId {
if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
self.connect_child(parent_task, task_id, turbo_tasks);
return task_id;
}

let task_type = Arc::new(task_type);
let task_id = self.persisted_task_id_factory.get();
if let Err(existing_task_id) = self.task_cache.try_insert(task_type.clone(), task_id) {
// Safety: We just created the id and failed to insert it.
unsafe {
self.persisted_task_id_factory.reuse(task_id);
}
self.connect_child(parent_task, existing_task_id, turbo_tasks);
return existing_task_id;
}
self.persisted_task_cache_log
.lock()
.push((task_type, task_id));

self.connect_child(parent_task, task_id, turbo_tasks);

task_id
}

fn invalidate_task(&self, _: TaskId, _: &dyn TurboTasksBackendApi<Self>) {
todo!()
}
fn invalidate_tasks(&self, _: &[TaskId], _: &dyn TurboTasksBackendApi<Self>) {
todo!()
}
fn invalidate_tasks_set(
&self,
_: &AutoSet<TaskId, BuildHasherDefault<FxHasher>, 2>,
_: &dyn TurboTasksBackendApi<Self>,
) {
todo!()
}
fn get_task_description(&self, _: TaskId) -> std::string::String {
todo!()
}
fn try_get_function_id(&self, _: TaskId) -> Option<FunctionId> {
todo!()
}
type ExecutionScopeFuture<T: Future<Output = Result<(), turbo_tasks::Error>>> = Pin<Box<dyn Future<Output = Result<()>> + Send + Sync>> where T: Send + 'static;
fn execution_scope<T: Future<Output = Result<(), turbo_tasks::Error>>>(
&self,
_: TaskId,
_: T,
) -> Self::ExecutionScopeFuture<T>
where
T: Send + 'static,
{
todo!()
}
fn try_start_task_execution(
&self,
_: TaskId,
_: &dyn TurboTasksBackendApi<Self>,
) -> std::option::Option<TaskExecutionSpec<'_>> {
todo!()
}
fn task_execution_result(
&self,
_: TaskId,
_: Result<Result<RawVc, turbo_tasks::Error>, std::option::Option<Cow<'static, str>>>,
_: &dyn TurboTasksBackendApi<Self>,
) {
todo!()
}
fn task_execution_completed(
&self,
_: TaskId,
_: Duration,
_: usize,
_: &AutoMap<ValueTypeId, u32, BuildHasherDefault<FxHasher>, 8>,
_: bool,
_: &dyn TurboTasksBackendApi<Self>,
) -> bool {
todo!()
}
fn run_backend_job(
&self,
_: BackendJobId,
_: &dyn TurboTasksBackendApi<Self>,
) -> Pin<Box<(dyn Future<Output = ()> + Send + 'static)>> {
todo!()
}
fn try_read_task_output(
&self,
_: TaskId,
_: TaskId,
_: ReadConsistency,
_: &dyn TurboTasksBackendApi<Self>,
) -> Result<Result<RawVc, EventListener>, turbo_tasks::Error> {
todo!()
}
fn try_read_task_output_untracked(
&self,
_: TaskId,
_: ReadConsistency,
_: &dyn TurboTasksBackendApi<Self>,
) -> Result<Result<RawVc, EventListener>, turbo_tasks::Error> {
todo!()
}
fn try_read_task_cell(
&self,
_: TaskId,
_: CellId,
_: TaskId,
_: &dyn TurboTasksBackendApi<Self>,
) -> Result<Result<TypedCellContent, EventListener>, turbo_tasks::Error> {
todo!()
}
fn try_read_task_cell_untracked(
&self,
_: TaskId,
_: CellId,
_: &dyn TurboTasksBackendApi<Self>,
) -> Result<Result<TypedCellContent, EventListener>, turbo_tasks::Error> {
todo!()
}
fn read_task_collectibles(
&self,
_: TaskId,
_: TraitTypeId,
_: TaskId,
_: &dyn TurboTasksBackendApi<Self>,
) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
todo!()
}
fn emit_collectible(
&self,
_: TraitTypeId,
_: RawVc,
_: TaskId,
_: &dyn TurboTasksBackendApi<Self>,
) {
todo!()
}
fn unemit_collectible(
&self,
_: TraitTypeId,
_: RawVc,
_: u32,
_: TaskId,
_: &dyn TurboTasksBackendApi<Self>,
) {
todo!()
}
fn update_task_cell(
&self,
_: TaskId,
_: CellId,
_: CellContent,
_: &dyn TurboTasksBackendApi<Self>,
) {
todo!()
}
fn get_or_create_transient_task(
&self,
_: CachedTaskType,
_: TaskId,
_: &dyn TurboTasksBackendApi<Self>,
) -> TaskId {
todo!()
}
fn connect_task(&self, _: TaskId, _: TaskId, _: &dyn TurboTasksBackendApi<Self>) {
todo!()
}
fn create_transient_task(
&self,
_: TransientTaskType,
_: &dyn TurboTasksBackendApi<Self>,
) -> TaskId {
todo!()
}
fn dispose_root_task(&self, _: TaskId, _: &dyn TurboTasksBackendApi<Self>) {
todo!()
}
}
Loading