Skip to content

Commit 0dd0f20

Browse files
authored
[Turbopack] refactor persistent caching from log based to cow approach (#76234)
### What? Instead of collecting all modifications to the task graph in a log structure (which ends up taking a lot of memory), use a modified and snapshot flag to avoid using more memory than needed. This way we don't store extra memory before a snapshot is requested. Once a snapshot is captured, we store at most twice the memory. We store only the memory of the modified tasks. It works this way: Idle Phase: (before a snapshot is requested) * When a task is modified, we only set the modified flag on that task. Capturing a snapshot want to avoid locking all tasks. So it first switches to the Snapshot Phase. Snapshot Phase: (after a snapshot is captured) * When a task is modified: * If there is already a snapshot, skip * If the task has the modified flag set, clone the task and store the snapshot. * If the task doesn't have the modified flag set, store a None snapshot. To actually capture the snapshot, go over all modified or snapshot tasks (which is stored in a separate map): * If the task has a snapshot (not None) stored, use that. * If the task has only the modified flag set, read the task state and capture a snapshot from that. * There is a race condition here, when the task is modified inbetween. We handle that by having a snapshot flag too. We have all snapshot captured now. To leave Snapshot Phase, we * First unset all modified flags. * Switch to Idle Phase. * Change all stored snapshots to modified.
1 parent e9982de commit 0dd0f20

19 files changed

+931
-832
lines changed

Cargo.lock

Lines changed: 100 additions & 79 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,7 @@ async-compression = { version = "0.3.13", default-features = false, features = [
332332
] }
333333
async-trait = "0.1.64"
334334
atty = "0.2.14"
335+
bitfield = "0.18.0"
335336
bytes = "1.1.0"
336337
chrono = "0.4.23"
337338
clap = { version = "4.5.2", features = ["derive"] }

turbopack/crates/turbo-tasks-backend/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ anyhow = { workspace = true }
2626
arc-swap = { version = "1.7.1" }
2727
async-trait = { workspace = true }
2828
auto-hash-map = { workspace = true }
29+
bitfield = { workspace = true }
2930
byteorder = "1.5.0"
3031
dashmap = { workspace = true, features = ["raw-api"]}
3132
either = { workspace = true }

turbopack/crates/turbo-tasks-backend/src/backend/dynamic_storage.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::data::{
66
CachedDataItemValue, CachedDataItemValueRef, CachedDataItemValueRefMut,
77
};
88

9+
#[derive(Debug, Clone)]
910
pub struct DynamicStorage {
1011
map: Vec<CachedDataItemStorage>,
1112
}
@@ -162,9 +163,24 @@ impl DynamicStorage {
162163
}
163164
}
164165

166+
pub fn len(&self) -> usize {
167+
self.map.iter().map(|m| m.len()).sum()
168+
}
169+
165170
pub fn shrink_to_fit(&mut self, ty: CachedDataItemType) {
166171
if let Some(map) = self.get_map_mut(ty) {
167172
map.shrink_to_fit();
168173
}
169174
}
175+
176+
pub fn snapshot_for_persisting(&self) -> Self {
177+
Self {
178+
map: self
179+
.map
180+
.iter()
181+
.filter(|m| m.ty().is_persistent())
182+
.cloned()
183+
.collect(),
184+
}
185+
}
170186
}

turbopack/crates/turbo-tasks-backend/src/backend/mod.rs

Lines changed: 126 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
mod dynamic_storage;
22
mod operation;
3-
mod persisted_storage_log;
43
mod storage;
54

65
use std::{
@@ -22,7 +21,7 @@ use auto_hash_map::{AutoMap, AutoSet};
2221
use indexmap::IndexSet;
2322
use parking_lot::{Condvar, Mutex};
2423
use rustc_hash::{FxHashMap, FxHashSet, FxHasher};
25-
use smallvec::smallvec;
24+
use smallvec::{smallvec, SmallVec};
2625
use tokio::time::{Duration, Instant};
2726
use turbo_tasks::{
2827
backend::{
@@ -34,8 +33,8 @@ use turbo_tasks::{
3433
task_statistics::TaskStatisticsApi,
3534
trace::TraceRawVcs,
3635
util::IdFactoryWithReuse,
37-
CellId, FunctionId, FxDashMap, RawVc, ReadCellOptions, ReadConsistency, SessionId, TaskId,
38-
TraitTypeId, TurboTasksBackendApi, ValueTypeId, TRANSIENT_TASK_BIT,
36+
CellId, FunctionId, FxDashMap, KeyValuePair, RawVc, ReadCellOptions, ReadConsistency,
37+
SessionId, TaskId, TraitTypeId, TurboTasksBackendApi, ValueTypeId, TRANSIENT_TASK_BIT,
3938
};
4039

4140
pub use self::{operation::AnyOperation, storage::TaskDataCategory};
@@ -49,17 +48,21 @@ use crate::{
4948
CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
5049
Operation, OutdatedEdge, TaskGuard,
5150
},
52-
persisted_storage_log::PersistedStorageLog,
53-
storage::{get, get_many, get_mut, get_mut_or_insert_with, iter_many, remove, Storage},
51+
storage::{
52+
get, get_many, get_mut, get_mut_or_insert_with, iter_many, remove,
53+
InnerStorageSnapshot, Storage,
54+
},
5455
},
5556
backing_storage::BackingStorage,
5657
data::{
5758
ActivenessState, AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
58-
CachedDataItemValue, CachedDataItemValueRef, CachedDataUpdate, CellRef, CollectibleRef,
59-
CollectiblesRef, DirtyState, InProgressCellState, InProgressState, InProgressStateInner,
60-
OutputValue, RootType,
59+
CachedDataItemValue, CachedDataItemValueRef, CellRef, CollectibleRef, CollectiblesRef,
60+
DirtyState, InProgressCellState, InProgressState, InProgressStateInner, OutputValue,
61+
RootType,
62+
},
63+
utils::{
64+
bi_map::BiMap, chunked_vec::ChunkedVec, ptr_eq_arc::PtrEqArc, sharded::Sharded, swap_retain,
6165
},
62-
utils::{bi_map::BiMap, chunked_vec::ChunkedVec, ptr_eq_arc::PtrEqArc, sharded::Sharded},
6366
};
6467

6568
const BACKEND_JOB_INITIAL_SNAPSHOT: BackendJobId = unsafe { BackendJobId::new_unchecked(1) };
@@ -163,8 +166,6 @@ struct TurboTasksBackendInner<B: BackingStorage> {
163166
task_cache: BiMap<Arc<CachedTaskType>, TaskId>,
164167
transient_tasks: FxDashMap<TaskId, Arc<TransientTask>>,
165168

166-
persisted_storage_data_log: Option<PersistedStorageLog>,
167-
persisted_storage_meta_log: Option<PersistedStorageLog>,
168169
storage: Storage,
169170

170171
/// Number of executing operations + Highest bit is set when snapshot is
@@ -227,8 +228,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
227228
persisted_task_cache_log: need_log.then(|| Sharded::new(shard_amount)),
228229
task_cache: BiMap::new(),
229230
transient_tasks: FxDashMap::default(),
230-
persisted_storage_data_log: need_log.then(|| PersistedStorageLog::new(shard_amount)),
231-
persisted_storage_meta_log: need_log.then(|| PersistedStorageLog::new(shard_amount)),
232231
storage: Storage::new(),
233232
in_progress_operations: AtomicUsize::new(0),
234233
snapshot_request: Mutex::new(SnapshotRequest::new()),
@@ -333,15 +332,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
333332
}
334333
}
335334

336-
fn persisted_storage_log(&self, category: TaskDataCategory) -> Option<&PersistedStorageLog> {
337-
match category {
338-
TaskDataCategory::Data => &self.persisted_storage_data_log,
339-
TaskDataCategory::Meta => &self.persisted_storage_meta_log,
340-
TaskDataCategory::All => unreachable!(),
341-
}
342-
.as_ref()
343-
}
344-
345335
fn should_persist(&self) -> bool {
346336
matches!(self.options.storage_mode, Some(StorageMode::ReadWrite))
347337
}
@@ -827,16 +817,12 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
827817
.map(|op| op.arc().clone())
828818
.collect::<Vec<_>>();
829819
drop(snapshot_request);
830-
fn take_from_log(log: &Option<PersistedStorageLog>) -> Vec<ChunkedVec<CachedDataUpdate>> {
831-
log.as_ref().map(|l| l.take()).unwrap_or_default()
832-
}
833-
let persisted_storage_meta_log = take_from_log(&self.persisted_storage_meta_log);
834-
let persisted_storage_data_log = take_from_log(&self.persisted_storage_data_log);
835-
let persisted_task_cache_log = self
820+
let mut persisted_task_cache_log = self
836821
.persisted_task_cache_log
837822
.as_ref()
838823
.map(|l| l.take(|i| i))
839824
.unwrap_or_default();
825+
self.storage.start_snapshot();
840826
let mut snapshot_request = self.snapshot_request.lock();
841827
snapshot_request.snapshot_requested = false;
842828
self.in_progress_operations
@@ -845,50 +831,131 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
845831
let snapshot_time = Instant::now();
846832
drop(snapshot_request);
847833

848-
// TODO track which items are persisting
849-
// TODO This is very inefficient, maybe the BackingStorage could compute that since it need
850-
// to iterate items anyway.
851-
// let mut counts: FxHashMap<TaskId, u32> =
852-
// FxHashMap::with_capacity_and_hasher(); for log in persisted_storage_meta_log
853-
// .iter()
854-
// .chain(persisted_storage_data_log.iter())
855-
// {
856-
// for CachedDataUpdate { task, .. } in log.iter() {
857-
// *counts.entry(*task).or_default() += 1;
858-
// }
859-
// }
834+
let preprocess = |task_id: TaskId, inner: &storage::InnerStorage| {
835+
if task_id.is_transient() {
836+
return (None, None);
837+
}
838+
let len = inner.len();
839+
let mut meta = Vec::with_capacity(len);
840+
let mut data = Vec::with_capacity(len);
841+
for (key, value) in inner.iter_all() {
842+
if key.is_persistent() && value.is_persistent() {
843+
match key.category() {
844+
TaskDataCategory::Meta => {
845+
meta.push(CachedDataItem::from_key_and_value_ref(key, value))
846+
}
847+
TaskDataCategory::Data => {
848+
data.push(CachedDataItem::from_key_and_value_ref(key, value))
849+
}
850+
_ => {}
851+
}
852+
}
853+
}
860854

861-
let mut new_items = false;
855+
(
856+
inner.state().meta_restored().then_some(meta),
857+
inner.state().data_restored().then_some(data),
858+
)
859+
};
860+
let process = |task_id: TaskId, (meta, data): (Option<Vec<_>>, Option<Vec<_>>)| {
861+
(
862+
task_id,
863+
meta.map(|d| B::serialize(task_id, &d)),
864+
data.map(|d| B::serialize(task_id, &d)),
865+
)
866+
};
867+
let process_snapshot = |task_id: TaskId, inner: Box<InnerStorageSnapshot>| {
868+
if task_id.is_transient() {
869+
return (task_id, None, None);
870+
}
871+
let len = inner.len();
872+
let mut meta = Vec::with_capacity(len);
873+
let mut data = Vec::with_capacity(len);
874+
for (key, value) in inner.iter_all() {
875+
if key.is_persistent() && value.is_persistent() {
876+
match key.category() {
877+
TaskDataCategory::Meta => {
878+
meta.push(CachedDataItem::from_key_and_value_ref(key, value))
879+
}
880+
TaskDataCategory::Data => {
881+
data.push(CachedDataItem::from_key_and_value_ref(key, value))
882+
}
883+
_ => {}
884+
}
885+
}
886+
}
887+
(
888+
task_id,
889+
inner.meta_restored.then(|| B::serialize(task_id, &meta)),
890+
inner.data_restored.then(|| B::serialize(task_id, &data)),
891+
)
892+
};
862893

863-
fn shards_empty<T>(shards: &[ChunkedVec<T>]) -> bool {
864-
shards.iter().all(|shard| shard.is_empty())
865-
}
894+
let snapshot = {
895+
let _span = tracing::trace_span!("take snapshot");
896+
self.storage
897+
.take_snapshot(&preprocess, &process, &process_snapshot)
898+
};
866899

867-
if !shards_empty(&persisted_task_cache_log)
868-
|| !shards_empty(&persisted_storage_meta_log)
869-
|| !shards_empty(&persisted_storage_data_log)
870-
{
900+
let task_snapshots = snapshot
901+
.into_iter()
902+
.filter_map(|iter| {
903+
let mut iter = iter
904+
.filter_map(
905+
|(task_id, meta, data): (
906+
_,
907+
Option<Result<SmallVec<_>>>,
908+
Option<Result<SmallVec<_>>>,
909+
)| {
910+
let meta = match meta {
911+
Some(Ok(meta)) => Some(meta),
912+
None => None,
913+
Some(Err(err)) => {
914+
println!(
915+
"Serializing task {} failed (meta): {:?}",
916+
self.get_task_description(task_id),
917+
err
918+
);
919+
None
920+
}
921+
};
922+
let data = match data {
923+
Some(Ok(data)) => Some(data),
924+
None => None,
925+
Some(Err(err)) => {
926+
println!(
927+
"Serializing task {} failed (data): {:?}",
928+
self.get_task_description(task_id),
929+
err
930+
);
931+
None
932+
}
933+
};
934+
(meta.is_some() || data.is_some()).then_some((task_id, meta, data))
935+
},
936+
)
937+
.peekable();
938+
iter.peek().is_some().then_some(iter)
939+
})
940+
.collect::<Vec<_>>();
941+
942+
swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());
943+
944+
let mut new_items = false;
945+
946+
if !persisted_task_cache_log.is_empty() || !task_snapshots.is_empty() {
871947
new_items = true;
872948
if let Err(err) = self.backing_storage.save_snapshot(
873949
self.session_id,
874950
suspended_operations,
875951
persisted_task_cache_log,
876-
persisted_storage_meta_log,
877-
persisted_storage_data_log,
952+
task_snapshots,
878953
) {
879954
println!("Persisting failed: {:?}", err);
880955
return None;
881956
}
882957
}
883958

884-
// TODO add when we need to track persisted items
885-
// for (task_id, count) in counts {
886-
// self.storage
887-
// .access_mut(task_id)
888-
// .persistance_state_mut()
889-
// .finish_persisting_items(count);
890-
// }
891-
892959
Some((snapshot_time, new_items))
893960
}
894961

turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use crate::{
3030
ActivenessState, AggregationNumber, CachedDataItem, CachedDataItemKey, CollectibleRef,
3131
DirtyContainerCount,
3232
},
33-
utils::deque_set::DequeSet,
33+
utils::{deque_set::DequeSet, swap_retain},
3434
};
3535

3636
pub const LEAF_NUMBER: u32 = 16;
@@ -1526,6 +1526,7 @@ impl AggregationUpdateQueue {
15261526
let mut upper_upper_ids_with_new_follower = SmallVec::new();
15271527
let mut tasks_for_which_increment_active_count = SmallVec::new();
15281528
let mut is_active = false;
1529+
15291530
swap_retain(&mut upper_ids, |&mut upper_id| {
15301531
let mut upper = ctx.task(
15311532
upper_id,
@@ -2355,29 +2356,3 @@ impl Operation for AggregationUpdateQueue {
23552356
}
23562357
}
23572358
}
2358-
2359-
fn swap_retain<T, const N: usize>(vec: &mut SmallVec<[T; N]>, mut f: impl FnMut(&mut T) -> bool) {
2360-
let mut i = 0;
2361-
while i < vec.len() {
2362-
if !f(&mut vec[i]) {
2363-
vec.swap_remove(i);
2364-
} else {
2365-
i += 1;
2366-
}
2367-
}
2368-
}
2369-
2370-
#[cfg(test)]
2371-
mod tests {
2372-
use smallvec::{smallvec, SmallVec};
2373-
2374-
use crate::backend::operation::aggregation_update::swap_retain;
2375-
2376-
#[test]
2377-
fn test_swap_retain() {
2378-
let mut vec: SmallVec<[i32; 4]> = smallvec![1, 2, 3, 4, 5];
2379-
swap_retain(&mut vec, |a| *a % 2 != 0);
2380-
let expected: SmallVec<[i32; 4]> = smallvec![1, 5, 3];
2381-
assert_eq!(vec, expected);
2382-
}
2383-
}

0 commit comments

Comments
 (0)