Skip to content

[Turbopack] refactor persistent caching from log based to cow approach #76234

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Apr 19, 2025
179 changes: 100 additions & 79 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ async-compression = { version = "0.3.13", default-features = false, features = [
] }
async-trait = "0.1.64"
atty = "0.2.14"
bitfield = "0.18.0"
bytes = "1.1.0"
chrono = "0.4.23"
clap = { version = "4.5.2", features = ["derive"] }
Expand Down
1 change: 1 addition & 0 deletions turbopack/crates/turbo-tasks-backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ anyhow = { workspace = true }
arc-swap = { version = "1.7.1" }
async-trait = { workspace = true }
auto-hash-map = { workspace = true }
bitfield = { workspace = true }
byteorder = "1.5.0"
dashmap = { workspace = true, features = ["raw-api"]}
either = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::data::{
CachedDataItemValue, CachedDataItemValueRef, CachedDataItemValueRefMut,
};

#[derive(Debug, Clone)]
pub struct DynamicStorage {
map: Vec<CachedDataItemStorage>,
}
Expand Down Expand Up @@ -162,9 +163,24 @@ impl DynamicStorage {
}
}

pub fn len(&self) -> usize {
self.map.iter().map(|m| m.len()).sum()
}

pub fn shrink_to_fit(&mut self, ty: CachedDataItemType) {
if let Some(map) = self.get_map_mut(ty) {
map.shrink_to_fit();
}
}

pub fn snapshot_for_persisting(&self) -> Self {
Self {
map: self
.map
.iter()
.filter(|m| m.ty().is_persistent())
.cloned()
.collect(),
}
}
}
185 changes: 126 additions & 59 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
mod dynamic_storage;
mod operation;
mod persisted_storage_log;
mod storage;

use std::{
Expand All @@ -22,7 +21,7 @@ use auto_hash_map::{AutoMap, AutoSet};
use indexmap::IndexSet;
use parking_lot::{Condvar, Mutex};
use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
use smallvec::smallvec;
use smallvec::{smallvec, SmallVec};
use tokio::time::{Duration, Instant};
use turbo_tasks::{
backend::{
Expand All @@ -34,8 +33,8 @@ use turbo_tasks::{
task_statistics::TaskStatisticsApi,
trace::TraceRawVcs,
util::IdFactoryWithReuse,
CellId, FunctionId, FxDashMap, RawVc, ReadCellOptions, ReadConsistency, SessionId, TaskId,
TraitTypeId, TurboTasksBackendApi, ValueTypeId, TRANSIENT_TASK_BIT,
CellId, FunctionId, FxDashMap, KeyValuePair, RawVc, ReadCellOptions, ReadConsistency,
SessionId, TaskId, TraitTypeId, TurboTasksBackendApi, ValueTypeId, TRANSIENT_TASK_BIT,
};

pub use self::{operation::AnyOperation, storage::TaskDataCategory};
Expand All @@ -49,17 +48,21 @@ use crate::{
CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
Operation, OutdatedEdge, TaskGuard,
},
persisted_storage_log::PersistedStorageLog,
storage::{get, get_many, get_mut, get_mut_or_insert_with, iter_many, remove, Storage},
storage::{
get, get_many, get_mut, get_mut_or_insert_with, iter_many, remove,
InnerStorageSnapshot, Storage,
},
},
backing_storage::BackingStorage,
data::{
ActivenessState, AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
CachedDataItemValue, CachedDataItemValueRef, CachedDataUpdate, CellRef, CollectibleRef,
CollectiblesRef, DirtyState, InProgressCellState, InProgressState, InProgressStateInner,
OutputValue, RootType,
CachedDataItemValue, CachedDataItemValueRef, CellRef, CollectibleRef, CollectiblesRef,
DirtyState, InProgressCellState, InProgressState, InProgressStateInner, OutputValue,
RootType,
},
utils::{
bi_map::BiMap, chunked_vec::ChunkedVec, ptr_eq_arc::PtrEqArc, sharded::Sharded, swap_retain,
},
utils::{bi_map::BiMap, chunked_vec::ChunkedVec, ptr_eq_arc::PtrEqArc, sharded::Sharded},
};

const BACKEND_JOB_INITIAL_SNAPSHOT: BackendJobId = unsafe { BackendJobId::new_unchecked(1) };
Expand Down Expand Up @@ -163,8 +166,6 @@ struct TurboTasksBackendInner<B: BackingStorage> {
task_cache: BiMap<Arc<CachedTaskType>, TaskId>,
transient_tasks: FxDashMap<TaskId, Arc<TransientTask>>,

persisted_storage_data_log: Option<PersistedStorageLog>,
persisted_storage_meta_log: Option<PersistedStorageLog>,
storage: Storage,

/// Number of executing operations + Highest bit is set when snapshot is
Expand Down Expand Up @@ -227,8 +228,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
task_cache: BiMap::new(),
transient_tasks: FxDashMap::default(),
persisted_storage_data_log: need_log.then(|| PersistedStorageLog::new(shard_amount)),
persisted_storage_meta_log: need_log.then(|| PersistedStorageLog::new(shard_amount)),
storage: Storage::new(),
in_progress_operations: AtomicUsize::new(0),
snapshot_request: Mutex::new(SnapshotRequest::new()),
Expand Down Expand Up @@ -333,15 +332,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
}
}

fn persisted_storage_log(&self, category: TaskDataCategory) -> Option<&PersistedStorageLog> {
match category {
TaskDataCategory::Data => &self.persisted_storage_data_log,
TaskDataCategory::Meta => &self.persisted_storage_meta_log,
TaskDataCategory::All => unreachable!(),
}
.as_ref()
}

fn should_persist(&self) -> bool {
matches!(self.options.storage_mode, Some(StorageMode::ReadWrite))
}
Expand Down Expand Up @@ -827,16 +817,12 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
.map(|op| op.arc().clone())
.collect::<Vec<_>>();
drop(snapshot_request);
fn take_from_log(log: &Option<PersistedStorageLog>) -> Vec<ChunkedVec<CachedDataUpdate>> {
log.as_ref().map(|l| l.take()).unwrap_or_default()
}
let persisted_storage_meta_log = take_from_log(&self.persisted_storage_meta_log);
let persisted_storage_data_log = take_from_log(&self.persisted_storage_data_log);
let persisted_task_cache_log = self
let mut persisted_task_cache_log = self
.persisted_task_cache_log
.as_ref()
.map(|l| l.take(|i| i))
.unwrap_or_default();
self.storage.start_snapshot();
let mut snapshot_request = self.snapshot_request.lock();
snapshot_request.snapshot_requested = false;
self.in_progress_operations
Expand All @@ -845,50 +831,131 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
let snapshot_time = Instant::now();
drop(snapshot_request);

// TODO track which items are persisting
// TODO This is very inefficient, maybe the BackingStorage could compute that since it need
// to iterate items anyway.
// let mut counts: FxHashMap<TaskId, u32> =
// FxHashMap::with_capacity_and_hasher(); for log in persisted_storage_meta_log
// .iter()
// .chain(persisted_storage_data_log.iter())
// {
// for CachedDataUpdate { task, .. } in log.iter() {
// *counts.entry(*task).or_default() += 1;
// }
// }
let preprocess = |task_id: TaskId, inner: &storage::InnerStorage| {
if task_id.is_transient() {
return (None, None);
}
let len = inner.len();
let mut meta = Vec::with_capacity(len);
let mut data = Vec::with_capacity(len);
for (key, value) in inner.iter_all() {
if key.is_persistent() && value.is_persistent() {
match key.category() {
TaskDataCategory::Meta => {
meta.push(CachedDataItem::from_key_and_value_ref(key, value))
}
TaskDataCategory::Data => {
data.push(CachedDataItem::from_key_and_value_ref(key, value))
}
_ => {}
}
}
}

let mut new_items = false;
(
inner.state().meta_restored().then_some(meta),
inner.state().data_restored().then_some(data),
)
};
let process = |task_id: TaskId, (meta, data): (Option<Vec<_>>, Option<Vec<_>>)| {
(
task_id,
meta.map(|d| B::serialize(task_id, &d)),
data.map(|d| B::serialize(task_id, &d)),
)
};
let process_snapshot = |task_id: TaskId, inner: Box<InnerStorageSnapshot>| {
if task_id.is_transient() {
return (task_id, None, None);
}
let len = inner.len();
let mut meta = Vec::with_capacity(len);
let mut data = Vec::with_capacity(len);
for (key, value) in inner.iter_all() {
if key.is_persistent() && value.is_persistent() {
match key.category() {
TaskDataCategory::Meta => {
meta.push(CachedDataItem::from_key_and_value_ref(key, value))
}
TaskDataCategory::Data => {
data.push(CachedDataItem::from_key_and_value_ref(key, value))
}
_ => {}
}
}
}
(
task_id,
inner.meta_restored.then(|| B::serialize(task_id, &meta)),
inner.data_restored.then(|| B::serialize(task_id, &data)),
)
};

fn shards_empty<T>(shards: &[ChunkedVec<T>]) -> bool {
shards.iter().all(|shard| shard.is_empty())
}
let snapshot = {
let _span = tracing::trace_span!("take snapshot");
self.storage
.take_snapshot(&preprocess, &process, &process_snapshot)
};

if !shards_empty(&persisted_task_cache_log)
|| !shards_empty(&persisted_storage_meta_log)
|| !shards_empty(&persisted_storage_data_log)
{
let task_snapshots = snapshot
.into_iter()
.filter_map(|iter| {
let mut iter = iter
.filter_map(
|(task_id, meta, data): (
_,
Option<Result<SmallVec<_>>>,
Option<Result<SmallVec<_>>>,
)| {
let meta = match meta {
Some(Ok(meta)) => Some(meta),
None => None,
Some(Err(err)) => {
println!(
"Serializing task {} failed (meta): {:?}",
self.get_task_description(task_id),
err
);
None
}
};
let data = match data {
Some(Ok(data)) => Some(data),
None => None,
Some(Err(err)) => {
println!(
"Serializing task {} failed (data): {:?}",
self.get_task_description(task_id),
err
);
None
}
};
(meta.is_some() || data.is_some()).then_some((task_id, meta, data))
},
)
.peekable();
iter.peek().is_some().then_some(iter)
})
.collect::<Vec<_>>();

swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());

let mut new_items = false;

if !persisted_task_cache_log.is_empty() || !task_snapshots.is_empty() {
new_items = true;
if let Err(err) = self.backing_storage.save_snapshot(
self.session_id,
suspended_operations,
persisted_task_cache_log,
persisted_storage_meta_log,
persisted_storage_data_log,
task_snapshots,
) {
println!("Persisting failed: {:?}", err);
return None;
}
}

// TODO add when we need to track persisted items
// for (task_id, count) in counts {
// self.storage
// .access_mut(task_id)
// .persistance_state_mut()
// .finish_persisting_items(count);
// }

Some((snapshot_time, new_items))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::{
ActivenessState, AggregationNumber, CachedDataItem, CachedDataItemKey, CollectibleRef,
DirtyContainerCount,
},
utils::deque_set::DequeSet,
utils::{deque_set::DequeSet, swap_retain},
};

pub const LEAF_NUMBER: u32 = 16;
Expand Down Expand Up @@ -1526,6 +1526,7 @@ impl AggregationUpdateQueue {
let mut upper_upper_ids_with_new_follower = SmallVec::new();
let mut tasks_for_which_increment_active_count = SmallVec::new();
let mut is_active = false;

swap_retain(&mut upper_ids, |&mut upper_id| {
let mut upper = ctx.task(
upper_id,
Expand Down Expand Up @@ -2355,29 +2356,3 @@ impl Operation for AggregationUpdateQueue {
}
}
}

fn swap_retain<T, const N: usize>(vec: &mut SmallVec<[T; N]>, mut f: impl FnMut(&mut T) -> bool) {
let mut i = 0;
while i < vec.len() {
if !f(&mut vec[i]) {
vec.swap_remove(i);
} else {
i += 1;
}
}
}

#[cfg(test)]
mod tests {
use smallvec::{smallvec, SmallVec};

use crate::backend::operation::aggregation_update::swap_retain;

#[test]
fn test_swap_retain() {
let mut vec: SmallVec<[i32; 4]> = smallvec![1, 2, 3, 4, 5];
swap_retain(&mut vec, |a| *a % 2 != 0);
let expected: SmallVec<[i32; 4]> = smallvec![1, 5, 3];
assert_eq!(vec, expected);
}
}
Loading
Loading