11mod extended_key;
22
33use std:: {
4+ cell:: UnsafeCell ,
45 collections:: { hash_map:: Entry , HashMap } ,
56 env,
67 error:: Error ,
@@ -13,10 +14,13 @@ use std::{
1314} ;
1415
1516use anyhow:: { anyhow, Context , Result } ;
17+ use arc_swap:: ArcSwap ;
1618use lmdb:: {
1719 Database , DatabaseFlags , Environment , EnvironmentFlags , RoTransaction , Transaction , WriteFlags ,
1820} ;
1921use rayon:: iter:: { IntoParallelIterator , ParallelIterator } ;
22+ use smallvec:: SmallVec ;
23+ use thread_local:: ThreadLocal ;
2024use tracing:: Span ;
2125use turbo_tasks:: { backend:: CachedTaskType , KeyValuePair , SessionId , TaskId } ;
2226
@@ -56,7 +60,27 @@ fn as_u32<E: Error + Send + Sync + 'static>(result: Result<&[u8], E>) -> Result<
5660 Ok ( n)
5761}
5862
63+ struct ThreadLocalReadTransactionsContainer ( UnsafeCell < SmallVec < [ RoTransaction < ' static > ; 4 ] > > ) ;
64+
65+ impl ThreadLocalReadTransactionsContainer {
66+ unsafe fn pop ( & self ) -> Option < RoTransaction < ' static > > {
67+ let vec = unsafe { & mut * self . 0 . get ( ) } ;
68+ vec. pop ( )
69+ }
70+
71+ unsafe fn push ( & self , tx : RoTransaction < ' static > ) {
72+ let vec = unsafe { & mut * self . 0 . get ( ) } ;
73+ vec. push ( tx)
74+ }
75+ }
76+
77+ // Safety: It's safe to send RoTransaction between threads, but the types don't allow that.
78+ unsafe impl Send for ThreadLocalReadTransactionsContainer { }
79+
5980pub struct LmdbBackingStorage {
81+ // Safety: `read_transactions_cache` need to be dropped before `env` since it will end the
82+ // transactions.
83+ read_transactions_cache : ArcSwap < ThreadLocal < ThreadLocalReadTransactionsContainer > > ,
6084 env : Environment ,
6185 infra_db : Database ,
6286 data_db : Database ,
@@ -180,6 +204,7 @@ impl LmdbBackingStorage {
180204 forward_task_cache_db,
181205 reverse_task_cache_db,
182206 fresh_db,
207+ read_transactions_cache : ArcSwap :: new ( Arc :: new ( ThreadLocal :: new ( ) ) ) ,
183208 } )
184209 }
185210
@@ -191,6 +216,33 @@ impl LmdbBackingStorage {
191216 }
192217 }
193218
219+ fn begin_read_transaction ( & self ) -> Result < RoTransaction < ' _ > > {
220+ let guard = self . read_transactions_cache . load ( ) ;
221+ let container = guard
222+ . get_or ( || ThreadLocalReadTransactionsContainer ( UnsafeCell :: new ( Default :: default ( ) ) ) ) ;
223+ // Safety: Since it's a thread local it's safe to take from the container
224+ Ok ( if let Some ( tx) = unsafe { container. pop ( ) } {
225+ tx
226+ } else {
227+ self . env . begin_ro_txn ( ) ?
228+ } )
229+ }
230+
231+ fn end_read_transaction ( & self , tx : RoTransaction < ' _ > ) {
232+ let guard = self . read_transactions_cache . load ( ) ;
233+ let container = guard
234+ . get_or ( || ThreadLocalReadTransactionsContainer ( UnsafeCell :: new ( Default :: default ( ) ) ) ) ;
235+ // Safety: We cast it to 'static lifetime, but it will be casted back to 'env when
236+ // taken. It's safe since this will not outlive the environment. We need to
237+ // be careful with Drop, but `read_transactions_cache` is before `env` in the
238+ // LmdbBackingStorage struct, so it's fine.
239+ let tx = unsafe { transmute :: < RoTransaction < ' _ > , RoTransaction < ' static > > ( tx) } ;
240+ // Safety: It's safe to put it back since it's a thread local
241+ unsafe {
242+ container. push ( tx) ;
243+ }
244+ }
245+
194246 fn to_tx ( & self , tx : ReadTransaction ) -> ManuallyDrop < RoTransaction < ' _ > > {
195247 ManuallyDrop :: new ( unsafe { transmute :: < * const ( ) , RoTransaction < ' _ > > ( tx. 0 ) } )
196248 }
@@ -208,9 +260,9 @@ impl LmdbBackingStorage {
208260 let tx = self . to_tx ( tx) ;
209261 f ( & tx)
210262 } else {
211- let tx = self . env . begin_ro_txn ( ) ?;
263+ let tx = self . begin_read_transaction ( ) ?;
212264 let r = f ( & tx) ?;
213- tx . commit ( ) ? ;
265+ self . end_read_transaction ( tx ) ;
214266 Ok ( r)
215267 }
216268 }
@@ -238,10 +290,11 @@ impl BackingStorage for LmdbBackingStorage {
238290
239291 fn uncompleted_operations ( & self ) -> Vec < AnyOperation > {
240292 fn get ( this : & LmdbBackingStorage ) -> Result < Vec < AnyOperation > > {
241- let tx = this. env . begin_ro_txn ( ) ?;
242- let operations = tx. get ( this. infra_db , & IntKey :: new ( META_KEY_OPERATIONS ) ) ?;
243- let operations = pot:: from_slice ( operations) ?;
244- Ok ( operations)
293+ this. with_tx ( None , |tx| {
294+ let operations = tx. get ( this. infra_db , & IntKey :: new ( META_KEY_OPERATIONS ) ) ?;
295+ let operations = pot:: from_slice ( operations) ?;
296+ Ok ( operations)
297+ } )
245298 }
246299 get ( self ) . unwrap_or_default ( )
247300 }
@@ -416,18 +469,23 @@ impl BackingStorage for LmdbBackingStorage {
416469 tx. commit ( )
417470 . with_context ( || anyhow ! ( "Unable to commit operations" ) ) ?;
418471 }
472+ {
473+ let _span = tracing:: trace_span!( "swap read transactions" ) . entered ( ) ;
474+ // This resets the thread local storage for read transactions, read transactions are
475+ // eventually dropped, allowing DB to free up unused storage.
476+ self . read_transactions_cache
477+ . store ( Arc :: new ( ThreadLocal :: new ( ) ) ) ;
478+ }
419479 span. record ( "db_operation_count" , op_count) ;
420480 Ok ( ( ) )
421481 }
422482
423483 fn start_read_transaction ( & self ) -> Option < ReadTransaction > {
424- Some ( Self :: from_tx ( self . env . begin_ro_txn ( ) . ok ( ) ?) )
484+ Some ( Self :: from_tx ( self . begin_read_transaction ( ) . ok ( ) ?) )
425485 }
426486
427487 unsafe fn end_read_transaction ( & self , transaction : ReadTransaction ) {
428- ManuallyDrop :: into_inner ( self . to_tx ( transaction) )
429- . commit ( )
430- . unwrap ( ) ;
488+ self . end_read_transaction ( ManuallyDrop :: into_inner ( Self :: to_tx ( self , transaction) ) ) ;
431489 }
432490
433491 unsafe fn forward_lookup_task_cache (
0 commit comments