Skip to content

Commit b9fca42

Browse files
committed
add a startup cache which caches the first 100MB of accessed data
1 parent 669bb87 commit b9fca42

File tree

3 files changed

+358
-3
lines changed

3 files changed

+358
-3
lines changed

turbopack/crates/turbo-tasks-backend/src/database/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ pub mod key_value_database;
44
pub mod lmdb;
55
pub mod noop_kv;
66
pub mod read_transaction_cache;
7+
mod startup_cache;
78

89
pub use db_versioning::handle_db_versioning;
910
pub use fresh_db_optimization::{is_fresh, FreshDbOptimization};
1011
#[allow(unused_imports)]
1112
pub use noop_kv::NoopKvDb;
1213
pub use read_transaction_cache::ReadTransactionCache;
14+
pub use startup_cache::StartupCacheLayer;
Lines changed: 351 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,351 @@
1+
use std::{
2+
borrow::{Borrow, Cow},
3+
fs::{self, File},
4+
hash::BuildHasherDefault,
5+
io::{BufWriter, Read, Write},
6+
mem::transmute,
7+
path::PathBuf,
8+
sync::atomic::{AtomicUsize, Ordering},
9+
};
10+
11+
use anyhow::{Ok, Result};
12+
use byteorder::WriteBytesExt;
13+
use dashmap::DashMap;
14+
use rustc_hash::{FxHashMap, FxHasher};
15+
16+
use crate::database::key_value_database::{KeySpace, KeyValueDatabase, WriteBatch};
17+
18+
const CACHE_SIZE_LIMIT: usize = 100 * 1024 * 1024;
19+
const PAIR_HEADER_SIZE: usize = 9;
20+
21+
pub struct ByKeySpace<T> {
22+
infra: T,
23+
task_meta: T,
24+
task_data: T,
25+
forward_task_cache: T,
26+
reverse_task_cache: T,
27+
}
28+
29+
impl<T> ByKeySpace<T> {
30+
pub fn new(mut factory: impl FnMut(KeySpace) -> T) -> Self {
31+
Self {
32+
infra: factory(KeySpace::Infra),
33+
task_meta: factory(KeySpace::TaskMeta),
34+
task_data: factory(KeySpace::TaskData),
35+
forward_task_cache: factory(KeySpace::ForwardTaskCache),
36+
reverse_task_cache: factory(KeySpace::ReverseTaskCache),
37+
}
38+
}
39+
40+
pub fn get(&self, key_space: KeySpace) -> &T {
41+
match key_space {
42+
KeySpace::Infra => &self.infra,
43+
KeySpace::TaskMeta => &self.task_meta,
44+
KeySpace::TaskData => &self.task_data,
45+
KeySpace::ForwardTaskCache => &self.forward_task_cache,
46+
KeySpace::ReverseTaskCache => &self.reverse_task_cache,
47+
}
48+
}
49+
50+
pub fn get_mut(&mut self, key_space: KeySpace) -> &mut T {
51+
match key_space {
52+
KeySpace::Infra => &mut self.infra,
53+
KeySpace::TaskMeta => &mut self.task_meta,
54+
KeySpace::TaskData => &mut self.task_data,
55+
KeySpace::ForwardTaskCache => &mut self.forward_task_cache,
56+
KeySpace::ReverseTaskCache => &mut self.reverse_task_cache,
57+
}
58+
}
59+
60+
pub fn iter(&self) -> impl Iterator<Item = (KeySpace, &T)> {
61+
[
62+
(KeySpace::Infra, &self.infra),
63+
(KeySpace::TaskMeta, &self.task_meta),
64+
(KeySpace::TaskData, &self.task_data),
65+
(KeySpace::ForwardTaskCache, &self.forward_task_cache),
66+
(KeySpace::ReverseTaskCache, &self.reverse_task_cache),
67+
]
68+
.into_iter()
69+
}
70+
}
71+
72+
pub enum ValueBuffer<'l, T: KeyValueDatabase>
73+
where
74+
T: 'l,
75+
{
76+
Database(T::ValueBuffer<'l>),
77+
Cached(&'l [u8]),
78+
}
79+
80+
impl<T: KeyValueDatabase> Borrow<[u8]> for ValueBuffer<'_, T> {
81+
fn borrow(&self) -> &[u8] {
82+
match self {
83+
ValueBuffer::Database(value) => value.borrow(),
84+
ValueBuffer::Cached(value) => value,
85+
}
86+
}
87+
}
88+
89+
pub struct StartupCacheLayer<T: KeyValueDatabase> {
90+
database: T,
91+
path: PathBuf,
92+
fresh_db: bool,
93+
cache_size: AtomicUsize,
94+
cache: ByKeySpace<DashMap<Vec<u8>, Option<Vec<u8>>, BuildHasherDefault<FxHasher>>>,
95+
restored_map: ByKeySpace<FxHashMap<&'static [u8], &'static [u8]>>,
96+
restored: Vec<u8>,
97+
}
98+
99+
impl<T: KeyValueDatabase> StartupCacheLayer<T> {
100+
pub fn new(database: T, path: PathBuf, fresh_db: bool) -> Result<Self> {
101+
let mut restored = Vec::new();
102+
let mut restored_map = ByKeySpace::new(|_| FxHashMap::default());
103+
if !fresh_db {
104+
if let Result::Ok(mut cache_file) = File::open(&path) {
105+
cache_file.read_to_end(&mut restored)?;
106+
drop(cache_file);
107+
let mut pos = 0;
108+
while pos < restored.len() {
109+
let (key_space, key, value) = read_key_value_pair(&restored, &mut pos)?;
110+
let map = restored_map.get_mut(key_space);
111+
unsafe {
112+
// Safety: This is a self reference, it's valid as long the `restored`
113+
// buffer is alive
114+
map.insert(
115+
transmute::<&'_ [u8], &'static [u8]>(key),
116+
transmute::<&'_ [u8], &'static [u8]>(value),
117+
);
118+
}
119+
}
120+
}
121+
}
122+
Ok(Self {
123+
database,
124+
path,
125+
fresh_db,
126+
cache_size: AtomicUsize::new(0),
127+
cache: ByKeySpace::new(|key_space| {
128+
DashMap::with_capacity_and_hasher(
129+
match key_space {
130+
KeySpace::Infra => 8,
131+
KeySpace::TaskMeta => 1024 * 1024,
132+
KeySpace::TaskData => 1024 * 1024,
133+
KeySpace::ForwardTaskCache => 1024 * 1024,
134+
KeySpace::ReverseTaskCache => 1024 * 1024,
135+
},
136+
Default::default(),
137+
)
138+
}),
139+
restored,
140+
restored_map,
141+
})
142+
}
143+
}
144+
145+
impl<T: KeyValueDatabase> KeyValueDatabase for StartupCacheLayer<T> {
146+
type ReadTransaction<'l>
147+
= T::ReadTransaction<'l>
148+
where
149+
Self: 'l;
150+
151+
fn lower_read_transaction<'l: 'i + 'r, 'i: 'r, 'r>(
152+
tx: &'r Self::ReadTransaction<'l>,
153+
) -> &'r Self::ReadTransaction<'i> {
154+
T::lower_read_transaction(tx)
155+
}
156+
157+
fn begin_read_transaction<'l>(&'l self) -> Result<Self::ReadTransaction<'l>> {
158+
self.database.begin_read_transaction()
159+
}
160+
161+
type ValueBuffer<'l>
162+
= ValueBuffer<'l, T>
163+
where
164+
Self: 'l;
165+
166+
fn get<'l, 'db: 'l>(
167+
&'l self,
168+
transaction: &'l Self::ReadTransaction<'db>,
169+
key_space: KeySpace,
170+
key: &[u8],
171+
) -> Result<Option<Self::ValueBuffer<'l>>> {
172+
if self.fresh_db {
173+
return Ok(self
174+
.database
175+
.get(transaction, key_space, key)?
176+
.map(ValueBuffer::Database));
177+
}
178+
let value = {
179+
if let Some(value) = self.restored_map.get(key_space).get(key) {
180+
Some(ValueBuffer::Cached(*value))
181+
} else {
182+
self.database
183+
.get(transaction, key_space, key)?
184+
.map(ValueBuffer::Database)
185+
}
186+
};
187+
if let Some(value) = value.as_ref() {
188+
let value: &[u8] = value.borrow();
189+
let size = self.cache_size.fetch_add(
190+
key.len() + value.len() + PAIR_HEADER_SIZE,
191+
Ordering::Relaxed,
192+
);
193+
if size < CACHE_SIZE_LIMIT {
194+
self.cache
195+
.get(key_space)
196+
.entry(key.to_vec())
197+
.or_insert_with(|| Some(value.to_vec()));
198+
}
199+
}
200+
Ok(value)
201+
}
202+
203+
type WriteBatch<'l>
204+
= StartupCacheWriteBatch<'l, T>
205+
where
206+
Self: 'l;
207+
208+
fn write_batch(&self) -> Result<Self::WriteBatch<'_>> {
209+
Ok(StartupCacheWriteBatch {
210+
batch: self.database.write_batch()?,
211+
this: self,
212+
})
213+
}
214+
}
215+
216+
pub struct StartupCacheWriteBatch<'a, T: KeyValueDatabase> {
217+
batch: T::WriteBatch<'a>,
218+
this: &'a StartupCacheLayer<T>,
219+
}
220+
221+
impl<'a, T: KeyValueDatabase> WriteBatch<'a> for StartupCacheWriteBatch<'a, T> {
222+
fn put(&mut self, key_space: KeySpace, key: Cow<[u8]>, value: Cow<[u8]>) -> Result<()> {
223+
if !self.this.fresh_db {
224+
let cache = self.this.cache.get(key_space);
225+
cache.insert(key.to_vec(), Some(value.to_vec()));
226+
}
227+
self.batch.put(key_space, key, value)
228+
}
229+
230+
type ValueBuffer<'l>
231+
= <T::WriteBatch<'a> as WriteBatch<'a>>::ValueBuffer<'l>
232+
where
233+
Self: 'l,
234+
'a: 'l;
235+
236+
fn get<'l>(&'l self, key_space: KeySpace, key: &[u8]) -> Result<Option<Self::ValueBuffer<'l>>>
237+
where
238+
'a: 'l,
239+
{
240+
self.batch.get(key_space, key)
241+
}
242+
243+
fn delete(&mut self, key_space: KeySpace, key: Cow<[u8]>) -> Result<()> {
244+
if !self.this.fresh_db {
245+
let cache = self.this.cache.get(key_space);
246+
cache.insert(key.to_vec(), None);
247+
}
248+
self.batch.delete(key_space, key)
249+
}
250+
251+
fn commit(self) -> Result<()> {
252+
if !self.this.fresh_db {
253+
// Remove file before writing the new snapshot to database to avoid inconsistency
254+
let _ = fs::remove_file(&self.this.path);
255+
}
256+
self.batch.commit()?;
257+
if !self.this.fresh_db {
258+
// write cache to a temp file to avoid corrupted file
259+
let temp_path = self.this.path.with_extension("cache.tmp");
260+
let mut writer = BufWriter::new(File::create(&temp_path)?);
261+
let mut size_buffer = [0u8; 4];
262+
let mut pos = 0;
263+
for (key_space, cache) in self.this.cache.iter() {
264+
for entry in cache.iter() {
265+
if let (key, Some(value)) = entry.pair() {
266+
pos += write_key_value_pair(
267+
&mut writer,
268+
key_space,
269+
key,
270+
value,
271+
&mut size_buffer,
272+
)?;
273+
}
274+
}
275+
}
276+
for (key_space, map) in self.this.restored_map.iter() {
277+
let cache = self.this.cache.get(key_space);
278+
for (key, value) in map.iter() {
279+
if !cache.contains_key(*key) {
280+
let size = key.len() + value.len() + PAIR_HEADER_SIZE;
281+
if pos + size < CACHE_SIZE_LIMIT {
282+
pos += write_key_value_pair(
283+
&mut writer,
284+
key_space,
285+
*key,
286+
*value,
287+
&mut size_buffer,
288+
)?;
289+
if pos + 24 >= CACHE_SIZE_LIMIT {
290+
break;
291+
}
292+
}
293+
}
294+
}
295+
}
296+
297+
// move temp file to the final location
298+
fs::rename(temp_path, &self.this.path)?;
299+
}
300+
Ok(())
301+
}
302+
}
303+
304+
fn write_key_value_pair(
305+
writer: &mut BufWriter<File>,
306+
key_space: KeySpace,
307+
key: &[u8],
308+
value: &[u8],
309+
size_buffer: &mut [u8; 4],
310+
) -> Result<usize> {
311+
writer.write_u8(match key_space {
312+
KeySpace::Infra => 0,
313+
KeySpace::TaskMeta => 1,
314+
KeySpace::TaskData => 2,
315+
KeySpace::ForwardTaskCache => 3,
316+
KeySpace::ReverseTaskCache => 4,
317+
})?;
318+
let key_len = key.len();
319+
size_buffer.copy_from_slice(&(key_len as u32).to_be_bytes());
320+
writer.write_all(&*size_buffer)?;
321+
let value_len = value.len();
322+
size_buffer.copy_from_slice(&(value_len as u32).to_be_bytes());
323+
writer.write_all(&*size_buffer)?;
324+
writer.write_all(&key)?;
325+
writer.write_all(&value)?;
326+
Ok(9 + key_len + value_len)
327+
}
328+
329+
fn read_key_value_pair<'l>(
330+
buffer: &'l [u8],
331+
pos: &mut usize,
332+
) -> Result<(KeySpace, &'l [u8], &'l [u8])> {
333+
let key_space = match buffer[*pos] {
334+
0 => KeySpace::Infra,
335+
1 => KeySpace::TaskMeta,
336+
2 => KeySpace::TaskData,
337+
3 => KeySpace::ForwardTaskCache,
338+
4 => KeySpace::ReverseTaskCache,
339+
_ => return Err(anyhow::anyhow!("Invalid key space")),
340+
};
341+
*pos += 1;
342+
let key_len = u32::from_be_bytes(buffer[*pos..*pos + 4].try_into()?);
343+
*pos += 4;
344+
let value_len = u32::from_be_bytes(buffer[*pos..*pos + 4].try_into()?);
345+
*pos += 4;
346+
let key = &buffer[*pos..*pos + key_len as usize];
347+
*pos += key_len as usize;
348+
let value = &buffer[*pos..*pos + value_len as usize];
349+
*pos += value_len as usize;
350+
Ok((key_space, key, value))
351+
}

turbopack/crates/turbo-tasks-backend/src/lib.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,19 @@ use anyhow::Result;
1414
pub use self::{backend::TurboTasksBackend, kv_backing_storage::KeyValueDatabaseBackingStorage};
1515
use crate::database::{
1616
handle_db_versioning, is_fresh, lmdb::LmbdKeyValueDatabase, FreshDbOptimization, NoopKvDb,
17-
ReadTransactionCache,
17+
ReadTransactionCache, StartupCacheLayer,
1818
};
1919

20-
pub type LmdbBackingStorage =
21-
KeyValueDatabaseBackingStorage<ReadTransactionCache<FreshDbOptimization<LmbdKeyValueDatabase>>>;
20+
pub type LmdbBackingStorage = KeyValueDatabaseBackingStorage<
21+
ReadTransactionCache<StartupCacheLayer<FreshDbOptimization<LmbdKeyValueDatabase>>>,
22+
>;
2223

2324
pub fn lmdb_backing_storage(path: &Path) -> Result<LmdbBackingStorage> {
2425
let path = handle_db_versioning(path)?;
2526
let fresh_db = is_fresh(&path);
2627
let database = LmbdKeyValueDatabase::new(&path)?;
2728
let database = FreshDbOptimization::new(database, fresh_db);
29+
let database = StartupCacheLayer::new(database, path.join("startup.cache"), fresh_db)?;
2830
let database = ReadTransactionCache::new(database);
2931
Ok(KeyValueDatabaseBackingStorage::new(database))
3032
}

0 commit comments

Comments
 (0)