Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Turbopack] make filesystem tasks session dependent instead of invalidating on startup #70945

Merged
merged 14 commits into from
Oct 11, 2024
Merged
8 changes: 6 additions & 2 deletions crates/next-api/src/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,9 @@ impl ProjectContainer {
let project = self.project();
let project_fs = project.project_fs().strongly_consistent().await?;
if watch.enable {
project_fs.start_watching_with_invalidation_reason(watch.poll_interval)?;
project_fs
.start_watching_with_invalidation_reason(watch.poll_interval)
.await?;
} else {
project_fs.invalidate_with_reason();
}
Expand Down Expand Up @@ -304,7 +306,9 @@ impl ProjectContainer {
if !ReadRef::ptr_eq(&prev_project_fs, &project_fs) {
if watch.enable {
// TODO stop watching: prev_project_fs.stop_watching()?;
project_fs.start_watching_with_invalidation_reason(watch.poll_interval)?;
project_fs
.start_watching_with_invalidation_reason(watch.poll_interval)
.await?;
} else {
project_fs.invalidate_with_reason();
}
Expand Down
2 changes: 1 addition & 1 deletion turbopack/crates/node-file-trace/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl Args {
async fn create_fs(name: &str, root: &str, watch: bool) -> Result<Vc<Box<dyn FileSystem>>> {
let fs = DiskFileSystem::new(name.into(), root.into(), vec![]);
if watch {
fs.await?.start_watching(None)?;
fs.await?.start_watching(None).await?;
} else {
fs.await?.invalidate_with_reason();
}
Expand Down
381 changes: 245 additions & 136 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,14 @@ pub enum OutdatedEdge {
}

impl CleanupOldEdgesOperation {
pub fn run(
task_id: TaskId,
outdated: Vec<OutdatedEdge>,
data_update: Option<AggregationUpdateJob>,
mut ctx: ExecuteContext<'_>,
) {
let mut queue = AggregationUpdateQueue::new();
queue.extend(data_update);
pub fn run(task_id: TaskId, outdated: Vec<OutdatedEdge>, ctx: &mut ExecuteContext<'_>) {
let queue = AggregationUpdateQueue::new();
CleanupOldEdgesOperation::RemoveEdges {
task_id,
outdated,
queue,
}
.execute(&mut ctx);
.execute(ctx);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ pub enum ConnectChildOperation {
impl ConnectChildOperation {
pub fn run(parent_task_id: TaskId, child_task_id: TaskId, mut ctx: ExecuteContext<'_>) {
let mut parent_task = ctx.task(parent_task_id, TaskDataCategory::All);
parent_task.remove(&CachedDataItemKey::OutdatedChild {
task: child_task_id,
});
// Quick skip if the child was already connected before
if parent_task
.remove(&CachedDataItemKey::OutdatedChild {
task: child_task_id,
})
.is_some()
{
return;
}
if parent_task.add(CachedDataItem::Child {
task: child_task_id,
value: (),
}) {
// When task is added to a AggregateRoot is need to be scheduled,
// indirect connections are handled by the aggregation update.
let mut should_schedule = false;
if parent_task.has_key(&CachedDataItemKey::AggregateRoot {}) {
should_schedule = true;
}
// Update the task aggregation
let mut queue = AggregationUpdateQueue::new();

Expand Down Expand Up @@ -110,15 +110,15 @@ impl ConnectChildOperation {

{
let mut task = ctx.task(child_task_id, TaskDataCategory::Data);
should_schedule = should_schedule || !task.has_key(&CachedDataItemKey::Output {});
if should_schedule {
if !task.has_key(&CachedDataItemKey::Output {}) {
let description = ctx.backend.get_task_desc_fn(child_task_id);
should_schedule = task.add(CachedDataItem::new_scheduled(description));
let should_schedule = task.add(CachedDataItem::new_scheduled(description));
drop(task);
if should_schedule {
ctx.schedule(child_task_id);
}
}
}
if should_schedule {
ctx.schedule(child_task_id);
}

ConnectChildOperation::UpdateAggregation {
aggregation_update: queue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
storage::{get, get_mut},
TaskDataCategory,
},
data::{CachedDataItem, CachedDataItemKey, InProgressState},
data::{CachedDataItem, CachedDataItemKey, CachedDataItemValue, DirtyState, InProgressState},
};

#[derive(Serialize, Deserialize, Clone, Default)]
Expand Down Expand Up @@ -91,22 +91,54 @@ pub fn make_task_dirty_internal(
*stale = true;
}
}
if task.add(CachedDataItem::Dirty { value: () }) {
let dirty_container = get!(task, AggregatedDirtyContainerCount)
.copied()
.unwrap_or_default();
if dirty_container == 0 {
queue.extend(AggregationUpdateJob::data_update(
task,
AggregatedDataUpdate::new().dirty_container(task_id),
));
let old = task.insert(CachedDataItem::Dirty {
value: DirtyState {
clean_in_session: None,
},
});
let mut dirty_container = match old {
Some(CachedDataItemValue::Dirty {
value: DirtyState {
clean_in_session: None,
},
}) => {
// already dirty
return;
}
let root = task.has_key(&CachedDataItemKey::AggregateRoot {});
if root {
let description = ctx.backend.get_task_desc_fn(task_id);
if task.add(CachedDataItem::new_scheduled(description)) {
ctx.schedule(task_id);
}
Some(CachedDataItemValue::Dirty {
value: DirtyState {
clean_in_session: Some(session_id),
},
}) => {
// Got dirty in that one session only
let mut dirty_container = get!(task, AggregatedDirtyContainerCount)
.copied()
.unwrap_or_default();
dirty_container.update_session_dependent(session_id, 1);
dirty_container
}
None => {
// Get dirty for all sessions
get!(task, AggregatedDirtyContainerCount)
.copied()
.unwrap_or_default()
}
_ => unreachable!(),
};
let aggregated_update = dirty_container.update_with_dirty_state(&DirtyState {
clean_in_session: None,
});
if !aggregated_update.is_default() {
queue.extend(AggregationUpdateJob::data_update(
task,
AggregatedDataUpdate::new().dirty_container_update(task_id, aggregated_update),
));
}
let root = task.has_key(&CachedDataItemKey::AggregateRoot {});
if root {
let description = ctx.backend.get_task_desc_fn(task_id);
if task.add(CachedDataItem::new_scheduled(description)) {
ctx.schedule(task_id);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{
};

use serde::{Deserialize, Serialize};
use turbo_tasks::{KeyValuePair, TaskId, TurboTasksBackendApi};
use turbo_tasks::{KeyValuePair, SessionId, TaskId, TurboTasksBackendApi};

use crate::{
backend::{
Expand Down Expand Up @@ -97,6 +97,10 @@ impl<'a> ExecuteContext<'a> {
}
}

pub fn session_id(&self) -> SessionId {
self.backend.session_id()
}

pub fn task(&mut self, task_id: TaskId, category: TaskDataCategory) -> TaskGuard<'a> {
let mut task = self.backend.storage.access_mut(task_id);
if !task.persistance_state().is_restored(category) {
Expand Down
4 changes: 3 additions & 1 deletion turbopack/crates/turbo-tasks-backend/src/backing_storage.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use anyhow::Result;
use turbo_tasks::{backend::CachedTaskType, TaskId};
use turbo_tasks::{backend::CachedTaskType, SessionId, TaskId};

use crate::{
backend::{AnyOperation, TaskDataCategory},
Expand All @@ -14,9 +14,11 @@ pub struct ReadTransaction(pub *const ());

pub trait BackingStorage {
fn next_free_task_id(&self) -> TaskId;
fn next_session_id(&self) -> SessionId;
fn uncompleted_operations(&self) -> Vec<AnyOperation>;
fn save_snapshot(
&self,
session_id: SessionId,
operations: Vec<Arc<AnyOperation>>,
task_cache_updates: Vec<ChunkedVec<(Arc<CachedTaskType>, TaskId)>>,
meta_updates: Vec<ChunkedVec<CachedDataUpdate>>,
Expand Down
Loading
Loading