Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions turbopack/crates/turbo-persistence/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,18 @@ pub fn compress_into_buffer(
_long_term: bool,
buffer: &mut Vec<u8>,
) -> Result<()> {
let mut compressor = if let Some(dict) = dict {
lzzzz::lz4::Compressor::with_dict(dict)
if let Some(dict) = dict {
// Dictionary compression requires the streaming API to load the dict.
let mut compressor =
lzzzz::lz4::Compressor::with_dict(dict).context("LZ4 compressor creation failed")?;
compressor
.next_to_vec(block, buffer, ACC_LEVEL_DEFAULT)
.context("Compression failed")?;
} else {
lzzzz::lz4::Compressor::new()
// Without a dictionary, use the block API which reuses a thread-local compression
// state, avoiding per-call allocations.
lzzzz::lz4::compress_to_vec(block, buffer, ACC_LEVEL_DEFAULT)
.context("Compression failed")?;
}
.context("LZ4 compressor creation failed")?;
let acc_factor = ACC_LEVEL_DEFAULT;
compressor
.next_to_vec(block, buffer, acc_factor)
.context("Compression failed")?;
Ok(())
}
25 changes: 13 additions & 12 deletions turbopack/crates/turbo-persistence/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
let mut new_meta_files = self
.parallel_scheduler
.parallel_map_collect_owned::<_, _, Result<Vec<_>>>(new_meta_files, |(seq, file)| {
file.sync_all()?;
file.sync_data()?;
let meta_file = MetaFile::open(&self.path, seq)?;
Ok(meta_file)
})?;
Expand All @@ -542,15 +542,16 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
sst_filter.apply_filter(meta_file);
}

self.parallel_scheduler.block_in_place(|| {
for (_, file) in new_sst_files.iter() {
file.sync_all()?;
}
for (_, file) in new_blob_files.iter() {
file.sync_all()?;
}
anyhow::Ok(())
})?;
self.parallel_scheduler
.try_parallel_for_each(&new_sst_files, |(_, file)| {
file.sync_data()?;
anyhow::Ok(())
})?;
self.parallel_scheduler
.try_parallel_for_each(&new_blob_files, |(_, file)| {
file.sync_data()?;
anyhow::Ok(())
Comment on lines +545 to +553
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just concat the iterators and do one parallel for each

})?;
drop(sync_span);

let new_meta_info = new_meta_files
Expand Down Expand Up @@ -628,7 +629,7 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
}
let mut file = File::create(self.path.join(format!("{seq:08}.del")))?;
file.write_all(&buf)?;
file.sync_all()?;
file.sync_data()?;
}

let mut current_file = OpenOptions::new()
Expand All @@ -637,7 +638,7 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
.read(false)
.open(self.path.join("CURRENT"))?;
current_file.write_u32::<BE>(seq)?;
current_file.sync_all()?;
current_file.sync_data()?;

for seq in sst_seq_numbers_to_delete.iter() {
fs::remove_file(self.path.join(format!("{seq:08}.sst")))?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
borrow::Cow,
cmp::min,
fs::File,
io::{BufWriter, Seek, Write},
io::{BufWriter, Write},
path::Path,
};

Expand Down Expand Up @@ -120,7 +120,7 @@ pub fn write_static_stored_file<E: Entry>(
) -> Result<(StaticSortedFileBuilderMeta<'static>, File)> {
debug_assert!(entries.iter().map(|e| e.key_hash()).is_sorted());

let mut file = BufWriter::new(File::create(file)?);
let mut file = BufWriter::with_capacity(256 * 1024, File::create(file)?);

let capacity = get_compression_buffer_capacity(total_key_size);
// We use a shared buffer for all operations to avoid excessive allocations
Expand Down Expand Up @@ -150,6 +150,12 @@ pub fn write_static_stored_file<E: Entry>(
let max_hash = entries.last().map_or(0, |e| e.key_hash());

let block_count = block_writer.block_count();
let block_data_size = block_writer
.block_offsets
.last()
.copied()
.unwrap_or_default() as u64;
let offset_table_size = (block_writer.block_offsets.len() * 4) as u64;
for offset in &block_writer.block_offsets {
file.write_u32::<BE>(*offset)
.context("Failed to write block offset")?;
Expand All @@ -161,7 +167,7 @@ pub fn write_static_stored_file<E: Entry>(
amqf: Cow::Owned(amqf.into_vec()),
key_compression_dictionary_length: key_dict.len().try_into().unwrap(),
block_count,
size: file.stream_position()?,
size: key_dict.len() as u64 + block_data_size + offset_table_size,
flags,
entries: entries.len() as u64,
};
Expand Down Expand Up @@ -520,7 +526,8 @@ fn write_key_blocks_and_compute_amqf(
{
let entry_count = i - current_block_start;
let has_hash = use_hash(current_block_max_key_len);
let mut block = KeyBlockBuilder::new(buffer, entry_count as u32, has_hash);
let mut block =
KeyBlockBuilder::new(buffer, entry_count as u32, has_hash, current_block_size);
for j in current_block_start..i {
let entry = &entries[j];
let value_location = &value_locations[j];
Expand All @@ -546,7 +553,8 @@ fn write_key_blocks_and_compute_amqf(
if current_block_size > 0 {
let entry_count = entries.len() - current_block_start;
let has_hash = use_hash(current_block_max_key_len);
let mut block = KeyBlockBuilder::new(buffer, entry_count as u32, has_hash);
let mut block =
KeyBlockBuilder::new(buffer, entry_count as u32, has_hash, current_block_size);
for j in current_block_start..entries.len() {
let entry = &entries[j];
let value_location = &value_locations[j];
Expand Down Expand Up @@ -594,11 +602,16 @@ const KEY_BLOCK_HEADER_SIZE: usize = 4;

impl<'l> KeyBlockBuilder<'l> {
/// Creates a new key block builder for the number of entries.
pub fn new(buffer: &'l mut Vec<u8>, entry_count: u32, has_hash: bool) -> Self {
/// `estimated_content_size` should be the sum of key lengths + per-entry metadata overhead.
pub fn new(
buffer: &'l mut Vec<u8>,
entry_count: u32,
has_hash: bool,
estimated_content_size: usize,
) -> Self {
debug_assert!(entry_count < (1 << 24));

const ESTIMATED_KEY_SIZE: usize = 16;
buffer.reserve(entry_count as usize * ESTIMATED_KEY_SIZE);
buffer.reserve(KEY_BLOCK_HEADER_SIZE + entry_count as usize * 4 + estimated_content_size);
let block_type = if has_hash {
BLOCK_TYPE_KEY_WITH_HASH
} else {
Expand Down
Loading