Skip to content

Commit c38f4d3

Browse files
committed
cache read transactions
1 parent 5425b62 commit c38f4d3

File tree

3 files changed

+80
-12
lines changed

3 files changed

+80
-12
lines changed

Cargo.lock

Lines changed: 10 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

turbopack/crates/turbo-tasks-backend/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ verify_serialization = []
1818

1919
[dependencies]
2020
anyhow = { workspace = true }
21+
arc-swap = { version = "1.7.1" }
2122
async-trait = { workspace = true }
2223
auto-hash-map = { workspace = true }
2324
byteorder = "1.5.0"
@@ -38,6 +39,7 @@ smallvec = { workspace = true }
3839
tokio = { workspace = true }
3940
tokio-scoped = "0.2.0"
4041
tracing = { workspace = true }
42+
thread_local = { version = "1.1.8" }
4143
turbo-prehash = { workspace = true }
4244
turbo-tasks = { workspace = true }
4345
turbo-tasks-hash = { workspace = true }

turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage.rs

Lines changed: 68 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
mod extended_key;
22

33
use std::{
4+
cell::UnsafeCell,
45
collections::{hash_map::Entry, HashMap},
56
env,
67
error::Error,
@@ -13,10 +14,13 @@ use std::{
1314
};
1415

1516
use anyhow::{anyhow, Context, Result};
17+
use arc_swap::ArcSwap;
1618
use lmdb::{
1719
Database, DatabaseFlags, Environment, EnvironmentFlags, RoTransaction, Transaction, WriteFlags,
1820
};
1921
use rayon::iter::{IntoParallelIterator, ParallelIterator};
22+
use smallvec::SmallVec;
23+
use thread_local::ThreadLocal;
2024
use tracing::Span;
2125
use turbo_tasks::{backend::CachedTaskType, KeyValuePair, SessionId, TaskId};
2226

@@ -56,7 +60,27 @@ fn as_u32<E: Error + Send + Sync + 'static>(result: Result<&[u8], E>) -> Result<
5660
Ok(n)
5761
}
5862

63+
struct ThreadLocalReadTransactionsContainer(UnsafeCell<SmallVec<[RoTransaction<'static>; 4]>>);
64+
65+
impl ThreadLocalReadTransactionsContainer {
66+
unsafe fn pop(&self) -> Option<RoTransaction<'static>> {
67+
let vec = unsafe { &mut *self.0.get() };
68+
vec.pop()
69+
}
70+
71+
unsafe fn push(&self, tx: RoTransaction<'static>) {
72+
let vec = unsafe { &mut *self.0.get() };
73+
vec.push(tx)
74+
}
75+
}
76+
77+
// Safety: It's safe to send RoTransaction between threads, but the types don't allow that.
78+
unsafe impl Send for ThreadLocalReadTransactionsContainer {}
79+
5980
pub struct LmdbBackingStorage {
81+
// Safety: `read_transactions_cache` need to be dropped before `env` since it will end the
82+
// transactions.
83+
read_transactions_cache: ArcSwap<ThreadLocal<ThreadLocalReadTransactionsContainer>>,
6084
env: Environment,
6185
infra_db: Database,
6286
data_db: Database,
@@ -180,6 +204,7 @@ impl LmdbBackingStorage {
180204
forward_task_cache_db,
181205
reverse_task_cache_db,
182206
fresh_db,
207+
read_transactions_cache: ArcSwap::new(Arc::new(ThreadLocal::new())),
183208
})
184209
}
185210

@@ -191,6 +216,33 @@ impl LmdbBackingStorage {
191216
}
192217
}
193218

219+
fn begin_read_transaction(&self) -> Result<RoTransaction<'_>> {
220+
let guard = self.read_transactions_cache.load();
221+
let container = guard
222+
.get_or(|| ThreadLocalReadTransactionsContainer(UnsafeCell::new(Default::default())));
223+
// Safety: Since it's a thread local it's safe to take from the container
224+
Ok(if let Some(tx) = unsafe { container.pop() } {
225+
tx
226+
} else {
227+
self.env.begin_ro_txn()?
228+
})
229+
}
230+
231+
fn end_read_transaction(&self, tx: RoTransaction<'_>) {
232+
let guard = self.read_transactions_cache.load();
233+
let container = guard
234+
.get_or(|| ThreadLocalReadTransactionsContainer(UnsafeCell::new(Default::default())));
235+
// Safety: We cast it to 'static lifetime, but it will be casted back to 'env when
236+
// taken. It's safe since this will not outlive the environment. We need to
237+
// be careful with Drop, but `read_transactions_cache` is before `env` in the
238+
// LmdbBackingStorage struct, so it's fine.
239+
let tx = unsafe { transmute::<RoTransaction<'_>, RoTransaction<'static>>(tx) };
240+
// Safety: It's safe to put it back since it's a thread local
241+
unsafe {
242+
container.push(tx);
243+
}
244+
}
245+
194246
fn to_tx(&self, tx: ReadTransaction) -> ManuallyDrop<RoTransaction<'_>> {
195247
ManuallyDrop::new(unsafe { transmute::<*const (), RoTransaction<'_>>(tx.0) })
196248
}
@@ -208,9 +260,9 @@ impl LmdbBackingStorage {
208260
let tx = self.to_tx(tx);
209261
f(&tx)
210262
} else {
211-
let tx = self.env.begin_ro_txn()?;
263+
let tx = self.begin_read_transaction()?;
212264
let r = f(&tx)?;
213-
tx.commit()?;
265+
self.end_read_transaction(tx);
214266
Ok(r)
215267
}
216268
}
@@ -238,10 +290,11 @@ impl BackingStorage for LmdbBackingStorage {
238290

239291
fn uncompleted_operations(&self) -> Vec<AnyOperation> {
240292
fn get(this: &LmdbBackingStorage) -> Result<Vec<AnyOperation>> {
241-
let tx = this.env.begin_ro_txn()?;
242-
let operations = tx.get(this.infra_db, &IntKey::new(META_KEY_OPERATIONS))?;
243-
let operations = pot::from_slice(operations)?;
244-
Ok(operations)
293+
this.with_tx(None, |tx| {
294+
let operations = tx.get(this.infra_db, &IntKey::new(META_KEY_OPERATIONS))?;
295+
let operations = pot::from_slice(operations)?;
296+
Ok(operations)
297+
})
245298
}
246299
get(self).unwrap_or_default()
247300
}
@@ -416,18 +469,23 @@ impl BackingStorage for LmdbBackingStorage {
416469
tx.commit()
417470
.with_context(|| anyhow!("Unable to commit operations"))?;
418471
}
472+
{
473+
let _span = tracing::trace_span!("swap read transactions").entered();
474+
// This resets the thread local storage for read transactions, read transactions are
475+
// eventually dropped, allowing DB to free up unused storage.
476+
self.read_transactions_cache
477+
.store(Arc::new(ThreadLocal::new()));
478+
}
419479
span.record("db_operation_count", op_count);
420480
Ok(())
421481
}
422482

423483
fn start_read_transaction(&self) -> Option<ReadTransaction> {
424-
Some(Self::from_tx(self.env.begin_ro_txn().ok()?))
484+
Some(Self::from_tx(self.begin_read_transaction().ok()?))
425485
}
426486

427487
unsafe fn end_read_transaction(&self, transaction: ReadTransaction) {
428-
ManuallyDrop::into_inner(self.to_tx(transaction))
429-
.commit()
430-
.unwrap();
488+
self.end_read_transaction(ManuallyDrop::into_inner(Self::to_tx(self, transaction)));
431489
}
432490

433491
unsafe fn forward_lookup_task_cache(

0 commit comments

Comments
 (0)