Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion turbopack/crates/turbo-persistence/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ memmap2 = "0.9.5"
nohash-hasher = { workspace = true }
parking_lot = { workspace = true }
pot = "3.0.0"
qfilter = { version = "0.2.4", features = ["serde"] }
qfilter = { git = "https://github.com/lukesandberg/qfilter", branch = "lukesandberg/qfilterref", features = ["serde"] }
quick_cache = { workspace = true }
rustc-hash = { workspace = true }
smallvec = { workspace = true }
Expand Down
15 changes: 7 additions & 8 deletions turbopack/crates/turbo-persistence/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,7 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
// during the merge loop. Empty filters (from commits with no
// reads) are discarded.
let used_key_hashes: Option<qfilter::Filter> = {
let filters: Vec<qfilter::Filter> = meta_files
let filter_refs: Vec<qfilter::FilterRef<'_>> = meta_files
.iter()
.filter(|m| m.family() == family)
.filter_map(|meta_file| {
Expand All @@ -952,19 +952,18 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
.into_iter()
.filter(|amqf| !amqf.is_empty())
.collect();
if filters.is_empty() {
if filter_refs.is_empty() {
None
} else if filters.len() == 1 {
// Just directly use the single item
filters.into_iter().next()
} else if filter_refs.len() == 1 {
Some(filter_refs[0].to_owned())
} else {
let total_len: u64 = filters.iter().map(|f| f.len()).sum();
let total_len: u64 = filter_refs.iter().map(|f| f.len()).sum();
let mut merged =
qfilter::Filter::with_fingerprint_size(total_len, u64::BITS as u8)
.expect("Failed to create merged AMQF filter");
for filter in &filters {
for filter_ref in &filter_refs {
merged
.merge(false, filter)
.merge(false, &filter_ref.to_owned())
.expect("Failed to merge AMQF filters");
}
merged.shrink_to_fit();
Expand Down
131 changes: 84 additions & 47 deletions turbopack/crates/turbo-persistence/src/meta_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,14 @@
fmt::Display,
fs::File,
io::{BufReader, Seek},
ops::Deref,
path::{Path, PathBuf},
sync::OnceLock,
};

use anyhow::{Context, Result, bail};
use bincode::{Decode, Encode};
use bitfield::bitfield;
use byteorder::{BE, ReadBytesExt};
use memmap2::{Mmap, MmapOptions};
use turbo_bincode::turbo_bincode_decode;

use crate::{
QueryKey,
Expand Down Expand Up @@ -50,14 +47,6 @@
}
}

/// A wrapper around [`qfilter::Filter`] that implements [`Encode`] and [`Decode`].
#[derive(Encode, Decode)]
pub struct AmqfBincodeWrapper(
// this annotation can be replaced with `#[bincode(serde)]` once
// <https://github.com/arthurprs/qfilter/issues/13> is resolved
#[bincode(with = "turbo_bincode::serde_self_describing")] pub qfilter::Filter,
);

pub struct MetaEntry {
/// The metadata for the static sorted file.
sst_data: StaticSortedFileMetaData,
Expand All @@ -77,9 +66,15 @@
/// The offset of the end of the AMQF data in the the meta file relative to the end of the
/// header.
end_of_amqf_data_offset: u32,
/// The AMQF filter of this file. This is only used if the range is very large. Smaller ranges
/// use the AMQF cache instead.
amqf: OnceLock<qfilter::Filter>,
/// The AMQF filter, deserialized zero-copy from the memory-mapped file.
///
/// # Safety
///
/// This is actually a `FilterRef<'mmap>` that borrows from the `MetaFile`'s mmap.
/// It is stored as `'static` because `MetaEntry` and the mmap are co-owned by `MetaFile`,
/// and Rust drops struct fields in declaration order, so `entries` (containing this field)
/// is dropped before `mmap`.
amqf: qfilter::FilterRef<'static>,
/// The static sorted file that is lazily loaded
sst: OnceLock<StaticSortedFile>,
}
Expand Down Expand Up @@ -107,24 +102,8 @@
.expect("AMQF data out of bounds")
}

pub fn deserialize_amqf(&self, meta: &MetaFile) -> Result<qfilter::Filter> {
let amqf = self.raw_amqf(meta.amqf_data());
Ok(turbo_bincode_decode::<AmqfBincodeWrapper>(amqf)
.with_context(|| {
format!(
"Failed to deserialize AMQF from {:08}.meta for {:08}.sst",
meta.sequence_number,
self.sequence_number()
)
})?
.0)
}

pub fn amqf(&self, meta: &MetaFile) -> Result<impl Deref<Target = qfilter::Filter>> {
self.amqf.get_or_try_init(|| {
let amqf = self.deserialize_amqf(meta)?;
anyhow::Ok(amqf)
})
pub fn amqf(&self) -> qfilter::FilterRef<'_> {
self.amqf
}

pub fn sst(&self, meta: &MetaFile) -> Result<&StaticSortedFile> {
Expand Down Expand Up @@ -219,7 +198,9 @@
sequence_number: u32,
/// The key family of the SST files in this meta file.
family: u32,
/// The entries of the file.
/// The entries of the file. Each entry's `amqf` field borrows from `mmap` via a
/// transmuted `'static` lifetime. This field MUST be declared before `mmap` so that
/// it is dropped first. See [`assert_entries_drop_before_mmap`].
entries: Vec<MetaEntry>,
/// The entries that have been marked as obsolete.
obsolete_entries: Vec<u32>,
Expand All @@ -231,10 +212,19 @@
/// The offset of the end of the "used keys" AMQF data in the the meta file relative to the end
/// of the header.
end_of_used_keys_amqf_data_offset: u32,
/// The memory mapped file.
/// The memory mapped file. This field MUST be declared after `entries` so that
/// `entries` is dropped first (Rust drops fields in declaration order).
mmap: Mmap,
}

/// Compile-time assertion that `entries` is laid out before `mmap` in `MetaFile`,
/// guaranteeing that entries (containing `FilterRef`s borrowing from the mmap)
/// are dropped before the mmap itself.
const _: () = assert!(

Check failure on line 223 in turbopack/crates/turbo-persistence/src/meta_file.rs

View workflow job for this annotation

GitHub Actions / build-native-windows / build

evaluation panicked: MetaFile::entries must be declared before MetaFile::mmap for drop-order safety
std::mem::offset_of!(MetaFile, entries) < std::mem::offset_of!(MetaFile, mmap),
"MetaFile::entries must be declared before MetaFile::mmap for drop-order safety"
);

impl MetaFile {
/// Opens a meta file at the given path. This memory maps the file, but does not read it yet.
/// It's lazy read on demand.
Expand All @@ -258,39 +248,88 @@
let obsolete_sst = file.read_u32::<BE>()?;
obsolete_sst_files.push(obsolete_sst);
}

// First pass: read entry headers from the file (before the mmap is available).
struct EntryHeader {
sst_data: StaticSortedFileMetaData,
min_hash: u64,
max_hash: u64,
size: u64,
flags: MetaEntryFlags,
start_of_amqf_data_offset: u32,
end_of_amqf_data_offset: u32,
}
let count = file.read_u32::<BE>()?;
let mut entries = Vec::with_capacity(count as usize);
let mut entry_headers = Vec::with_capacity(count as usize);
let mut start_of_amqf_data_offset = 0;
for _ in 0..count {
let entry = MetaEntry {
let header = EntryHeader {
sst_data: StaticSortedFileMetaData {
sequence_number: file.read_u32::<BE>()?,
key_compression_dictionary_length: file.read_u16::<BE>()?,
block_count: file.read_u16::<BE>()?,
},
family,
min_hash: file.read_u64::<BE>()?,
max_hash: file.read_u64::<BE>()?,
size: file.read_u64::<BE>()?,
flags: MetaEntryFlags(file.read_u32::<BE>()?),
start_of_amqf_data_offset,
end_of_amqf_data_offset: file.read_u32::<BE>()?,
amqf: OnceLock::new(),
sst: OnceLock::new(),
};
start_of_amqf_data_offset = entry.end_of_amqf_data_offset;
entries.push(entry);
start_of_amqf_data_offset = header.end_of_amqf_data_offset;
entry_headers.push(header);
}
let start_of_used_keys_amqf_data_offset = start_of_amqf_data_offset;
let end_of_used_keys_amqf_data_offset = file.read_u32::<BE>()?;

// Create the mmap over the AMQF data region.
let offset = file.stream_position()?;
let file = file.into_inner();
let mut options = MmapOptions::new();
options.offset(offset);
let mmap = unsafe { options.map(&file)? };
#[cfg(unix)]
mmap.advise(memmap2::Advice::Random)?;
{
mmap.advise(memmap2::Advice::Random)?;
mmap.advise(memmap2::Advice::WillNeed)?;
}

// Second pass: build MetaEntry structs, eagerly deserializing each AMQF filter
// zero-copy from the mmap.
let amqf_data: &[u8] = &mmap;
let entries = entry_headers
.into_iter()
.map(|h| {
let raw_amqf = amqf_data
.get(h.start_of_amqf_data_offset as usize..h.end_of_amqf_data_offset as usize)
.expect("AMQF data out of bounds");
let amqf: qfilter::FilterRef<'_> =
pot::from_slice(raw_amqf).with_context(|| {
format!(
"Failed to deserialize AMQF from {:08}.meta for {:08}.sst",
sequence_number, h.sst_data.sequence_number
)
})?;
// SAFETY: The FilterRef borrows from `mmap` which is owned by the same
// MetaFile struct. Rust drops struct fields in declaration order, so
// `entries` (which contains these FilterRefs) is dropped before `mmap`.
// FilterRef is Copy and has no Drop impl.
let amqf: qfilter::FilterRef<'static> = unsafe { std::mem::transmute(amqf) };
anyhow::Ok(MetaEntry {
sst_data: h.sst_data,
family,
min_hash: h.min_hash,
max_hash: h.max_hash,
size: h.size,
flags: h.flags,
start_of_amqf_data_offset: h.start_of_amqf_data_offset,
end_of_amqf_data_offset: h.end_of_amqf_data_offset,
amqf,
sst: OnceLock::new(),
})
})
.collect::<Result<Vec<_>>>()?;

let file = Self {
db_path,
sequence_number,
Expand All @@ -307,15 +346,13 @@

pub fn clear_cache(&mut self) {
for entry in self.entries.iter_mut() {
entry.amqf.take();
entry.sst.take();
}
}

pub fn prepare_sst_cache(&self) {
for entry in self.entries.iter() {
let _ = entry.sst(self);
let _ = entry.amqf(self);
}
}

Expand All @@ -340,7 +377,7 @@
&self.mmap
}

pub fn deserialize_used_key_hashes_amqf(&self) -> Result<Option<qfilter::Filter>> {
pub fn deserialize_used_key_hashes_amqf(&self) -> Result<Option<qfilter::FilterRef<'_>>> {
if self.start_of_used_keys_amqf_data_offset == self.end_of_used_keys_amqf_data_offset {
return Ok(None);
}
Expand Down Expand Up @@ -396,7 +433,7 @@
continue;
}
{
let amqf = entry.amqf(self)?;
let amqf = entry.amqf();
if !amqf.contains_fingerprint(key_hash) {
miss_result = MetaLookupResult::QuickFilterMiss;
continue;
Expand Down Expand Up @@ -468,7 +505,7 @@
}
continue;
}
let amqf = entry.amqf(self)?;
let amqf = entry.amqf();
for (hash, index, result) in &mut cells[start_index..=end_index] {
debug_assert!(
*hash >= entry.min_hash && *hash <= entry.max_hash,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ use std::{

use anyhow::{Context, Result};
use byteorder::{BE, ByteOrder, WriteBytesExt};
use turbo_bincode::{TurboBincodeBuffer, turbo_bincode_encode};

use crate::{
compression::compress_into_buffer,
constants::{MAX_INLINE_VALUE_SIZE, MIN_SMALL_VALUE_BLOCK_SIZE},
meta_file::{AmqfBincodeWrapper, MetaEntryFlags},
meta_file::MetaEntryFlags,
static_sorted_file::{
BLOCK_TYPE_INDEX, BLOCK_TYPE_KEY_NO_HASH, BLOCK_TYPE_KEY_WITH_HASH,
KEY_BLOCK_ENTRY_TYPE_BLOB, KEY_BLOCK_ENTRY_TYPE_DELETED, KEY_BLOCK_ENTRY_TYPE_INLINE_MIN,
Expand Down Expand Up @@ -158,7 +157,7 @@ pub fn write_static_stored_file<E: Entry>(
let meta = StaticSortedFileBuilderMeta {
min_hash,
max_hash,
amqf: Cow::Owned(amqf.into_vec()),
amqf: Cow::Owned(amqf),
key_compression_dictionary_length: key_dict.len().try_into().unwrap(),
block_count,
size: file.stream_position()?,
Expand Down Expand Up @@ -460,7 +459,7 @@ fn write_key_blocks_and_compute_amqf(
key_compression_dictionary: &[u8],
writer: &mut BlockWriter<'_>,
buffer: &mut Vec<u8>,
) -> Result<TurboBincodeBuffer> {
) -> Result<Vec<u8>> {
let mut filter = qfilter::Filter::new(entries.len() as u64, AMQF_FALSE_POSITIVE_RATE)
// This won't fail as we limit the number of entries per SST file
.expect("Filter can't be constructed");
Expand Down Expand Up @@ -578,7 +577,7 @@ fn write_key_blocks_and_compute_amqf(
writer.write_index_block(buffer)?;
buffer.clear();

Ok(turbo_bincode_encode(&AmqfBincodeWrapper(filter)).expect("AMQF serialization failed"))
Ok(pot::to_vec(&filter).expect("AMQF serialization failed"))
}

/// Builder for a single key block
Expand Down
Loading