Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Turbopack] make filesystem tasks session dependent instead of invalidating on startup #70945

Merged
merged 14 commits into from
Oct 11, 2024
Merged
Prev Previous commit
Next Next commit
schedule tasks when they are added as inner to a aggregate root
  • Loading branch information
sokra committed Oct 10, 2024
commit a7ba338d2b18d32739b7a5a680b118868b994af1
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,14 @@ impl AggregationUpdateQueue {
new_follower_ids: followers,
});
}

if upper.has_key(&CachedDataItemKey::AggregateRoot {}) {
// If the upper node is an `AggregateRoot` we need to schedule the
// dirty tasks in the new dirty container
self.push(AggregationUpdateJob::FindAndScheduleDirty {
task_ids: vec![task_id],
});
}
}

// notify uppers about lost follower
Expand Down Expand Up @@ -716,6 +724,7 @@ impl AggregationUpdateQueue {
get_aggregation_number(&follower)
};
let mut upper_ids_as_follower = Vec::new();
let mut is_aggregate_root = false;
upper_ids.retain(|&upper_id| {
let upper = ctx.task(upper_id, TaskDataCategory::Meta);
// decide if it should be an inner or follower
Expand All @@ -729,6 +738,9 @@ impl AggregationUpdateQueue {
false
} else {
// It's an inner node, continue with the list
if upper.has_key(&CachedDataItemKey::AggregateRoot {}) {
is_aggregate_root = true;
}
true
}
});
Expand Down Expand Up @@ -782,6 +794,11 @@ impl AggregationUpdateQueue {
1
)
});
if is_aggregate_root {
self.push(AggregationUpdateJob::FindAndScheduleDirty {
task_ids: vec![new_follower_id],
});
}
if !upper_ids_as_follower.is_empty() {
self.push(AggregationUpdateJob::InnerOfUppersHasNewFollower {
upper_ids: upper_ids_as_follower,
Expand All @@ -805,8 +822,10 @@ impl AggregationUpdateQueue {
.collect::<Vec<_>>();

let mut followers_of_upper = Vec::new();
let is_aggregate_root;
{
let upper = ctx.task(upper_id, TaskDataCategory::Meta);
is_aggregate_root = upper.has_key(&CachedDataItemKey::AggregateRoot {});
// decide if it should be an inner or follower
let upper_aggregation_number = get_aggregation_number(&upper);

Expand All @@ -828,7 +847,7 @@ impl AggregationUpdateQueue {

let mut upper_data_updates = Vec::new();
let mut upper_new_followers = Vec::new();
for (follower_id, _) in followers_with_aggregation_number {
for &(follower_id, _) in followers_with_aggregation_number.iter() {
let mut follower = ctx.task(follower_id, TaskDataCategory::Meta);
if update_count!(follower, Upper { task: upper_id }, 1) {
// It's a new upper
Expand Down Expand Up @@ -876,6 +895,14 @@ impl AggregationUpdateQueue {
});
}
}
if is_aggregate_root {
self.push(AggregationUpdateJob::FindAndScheduleDirty {
task_ids: followers_with_aggregation_number
.into_iter()
.map(|(id, _)| id)
.collect(),
});
}
if !followers_of_upper.is_empty() {
let mut upper = ctx.task(upper_id, TaskDataCategory::Meta);
followers_of_upper
Expand Down