Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Turbopack] improve aggregation implementation #69651

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
a47eabe
add lmdb persisting and restoring
sokra Aug 9, 2024
10b2688
pass test name to test_config to construct db name
sokra Aug 9, 2024
3320b98
continue uncompleted operations
sokra Aug 9, 2024
e389ce3
create dir and logging
sokra Aug 12, 2024
fba0487
improve error messages
sokra Aug 13, 2024
b6f946d
handle keys larger than 511 bytes
sokra Aug 13, 2024
d952e25
avoid storing transient tasks
sokra Aug 14, 2024
fdfe7e9
show lookup error
sokra Aug 14, 2024
3c3d2d2
handle state serialization
sokra Aug 14, 2024
4f6dffc
validate serialization and improve errors
sokra Aug 14, 2024
b1cff41
add turbo_tasks_backend to tracing, add tracing for restore
sokra Aug 16, 2024
d6a3594
disable TLS
sokra Aug 16, 2024
a16b7cb
print lookup error
sokra Aug 16, 2024
9338f2a
verify serialization
sokra Aug 16, 2024
f4aecd9
fix lookup deserialization
sokra Aug 16, 2024
de09ab8
replace bincode with pot
sokra Aug 16, 2024
7220910
fix restore data trace
sokra Aug 17, 2024
796656f
more tracing in db
sokra Aug 17, 2024
517f568
remove verify_serialization
sokra Aug 19, 2024
716fd63
fix race condition
sokra Aug 17, 2024
898a031
do not interrupt persisting while there is data
sokra Aug 19, 2024
def2e20
add persist trace
sokra Aug 19, 2024
82abf4f
improve task aggregation
sokra Sep 3, 2024
c58050e
restore task_pair
sokra Sep 4, 2024
b1093f5
verify persistent function only calls persistent functions
sokra Aug 19, 2024
40dde1d
clippy
sokra Sep 6, 2024
e6451d0
add more details to save_snapshot tracing
sokra Sep 20, 2024
1d8d5f8
store task data and aggregation separately
sokra Sep 25, 2024
21bcf0e
compare with old value before storing to db
sokra Sep 24, 2024
7f3b34f
Early stop snapshot interval when stopping
sokra Sep 24, 2024
3440d1b
parallel save_snapshot
sokra Sep 24, 2024
7bc39db
run snapshot in spawn_blocking
sokra Sep 25, 2024
957708d
save snapshot when starting idle
sokra Sep 25, 2024
9d0a7ed
wait until idle for a second before persisting
sokra Sep 25, 2024
909a230
increase max DB size
sokra Sep 26, 2024
cf9c9b0
remove db lookup tracing
sokra Sep 26, 2024
da79e6e
remove database println
sokra Sep 26, 2024
464adcc
sharded transaction log
sokra Sep 26, 2024
38f8d57
share read transition
sokra Oct 4, 2024
cfd75ba
remove aggregation update queue tracing
sokra Sep 27, 2024
2c2938e
improve messaging
sokra Sep 27, 2024
8ba023f
handle idle end event for idle detection in persisting
sokra Sep 27, 2024
291c838
improve imports
sokra Oct 4, 2024
d4d6ee6
clippy
sokra Oct 4, 2024
ff90deb
randomize port
sokra Oct 4, 2024
5aa1f81
add or_insert to AutoMap Entry
sokra Oct 4, 2024
d863284
add support for collectibles to new backend
sokra Oct 4, 2024
551df44
flag task as dirty when they get output set
sokra Oct 4, 2024
225a352
force root task when reading collectibles
sokra Oct 4, 2024
767cbc5
add collectibles test
sokra Oct 4, 2024
f111b51
make in progress tasks as stale
sokra Oct 4, 2024
df83503
skip updating output for stale tasks
sokra Oct 4, 2024
3e8b6f2
fix bug
sokra Oct 4, 2024
9a8148b
add remaining test cases
sokra Oct 4, 2024
0c58049
enable new backend
sokra Aug 13, 2024
5eb5fea
WIP: remove todos
sokra Aug 13, 2024
e11a7b0
print turbopack time
sokra Sep 5, 2024
053c178
add restore task tracing
sokra Sep 26, 2024
6a9f5c0
bolder time reporting
sokra Sep 27, 2024
adbd31b
avoid some serde skip and untagged
sokra Aug 14, 2024
12e6f34
log restored db entries
sokra Aug 14, 2024
7f777ff
WIP: print new tasks
sokra Aug 14, 2024
f6b2761
WIP: logging
sokra Sep 6, 2024
eedbc2f
clippy
sokra Sep 6, 2024
f2adfcb
invalidation tracing
sokra Aug 16, 2024
724a4f5
WIP: recompute from reading output tracing
sokra Sep 5, 2024
07a8e55
remove verbose tracing
sokra Aug 19, 2024
d37ca8b
WIP: logging
sokra Sep 5, 2024
26a9ca8
WIP
sokra Sep 6, 2024
565c16c
Revert "WIP"
sokra Sep 20, 2024
e13d82f
Revert "WIP: logging"
sokra Sep 20, 2024
3b08bce
Revert "WIP: logging"
sokra Sep 20, 2024
f802682
trace aggregation update
sokra Sep 23, 2024
26bea01
trace suspended operations
sokra Sep 23, 2024
eb54393
WIP: logging
sokra Sep 23, 2024
6378b96
Revert "WIP: logging"
sokra Sep 23, 2024
0212002
aggregation stats
sokra Sep 23, 2024
c2a8f1b
log tasks with many items
sokra Sep 23, 2024
9be0b98
WIP: logging
sokra Sep 24, 2024
4135634
reuse outdated children
sokra Sep 24, 2024
1264a17
Revert "WIP: logging"
sokra Sep 24, 2024
02c2e7c
Revert "reuse outdated children"
sokra Sep 24, 2024
ac28765
log tasks with many items
sokra Sep 24, 2024
9a6534d
improve once task handling
sokra Aug 16, 2024
b57490c
WIP: avoid overriden package.json
sokra Aug 17, 2024
ee27bc1
workaround for missing active tracking
sokra Aug 19, 2024
5be2ef5
Revert "workaround for missing active tracking"
sokra Sep 19, 2024
463aea0
strong reads
sokra Sep 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
store task data and aggregation separately
  • Loading branch information
sokra committed Oct 7, 2024
commit 1d8d5f8d29a85e7d6ce614949403ccbfc5aa9ddf
52 changes: 37 additions & 15 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use dashmap::DashMap;
use parking_lot::{Condvar, Mutex};
use rustc_hash::FxHasher;
use smallvec::smallvec;
pub use storage::TaskDataCategory;
use turbo_tasks::{
backend::{
Backend, BackendJobId, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskRoot,
Expand Down Expand Up @@ -101,7 +102,8 @@ pub struct TurboTasksBackend {
task_cache: BiMap<Arc<CachedTaskType>, TaskId>,
transient_tasks: DashMap<TaskId, Arc<TransientTask>>,

persisted_storage_log: Mutex<ChunkedVec<CachedDataUpdate>>,
persisted_storage_data_log: Mutex<ChunkedVec<CachedDataUpdate>>,
persisted_storage_meta_log: Mutex<ChunkedVec<CachedDataUpdate>>,
storage: Storage<TaskId, CachedDataItem>,

/// Number of executing operations + Highest bit is set when snapshot is
Expand Down Expand Up @@ -140,7 +142,8 @@ impl TurboTasksBackend {
persisted_task_cache_log: Mutex::new(ChunkedVec::new()),
task_cache: BiMap::new(),
transient_tasks: DashMap::new(),
persisted_storage_log: Mutex::new(ChunkedVec::new()),
persisted_storage_data_log: Mutex::new(ChunkedVec::new()),
persisted_storage_meta_log: Mutex::new(ChunkedVec::new()),
storage: Storage::new(),
in_progress_operations: AtomicUsize::new(0),
snapshot_request: Mutex::new(SnapshotRequest::new()),
Expand Down Expand Up @@ -205,6 +208,17 @@ impl TurboTasksBackend {
}
OperationGuard { backend: self }
}

fn persisted_storage_log(
&self,
category: TaskDataCategory,
) -> &Mutex<ChunkedVec<CachedDataUpdate>> {
match category {
TaskDataCategory::Data => &self.persisted_storage_data_log,
TaskDataCategory::Meta => &self.persisted_storage_meta_log,
TaskDataCategory::All => unreachable!(),
}
}
}

pub(crate) struct OperationGuard<'a> {
Expand Down Expand Up @@ -246,7 +260,7 @@ impl TurboTasksBackend {
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) -> Result<Result<RawVc, EventListener>> {
let ctx = self.execute_context(turbo_tasks);
let mut task = ctx.task(task_id);
let mut task = ctx.task(task_id, TaskDataCategory::All);

if let Some(in_progress) = get!(task, InProgress) {
match in_progress {
Expand Down Expand Up @@ -281,7 +295,7 @@ impl TurboTasksBackend {
},
&ctx,
);
task = ctx.task(task_id);
task = ctx.task(task_id, TaskDataCategory::All);
}

// Check the dirty count of the root node
Expand Down Expand Up @@ -328,7 +342,7 @@ impl TurboTasksBackend {
});
drop(task);

let mut reader_task = ctx.task(reader);
let mut reader_task = ctx.task(reader, TaskDataCategory::Data);
if reader_task
.remove(&CachedDataItemKey::OutdatedOutputDependency { target: task_id })
.is_none()
Expand Down Expand Up @@ -370,7 +384,7 @@ impl TurboTasksBackend {
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) -> Result<Result<TypedCellContent, EventListener>> {
let ctx = self.execute_context(turbo_tasks);
let mut task = ctx.task(task_id);
let mut task = ctx.task(task_id, TaskDataCategory::Data);
if let Some(content) = get!(task, CellData { cell }) {
let content = content.clone();
if let Some(reader) = reader {
Expand All @@ -381,7 +395,7 @@ impl TurboTasksBackend {
});
drop(task);

let mut reader_task = ctx.task(reader);
let mut reader_task = ctx.task(reader, TaskDataCategory::Data);
let target = CellRef {
task: task_id,
cell,
Expand Down Expand Up @@ -494,7 +508,8 @@ impl TurboTasksBackend {
.map(|op| op.arc().clone())
.collect::<Vec<_>>();
drop(snapshot_request);
let persisted_storage_log = take(&mut *self.persisted_storage_log.lock());
let persisted_storage_meta_log = take(&mut *self.persisted_storage_meta_log.lock());
let persisted_storage_data_log = take(&mut *self.persisted_storage_data_log.lock());
let persisted_task_cache_log = take(&mut *self.persisted_task_cache_log.lock());
let mut snapshot_request = self.snapshot_request.lock();
snapshot_request.snapshot_requested = false;
Expand All @@ -505,18 +520,25 @@ impl TurboTasksBackend {
drop(snapshot_request);

let mut counts: HashMap<TaskId, u32> = HashMap::new();
for CachedDataUpdate { task, .. } in persisted_storage_log.iter() {
for CachedDataUpdate { task, .. } in persisted_storage_data_log
.iter()
.chain(persisted_storage_meta_log.iter())
{
*counts.entry(*task).or_default() += 1;
}

let mut new_items = false;

if !persisted_task_cache_log.is_empty() || !persisted_storage_log.is_empty() {
if !persisted_task_cache_log.is_empty()
|| !persisted_storage_meta_log.is_empty()
|| !persisted_storage_data_log.is_empty()
{
new_items = true;
if let Err(err) = self.backing_storage.save_snapshot(
suspended_operations,
persisted_task_cache_log,
persisted_storage_log,
persisted_storage_meta_log,
persisted_storage_data_log,
) {
println!("Persising failed: {:#?}", err);
return None;
Expand Down Expand Up @@ -655,7 +677,7 @@ impl Backend for TurboTasksBackend {
return;
}
let ctx = self.execute_context(turbo_tasks);
let mut task = ctx.task(task_id);
let mut task = ctx.task(task_id, TaskDataCategory::Data);
task.invalidate_serialization();
}

Expand Down Expand Up @@ -696,7 +718,7 @@ impl Backend for TurboTasksBackend {
};
{
let ctx = self.execute_context(turbo_tasks);
let mut task = ctx.task(task_id);
let mut task = ctx.task(task_id, TaskDataCategory::Data);
let in_progress = remove!(task, InProgress)?;
let InProgressState::Scheduled { done_event } = in_progress else {
task.add_new(CachedDataItem::InProgress { value: in_progress });
Expand Down Expand Up @@ -887,7 +909,7 @@ impl Backend for TurboTasksBackend {
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) -> bool {
let ctx = self.execute_context(turbo_tasks);
let mut task = ctx.task(task_id);
let mut task = ctx.task(task_id, TaskDataCategory::All);
let Some(CachedDataItemValue::InProgress { value: in_progress }) =
task.remove(&CachedDataItemKey::InProgress {})
else {
Expand Down Expand Up @@ -1126,7 +1148,7 @@ impl Backend for TurboTasksBackend {
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) -> Result<TypedCellContent> {
let ctx = self.execute_context(turbo_tasks);
let task = ctx.task(task_id);
let task = ctx.task(task_id, TaskDataCategory::Data);
if let Some(content) = get!(task, CellData { cell }) {
Ok(CellContent(Some(content.1.clone())).into_typed(cell.type_id))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
backend::{
operation::{ExecuteContext, Operation, TaskGuard},
storage::{get, get_many, iter_many, remove, update, update_count},
TaskDataCategory,
},
data::{ActiveType, AggregationNumber, CachedDataItem, CachedDataItemKey, RootState},
};
Expand Down Expand Up @@ -385,7 +386,7 @@ impl AggregationUpdateQueue {
}

fn balance_edge(&mut self, ctx: &ExecuteContext, upper_id: TaskId, task_id: TaskId) {
let (mut upper, mut task) = ctx.task_pair(upper_id, task_id);
let (mut upper, mut task) = ctx.task_pair(upper_id, task_id, TaskDataCategory::Meta);
let upper_aggregation_number = get_aggregation_number(&upper);
let task_aggregation_number = get_aggregation_number(&task);

Expand Down Expand Up @@ -490,7 +491,7 @@ impl AggregationUpdateQueue {
self.push(AggregationUpdateJob::FindAndScheduleDirty { task_ids });
}
if let Some(task_id) = popped {
let mut task = ctx.task(task_id);
let mut task = ctx.task(task_id, TaskDataCategory::Meta);
#[allow(clippy::collapsible_if, reason = "readablility")]
if task.has_key(&CachedDataItemKey::Dirty {}) {
let description = ctx.backend.get_task_desc_fn(task_id);
Expand Down Expand Up @@ -522,7 +523,7 @@ impl AggregationUpdateQueue {
update: AggregatedDataUpdate,
) {
for upper_id in upper_ids {
let mut upper = ctx.task(upper_id);
let mut upper = ctx.task(upper_id, TaskDataCategory::Meta);
let diff = update.apply(&mut upper, self);
if !diff.is_empty() {
let upper_ids = get_uppers(&upper);
Expand All @@ -542,7 +543,7 @@ impl AggregationUpdateQueue {
lost_follower_id: TaskId,
mut upper_ids: Vec<TaskId>,
) {
let mut follower = ctx.task(lost_follower_id);
let mut follower = ctx.task(lost_follower_id, TaskDataCategory::Meta);
let mut follower_in_upper_ids = Vec::new();
upper_ids.retain(|&upper_id| {
let mut keep_upper = false;
Expand Down Expand Up @@ -571,7 +572,7 @@ impl AggregationUpdateQueue {
if !data.is_empty() {
for upper_id in upper_ids.iter() {
// remove data from upper
let mut upper = ctx.task(*upper_id);
let mut upper = ctx.task(*upper_id, TaskDataCategory::Meta);
let diff = data.apply(&mut upper, self);
if !diff.is_empty() {
let upper_ids = get_uppers(&upper);
Expand All @@ -593,7 +594,7 @@ impl AggregationUpdateQueue {
}

for upper_id in follower_in_upper_ids {
let mut upper = ctx.task(upper_id);
let mut upper = ctx.task(upper_id, TaskDataCategory::Meta);
if update_count!(
upper,
Follower {
Expand All @@ -617,12 +618,12 @@ impl AggregationUpdateQueue {
mut upper_ids: Vec<TaskId>,
) {
let follower_aggregation_number = {
let follower = ctx.task(new_follower_id);
let follower = ctx.task(new_follower_id, TaskDataCategory::Meta);
get_aggregation_number(&follower)
};
let mut upper_ids_as_follower = Vec::new();
upper_ids.retain(|&upper_id| {
let upper = ctx.task(upper_id);
let upper = ctx.task(upper_id, TaskDataCategory::Meta);
// decide if it should be an inner or follower
let upper_aggregation_number = get_aggregation_number(&upper);

Expand All @@ -638,7 +639,7 @@ impl AggregationUpdateQueue {
}
});
if !upper_ids.is_empty() {
let mut follower = ctx.task(new_follower_id);
let mut follower = ctx.task(new_follower_id, TaskDataCategory::Meta);
upper_ids.retain(|&upper_id| {
if update_count!(follower, Upper { task: upper_id }, 1) {
// It's a new upper
Expand All @@ -656,7 +657,7 @@ impl AggregationUpdateQueue {
if !data.is_empty() {
for upper_id in upper_ids.iter() {
// add data to upper
let mut upper = ctx.task(*upper_id);
let mut upper = ctx.task(*upper_id, TaskDataCategory::Meta);
let diff = data.apply(&mut upper, self);
if !diff.is_empty() {
let upper_ids = get_uppers(&upper);
Expand All @@ -678,7 +679,7 @@ impl AggregationUpdateQueue {
}
}
upper_ids_as_follower.retain(|&upper_id| {
let mut upper = ctx.task(upper_id);
let mut upper = ctx.task(upper_id, TaskDataCategory::Meta);
update_count!(
upper,
Follower {
Expand All @@ -704,14 +705,14 @@ impl AggregationUpdateQueue {
let mut followers_with_aggregation_number = new_follower_ids
.into_iter()
.map(|new_follower_id| {
let follower = ctx.task(new_follower_id);
let follower = ctx.task(new_follower_id, TaskDataCategory::Meta);
(new_follower_id, get_aggregation_number(&follower))
})
.collect::<Vec<_>>();

let mut followers_of_upper = Vec::new();
{
let upper = ctx.task(upper_id);
let upper = ctx.task(upper_id, TaskDataCategory::Meta);
// decide if it should be an inner or follower
let upper_aggregation_number = get_aggregation_number(&upper);

Expand All @@ -734,7 +735,7 @@ impl AggregationUpdateQueue {
let mut upper_data_updates = Vec::new();
let mut upper_new_followers = Vec::new();
for (follower_id, _) in followers_with_aggregation_number {
let mut follower = ctx.task(follower_id);
let mut follower = ctx.task(follower_id, TaskDataCategory::Meta);
if update_count!(follower, Upper { task: upper_id }, 1) {
// It's a new upper
let data = AggregatedDataUpdate::from_task(&mut follower);
Expand All @@ -755,7 +756,7 @@ impl AggregationUpdateQueue {
}
if !upper_data_updates.is_empty() {
// add data to upper
let mut upper = ctx.task(upper_id);
let mut upper = ctx.task(upper_id, TaskDataCategory::Meta);
let diffs = upper_data_updates
.into_iter()
.filter_map(|data| {
Expand All @@ -782,7 +783,7 @@ impl AggregationUpdateQueue {
}
}
if !followers_of_upper.is_empty() {
let mut upper = ctx.task(upper_id);
let mut upper = ctx.task(upper_id, TaskDataCategory::Meta);
followers_of_upper
.retain(|follower_id| update_count!(upper, Follower { task: *follower_id }, 1));
if !followers_of_upper.is_empty() {
Expand All @@ -801,7 +802,7 @@ impl AggregationUpdateQueue {
base_effective_distance: Option<std::num::NonZero<u32>>,
base_aggregation_number: u32,
) {
let mut task = ctx.task(task_id);
let mut task = ctx.task(task_id, TaskDataCategory::Meta);
let current = get!(task, AggregationNumber).copied().unwrap_or_default();
// The wanted new distance is either the provided one or the old distance
let distance = base_effective_distance.map_or(current.distance, |d| d.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ use serde::{Deserialize, Serialize};
use turbo_tasks::TaskId;

use crate::{
backend::operation::{
aggregation_update::{
get_aggregation_number, get_uppers, is_aggregating_node, AggregationUpdateJob,
AggregationUpdateQueue,
backend::{
operation::{
aggregation_update::{
get_aggregation_number, get_uppers, is_aggregating_node, AggregationUpdateJob,
AggregationUpdateQueue,
},
invalidate::make_task_dirty,
ExecuteContext, Operation,
},
invalidate::make_task_dirty,
ExecuteContext, Operation,
TaskDataCategory,
},
data::{CachedDataItemKey, CellRef},
};
Expand Down Expand Up @@ -69,7 +72,7 @@ impl Operation for CleanupOldEdgesOperation {
if let Some(edge) = outdated.pop() {
match edge {
OutdatedEdge::Child(child_id) => {
let mut task = ctx.task(task_id);
let mut task = ctx.task(task_id, TaskDataCategory::All);
task.remove(&CachedDataItemKey::Child { task: child_id });
if is_aggregating_node(get_aggregation_number(&task)) {
queue.push(AggregationUpdateJob::InnerLostFollower {
Expand All @@ -89,14 +92,14 @@ impl Operation for CleanupOldEdgesOperation {
cell,
}) => {
{
let mut task = ctx.task(cell_task_id);
let mut task = ctx.task(cell_task_id, TaskDataCategory::Data);
task.remove(&CachedDataItemKey::CellDependent {
cell,
task: task_id,
});
}
{
let mut task = ctx.task(task_id);
let mut task = ctx.task(task_id, TaskDataCategory::Data);
task.remove(&CachedDataItemKey::CellDependency {
target: CellRef {
task: cell_task_id,
Expand All @@ -107,13 +110,13 @@ impl Operation for CleanupOldEdgesOperation {
}
OutdatedEdge::OutputDependency(output_task_id) => {
{
let mut task = ctx.task(output_task_id);
let mut task = ctx.task(output_task_id, TaskDataCategory::Data);
task.remove(&CachedDataItemKey::OutputDependent {
task: task_id,
});
}
{
let mut task = ctx.task(task_id);
let mut task = ctx.task(task_id, TaskDataCategory::Data);
task.remove(&CachedDataItemKey::OutputDependency {
target: output_task_id,
});
Expand Down
Loading