1212//! There are a bunch of these as their handling is relatively error-prone so they are split out
1313//! here. See also the chanmon_fail_consistency fuzz test.
1414
15- use crate :: chain:: channelmonitor:: { ChannelMonitor , ANTI_REORG_DELAY } ;
15+ use crate :: chain:: chainmonitor:: ChainMonitor ;
16+ use crate :: chain:: channelmonitor:: { ChannelMonitor , ANTI_REORG_DELAY , MonitorEvent } ;
17+ use crate :: chain:: transaction:: OutPoint ;
1618use crate :: chain:: { ChannelMonitorUpdateStatus , Listen , Watch } ;
1719use crate :: events:: { ClosureReason , Event , HTLCHandlingFailureType , PaymentPurpose } ;
1820use crate :: ln:: channel:: AnnouncementSigsState ;
@@ -22,6 +24,13 @@ use crate::ln::msgs::{
2224 BaseMessageHandler , ChannelMessageHandler , MessageSendEvent , RoutingMessageHandler ,
2325} ;
2426use crate :: ln:: types:: ChannelId ;
27+ use crate :: sign:: NodeSigner ;
28+ use crate :: util:: native_async:: FutureQueue ;
29+ use crate :: util:: persist:: {
30+ CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE , CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
31+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE , MonitorName ,
32+ MonitorUpdatingPersisterAsync ,
33+ } ;
2534use crate :: util:: ser:: { ReadableArgs , Writeable } ;
2635use crate :: util:: test_channel_signer:: TestChannelSigner ;
2736use crate :: util:: test_utils:: TestBroadcaster ;
@@ -4847,3 +4856,201 @@ fn test_single_channel_multiple_mpp() {
48474856 nodes[ 7 ] . node . handle_revoke_and_ack ( node_i_id, & raa) ;
48484857 check_added_monitors ( & nodes[ 7 ] , 1 ) ;
48494858}
4859+
4860+ #[ test]
4861+ fn native_async_persist ( ) {
4862+ // Test ChainMonitor::new_async_beta and the backing MonitorUpdatingPersisterAsync.
4863+ //
4864+ // Because our test utils aren't really set up for such utils, we simply test them directly,
4865+ // first spinning up some nodes to create a `ChannelMonitor` and some `ChannelMonitorUpdate`s
4866+ // we can apply.
4867+ let ( monitor, updates) ;
4868+ let mut chanmon_cfgs = create_chanmon_cfgs ( 2 ) ;
4869+ let node_cfgs = create_node_cfgs ( 2 , & chanmon_cfgs) ;
4870+ let node_chanmgrs = create_node_chanmgrs ( 2 , & node_cfgs, & [ None , None ] ) ;
4871+ let nodes = create_network ( 2 , & node_cfgs, & node_chanmgrs) ;
4872+
4873+ let ( _, _, chan_id, funding_tx) = create_announced_chan_between_nodes ( & nodes, 0 , 1 ) ;
4874+
4875+ monitor = get_monitor ! ( nodes[ 0 ] , chan_id) . clone ( ) ;
4876+ send_payment ( & nodes[ 0 ] , & [ & nodes[ 1 ] ] , 1_000_000 ) ;
4877+ let mon_updates =
4878+ nodes[ 0 ] . chain_monitor . monitor_updates . lock ( ) . unwrap ( ) . remove ( & chan_id) . unwrap ( ) ;
4879+ updates = mon_updates. into_iter ( ) . collect :: < Vec < _ > > ( ) ;
4880+ assert ! ( updates. len( ) >= 4 , "The test below needs at least four updates" ) ;
4881+
4882+ core:: mem:: drop ( nodes) ;
4883+ core:: mem:: drop ( node_chanmgrs) ;
4884+ core:: mem:: drop ( node_cfgs) ;
4885+
4886+ let node_0_utils = chanmon_cfgs. remove ( 0 ) ;
4887+ let ( logger, keys_manager, tx_broadcaster, fee_estimator) =
4888+ (
4889+ node_0_utils. logger ,
4890+ node_0_utils. keys_manager ,
4891+ node_0_utils. tx_broadcaster ,
4892+ node_0_utils. fee_estimator ,
4893+ ) ;
4894+
4895+ // Now that we have some updates, build a new ChainMonitor with a backing async KVStore.
4896+ let logger = Arc :: new ( logger) ;
4897+ let keys_manager = Arc :: new ( keys_manager) ;
4898+ let tx_broadcaster = Arc :: new ( tx_broadcaster) ;
4899+ let fee_estimator = Arc :: new ( fee_estimator) ;
4900+
4901+ let kv_store = Arc :: new ( test_utils:: TestStore :: new ( false ) ) ;
4902+ let persist_futures = Arc :: new ( FutureQueue :: new ( ) ) ;
4903+ let native_async_persister = MonitorUpdatingPersisterAsync :: new (
4904+ Arc :: clone ( & kv_store) ,
4905+ Arc :: clone ( & persist_futures) ,
4906+ Arc :: clone ( & logger) ,
4907+ 42 ,
4908+ Arc :: clone ( & keys_manager) ,
4909+ Arc :: clone ( & keys_manager) ,
4910+ Arc :: clone ( & tx_broadcaster) ,
4911+ Arc :: clone ( & fee_estimator) ,
4912+ ) ;
4913+ let chain_source = test_utils:: TestChainSource :: new ( Network :: Testnet ) ;
4914+ let async_chain_monitor = ChainMonitor :: new_async_beta (
4915+ Some ( & chain_source) ,
4916+ tx_broadcaster,
4917+ logger,
4918+ fee_estimator,
4919+ native_async_persister,
4920+ Arc :: clone ( & keys_manager) ,
4921+ keys_manager. get_peer_storage_key ( ) ,
4922+ ) ;
4923+
4924+ // Write the initial ChannelMonitor async, testing primarily that the `MonitorEvent::Completed`
4925+ // isn't returned until the write is complted (via `complete_all_async_writes`) and the future
4926+ // is `poll`ed (which a background spawn should do automatically in production, but which is
4927+ // needed to get the future completion through to the `ChainMonitor`).
4928+ let write_status = async_chain_monitor. watch_channel ( chan_id, monitor) . unwrap ( ) ;
4929+ assert_eq ! ( write_status, ChannelMonitorUpdateStatus :: InProgress ) ;
4930+
4931+ // The write will remain pending until we call `complete_all_async_writes`, below.
4932+ assert_eq ! ( persist_futures. 0 . lock( ) . unwrap( ) . len( ) , 1 ) ;
4933+ persist_futures. poll_futures ( ) ;
4934+ assert_eq ! ( persist_futures. 0 . lock( ) . unwrap( ) . len( ) , 1 ) ;
4935+
4936+ let funding_txo = OutPoint { txid : funding_tx. compute_txid ( ) , index : 0 } ;
4937+ let key = MonitorName :: V1Channel ( funding_txo) . to_string ( ) ;
4938+ let pending_writes = kv_store. list_pending_async_writes (
4939+ CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ,
4940+ CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
4941+ & key,
4942+ ) ;
4943+ assert_eq ! ( pending_writes. len( ) , 1 ) ;
4944+
4945+ // Once we complete the future, the write will still be pending until the future gets `poll`ed.
4946+ kv_store. complete_all_async_writes ( ) ;
4947+ assert_eq ! ( persist_futures. 0 . lock( ) . unwrap( ) . len( ) , 1 ) ;
4948+ assert_eq ! ( async_chain_monitor. release_pending_monitor_events( ) . len( ) , 0 ) ;
4949+
4950+ assert_eq ! ( persist_futures. 0 . lock( ) . unwrap( ) . len( ) , 1 ) ;
4951+ persist_futures. poll_futures ( ) ;
4952+ assert_eq ! ( persist_futures. 0 . lock( ) . unwrap( ) . len( ) , 0 ) ;
4953+
4954+ let completed_persist = async_chain_monitor. release_pending_monitor_events ( ) ;
4955+ assert_eq ! ( completed_persist. len( ) , 1 ) ;
4956+ assert_eq ! ( completed_persist[ 0 ] . 2 . len( ) , 1 ) ;
4957+ assert ! ( matches!( completed_persist[ 0 ] . 2 [ 0 ] , MonitorEvent :: Completed { .. } ) ) ;
4958+
4959+ // Now test two async `ChannelMonitorUpdate`s in flight at once, completing them in-order but
4960+ // separately.
4961+ let update_status = async_chain_monitor. update_channel ( chan_id, & updates[ 0 ] ) ;
4962+ assert_eq ! ( update_status, ChannelMonitorUpdateStatus :: InProgress ) ;
4963+
4964+ let update_status = async_chain_monitor. update_channel ( chan_id, & updates[ 1 ] ) ;
4965+ assert_eq ! ( update_status, ChannelMonitorUpdateStatus :: InProgress ) ;
4966+
4967+ persist_futures. poll_futures ( ) ;
4968+ assert_eq ! ( async_chain_monitor. release_pending_monitor_events( ) . len( ) , 0 ) ;
4969+
4970+ let pending_writes = kv_store. list_pending_async_writes (
4971+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
4972+ & key,
4973+ "1" ,
4974+ ) ;
4975+ assert_eq ! ( pending_writes. len( ) , 1 ) ;
4976+ let pending_writes = kv_store. list_pending_async_writes (
4977+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
4978+ & key,
4979+ "2" ,
4980+ ) ;
4981+ assert_eq ! ( pending_writes. len( ) , 1 ) ;
4982+
4983+ kv_store. complete_async_writes_through (
4984+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
4985+ & key,
4986+ "1" ,
4987+ usize:: MAX ,
4988+ ) ;
4989+ persist_futures. poll_futures ( ) ;
4990+ // While the `ChainMonitor` could return a `MonitorEvent::Completed` here, it currently
4991+ // doesn't. If that ever changes we should validate that the `Completed` event has the correct
4992+ // `monitor_update_id` (1).
4993+ assert ! ( async_chain_monitor. release_pending_monitor_events( ) . is_empty( ) ) ;
4994+
4995+ kv_store. complete_async_writes_through (
4996+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
4997+ & key,
4998+ "2" ,
4999+ usize:: MAX ,
5000+ ) ;
5001+ persist_futures. poll_futures ( ) ;
5002+ let completed_persist = async_chain_monitor. release_pending_monitor_events ( ) ;
5003+ assert_eq ! ( completed_persist. len( ) , 1 ) ;
5004+ assert_eq ! ( completed_persist[ 0 ] . 2 . len( ) , 1 ) ;
5005+ assert ! ( matches!( completed_persist[ 0 ] . 2 [ 0 ] , MonitorEvent :: Completed { .. } ) ) ;
5006+
5007+ // Finally, test two async `ChanelMonitorUpdate`s in flight at once, completing them
5008+ // out-of-order and ensuring that no `MonitorEvent::Completed` is generated until they are both
5009+ // completed (and that it marks both as completed when it is generated).
5010+ let update_status = async_chain_monitor. update_channel ( chan_id, & updates[ 2 ] ) ;
5011+ assert_eq ! ( update_status, ChannelMonitorUpdateStatus :: InProgress ) ;
5012+
5013+ let update_status = async_chain_monitor. update_channel ( chan_id, & updates[ 3 ] ) ;
5014+ assert_eq ! ( update_status, ChannelMonitorUpdateStatus :: InProgress ) ;
5015+
5016+ persist_futures. poll_futures ( ) ;
5017+ assert_eq ! ( async_chain_monitor. release_pending_monitor_events( ) . len( ) , 0 ) ;
5018+
5019+ let pending_writes = kv_store. list_pending_async_writes (
5020+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
5021+ & key,
5022+ "3" ,
5023+ ) ;
5024+ assert_eq ! ( pending_writes. len( ) , 1 ) ;
5025+ let pending_writes = kv_store. list_pending_async_writes (
5026+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
5027+ & key,
5028+ "4" ,
5029+ ) ;
5030+ assert_eq ! ( pending_writes. len( ) , 1 ) ;
5031+
5032+ kv_store. complete_async_writes_through (
5033+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
5034+ & key,
5035+ "4" ,
5036+ usize:: MAX ,
5037+ ) ;
5038+ persist_futures. poll_futures ( ) ;
5039+ assert_eq ! ( async_chain_monitor. release_pending_monitor_events( ) . len( ) , 0 ) ;
5040+
5041+ kv_store. complete_async_writes_through (
5042+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
5043+ & key,
5044+ "3" ,
5045+ usize:: MAX ,
5046+ ) ;
5047+ persist_futures. poll_futures ( ) ;
5048+ let completed_persist = async_chain_monitor. release_pending_monitor_events ( ) ;
5049+ assert_eq ! ( completed_persist. len( ) , 1 ) ;
5050+ assert_eq ! ( completed_persist[ 0 ] . 2 . len( ) , 1 ) ;
5051+ if let MonitorEvent :: Completed { monitor_update_id, .. } = & completed_persist[ 0 ] . 2 [ 0 ] {
5052+ assert_eq ! ( * monitor_update_id, 4 ) ;
5053+ } else {
5054+ panic ! ( ) ;
5055+ }
5056+ }
0 commit comments