@@ -17,6 +17,7 @@ use bitcoin::hashes::hex::FromHex;
1717use bitcoin:: { BlockHash , Txid } ;
1818
1919use core:: future:: Future ;
20+ use core:: mem;
2021use core:: ops:: Deref ;
2122use core:: pin:: Pin ;
2223use core:: str:: FromStr ;
@@ -32,7 +33,8 @@ use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate};
3233use crate :: chain:: transaction:: OutPoint ;
3334use crate :: ln:: types:: ChannelId ;
3435use crate :: sign:: { ecdsa:: EcdsaChannelSigner , EntropySource , SignerProvider } ;
35- use crate :: util:: async_poll:: dummy_waker;
36+ use crate :: sync:: Mutex ;
37+ use crate :: util:: async_poll:: { dummy_waker, MaybeSend , MaybeSync } ;
3638use crate :: util:: logger:: Logger ;
3739use crate :: util:: ser:: { Readable , ReadableArgs , Writeable } ;
3840
@@ -409,6 +411,21 @@ where
409411 Ok ( res)
410412}
411413
414+ /// A generic trait which is able to spawn futures in the background.
415+ pub trait FutureSpawner : Send + Sync + ' static {
416+ /// Spawns the given future as a background task.
417+ ///
418+ /// This method MUST NOT block on the given future immediately.
419+ fn spawn < T : Future < Output = ( ) > + MaybeSend + ' static > ( & self , future : T ) ;
420+ }
421+
422+ struct PanicingSpawner ;
423+ impl FutureSpawner for PanicingSpawner {
424+ fn spawn < T : Future < Output = ( ) > + MaybeSend + ' static > ( & self , _: T ) {
425+ unreachable ! ( ) ;
426+ }
427+ }
428+
412429fn poll_sync_future < F : Future > ( future : F ) -> F :: Output {
413430 let mut waker = dummy_waker ( ) ;
414431 let mut ctx = task:: Context :: from_waker ( & mut waker) ;
@@ -507,7 +524,7 @@ fn poll_sync_future<F: Future>(future: F) -> F::Output {
507524/// would like to get rid of them, consider using the
508525/// [`MonitorUpdatingPersister::cleanup_stale_updates`] function.
509526pub struct MonitorUpdatingPersister < K : Deref , L : Deref , ES : Deref , SP : Deref , BI : Deref , FE : Deref >
510- ( MonitorUpdatingPersisterAsync < KVStoreSyncWrapper < K > , L , ES , SP , BI , FE > )
527+ ( MonitorUpdatingPersisterAsync < KVStoreSyncWrapper < K > , PanicingSpawner , L , ES , SP , BI , FE > )
511528where
512529 K :: Target : KVStoreSync ,
513530 L :: Target : Logger ,
@@ -552,6 +569,7 @@ where
552569 ) -> Self {
553570 MonitorUpdatingPersister ( MonitorUpdatingPersisterAsync :: new (
554571 KVStoreSyncWrapper ( kv_store) ,
572+ PanicingSpawner ,
555573 logger,
556574 maximum_pending_updates,
557575 entropy_source,
@@ -664,7 +682,8 @@ where
664682 & self , monitor_name : MonitorName , update : Option < & ChannelMonitorUpdate > ,
665683 monitor : & ChannelMonitor < ChannelSigner > ,
666684 ) -> chain:: ChannelMonitorUpdateStatus {
667- let res = poll_sync_future ( self . 0 . 0 . update_persisted_channel ( monitor_name, update, monitor) ) ;
685+ let inner = Arc :: clone ( & self . 0 . 0 ) ;
686+ let res = poll_sync_future ( inner. update_persisted_channel ( monitor_name, update, monitor) ) ;
668687 match res {
669688 Ok ( ( ) ) => chain:: ChannelMonitorUpdateStatus :: Completed ,
670689 Err ( e) => {
@@ -689,8 +708,11 @@ where
689708/// async versions of the public accessors.
690709///
691710/// Note that async monitor updating is considered beta, and bugs may be triggered by its use.
692- pub struct MonitorUpdatingPersisterAsync < K : Deref , L : Deref , ES : Deref , SP : Deref , BI : Deref , FE : Deref >
693- ( Arc < MonitorUpdatingPersisterAsyncInner < K , L , ES , SP , BI , FE > > )
711+ ///
712+ /// Unlike [`MonitorUpdatingPersister`], this does not implement [`Persist`], but is instead used
713+ /// directly by the [`ChainMonitor`].
714+ pub struct MonitorUpdatingPersisterAsync < K : Deref , S : FutureSpawner , L : Deref , ES : Deref , SP : Deref , BI : Deref , FE : Deref >
715+ ( Arc < MonitorUpdatingPersisterAsyncInner < K , S , L , ES , SP , BI , FE > > )
694716where
695717 K :: Target : KVStore ,
696718 L :: Target : Logger ,
@@ -699,7 +721,7 @@ where
699721 BI :: Target : BroadcasterInterface ,
700722 FE :: Target : FeeEstimator ;
701723
702- struct MonitorUpdatingPersisterAsyncInner < K : Deref , L : Deref , ES : Deref , SP : Deref , BI : Deref , FE : Deref >
724+ struct MonitorUpdatingPersisterAsyncInner < K : Deref , S : FutureSpawner , L : Deref , ES : Deref , SP : Deref , BI : Deref , FE : Deref >
703725where
704726 K :: Target : KVStore ,
705727 L :: Target : Logger ,
@@ -709,6 +731,7 @@ where
709731 FE :: Target : FeeEstimator ,
710732{
711733 kv_store : K ,
734+ future_spawner : S ,
712735 logger : L ,
713736 maximum_pending_updates : u64 ,
714737 entropy_source : ES ,
@@ -717,8 +740,8 @@ where
717740 fee_estimator : FE ,
718741}
719742
720- impl < K : Deref , L : Deref , ES : Deref , SP : Deref , BI : Deref , FE : Deref >
721- MonitorUpdatingPersisterAsync < K , L , ES , SP , BI , FE >
743+ impl < K : Deref , S : FutureSpawner , L : Deref , ES : Deref , SP : Deref , BI : Deref , FE : Deref >
744+ MonitorUpdatingPersisterAsync < K , S , L , ES , SP , BI , FE >
722745where
723746 K :: Target : KVStore ,
724747 L :: Target : Logger ,
@@ -731,11 +754,12 @@ where
731754 ///
732755 /// See [`MonitorUpdatingPersister::new`] for more info.
733756 pub fn new (
734- kv_store : K , logger : L , maximum_pending_updates : u64 , entropy_source : ES ,
735- signer_provider : SP , broadcaster : BI , fee_estimator : FE ,
757+ kv_store : K , future_spawner : S , logger : L , maximum_pending_updates : u64 ,
758+ entropy_source : ES , signer_provider : SP , broadcaster : BI , fee_estimator : FE ,
736759 ) -> Self {
737760 MonitorUpdatingPersisterAsync ( Arc :: new ( MonitorUpdatingPersisterAsyncInner {
738761 kv_store,
762+ future_spawner,
739763 logger,
740764 maximum_pending_updates,
741765 entropy_source,
@@ -805,9 +829,70 @@ where
805829 }
806830}
807831
832+ impl < K : Deref + MaybeSend + MaybeSync + ' static , S : FutureSpawner , L : Deref + MaybeSend + MaybeSync + ' static , ES : Deref + MaybeSend + MaybeSync + ' static , SP : Deref + MaybeSend + MaybeSync + ' static , BI : Deref + MaybeSend + MaybeSync + ' static , FE : Deref + MaybeSend + MaybeSync + ' static >
833+ MonitorUpdatingPersisterAsync < K , S , L , ES , SP , BI , FE >
834+ where
835+ K :: Target : KVStore + MaybeSync ,
836+ L :: Target : Logger ,
837+ ES :: Target : EntropySource + Sized ,
838+ SP :: Target : SignerProvider + Sized ,
839+ BI :: Target : BroadcasterInterface ,
840+ FE :: Target : FeeEstimator ,
841+ <SP :: Target as SignerProvider >:: EcdsaSigner : MaybeSend + ' static ,
842+ {
843+ pub ( crate ) fn spawn_async_persist_new_channel (
844+ & self , monitor_name : MonitorName , monitor : & ChannelMonitor < <SP :: Target as SignerProvider >:: EcdsaSigner > ,
845+ ) {
846+ let inner = Arc :: clone ( & self . 0 ) ;
847+ let future = inner. persist_new_channel ( monitor_name, monitor) ;
848+ let channel_id = monitor. channel_id ( ) ;
849+ self . 0 . future_spawner . spawn ( async move {
850+ match future. await {
851+ Ok ( ( ) ) => { } , // TODO: expose completions
852+ Err ( e) => {
853+ log_error ! (
854+ inner. logger,
855+ "Failed to persist new ChannelMonitor {channel_id}: {e}. The node will now likely stall as this channel will not be able to make progress. You should restart as soon as possible." ,
856+ ) ;
857+ } ,
858+ }
859+ } ) ;
860+ }
808861
809- impl < K : Deref , L : Deref , ES : Deref , SP : Deref , BI : Deref , FE : Deref >
810- MonitorUpdatingPersisterAsyncInner < K , L , ES , SP , BI , FE >
862+ pub ( crate ) fn spawn_async_update_persisted_channel (
863+ & self , monitor_name : MonitorName , update : Option < & ChannelMonitorUpdate > ,
864+ monitor : & ChannelMonitor < <SP :: Target as SignerProvider >:: EcdsaSigner > ,
865+ ) {
866+ let inner = Arc :: clone ( & self . 0 ) ;
867+ let future = inner. update_persisted_channel ( monitor_name, update, monitor) ;
868+ let channel_id = monitor. channel_id ( ) ;
869+ let inner = Arc :: clone ( & self . 0 ) ;
870+ self . 0 . future_spawner . spawn ( async move {
871+ match future. await {
872+ Ok ( ( ) ) => { } , // TODO: expose completions
873+ Err ( e) => {
874+ log_error ! (
875+ inner. logger,
876+ "Failed to persist new ChannelMonitor {channel_id}: {e}. The node will now likely stall as this channel will not be able to make progress. You should restart as soon as possible." ,
877+ ) ;
878+ } ,
879+ }
880+ } ) ;
881+ }
882+
883+ pub ( crate ) fn spawn_async_archive_persisted_channel (
884+ & self , monitor_name : MonitorName ,
885+ ) {
886+ let inner = Arc :: clone ( & self . 0 ) ;
887+ self . 0 . future_spawner . spawn ( async move {
888+ inner. archive_persisted_channel ( monitor_name) . await ;
889+ } ) ;
890+ }
891+ }
892+
893+
894+ impl < K : Deref , S : FutureSpawner , L : Deref , ES : Deref , SP : Deref , BI : Deref , FE : Deref >
895+ MonitorUpdatingPersisterAsyncInner < K , S , L , ES , SP , BI , FE >
811896where
812897 K :: Target : KVStore ,
813898 L :: Target : Logger ,
@@ -931,7 +1016,7 @@ where
9311016 let monitor_name = MonitorName :: from_str ( & monitor_key) ?;
9321017 let ( _, current_monitor) = self . read_monitor ( & monitor_name, & monitor_key) . await ?;
9331018 let latest_update_id = current_monitor. get_latest_update_id ( ) ;
934- self . cleanup_stale_updates_for_monitor_to ( & monitor_key, latest_update_id, lazy) . await ;
1019+ self . cleanup_stale_updates_for_monitor_to ( & monitor_key, latest_update_id, lazy) . await ? ;
9351020 }
9361021 Ok ( ( ) )
9371022 }
@@ -958,9 +1043,9 @@ where
9581043 Ok ( ( ) )
9591044 }
9601045
961- async fn persist_new_channel < ChannelSigner : EcdsaChannelSigner > (
1046+ fn persist_new_channel < ChannelSigner : EcdsaChannelSigner > (
9621047 & self , monitor_name : MonitorName , monitor : & ChannelMonitor < ChannelSigner > ,
963- ) -> Result < ( ) , io:: Error > {
1048+ ) -> impl Future < Output = Result < ( ) , io:: Error > > {
9641049 // Determine the proper key for this monitor
9651050 let monitor_key = monitor_name. to_string ( ) ;
9661051 // Serialize and write the new monitor
@@ -974,55 +1059,77 @@ where
9741059 monitor_bytes. extend_from_slice ( MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL ) ;
9751060 }
9761061 monitor. write ( & mut monitor_bytes) . unwrap ( ) ;
1062+ // Note that this is NOT an async function, but rather calls the *sync* KVStore write
1063+ // method, allowing it to do its queueing immediately, and then return a future for the
1064+ // completion of the write. This ensures monitor persistence ordering is preserved.
9771065 self . kv_store . write (
9781066 CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ,
9791067 CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
9801068 monitor_key. as_str ( ) ,
9811069 monitor_bytes,
982- ) . await
1070+ )
9831071 }
9841072
985- async fn update_persisted_channel < ChannelSigner : EcdsaChannelSigner > (
986- & self , monitor_name : MonitorName , update : Option < & ChannelMonitorUpdate > ,
1073+ fn update_persisted_channel < ' a , ChannelSigner : EcdsaChannelSigner + ' a > (
1074+ self : Arc < Self > , monitor_name : MonitorName , update : Option < & ChannelMonitorUpdate > ,
9871075 monitor : & ChannelMonitor < ChannelSigner > ,
988- ) -> Result < ( ) , io:: Error > {
1076+ ) -> impl Future < Output = Result < ( ) , io:: Error > > + ' a where Self : ' a {
9891077 const LEGACY_CLOSED_CHANNEL_UPDATE_ID : u64 = u64:: MAX ;
1078+ let mut res_a = None ;
1079+ let mut res_b = None ;
1080+ let mut res_c = None ;
9901081 if let Some ( update) = update {
9911082 let persist_update = update. update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID
9921083 && self . maximum_pending_updates != 0
9931084 && update. update_id % self . maximum_pending_updates != 0 ;
9941085 if persist_update {
9951086 let monitor_key = monitor_name. to_string ( ) ;
9961087 let update_name = UpdateName :: from ( update. update_id ) ;
997- self . kv_store . write (
1088+ res_a = Some ( self . kv_store . write (
9981089 CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
9991090 monitor_key. as_str ( ) ,
10001091 update_name. as_str ( ) ,
10011092 update. encode ( ) ,
1002- ) . await
1093+ ) ) ;
10031094 } else {
10041095 // We could write this update, but it meets criteria of our design that calls for a full monitor write.
1005- let write_status = self . persist_new_channel ( monitor_name, monitor) . await ;
1006-
1007- if let Ok ( ( ) ) = write_status {
1008- let channel_closed_legacy =
1009- monitor. get_latest_update_id ( ) == LEGACY_CLOSED_CHANNEL_UPDATE_ID ;
1010- let latest_update_id = monitor. get_latest_update_id ( ) ;
1011- if channel_closed_legacy {
1012- let monitor_key = monitor_name. to_string ( ) ;
1013- self . cleanup_stale_updates_for_monitor_to ( & monitor_key, latest_update_id, true ) . await ;
1014- } else {
1015- let end = latest_update_id;
1016- let start = end. saturating_sub ( self . maximum_pending_updates ) ;
1017- self . cleanup_in_range ( monitor_name, start, end) . await ;
1096+ let write_fut = self . persist_new_channel ( monitor_name, monitor) ;
1097+ let latest_update_id = monitor. get_latest_update_id ( ) ;
1098+
1099+ res_b = Some ( async move {
1100+ let write_status = write_fut. await ;
1101+ if let Ok ( ( ) ) = write_status {
1102+ if latest_update_id == LEGACY_CLOSED_CHANNEL_UPDATE_ID {
1103+ let monitor_key = monitor_name. to_string ( ) ;
1104+ self . cleanup_stale_updates_for_monitor_to ( & monitor_key, latest_update_id, true ) . await ?;
1105+ } else {
1106+ let end = latest_update_id;
1107+ let start = end. saturating_sub ( self . maximum_pending_updates ) ;
1108+ self . cleanup_in_range ( monitor_name, start, end) . await ;
1109+ }
10181110 }
1019- }
10201111
1021- write_status
1112+ write_status
1113+ } ) ;
10221114 }
10231115 } else {
10241116 // There is no update given, so we must persist a new monitor.
1025- self . persist_new_channel ( monitor_name, monitor) . await
1117+ // Note that this is NOT an async function, but rather calls the *sync* KVStore write
1118+ // method, allowing it to do its queueing immediately, and then return a future for the
1119+ // completion of the write. This ensures monitor persistence ordering is preserved.
1120+ res_c = Some ( self . persist_new_channel ( monitor_name, monitor) ) ;
1121+ }
1122+ async move {
1123+ if let Some ( a) = res_a {
1124+ a. await ?;
1125+ }
1126+ if let Some ( b) = res_b {
1127+ b. await ?;
1128+ }
1129+ if let Some ( c) = res_c {
1130+ c. await ?;
1131+ }
1132+ Ok ( ( ) )
10261133 }
10271134 }
10281135
0 commit comments