Skip to content

Commit d8b4012

Browse files
committed
parallelize processing tasks updates
1 parent 0c74d36 commit d8b4012

File tree

1 file changed

+167
-176
lines changed

1 file changed

+167
-176
lines changed

turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs

Lines changed: 167 additions & 176 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use anyhow::{anyhow, Context, Result};
88
use rayon::iter::{IntoParallelIterator, ParallelIterator};
99
use rustc_hash::FxHashMap;
1010
use tracing::Span;
11-
use turbo_tasks::{backend::CachedTaskType, KeyValuePair, SessionId, TaskId};
11+
use turbo_tasks::{backend::CachedTaskType, turbo_tasks_scope, KeyValuePair, SessionId, TaskId};
1212

1313
use crate::{
1414
backend::{AnyOperation, TaskDataCategory},
@@ -129,48 +129,12 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
129129
turbo_tasks::scope(|s| {
130130
// Start organizing the updates in parallel
131131
s.spawn(|_| {
132-
let task_meta_updates = {
133-
let _span = tracing::trace_span!(
134-
"organize task meta",
135-
updates = meta_updates.iter().map(|m| m.len()).sum::<usize>()
136-
)
137-
.entered();
138-
organize_task_data(meta_updates)
139-
};
140-
let items_result = {
141-
let _span = tracing::trace_span!(
142-
"restore task meta",
143-
tasks = task_meta_updates.iter().map(|m| m.len()).sum::<usize>()
144-
)
145-
.entered();
146-
restore_task_data(&self.database, KeySpace::TaskMeta, task_meta_updates)
147-
};
148-
task_meta_items_result = items_result.and_then(|items| {
149-
let _span = tracing::trace_span!("serialize task meta").entered();
150-
serialize_task_data(items)
151-
});
132+
task_meta_items_result =
133+
process_task_data(&self.database, KeySpace::TaskMeta, meta_updates);
152134
});
153135
s.spawn(|_| {
154-
let task_data_updates = {
155-
let _span = tracing::trace_span!(
156-
"organize task data",
157-
updates = data_updates.iter().map(|m| m.len()).sum::<usize>()
158-
)
159-
.entered();
160-
organize_task_data(data_updates)
161-
};
162-
let items_result = {
163-
let _span = tracing::trace_span!(
164-
"restore task data",
165-
tasks = task_data_updates.iter().map(|m| m.len()).sum::<usize>()
166-
)
167-
.entered();
168-
restore_task_data(&self.database, KeySpace::TaskData, task_data_updates)
169-
};
170-
task_data_items_result = items_result.and_then(|items| {
171-
let _span = tracing::trace_span!("serialize task data").entered();
172-
serialize_task_data(items)
173-
});
136+
task_data_items_result =
137+
process_task_data(&self.database, KeySpace::TaskData, data_updates);
174138
});
175139

176140
{
@@ -385,155 +349,182 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
385349
}
386350
}
387351

388-
type OrganizedTaskData = FxHashMap<
389-
TaskId,
390-
FxHashMap<CachedDataItemKey, (Option<CachedDataItemValue>, Option<CachedDataItemValue>)>,
391-
>;
392-
type ShardedOrganizedTaskData = Vec<OrganizedTaskData>;
393-
394-
fn organize_task_data(updates: Vec<ChunkedVec<CachedDataUpdate>>) -> ShardedOrganizedTaskData {
352+
fn process_task_data(
353+
database: &(impl KeyValueDatabase + Sync),
354+
key_space: KeySpace,
355+
updates: Vec<ChunkedVec<CachedDataUpdate>>,
356+
) -> Result<Vec<Vec<(TaskId, Vec<u8>)>>> {
395357
let span = Span::current();
358+
let turbo_tasks = turbo_tasks::turbo_tasks();
359+
let handle = tokio::runtime::Handle::current();
396360
updates
397361
.into_par_iter()
398362
.map(|updates| {
399363
let _span = span.clone().entered();
400-
let mut task_updates: OrganizedTaskData =
401-
FxHashMap::with_capacity_and_hasher(updates.len() / 10, Default::default());
402-
for CachedDataUpdate {
403-
task,
404-
key,
405-
value,
406-
old_value,
407-
} in updates.into_iter()
408-
{
409-
let data = task_updates.entry(task).or_default();
410-
match data.entry(key) {
411-
Entry::Occupied(mut entry) => {
412-
entry.get_mut().1 = value;
413-
}
414-
Entry::Vacant(entry) => {
415-
entry.insert((old_value, value));
364+
let _guard = handle.clone().enter();
365+
turbo_tasks_scope(turbo_tasks.clone(), || {
366+
let mut task_updates: FxHashMap<
367+
TaskId,
368+
FxHashMap<
369+
CachedDataItemKey,
370+
(Option<CachedDataItemValue>, Option<CachedDataItemValue>),
371+
>,
372+
> = FxHashMap::with_capacity_and_hasher(updates.len(), Default::default());
373+
374+
{
375+
let span = tracing::trace_span!(
376+
"organize updates",
377+
updates = updates.len(),
378+
tasks = tracing::field::Empty
379+
)
380+
.entered();
381+
382+
// Organize the updates by task
383+
for CachedDataUpdate {
384+
task,
385+
key,
386+
value,
387+
old_value,
388+
} in updates.into_iter()
389+
{
390+
let data = task_updates.entry(task).or_default();
391+
match data.entry(key) {
392+
Entry::Occupied(mut entry) => {
393+
entry.get_mut().1 = value;
394+
}
395+
Entry::Vacant(entry) => {
396+
entry.insert((old_value, value));
397+
}
398+
}
416399
}
400+
401+
span.record("tasks", task_updates.len());
417402
}
418-
}
419-
task_updates.retain(|_, data| {
420-
data.retain(|_, (old_value, value)| *old_value != *value);
421-
!data.is_empty()
422-
});
423-
task_updates
424-
})
425-
.collect()
426-
}
427403

428-
fn restore_task_data(
429-
database: &(impl KeyValueDatabase + Sync),
430-
key_space: KeySpace,
431-
task_updates: ShardedOrganizedTaskData,
432-
) -> Result<Vec<Vec<(TaskId, Vec<CachedDataItem>)>>> {
433-
task_updates
434-
.into_par_iter()
435-
.map(|task_updates| {
436-
let tx = database.begin_read_transaction()?;
437-
let mut result = Vec::with_capacity(task_updates.len());
438-
let mut map = FxHashMap::with_capacity_and_hasher(128, Default::default());
439-
for (task, updates) in task_updates {
440-
if let Some(old_data) = database.get(&tx, key_space, IntKey::new(*task).as_ref())? {
441-
let old_data: Vec<CachedDataItem> = match pot::from_slice(old_data.borrow()) {
442-
Ok(d) => d,
443-
Err(_) => serde_path_to_error::deserialize(
444-
&mut pot::de::SymbolList::new()
445-
.deserializer_for_slice(old_data.borrow())?,
446-
)
447-
.with_context(|| {
448-
let old_data: &[u8] = old_data.borrow();
449-
anyhow!("Unable to deserialize old value of {task}: {old_data:?}")
450-
})?,
451-
};
452-
map.extend(old_data.into_iter().map(|item| item.into_key_and_value()));
404+
{
405+
let span = tracing::trace_span!(
406+
"dedupe updates",
407+
before = task_updates.len(),
408+
after = tracing::field::Empty
409+
)
410+
.entered();
411+
412+
// Remove no-op task updates (so we have less tasks to restore)
413+
task_updates.retain(|_, data| {
414+
data.retain(|_, (old_value, value)| *old_value != *value);
415+
!data.is_empty()
416+
});
417+
418+
span.record("after", task_updates.len());
453419
}
454-
for (key, (_, value)) in updates {
455-
if let Some(value) = value {
456-
map.insert(key, value);
457-
} else {
458-
map.remove(&key);
420+
421+
let tx = database.begin_read_transaction()?;
422+
423+
let span = tracing::trace_span!(
424+
"restore, update and serialize",
425+
tasks = task_updates.len(),
426+
restored_tasks = tracing::field::Empty
427+
)
428+
.entered();
429+
let mut restored_tasks = 0;
430+
431+
// Restore the old task data, apply the updates and serialize the new data
432+
let mut tasks = Vec::with_capacity(task_updates.len());
433+
let mut map = FxHashMap::with_capacity_and_hasher(128, Default::default());
434+
for (task, updates) in task_updates {
435+
// Restore the old task data
436+
if let Some(old_data) =
437+
database.get(&tx, key_space, IntKey::new(*task).as_ref())?
438+
{
439+
let old_data: Vec<CachedDataItem> = match pot::from_slice(old_data.borrow())
440+
{
441+
Ok(d) => d,
442+
Err(_) => serde_path_to_error::deserialize(
443+
&mut pot::de::SymbolList::new()
444+
.deserializer_for_slice(old_data.borrow())?,
445+
)
446+
.with_context(|| {
447+
let old_data: &[u8] = old_data.borrow();
448+
anyhow!("Unable to deserialize old value of {task}: {old_data:?}")
449+
})?,
450+
};
451+
map.extend(old_data.into_iter().map(|item| item.into_key_and_value()));
452+
restored_tasks += 1;
453+
}
454+
455+
// Apply update
456+
for (key, (_, value)) in updates {
457+
if let Some(value) = value {
458+
map.insert(key, value);
459+
} else {
460+
map.remove(&key);
461+
}
459462
}
463+
464+
// Get new data
465+
let data = map
466+
.drain()
467+
.map(|(key, value)| CachedDataItem::from_key_and_value(key, value))
468+
.collect::<Vec<_>>();
469+
470+
// Serialize new data
471+
let value = serialize(task, data)?;
472+
473+
// Store the new task data
474+
tasks.push((task, value));
460475
}
461-
let vec = map
462-
.drain()
463-
.map(|(key, value)| CachedDataItem::from_key_and_value(key, value))
464-
.collect();
465-
result.push((task, vec));
466-
}
467-
Ok(result)
476+
477+
span.record("restored_tasks", restored_tasks);
478+
Ok(tasks)
479+
})
468480
})
469481
.collect::<Result<Vec<_>>>()
470482
}
471483

472-
fn serialize_task_data(
473-
tasks: Vec<Vec<(TaskId, Vec<CachedDataItem>)>>,
474-
) -> Result<Vec<Vec<(TaskId, Vec<u8>)>>> {
475-
tasks
476-
.into_par_iter()
477-
.map(|tasks| {
478-
tasks
479-
.into_iter()
480-
.map(|(task_id, mut data)| {
481-
let value = match pot::to_vec(&data) {
482-
#[cfg(not(feature = "verify_serialization"))]
483-
Ok(value) => value,
484-
_ => {
485-
let mut error = Ok(());
486-
data.retain(|item| {
487-
let mut buf = Vec::<u8>::new();
488-
let mut symbol_map = pot::ser::SymbolMap::new();
489-
let mut serializer = symbol_map.serializer_for(&mut buf).unwrap();
490-
if let Err(err) =
491-
serde_path_to_error::serialize(item, &mut serializer)
492-
{
493-
if item.is_optional() {
494-
#[cfg(feature = "verify_serialization")]
495-
println!(
496-
"Skipping non-serializable optional item: {item:?}"
497-
);
498-
} else {
499-
error = Err(err).context({
500-
anyhow!(
501-
"Unable to serialize data item for {task_id}: \
502-
{item:#?}"
503-
)
504-
});
505-
}
506-
false
507-
} else {
508-
#[cfg(feature = "verify_serialization")]
509-
{
510-
let deserialize: Result<CachedDataItem, _> =
511-
serde_path_to_error::deserialize(
512-
&mut pot::de::SymbolList::new()
513-
.deserializer_for_slice(&buf)
514-
.unwrap(),
515-
);
516-
if let Err(err) = deserialize {
517-
println!(
518-
"Data item would not be deserializable {task_id}: \
519-
{err:?}\n{item:#?}"
520-
);
521-
return false;
522-
}
523-
}
524-
true
525-
}
526-
});
527-
error?;
528-
529-
pot::to_vec(&data).with_context(|| {
530-
anyhow!("Unable to serialize data items for {task_id}: {data:#?}")
531-
})?
484+
fn serialize(task: TaskId, mut data: Vec<CachedDataItem>) -> Result<Vec<u8>> {
485+
Ok(match pot::to_vec(&data) {
486+
#[cfg(not(feature = "verify_serialization"))]
487+
Ok(value) => value,
488+
_ => {
489+
let mut error = Ok(());
490+
data.retain(|item| {
491+
let mut buf = Vec::<u8>::new();
492+
let mut symbol_map = pot::ser::SymbolMap::new();
493+
let mut serializer = symbol_map.serializer_for(&mut buf).unwrap();
494+
if let Err(err) = serde_path_to_error::serialize(item, &mut serializer) {
495+
if item.is_optional() {
496+
#[cfg(feature = "verify_serialization")]
497+
println!("Skipping non-serializable optional item: {item:?}");
498+
} else {
499+
error = Err(err).context({
500+
anyhow!("Unable to serialize data item for {task}: {item:#?}")
501+
});
502+
}
503+
false
504+
} else {
505+
#[cfg(feature = "verify_serialization")]
506+
{
507+
let deserialize: Result<CachedDataItem, _> =
508+
serde_path_to_error::deserialize(
509+
&mut pot::de::SymbolList::new()
510+
.deserializer_for_slice(&buf)
511+
.unwrap(),
512+
);
513+
if let Err(err) = deserialize {
514+
println!(
515+
"Data item would not be deserializable {task_id}: \
516+
{err:?}\n{item:#?}"
517+
);
518+
return false;
532519
}
533-
};
534-
Ok((task_id, value))
535-
})
536-
.collect::<Result<Vec<_>>>()
537-
})
538-
.collect::<Result<Vec<_>>>()
520+
}
521+
true
522+
}
523+
});
524+
error?;
525+
526+
pot::to_vec(&data)
527+
.with_context(|| anyhow!("Unable to serialize data items for {task}: {data:#?}"))?
528+
}
529+
})
539530
}

0 commit comments

Comments
 (0)