1212//! There are a bunch of these as their handling is relatively error-prone so they are split out
1313//! here. See also the chanmon_fail_consistency fuzz test.
1414
15- use crate :: chain:: channelmonitor:: { ChannelMonitor , ANTI_REORG_DELAY } ;
15+ use crate :: chain:: chainmonitor:: ChainMonitor ;
16+ use crate :: chain:: channelmonitor:: { ChannelMonitor , MonitorEvent , ANTI_REORG_DELAY } ;
17+ use crate :: chain:: transaction:: OutPoint ;
1618use crate :: chain:: { ChannelMonitorUpdateStatus , Listen , Watch } ;
1719use crate :: events:: { ClosureReason , Event , HTLCHandlingFailureType , PaymentPurpose } ;
1820use crate :: ln:: channel:: AnnouncementSigsState ;
@@ -22,6 +24,13 @@ use crate::ln::msgs::{
2224 BaseMessageHandler , ChannelMessageHandler , MessageSendEvent , RoutingMessageHandler ,
2325} ;
2426use crate :: ln:: types:: ChannelId ;
27+ use crate :: sign:: NodeSigner ;
28+ use crate :: util:: native_async:: FutureQueue ;
29+ use crate :: util:: persist:: {
30+ MonitorName , MonitorUpdatingPersisterAsync , CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ,
31+ CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
32+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
33+ } ;
2534use crate :: util:: ser:: { ReadableArgs , Writeable } ;
2635use crate :: util:: test_channel_signer:: TestChannelSigner ;
2736use crate :: util:: test_utils:: TestBroadcaster ;
@@ -4847,3 +4856,200 @@ fn test_single_channel_multiple_mpp() {
48474856 nodes[ 7 ] . node . handle_revoke_and_ack ( node_i_id, & raa) ;
48484857 check_added_monitors ( & nodes[ 7 ] , 1 ) ;
48494858}
4859+
4860+ #[ test]
4861+ fn native_async_persist ( ) {
4862+ // Test ChainMonitor::new_async_beta and the backing MonitorUpdatingPersisterAsync.
4863+ //
4864+ // Because our test utils aren't really set up for such utils, we simply test them directly,
4865+ // first spinning up some nodes to create a `ChannelMonitor` and some `ChannelMonitorUpdate`s
4866+ // we can apply.
4867+ let ( monitor, updates) ;
4868+ let mut chanmon_cfgs = create_chanmon_cfgs ( 2 ) ;
4869+ let node_cfgs = create_node_cfgs ( 2 , & chanmon_cfgs) ;
4870+ let node_chanmgrs = create_node_chanmgrs ( 2 , & node_cfgs, & [ None , None ] ) ;
4871+ let nodes = create_network ( 2 , & node_cfgs, & node_chanmgrs) ;
4872+
4873+ let ( _, _, chan_id, funding_tx) = create_announced_chan_between_nodes ( & nodes, 0 , 1 ) ;
4874+
4875+ monitor = get_monitor ! ( nodes[ 0 ] , chan_id) . clone ( ) ;
4876+ send_payment ( & nodes[ 0 ] , & [ & nodes[ 1 ] ] , 1_000_000 ) ;
4877+ let mon_updates =
4878+ nodes[ 0 ] . chain_monitor . monitor_updates . lock ( ) . unwrap ( ) . remove ( & chan_id) . unwrap ( ) ;
4879+ updates = mon_updates. into_iter ( ) . collect :: < Vec < _ > > ( ) ;
4880+ assert ! ( updates. len( ) >= 4 , "The test below needs at least four updates" ) ;
4881+
4882+ core:: mem:: drop ( nodes) ;
4883+ core:: mem:: drop ( node_chanmgrs) ;
4884+ core:: mem:: drop ( node_cfgs) ;
4885+
4886+ let node_0_utils = chanmon_cfgs. remove ( 0 ) ;
4887+ let ( logger, keys_manager, tx_broadcaster, fee_estimator) = (
4888+ node_0_utils. logger ,
4889+ node_0_utils. keys_manager ,
4890+ node_0_utils. tx_broadcaster ,
4891+ node_0_utils. fee_estimator ,
4892+ ) ;
4893+
4894+ // Now that we have some updates, build a new ChainMonitor with a backing async KVStore.
4895+ let logger = Arc :: new ( logger) ;
4896+ let keys_manager = Arc :: new ( keys_manager) ;
4897+ let tx_broadcaster = Arc :: new ( tx_broadcaster) ;
4898+ let fee_estimator = Arc :: new ( fee_estimator) ;
4899+
4900+ let kv_store = Arc :: new ( test_utils:: TestStore :: new ( false ) ) ;
4901+ let persist_futures = Arc :: new ( FutureQueue :: new ( ) ) ;
4902+ let native_async_persister = MonitorUpdatingPersisterAsync :: new (
4903+ Arc :: clone ( & kv_store) ,
4904+ Arc :: clone ( & persist_futures) ,
4905+ Arc :: clone ( & logger) ,
4906+ 42 ,
4907+ Arc :: clone ( & keys_manager) ,
4908+ Arc :: clone ( & keys_manager) ,
4909+ Arc :: clone ( & tx_broadcaster) ,
4910+ Arc :: clone ( & fee_estimator) ,
4911+ ) ;
4912+ let chain_source = test_utils:: TestChainSource :: new ( Network :: Testnet ) ;
4913+ let async_chain_monitor = ChainMonitor :: new_async_beta (
4914+ Some ( & chain_source) ,
4915+ tx_broadcaster,
4916+ logger,
4917+ fee_estimator,
4918+ native_async_persister,
4919+ Arc :: clone ( & keys_manager) ,
4920+ keys_manager. get_peer_storage_key ( ) ,
4921+ ) ;
4922+
4923+ // Write the initial ChannelMonitor async, testing primarily that the `MonitorEvent::Completed`
4924+ // isn't returned until the write is complted (via `complete_all_async_writes`) and the future
4925+ // is `poll`ed (which a background spawn should do automatically in production, but which is
4926+ // needed to get the future completion through to the `ChainMonitor`).
4927+ let write_status = async_chain_monitor. watch_channel ( chan_id, monitor) . unwrap ( ) ;
4928+ assert_eq ! ( write_status, ChannelMonitorUpdateStatus :: InProgress ) ;
4929+
4930+ // The write will remain pending until we call `complete_all_async_writes`, below.
4931+ assert_eq ! ( persist_futures. 0 . lock( ) . unwrap( ) . len( ) , 1 ) ;
4932+ persist_futures. poll_futures ( ) ;
4933+ assert_eq ! ( persist_futures. 0 . lock( ) . unwrap( ) . len( ) , 1 ) ;
4934+
4935+ let funding_txo = OutPoint { txid : funding_tx. compute_txid ( ) , index : 0 } ;
4936+ let key = MonitorName :: V1Channel ( funding_txo) . to_string ( ) ;
4937+ let pending_writes = kv_store. list_pending_async_writes (
4938+ CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ,
4939+ CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
4940+ & key,
4941+ ) ;
4942+ assert_eq ! ( pending_writes. len( ) , 1 ) ;
4943+
4944+ // Once we complete the future, the write will still be pending until the future gets `poll`ed.
4945+ kv_store. complete_all_async_writes ( ) ;
4946+ assert_eq ! ( persist_futures. 0 . lock( ) . unwrap( ) . len( ) , 1 ) ;
4947+ assert_eq ! ( async_chain_monitor. release_pending_monitor_events( ) . len( ) , 0 ) ;
4948+
4949+ assert_eq ! ( persist_futures. 0 . lock( ) . unwrap( ) . len( ) , 1 ) ;
4950+ persist_futures. poll_futures ( ) ;
4951+ assert_eq ! ( persist_futures. 0 . lock( ) . unwrap( ) . len( ) , 0 ) ;
4952+
4953+ let completed_persist = async_chain_monitor. release_pending_monitor_events ( ) ;
4954+ assert_eq ! ( completed_persist. len( ) , 1 ) ;
4955+ assert_eq ! ( completed_persist[ 0 ] . 2 . len( ) , 1 ) ;
4956+ assert ! ( matches!( completed_persist[ 0 ] . 2 [ 0 ] , MonitorEvent :: Completed { .. } ) ) ;
4957+
4958+ // Now test two async `ChannelMonitorUpdate`s in flight at once, completing them in-order but
4959+ // separately.
4960+ let update_status = async_chain_monitor. update_channel ( chan_id, & updates[ 0 ] ) ;
4961+ assert_eq ! ( update_status, ChannelMonitorUpdateStatus :: InProgress ) ;
4962+
4963+ let update_status = async_chain_monitor. update_channel ( chan_id, & updates[ 1 ] ) ;
4964+ assert_eq ! ( update_status, ChannelMonitorUpdateStatus :: InProgress ) ;
4965+
4966+ persist_futures. poll_futures ( ) ;
4967+ assert_eq ! ( async_chain_monitor. release_pending_monitor_events( ) . len( ) , 0 ) ;
4968+
4969+ let pending_writes = kv_store. list_pending_async_writes (
4970+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
4971+ & key,
4972+ "1" ,
4973+ ) ;
4974+ assert_eq ! ( pending_writes. len( ) , 1 ) ;
4975+ let pending_writes = kv_store. list_pending_async_writes (
4976+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
4977+ & key,
4978+ "2" ,
4979+ ) ;
4980+ assert_eq ! ( pending_writes. len( ) , 1 ) ;
4981+
4982+ kv_store. complete_async_writes_through (
4983+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
4984+ & key,
4985+ "1" ,
4986+ usize:: MAX ,
4987+ ) ;
4988+ persist_futures. poll_futures ( ) ;
4989+ // While the `ChainMonitor` could return a `MonitorEvent::Completed` here, it currently
4990+ // doesn't. If that ever changes we should validate that the `Completed` event has the correct
4991+ // `monitor_update_id` (1).
4992+ assert ! ( async_chain_monitor. release_pending_monitor_events( ) . is_empty( ) ) ;
4993+
4994+ kv_store. complete_async_writes_through (
4995+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
4996+ & key,
4997+ "2" ,
4998+ usize:: MAX ,
4999+ ) ;
5000+ persist_futures. poll_futures ( ) ;
5001+ let completed_persist = async_chain_monitor. release_pending_monitor_events ( ) ;
5002+ assert_eq ! ( completed_persist. len( ) , 1 ) ;
5003+ assert_eq ! ( completed_persist[ 0 ] . 2 . len( ) , 1 ) ;
5004+ assert ! ( matches!( completed_persist[ 0 ] . 2 [ 0 ] , MonitorEvent :: Completed { .. } ) ) ;
5005+
5006+ // Finally, test two async `ChanelMonitorUpdate`s in flight at once, completing them
5007+ // out-of-order and ensuring that no `MonitorEvent::Completed` is generated until they are both
5008+ // completed (and that it marks both as completed when it is generated).
5009+ let update_status = async_chain_monitor. update_channel ( chan_id, & updates[ 2 ] ) ;
5010+ assert_eq ! ( update_status, ChannelMonitorUpdateStatus :: InProgress ) ;
5011+
5012+ let update_status = async_chain_monitor. update_channel ( chan_id, & updates[ 3 ] ) ;
5013+ assert_eq ! ( update_status, ChannelMonitorUpdateStatus :: InProgress ) ;
5014+
5015+ persist_futures. poll_futures ( ) ;
5016+ assert_eq ! ( async_chain_monitor. release_pending_monitor_events( ) . len( ) , 0 ) ;
5017+
5018+ let pending_writes = kv_store. list_pending_async_writes (
5019+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
5020+ & key,
5021+ "3" ,
5022+ ) ;
5023+ assert_eq ! ( pending_writes. len( ) , 1 ) ;
5024+ let pending_writes = kv_store. list_pending_async_writes (
5025+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
5026+ & key,
5027+ "4" ,
5028+ ) ;
5029+ assert_eq ! ( pending_writes. len( ) , 1 ) ;
5030+
5031+ kv_store. complete_async_writes_through (
5032+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
5033+ & key,
5034+ "4" ,
5035+ usize:: MAX ,
5036+ ) ;
5037+ persist_futures. poll_futures ( ) ;
5038+ assert_eq ! ( async_chain_monitor. release_pending_monitor_events( ) . len( ) , 0 ) ;
5039+
5040+ kv_store. complete_async_writes_through (
5041+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
5042+ & key,
5043+ "3" ,
5044+ usize:: MAX ,
5045+ ) ;
5046+ persist_futures. poll_futures ( ) ;
5047+ let completed_persist = async_chain_monitor. release_pending_monitor_events ( ) ;
5048+ assert_eq ! ( completed_persist. len( ) , 1 ) ;
5049+ assert_eq ! ( completed_persist[ 0 ] . 2 . len( ) , 1 ) ;
5050+ if let MonitorEvent :: Completed { monitor_update_id, .. } = & completed_persist[ 0 ] . 2 [ 0 ] {
5051+ assert_eq ! ( * monitor_update_id, 4 ) ;
5052+ } else {
5053+ panic ! ( ) ;
5054+ }
5055+ }
0 commit comments