@@ -89,6 +89,7 @@ use core::future::Future;
8989use core:: mem;
9090use core:: pin:: Pin ;
9191use core:: sync:: atomic:: { AtomicBool , AtomicUsize , Ordering } ;
92+ use core:: task:: { Context , Poll , Waker } ;
9293use core:: time:: Duration ;
9394
9495use bitcoin:: psbt:: Psbt ;
@@ -856,15 +857,80 @@ impl<Signer: sign::ecdsa::EcdsaChannelSigner> Persist<Signer> for TestPersister
856857 }
857858}
858859
860+ type SPSCKVChannelState = Arc < Mutex < ( Option < Result < ( ) , io:: Error > > , Option < Waker > ) > > ;
861+ struct SPSCKVChannel ( SPSCKVChannelState ) ;
862+ impl Future for SPSCKVChannel {
863+ type Output = Result < ( ) , io:: Error > ;
864+ fn poll ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , io:: Error > > {
865+ let mut state = self . 0 . lock ( ) . unwrap ( ) ;
866+ state. 0 . take ( ) . map ( |res| Poll :: Ready ( res) ) . unwrap_or_else ( || {
867+ state. 1 = Some ( cx. waker ( ) . clone ( ) ) ;
868+ Poll :: Pending
869+ } )
870+ }
871+ }
872+
859873pub struct TestStore {
874+ pending_async_writes : Mutex < HashMap < String , Vec < ( usize , SPSCKVChannelState , Vec < u8 > ) > > > ,
860875 persisted_bytes : Mutex < HashMap < String , HashMap < String , Vec < u8 > > > > ,
861876 read_only : bool ,
862877}
863878
864879impl TestStore {
865880 pub fn new ( read_only : bool ) -> Self {
881+ let pending_async_writes = Mutex :: new ( new_hash_map ( ) ) ;
866882 let persisted_bytes = Mutex :: new ( new_hash_map ( ) ) ;
867- Self { persisted_bytes, read_only }
883+ Self { pending_async_writes, persisted_bytes, read_only }
884+ }
885+
886+ pub fn list_pending_async_writes (
887+ & self , primary_namespace : & str , secondary_namespace : & str , key : & str ,
888+ ) -> Vec < usize > {
889+ let key = format ! ( "{primary_namespace}/{secondary_namespace}/{key}" ) ;
890+ let writes_lock = self . pending_async_writes . lock ( ) . unwrap ( ) ;
891+ writes_lock
892+ . get ( & key)
893+ . map ( |v| v. iter ( ) . map ( |( id, _, _) | * id) . collect ( ) )
894+ . unwrap_or ( Vec :: new ( ) )
895+ }
896+
897+ pub fn complete_async_writes_through (
898+ & self , primary_namespace : & str , secondary_namespace : & str , key : & str , write_id : usize ,
899+ ) {
900+ let prefix = format ! ( "{primary_namespace}/{secondary_namespace}" ) ;
901+ let key = format ! ( "{primary_namespace}/{secondary_namespace}/{key}" ) ;
902+
903+ let mut persisted_lock = self . persisted_bytes . lock ( ) . unwrap ( ) ;
904+ let mut writes_lock = self . pending_async_writes . lock ( ) . unwrap ( ) ;
905+
906+ let pending_writes = writes_lock. get_mut ( & key) . unwrap ( ) ;
907+ pending_writes. retain ( |( id, res, data) | {
908+ if * id <= write_id {
909+ let namespace = persisted_lock. entry ( prefix. clone ( ) ) . or_insert ( new_hash_map ( ) ) ;
910+ * namespace. entry ( key. to_string ( ) ) . or_default ( ) = data. clone ( ) ;
911+ let mut future_state = res. lock ( ) . unwrap ( ) ;
912+ future_state. 0 = Some ( Ok ( ( ) ) ) ;
913+ if let Some ( waker) = future_state. 1 . take ( ) {
914+ waker. wake ( ) ;
915+ }
916+ false
917+ } else {
918+ true
919+ }
920+ } ) ;
921+ }
922+
923+ pub fn complete_all_async_writes ( & self ) {
924+ let pending_writes: Vec < String > =
925+ self . pending_async_writes . lock ( ) . unwrap ( ) . keys ( ) . cloned ( ) . collect ( ) ;
926+ for key in pending_writes {
927+ let mut levels = key. split ( "/" ) ;
928+ let primary = levels. next ( ) . unwrap ( ) ;
929+ let secondary = levels. next ( ) . unwrap ( ) ;
930+ let key = levels. next ( ) . unwrap ( ) ;
931+ assert ! ( levels. next( ) . is_none( ) ) ;
932+ self . complete_async_writes_through ( primary, secondary, key, usize:: MAX ) ;
933+ }
868934 }
869935
870936 fn read_internal (
@@ -885,23 +951,6 @@ impl TestStore {
885951 }
886952 }
887953
888- fn write_internal (
889- & self , primary_namespace : & str , secondary_namespace : & str , key : & str , buf : Vec < u8 > ,
890- ) -> io:: Result < ( ) > {
891- if self . read_only {
892- return Err ( io:: Error :: new (
893- io:: ErrorKind :: PermissionDenied ,
894- "Cannot modify read-only store" ,
895- ) ) ;
896- }
897- let mut persisted_lock = self . persisted_bytes . lock ( ) . unwrap ( ) ;
898-
899- let prefixed = format ! ( "{primary_namespace}/{secondary_namespace}" ) ;
900- let outer_e = persisted_lock. entry ( prefixed) . or_insert ( new_hash_map ( ) ) ;
901- outer_e. insert ( key. to_string ( ) , buf) ;
902- Ok ( ( ) )
903- }
904-
905954 fn remove_internal (
906955 & self , primary_namespace : & str , secondary_namespace : & str , key : & str , _lazy : bool ,
907956 ) -> io:: Result < ( ) > {
@@ -913,12 +962,23 @@ impl TestStore {
913962 }
914963
915964 let mut persisted_lock = self . persisted_bytes . lock ( ) . unwrap ( ) ;
965+ let mut async_writes_lock = self . pending_async_writes . lock ( ) . unwrap ( ) ;
916966
917967 let prefixed = format ! ( "{primary_namespace}/{secondary_namespace}" ) ;
918968 if let Some ( outer_ref) = persisted_lock. get_mut ( & prefixed) {
919969 outer_ref. remove ( & key. to_string ( ) ) ;
920970 }
921971
972+ if let Some ( pending_writes) = async_writes_lock. remove ( & format ! ( "{prefixed}/{key}" ) ) {
973+ for ( _, future, _) in pending_writes {
974+ let mut future_lock = future. lock ( ) . unwrap ( ) ;
975+ future_lock. 0 = Some ( Ok ( ( ) ) ) ;
976+ if let Some ( waker) = future_lock. 1 . take ( ) {
977+ waker. wake ( ) ;
978+ }
979+ }
980+ }
981+
922982 Ok ( ( ) )
923983 }
924984
@@ -945,8 +1005,15 @@ impl KVStore for TestStore {
9451005 fn write (
9461006 & self , primary_namespace : & str , secondary_namespace : & str , key : & str , buf : Vec < u8 > ,
9471007 ) -> Pin < Box < dyn Future < Output = Result < ( ) , io:: Error > > + ' static + Send > > {
948- let res = self . write_internal ( & primary_namespace, & secondary_namespace, & key, buf) ;
949- Box :: pin ( async move { res } )
1008+ let path = format ! ( "{primary_namespace}/{secondary_namespace}/{key}" ) ;
1009+ let future = Arc :: new ( Mutex :: new ( ( None , None ) ) ) ;
1010+
1011+ let mut async_writes_lock = self . pending_async_writes . lock ( ) . unwrap ( ) ;
1012+ let pending_writes = async_writes_lock. entry ( path) . or_insert ( Vec :: new ( ) ) ;
1013+ let new_id = pending_writes. last ( ) . map ( |( id, _, _) | id + 1 ) . unwrap_or ( 0 ) ;
1014+ pending_writes. push ( ( new_id, Arc :: clone ( & future) , buf) ) ;
1015+
1016+ Box :: pin ( SPSCKVChannel ( future) )
9501017 }
9511018 fn remove (
9521019 & self , primary_namespace : & str , secondary_namespace : & str , key : & str , lazy : bool ,
@@ -972,7 +1039,30 @@ impl KVStoreSync for TestStore {
9721039 fn write (
9731040 & self , primary_namespace : & str , secondary_namespace : & str , key : & str , buf : Vec < u8 > ,
9741041 ) -> io:: Result < ( ) > {
975- self . write_internal ( primary_namespace, secondary_namespace, key, buf)
1042+ if self . read_only {
1043+ return Err ( io:: Error :: new (
1044+ io:: ErrorKind :: PermissionDenied ,
1045+ "Cannot modify read-only store" ,
1046+ ) ) ;
1047+ }
1048+ let mut persisted_lock = self . persisted_bytes . lock ( ) . unwrap ( ) ;
1049+ let mut async_writes_lock = self . pending_async_writes . lock ( ) . unwrap ( ) ;
1050+
1051+ let prefixed = format ! ( "{primary_namespace}/{secondary_namespace}" ) ;
1052+ let async_writes_pending = async_writes_lock. remove ( & format ! ( "{prefixed}/{key}" ) ) ;
1053+ let outer_e = persisted_lock. entry ( prefixed) . or_insert ( new_hash_map ( ) ) ;
1054+ outer_e. insert ( key. to_string ( ) , buf) ;
1055+
1056+ if let Some ( pending_writes) = async_writes_pending {
1057+ for ( _, future, _) in pending_writes {
1058+ let mut future_lock = future. lock ( ) . unwrap ( ) ;
1059+ future_lock. 0 = Some ( Ok ( ( ) ) ) ;
1060+ if let Some ( waker) = future_lock. 1 . take ( ) {
1061+ waker. wake ( ) ;
1062+ }
1063+ }
1064+ }
1065+ Ok ( ( ) )
9761066 }
9771067
9781068 fn remove (
0 commit comments