@@ -15,7 +15,6 @@ use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
1515use std:: sync:: { Arc , Mutex } ;
1616use std:: time:: Duration ;
1717
18- use bdk_chain:: Merge ;
1918use bitcoin:: hashes:: { sha256, Hash , HashEngine , Hmac , HmacEngine } ;
2019use lightning:: impl_writeable_tlv_based_enum;
2120use lightning:: io:: { self , Error , ErrorKind } ;
@@ -244,11 +243,15 @@ impl KVStoreSync for VssStore {
244243 primary_namespace,
245244 secondary_namespace,
246245 key,
247- lazy,
248246 )
249247 . await
250248 } ;
251- tokio:: task:: block_in_place ( move || internal_runtime. block_on ( fut) )
249+ if lazy {
250+ internal_runtime. spawn ( async { fut. await } ) ;
251+ Ok ( ( ) )
252+ } else {
253+ tokio:: task:: block_in_place ( move || internal_runtime. block_on ( fut) )
254+ }
252255 }
253256
254257 fn list ( & self , primary_namespace : & str , secondary_namespace : & str ) -> io:: Result < Vec < String > > {
@@ -316,7 +319,7 @@ impl KVStore for VssStore {
316319 let secondary_namespace = secondary_namespace. to_string ( ) ;
317320 let key = key. to_string ( ) ;
318321 let inner = Arc :: clone ( & self . inner ) ;
319- Box :: pin ( async move {
322+ let fut = async move {
320323 inner
321324 . remove_internal (
322325 & inner. async_client ,
@@ -326,10 +329,15 @@ impl KVStore for VssStore {
326329 primary_namespace,
327330 secondary_namespace,
328331 key,
329- lazy,
330332 )
331333 . await
332- } )
334+ } ;
335+ if lazy {
336+ tokio:: task:: spawn ( async { fut. await } ) ;
337+ Box :: pin ( async { Ok ( ( ) ) } )
338+ } else {
339+ Box :: pin ( async { fut. await } )
340+ }
333341 }
334342 fn list (
335343 & self , primary_namespace : & str , secondary_namespace : & str ,
@@ -362,7 +370,6 @@ struct VssStoreInner {
362370 // Per-key locks that ensures that we don't have concurrent writes to the same namespace/key.
363371 // The lock also encapsulates the latest written version per key.
364372 locks : Mutex < HashMap < String , Arc < tokio:: sync:: Mutex < u64 > > > > ,
365- pending_lazy_deletes : Mutex < Vec < KeyValue > > ,
366373}
367374
368375impl VssStoreInner {
@@ -372,7 +379,6 @@ impl VssStoreInner {
372379 data_encryption_key : [ u8 ; 32 ] , key_obfuscator : KeyObfuscator ,
373380 ) -> Self {
374381 let locks = Mutex :: new ( HashMap :: new ( ) ) ;
375- let pending_lazy_deletes = Mutex :: new ( Vec :: new ( ) ) ;
376382 Self {
377383 schema_version,
378384 blocking_client,
@@ -381,7 +387,6 @@ impl VssStoreInner {
381387 data_encryption_key,
382388 key_obfuscator,
383389 locks,
384- pending_lazy_deletes,
385390 }
386391 }
387392
@@ -520,12 +525,6 @@ impl VssStoreInner {
520525 "write" ,
521526 ) ?;
522527
523- let delete_items = self
524- . pending_lazy_deletes
525- . try_lock ( )
526- . ok ( )
527- . and_then ( |mut guard| guard. take ( ) )
528- . unwrap_or_default ( ) ;
529528 let store_key = self . build_obfuscated_key ( & primary_namespace, & secondary_namespace, & key) ;
530529 let vss_version = -1 ;
531530 let storable_builder = StorableBuilder :: new ( RandEntropySource ) ;
@@ -541,16 +540,11 @@ impl VssStoreInner {
541540 version: vss_version,
542541 value: storable. encode_to_vec( ) ,
543542 } ] ,
544- delete_items : delete_items . clone ( ) ,
543+ delete_items : vec ! [ ] ,
545544 } ;
546545
547546 self . execute_locked_write ( inner_lock_ref, locking_key, version, async move || {
548547 client. put_object ( & request) . await . map_err ( |e| {
549- // Restore delete items so they'll be retried on next write.
550- if !delete_items. is_empty ( ) {
551- self . pending_lazy_deletes . lock ( ) . unwrap ( ) . extend ( delete_items) ;
552- }
553-
554548 let msg = format ! (
555549 "Failed to write to key {}/{}/{}: {}" ,
556550 primary_namespace, secondary_namespace, key, e
@@ -566,7 +560,7 @@ impl VssStoreInner {
566560 async fn remove_internal (
567561 & self , client : & VssClient < CustomRetryPolicy > , inner_lock_ref : Arc < tokio:: sync:: Mutex < u64 > > ,
568562 locking_key : String , version : u64 , primary_namespace : String , secondary_namespace : String ,
569- key : String , lazy : bool ,
563+ key : String ,
570564 ) -> io:: Result < ( ) > {
571565 check_namespace_key_validity (
572566 & primary_namespace,
@@ -579,12 +573,6 @@ impl VssStoreInner {
579573 self . build_obfuscated_key ( & primary_namespace, & secondary_namespace, & key) ;
580574
581575 let key_value = KeyValue { key : obfuscated_key, version : -1 , value : vec ! [ ] } ;
582- if lazy {
583- let mut pending_lazy_deletes = self . pending_lazy_deletes . lock ( ) . unwrap ( ) ;
584- pending_lazy_deletes. push ( key_value) ;
585- return Ok ( ( ) ) ;
586- }
587-
588576 self . execute_locked_write ( inner_lock_ref, locking_key, version, async move || {
589577 let request =
590578 DeleteObjectRequest { store_id : self . store_id . clone ( ) , key_value : Some ( key_value) } ;
@@ -851,85 +839,4 @@ mod tests {
851839 do_read_write_remove_list_persist ( & vss_store) ;
852840 drop ( vss_store)
853841 }
854-
855- #[ tokio:: test( flavor = "multi_thread" , worker_threads = 1 ) ]
856- async fn vss_lazy_delete ( ) {
857- let vss_base_url = std:: env:: var ( "TEST_VSS_BASE_URL" ) . unwrap ( ) ;
858- let mut rng = rng ( ) ;
859- let rand_store_id: String = ( 0 ..7 ) . map ( |_| rng. sample ( Alphanumeric ) as char ) . collect ( ) ;
860- let mut vss_seed = [ 0u8 ; 32 ] ;
861- rng. fill_bytes ( & mut vss_seed) ;
862- let header_provider = Arc :: new ( FixedHeaders :: new ( HashMap :: new ( ) ) ) ;
863- let vss_store =
864- VssStore :: new ( vss_base_url, rand_store_id, vss_seed, header_provider) . unwrap ( ) ;
865-
866- let primary_namespace = "test_namespace" ;
867- let secondary_namespace = "" ;
868- let key_to_delete = "key_to_delete" ;
869- let key_for_trigger = "key_for_trigger" ;
870- let data_to_delete = b"data_to_delete" . to_vec ( ) ;
871- let trigger_data = b"trigger_data" . to_vec ( ) ;
872-
873- // Write the key that we'll later lazily delete
874- KVStore :: write (
875- & vss_store,
876- primary_namespace,
877- secondary_namespace,
878- key_to_delete,
879- data_to_delete. clone ( ) ,
880- )
881- . await
882- . unwrap ( ) ;
883-
884- // Verify the key exists
885- let read_data =
886- KVStore :: read ( & vss_store, primary_namespace, secondary_namespace, key_to_delete)
887- . await
888- . unwrap ( ) ;
889- assert_eq ! ( read_data, data_to_delete) ;
890-
891- // Perform a lazy delete
892- KVStore :: remove ( & vss_store, primary_namespace, secondary_namespace, key_to_delete, true )
893- . await
894- . unwrap ( ) ;
895-
896- // Verify the key still exists (lazy delete doesn't immediately remove it)
897- let read_data =
898- KVStore :: read ( & vss_store, primary_namespace, secondary_namespace, key_to_delete)
899- . await
900- . unwrap ( ) ;
901- assert_eq ! ( read_data, data_to_delete) ;
902-
903- // Verify the key is still in the list
904- let keys = KVStore :: list ( & vss_store, primary_namespace, secondary_namespace) . await . unwrap ( ) ;
905- assert ! ( keys. contains( & key_to_delete. to_string( ) ) ) ;
906-
907- // Trigger the actual deletion by performing a write operation
908- KVStore :: write (
909- & vss_store,
910- primary_namespace,
911- secondary_namespace,
912- key_for_trigger,
913- trigger_data. clone ( ) ,
914- )
915- . await
916- . unwrap ( ) ;
917-
918- // Now verify the key is actually deleted
919- let read_result =
920- KVStore :: read ( & vss_store, primary_namespace, secondary_namespace, key_to_delete) . await ;
921- assert ! ( read_result. is_err( ) ) ;
922- assert_eq ! ( read_result. unwrap_err( ) . kind( ) , ErrorKind :: NotFound ) ;
923-
924- // Verify the key is no longer in the list
925- let keys = KVStore :: list ( & vss_store, primary_namespace, secondary_namespace) . await . unwrap ( ) ;
926- assert ! ( !keys. contains( & key_to_delete. to_string( ) ) ) ;
927-
928- // Verify the trigger key still exists
929- let read_data =
930- KVStore :: read ( & vss_store, primary_namespace, secondary_namespace, key_for_trigger)
931- . await
932- . unwrap ( ) ;
933- assert_eq ! ( read_data, trigger_data) ;
934- }
935842}
0 commit comments