Skip to content

Commit 96575bc

Browse files
authored
[Turbopack] lmdb Backing Storage performance improvments (#71250)
### What? Creating transactions is pretty expensive in LMDB so we use a thread local storage to cache them. This cache is reset after we store a new snapshot, to allow LMDB to free unused storage. Also adds an optimization that skips querying the DB when we know it's empty.
1 parent 1dd4695 commit 96575bc

File tree

3 files changed

+112
-14
lines changed

3 files changed

+112
-14
lines changed

Cargo.lock

Lines changed: 10 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

turbopack/crates/turbo-tasks-backend/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ verify_serialization = []
1818

1919
[dependencies]
2020
anyhow = { workspace = true }
21+
arc-swap = { version = "1.7.1" }
2122
async-trait = { workspace = true }
2223
auto-hash-map = { workspace = true }
2324
byteorder = "1.5.0"
@@ -38,6 +39,7 @@ smallvec = { workspace = true }
3839
tokio = { workspace = true }
3940
tokio-scoped = "0.2.0"
4041
tracing = { workspace = true }
42+
thread_local = { version = "1.1.8" }
4143
turbo-prehash = { workspace = true }
4244
turbo-tasks = { workspace = true }
4345
turbo-tasks-hash = { workspace = true }

turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage.rs

Lines changed: 100 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
mod extended_key;
22

33
use std::{
4+
cell::UnsafeCell,
45
collections::{hash_map::Entry, HashMap},
56
env,
67
error::Error,
7-
fs::{create_dir_all, metadata, read_dir, remove_dir_all},
8+
fs::{self, create_dir_all, metadata, read_dir, remove_dir_all},
89
mem::{transmute, ManuallyDrop},
910
path::Path,
1011
sync::Arc,
@@ -13,10 +14,13 @@ use std::{
1314
};
1415

1516
use anyhow::{anyhow, Context, Result};
17+
use arc_swap::ArcSwap;
1618
use lmdb::{
1719
Database, DatabaseFlags, Environment, EnvironmentFlags, RoTransaction, Transaction, WriteFlags,
1820
};
1921
use rayon::iter::{IntoParallelIterator, ParallelIterator};
22+
use smallvec::SmallVec;
23+
use thread_local::ThreadLocal;
2024
use tracing::Span;
2125
use turbo_tasks::{backend::CachedTaskType, KeyValuePair, SessionId, TaskId};
2226

@@ -56,13 +60,41 @@ fn as_u32<E: Error + Send + Sync + 'static>(result: Result<&[u8], E>) -> Result<
5660
Ok(n)
5761
}
5862

63+
struct ThreadLocalReadTransactionsContainer(UnsafeCell<SmallVec<[RoTransaction<'static>; 4]>>);
64+
65+
impl ThreadLocalReadTransactionsContainer {
66+
unsafe fn pop(&self) -> Option<RoTransaction<'static>> {
67+
// Safety: Access only happens via `push` and `pop`, and
68+
// `ThreadLocalReadTransactionsContainer` is not `Sync`, so we can know we can know this
69+
// block is the only one currently reading or writing to the cell.
70+
let vec = unsafe { &mut *self.0.get() };
71+
vec.pop()
72+
}
73+
74+
unsafe fn push(&self, tx: RoTransaction<'static>) {
75+
// Safety: Access only happens via `push` and `pop`, and
76+
// `ThreadLocalReadTransactionsContainer` is not `Sync`, so we can know we can know this
77+
// block is the only one currently reading or writing to the cell.
78+
let vec = unsafe { &mut *self.0.get() };
79+
vec.push(tx)
80+
}
81+
}
82+
83+
// Safety: It's safe to send `RoTransaction` between threads as we construct `Environment` with
84+
// `EnvironmentFlags::NO_TLS`, but the types don't allow that.
85+
unsafe impl Send for ThreadLocalReadTransactionsContainer {}
86+
5987
pub struct LmdbBackingStorage {
88+
// Safety: `read_transactions_cache` need to be dropped before `env` since it will end the
89+
// transactions.
90+
read_transactions_cache: ArcSwap<ThreadLocal<ThreadLocalReadTransactionsContainer>>,
6091
env: Environment,
6192
infra_db: Database,
6293
data_db: Database,
6394
meta_db: Database,
6495
forward_task_cache_db: Database,
6596
reverse_task_cache_db: Database,
97+
fresh_db: bool,
6698
}
6799

68100
impl LmdbBackingStorage {
@@ -144,7 +176,10 @@ impl LmdbBackingStorage {
144176
let _ = remove_dir_all(base_path);
145177
path = base_path.join("temp");
146178
}
147-
create_dir_all(&path).context("Creating database directory failed")?;
179+
let fresh_db = fs::exists(&path).map_or(false, |exists| !exists);
180+
if fresh_db {
181+
create_dir_all(&path).context("Creating database directory failed")?;
182+
}
148183

149184
#[cfg(target_arch = "x86")]
150185
const MAP_SIZE: usize = usize::MAX;
@@ -175,6 +210,8 @@ impl LmdbBackingStorage {
175210
meta_db,
176211
forward_task_cache_db,
177212
reverse_task_cache_db,
213+
fresh_db,
214+
read_transactions_cache: ArcSwap::new(Arc::new(ThreadLocal::new())),
178215
})
179216
}
180217

@@ -186,6 +223,33 @@ impl LmdbBackingStorage {
186223
}
187224
}
188225

226+
fn begin_read_transaction(&self) -> Result<RoTransaction<'_>> {
227+
let guard = self.read_transactions_cache.load();
228+
let container = guard
229+
.get_or(|| ThreadLocalReadTransactionsContainer(UnsafeCell::new(Default::default())));
230+
// Safety: Since it's a thread local it's safe to take from the container
231+
Ok(if let Some(tx) = unsafe { container.pop() } {
232+
tx
233+
} else {
234+
self.env.begin_ro_txn()?
235+
})
236+
}
237+
238+
fn end_read_transaction(&self, tx: RoTransaction<'_>) {
239+
let guard = self.read_transactions_cache.load();
240+
let container = guard
241+
.get_or(|| ThreadLocalReadTransactionsContainer(UnsafeCell::new(Default::default())));
242+
// Safety: We cast it to 'static lifetime, but it will be casted back to 'env when
243+
// taken. It's safe since this will not outlive the environment. We need to
244+
// be careful with Drop, but `read_transactions_cache` is before `env` in the
245+
// LmdbBackingStorage struct, so it's fine.
246+
let tx = unsafe { transmute::<RoTransaction<'_>, RoTransaction<'static>>(tx) };
247+
// Safety: It's safe to put it back since it's a thread local
248+
unsafe {
249+
container.push(tx);
250+
}
251+
}
252+
189253
fn to_tx(&self, tx: ReadTransaction) -> ManuallyDrop<RoTransaction<'_>> {
190254
ManuallyDrop::new(unsafe { transmute::<*const (), RoTransaction<'_>>(tx.0) })
191255
}
@@ -203,9 +267,9 @@ impl LmdbBackingStorage {
203267
let tx = self.to_tx(tx);
204268
f(&tx)
205269
} else {
206-
let tx = self.env.begin_ro_txn()?;
270+
let tx = self.begin_read_transaction()?;
207271
let r = f(&tx)?;
208-
tx.commit()?;
272+
self.end_read_transaction(tx);
209273
Ok(r)
210274
}
211275
}
@@ -233,10 +297,11 @@ impl BackingStorage for LmdbBackingStorage {
233297

234298
fn uncompleted_operations(&self) -> Vec<AnyOperation> {
235299
fn get(this: &LmdbBackingStorage) -> Result<Vec<AnyOperation>> {
236-
let tx = this.env.begin_ro_txn()?;
237-
let operations = tx.get(this.infra_db, &IntKey::new(META_KEY_OPERATIONS))?;
238-
let operations = pot::from_slice(operations)?;
239-
Ok(operations)
300+
this.with_tx(None, |tx| {
301+
let operations = tx.get(this.infra_db, &IntKey::new(META_KEY_OPERATIONS))?;
302+
let operations = pot::from_slice(operations)?;
303+
Ok(operations)
304+
})
240305
}
241306
get(self).unwrap_or_default()
242307
}
@@ -411,25 +476,36 @@ impl BackingStorage for LmdbBackingStorage {
411476
tx.commit()
412477
.with_context(|| anyhow!("Unable to commit operations"))?;
413478
}
479+
{
480+
let _span = tracing::trace_span!("swap read transactions").entered();
481+
// This resets the thread local storage for read transactions, read transactions are
482+
// eventually dropped, allowing DB to free up unused storage.
483+
self.read_transactions_cache
484+
.store(Arc::new(ThreadLocal::new()));
485+
}
414486
span.record("db_operation_count", op_count);
415487
Ok(())
416488
}
417489

418490
fn start_read_transaction(&self) -> Option<ReadTransaction> {
419-
Some(Self::from_tx(self.env.begin_ro_txn().ok()?))
491+
Some(Self::from_tx(self.begin_read_transaction().ok()?))
420492
}
421493

422494
unsafe fn end_read_transaction(&self, transaction: ReadTransaction) {
423-
ManuallyDrop::into_inner(self.to_tx(transaction))
424-
.commit()
425-
.unwrap();
495+
self.end_read_transaction(ManuallyDrop::into_inner(Self::to_tx(self, transaction)));
426496
}
427497

428498
unsafe fn forward_lookup_task_cache(
429499
&self,
430500
tx: Option<ReadTransaction>,
431501
task_type: &CachedTaskType,
432502
) -> Option<TaskId> {
503+
// Performance optimization when the database was empty
504+
// It's assumed that no cache entries are removed from the memory cache, but we might change
505+
// that in future.
506+
if self.fresh_db {
507+
return None;
508+
}
433509
fn lookup(
434510
this: &LmdbBackingStorage,
435511
tx: &RoTransaction<'_>,
@@ -462,6 +538,12 @@ impl BackingStorage for LmdbBackingStorage {
462538
tx: Option<ReadTransaction>,
463539
task_id: TaskId,
464540
) -> Option<Arc<CachedTaskType>> {
541+
// Performance optimization when the database was empty
542+
// It's assumed that no cache entries are removed from the memory cache, but we might change
543+
// that in future.
544+
if self.fresh_db {
545+
return None;
546+
}
465547
fn lookup(
466548
this: &LmdbBackingStorage,
467549
tx: &RoTransaction<'_>,
@@ -492,6 +574,12 @@ impl BackingStorage for LmdbBackingStorage {
492574
task_id: TaskId,
493575
category: TaskDataCategory,
494576
) -> Vec<CachedDataItem> {
577+
// Performance optimization when the database was empty
578+
// It's assumed that no cache entries are removed from the memory cache, but we might change
579+
// that in future.
580+
if self.fresh_db {
581+
return Vec::new();
582+
}
495583
fn lookup(
496584
this: &LmdbBackingStorage,
497585
tx: &RoTransaction<'_>,

0 commit comments

Comments
 (0)