@@ -142,17 +142,29 @@ impl Writer for VecWriter {
142
142
}
143
143
}
144
144
145
+ /// The LDK API requires that any time we tell it we're done persisting a `ChannelMonitor[Update]`
146
+ /// we never pass it in as the "latest" `ChannelMonitor` on startup. However, we can pass
147
+ /// out-of-date monitors as long as we never told LDK we finished persisting them, which we do by
148
+ /// storing both old `ChannelMonitor`s and ones that are "being persisted" here.
149
+ ///
150
+ /// Note that such "being persisted" `ChannelMonitor`s are stored in `ChannelManager` and will
151
+ /// simply be replayed on startup.
152
+ struct LatestMonitorState {
153
+ /// The latest monitor id which we told LDK we've persisted
154
+ persisted_monitor_id : u64 ,
155
+ /// The latest serialized `ChannelMonitor` that we told LDK we persisted.
156
+ persisted_monitor : Vec < u8 > ,
157
+ /// A set of (monitor id, serialized `ChannelMonitor`)s which we're currently "persisting",
158
+ /// from LDK's perspective.
159
+ pending_monitors : Vec < ( u64 , Vec < u8 > ) > ,
160
+ }
161
+
145
162
struct TestChainMonitor {
146
163
pub logger : Arc < dyn Logger > ,
147
164
pub keys : Arc < KeyProvider > ,
148
165
pub persister : Arc < TestPersister > ,
149
166
pub chain_monitor : Arc < chainmonitor:: ChainMonitor < TestChannelSigner , Arc < dyn chain:: Filter > , Arc < TestBroadcaster > , Arc < FuzzEstimator > , Arc < dyn Logger > , Arc < TestPersister > > > ,
150
- // If we reload a node with an old copy of ChannelMonitors, the ChannelManager deserialization
151
- // logic will automatically force-close our channels for us (as we don't have an up-to-date
152
- // monitor implying we are not able to punish misbehaving counterparties). Because this test
153
- // "fails" if we ever force-close a channel, we avoid doing so, always saving the latest
154
- // fully-serialized monitor state here, as well as the corresponding update_id.
155
- pub latest_monitors : Mutex < HashMap < OutPoint , ( u64 , Vec < u8 > ) > > ,
167
+ pub latest_monitors : Mutex < HashMap < OutPoint , LatestMonitorState > > ,
156
168
}
157
169
impl TestChainMonitor {
158
170
pub fn new ( broadcaster : Arc < TestBroadcaster > , logger : Arc < dyn Logger > , feeest : Arc < FuzzEstimator > , persister : Arc < TestPersister > , keys : Arc < KeyProvider > ) -> Self {
@@ -169,22 +181,47 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
169
181
fn watch_channel ( & self , funding_txo : OutPoint , monitor : channelmonitor:: ChannelMonitor < TestChannelSigner > ) -> Result < chain:: ChannelMonitorUpdateStatus , ( ) > {
170
182
let mut ser = VecWriter ( Vec :: new ( ) ) ;
171
183
monitor. write ( & mut ser) . unwrap ( ) ;
172
- if let Some ( _) = self . latest_monitors . lock ( ) . unwrap ( ) . insert ( funding_txo, ( monitor. get_latest_update_id ( ) , ser. 0 ) ) {
184
+ let monitor_id = monitor. get_latest_update_id ( ) ;
185
+ let res = self . chain_monitor . watch_channel ( funding_txo, monitor) ;
186
+ let state = match res {
187
+ Ok ( chain:: ChannelMonitorUpdateStatus :: Completed ) => {
188
+ LatestMonitorState {
189
+ persisted_monitor_id : monitor_id, persisted_monitor : ser. 0 ,
190
+ pending_monitors : Vec :: new ( ) ,
191
+ }
192
+ } ,
193
+ Ok ( chain:: ChannelMonitorUpdateStatus :: InProgress ) =>
194
+ panic ! ( "The test currently doesn't test initial-persistence via the async pipeline" ) ,
195
+ Ok ( chain:: ChannelMonitorUpdateStatus :: UnrecoverableError ) => panic ! ( ) ,
196
+ Err ( ( ) ) => panic ! ( ) ,
197
+ } ;
198
+ if self . latest_monitors . lock ( ) . unwrap ( ) . insert ( funding_txo, state) . is_some ( ) {
173
199
panic ! ( "Already had monitor pre-watch_channel" ) ;
174
200
}
175
- self . chain_monitor . watch_channel ( funding_txo , monitor )
201
+ res
176
202
}
177
203
178
204
fn update_channel ( & self , funding_txo : OutPoint , update : & channelmonitor:: ChannelMonitorUpdate ) -> chain:: ChannelMonitorUpdateStatus {
179
205
let mut map_lock = self . latest_monitors . lock ( ) . unwrap ( ) ;
180
206
let map_entry = map_lock. get_mut ( & funding_txo) . expect ( "Didn't have monitor on update call" ) ;
207
+ let latest_monitor_data = map_entry. pending_monitors . last ( ) . as_ref ( ) . map ( |( _, data) | data) . unwrap_or ( & map_entry. persisted_monitor ) ;
181
208
let deserialized_monitor = <( BlockHash , channelmonitor:: ChannelMonitor < TestChannelSigner > ) >::
182
- read ( & mut Cursor :: new ( & map_entry . 1 ) , ( & * self . keys , & * self . keys ) ) . unwrap ( ) . 1 ;
209
+ read ( & mut Cursor :: new ( & latest_monitor_data ) , ( & * self . keys , & * self . keys ) ) . unwrap ( ) . 1 ;
183
210
deserialized_monitor. update_monitor ( update, & & TestBroadcaster { } , & & FuzzEstimator { ret_val : atomic:: AtomicU32 :: new ( 253 ) } , & self . logger ) . unwrap ( ) ;
184
211
let mut ser = VecWriter ( Vec :: new ( ) ) ;
185
212
deserialized_monitor. write ( & mut ser) . unwrap ( ) ;
186
- * map_entry = ( update. update_id , ser. 0 ) ;
187
- self . chain_monitor . update_channel ( funding_txo, update)
213
+ let res = self . chain_monitor . update_channel ( funding_txo, update) ;
214
+ match res {
215
+ chain:: ChannelMonitorUpdateStatus :: Completed => {
216
+ map_entry. persisted_monitor_id = update. update_id ;
217
+ map_entry. persisted_monitor = ser. 0 ;
218
+ } ,
219
+ chain:: ChannelMonitorUpdateStatus :: InProgress => {
220
+ map_entry. pending_monitors . push ( ( update. update_id , ser. 0 ) ) ;
221
+ } ,
222
+ chain:: ChannelMonitorUpdateStatus :: UnrecoverableError => panic ! ( ) ,
223
+ }
224
+ res
188
225
}
189
226
190
227
fn release_pending_monitor_events ( & self ) -> Vec < ( OutPoint , ChannelId , Vec < MonitorEvent > , Option < PublicKey > ) > {
@@ -511,9 +548,15 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
511
548
512
549
let mut monitors = new_hash_map( ) ;
513
550
let mut old_monitors = $old_monitors. latest_monitors. lock( ) . unwrap( ) ;
514
- for ( outpoint, ( update_id, monitor_ser) ) in old_monitors. drain( ) {
515
- monitors. insert( outpoint, <( BlockHash , ChannelMonitor <TestChannelSigner >) >:: read( & mut Cursor :: new( & monitor_ser) , ( & * $keys_manager, & * $keys_manager) ) . expect( "Failed to read monitor" ) . 1 ) ;
516
- chain_monitor. latest_monitors. lock( ) . unwrap( ) . insert( outpoint, ( update_id, monitor_ser) ) ;
551
+ for ( outpoint, mut prev_state) in old_monitors. drain( ) {
552
+ monitors. insert( outpoint, <( BlockHash , ChannelMonitor <TestChannelSigner >) >:: read(
553
+ & mut Cursor :: new( & prev_state. persisted_monitor) , ( & * $keys_manager, & * $keys_manager)
554
+ ) . expect( "Failed to read monitor" ) . 1 ) ;
555
+ // Wipe any `ChannelMonitor`s which we never told LDK we finished persisting,
556
+ // considering them discarded. LDK should replay these for us as they're stored in
557
+ // the `ChannelManager`.
558
+ prev_state. pending_monitors. clear( ) ;
559
+ chain_monitor. latest_monitors. lock( ) . unwrap( ) . insert( outpoint, prev_state) ;
517
560
}
518
561
let mut monitor_refs = new_hash_map( ) ;
519
562
for ( outpoint, monitor) in monitors. iter_mut( ) {
@@ -1040,6 +1083,43 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
1040
1083
} }
1041
1084
}
1042
1085
1086
+ let complete_first = |v : & mut Vec < _ > | if !v. is_empty ( ) { Some ( v. remove ( 0 ) ) } else { None } ;
1087
+ let complete_second = |v : & mut Vec < _ > | if v. len ( ) > 1 { Some ( v. remove ( 1 ) ) } else { None } ;
1088
+ let complete_monitor_update = |
1089
+ monitor : & Arc < TestChainMonitor > , chan_funding,
1090
+ compl_selector : & dyn Fn ( & mut Vec < ( u64 , Vec < u8 > ) > ) -> Option < ( u64 , Vec < u8 > ) > ,
1091
+ | {
1092
+ if let Some ( state) = monitor. latest_monitors. lock( ) . unwrap( ) . get_mut( chan_funding) {
1093
+ assert ! (
1094
+ state. pending_monitors. windows( 2 ) . all( |pair| pair[ 0 ] . 0 < pair[ 1 ] . 0 ) ,
1095
+ "updates should be sorted by id"
1096
+ ) ;
1097
+ if let Some ( ( id, data) ) = compl_selector( & mut state. pending_monitors) {
1098
+ monitor. chain_monitor. channel_monitor_updated( * chan_funding, id) . unwrap( ) ;
1099
+ if id > state. persisted_monitor_id {
1100
+ state. persisted_monitor_id = id;
1101
+ state. persisted_monitor = data;
1102
+ }
1103
+ }
1104
+ }
1105
+ } ;
1106
+
1107
+ let complete_all_monitor_updates = |monitor: & Arc < TestChainMonitor > , chan_funding| {
1108
+ if let Some ( state) = monitor. latest_monitors. lock( ) . unwrap( ) . get_mut( chan_funding) {
1109
+ assert ! (
1110
+ state. pending_monitors. windows( 2 ) . all( |pair| pair[ 0 ] . 0 < pair[ 1 ] . 0 ) ,
1111
+ "updates should be sorted by id"
1112
+ ) ;
1113
+ for ( id, data) in state. pending_monitors. drain( ..) {
1114
+ monitor. chain_monitor. channel_monitor_updated( * chan_funding, id) . unwrap( ) ;
1115
+ if id > state. persisted_monitor_id {
1116
+ state. persisted_monitor_id = id;
1117
+ state. persisted_monitor = data;
1118
+ }
1119
+ }
1120
+ }
1121
+ } ;
1122
+
1043
1123
let v = get_slice!( 1 ) [ 0 ] ;
1044
1124
out. locked_write( format!( "READ A BYTE! HANDLING INPUT {:x}...........\n " , v) . as_bytes( ) ) ;
1045
1125
match v {
@@ -1054,30 +1134,10 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
1054
1134
0x05 => * monitor_b. persister. update_ret. lock( ) . unwrap( ) = ChannelMonitorUpdateStatus :: Completed ,
1055
1135
0x06 => * monitor_c. persister. update_ret. lock( ) . unwrap( ) = ChannelMonitorUpdateStatus :: Completed ,
1056
1136
1057
- 0x08 => {
1058
- if let Some ( ( id, _) ) = monitor_a. latest_monitors . lock ( ) . unwrap ( ) . get ( & chan_1_funding) {
1059
- monitor_a. chain_monitor . force_channel_monitor_updated ( chan_1_funding, * id) ;
1060
- nodes[ 0 ] . process_monitor_events ( ) ;
1061
- }
1062
- } ,
1063
- 0x09 => {
1064
- if let Some ( ( id, _) ) = monitor_b. latest_monitors . lock ( ) . unwrap ( ) . get ( & chan_1_funding) {
1065
- monitor_b. chain_monitor . force_channel_monitor_updated ( chan_1_funding, * id) ;
1066
- nodes[ 1 ] . process_monitor_events ( ) ;
1067
- }
1068
- } ,
1069
- 0x0a => {
1070
- if let Some ( ( id, _) ) = monitor_b. latest_monitors . lock ( ) . unwrap ( ) . get ( & chan_2_funding) {
1071
- monitor_b. chain_monitor . force_channel_monitor_updated ( chan_2_funding, * id) ;
1072
- nodes[ 1 ] . process_monitor_events ( ) ;
1073
- }
1074
- } ,
1075
- 0x0b => {
1076
- if let Some ( ( id, _) ) = monitor_c. latest_monitors . lock ( ) . unwrap ( ) . get ( & chan_2_funding) {
1077
- monitor_c. chain_monitor . force_channel_monitor_updated ( chan_2_funding, * id) ;
1078
- nodes[ 2 ] . process_monitor_events ( ) ;
1079
- }
1080
- } ,
1137
+ 0x08 => complete_all_monitor_updates( & monitor_a, & chan_1_funding) ,
1138
+ 0x09 => complete_all_monitor_updates( & monitor_b, & chan_1_funding) ,
1139
+ 0x0a => complete_all_monitor_updates( & monitor_b, & chan_2_funding) ,
1140
+ 0x0b => complete_all_monitor_updates( & monitor_c, & chan_2_funding) ,
1081
1141
1082
1142
0x0c => {
1083
1143
if !chan_a_disconnected {
@@ -1285,119 +1345,35 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
1285
1345
} ,
1286
1346
0x89 => { fee_est_c. ret_val. store( 253 , atomic:: Ordering :: Release ) ; nodes[ 2 ] . maybe_update_chan_fees( ) ; } ,
1287
1347
1288
- 0xf0 => {
1289
- let pending_updates = monitor_a. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_1_funding) . unwrap ( ) ;
1290
- if let Some ( id) = pending_updates. get ( 0 ) {
1291
- monitor_a. chain_monitor . channel_monitor_updated ( chan_1_funding, * id) . unwrap ( ) ;
1292
- }
1293
- nodes[ 0 ] . process_monitor_events ( ) ;
1294
- }
1295
- 0xf1 => {
1296
- let pending_updates = monitor_a. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_1_funding) . unwrap ( ) ;
1297
- if let Some ( id) = pending_updates. get ( 1 ) {
1298
- monitor_a. chain_monitor . channel_monitor_updated ( chan_1_funding, * id) . unwrap ( ) ;
1299
- }
1300
- nodes[ 0 ] . process_monitor_events ( ) ;
1301
- }
1302
- 0xf2 => {
1303
- let pending_updates = monitor_a. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_1_funding) . unwrap ( ) ;
1304
- if let Some ( id) = pending_updates. last ( ) {
1305
- monitor_a. chain_monitor . channel_monitor_updated ( chan_1_funding, * id) . unwrap ( ) ;
1306
- }
1307
- nodes[ 0 ] . process_monitor_events ( ) ;
1308
- }
1348
+ 0xf0 => complete_monitor_update( & monitor_a, & chan_1_funding, & complete_first) ,
1349
+ 0xf1 => complete_monitor_update( & monitor_a, & chan_1_funding, & complete_second) ,
1350
+ 0xf2 => complete_monitor_update( & monitor_a, & chan_1_funding, & Vec :: pop) ,
1309
1351
1310
- 0xf4 => {
1311
- let pending_updates = monitor_b. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_1_funding) . unwrap ( ) ;
1312
- if let Some ( id) = pending_updates. get ( 0 ) {
1313
- monitor_b. chain_monitor . channel_monitor_updated ( chan_1_funding, * id) . unwrap ( ) ;
1314
- }
1315
- nodes[ 1 ] . process_monitor_events ( ) ;
1316
- }
1317
- 0xf5 => {
1318
- let pending_updates = monitor_b. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_1_funding) . unwrap ( ) ;
1319
- if let Some ( id) = pending_updates. get ( 1 ) {
1320
- monitor_b. chain_monitor . channel_monitor_updated ( chan_1_funding, * id) . unwrap ( ) ;
1321
- }
1322
- nodes[ 1 ] . process_monitor_events ( ) ;
1323
- }
1324
- 0xf6 => {
1325
- let pending_updates = monitor_b. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_1_funding) . unwrap ( ) ;
1326
- if let Some ( id) = pending_updates. last ( ) {
1327
- monitor_b. chain_monitor . channel_monitor_updated ( chan_1_funding, * id) . unwrap ( ) ;
1328
- }
1329
- nodes[ 1 ] . process_monitor_events ( ) ;
1330
- }
1352
+ 0xf4 => complete_monitor_update( & monitor_b, & chan_1_funding, & complete_first) ,
1353
+ 0xf5 => complete_monitor_update( & monitor_b, & chan_1_funding, & complete_second) ,
1354
+ 0xf6 => complete_monitor_update( & monitor_b, & chan_1_funding, & Vec :: pop) ,
1331
1355
1332
- 0xf8 => {
1333
- let pending_updates = monitor_b. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_2_funding) . unwrap ( ) ;
1334
- if let Some ( id) = pending_updates. get ( 0 ) {
1335
- monitor_b. chain_monitor . channel_monitor_updated ( chan_2_funding, * id) . unwrap ( ) ;
1336
- }
1337
- nodes[ 1 ] . process_monitor_events ( ) ;
1338
- }
1339
- 0xf9 => {
1340
- let pending_updates = monitor_b. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_2_funding) . unwrap ( ) ;
1341
- if let Some ( id) = pending_updates. get ( 1 ) {
1342
- monitor_b. chain_monitor . channel_monitor_updated ( chan_2_funding, * id) . unwrap ( ) ;
1343
- }
1344
- nodes[ 1 ] . process_monitor_events ( ) ;
1345
- }
1346
- 0xfa => {
1347
- let pending_updates = monitor_b. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_2_funding) . unwrap ( ) ;
1348
- if let Some ( id) = pending_updates. last ( ) {
1349
- monitor_b. chain_monitor . channel_monitor_updated ( chan_2_funding, * id) . unwrap ( ) ;
1350
- }
1351
- nodes[ 1 ] . process_monitor_events ( ) ;
1352
- }
1356
+ 0xf8 => complete_monitor_update( & monitor_b, & chan_2_funding, & complete_first) ,
1357
+ 0xf9 => complete_monitor_update( & monitor_b, & chan_2_funding, & complete_second) ,
1358
+ 0xfa => complete_monitor_update( & monitor_b, & chan_2_funding, & Vec :: pop) ,
1353
1359
1354
- 0xfc => {
1355
- let pending_updates = monitor_c. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_2_funding) . unwrap ( ) ;
1356
- if let Some ( id) = pending_updates. get ( 0 ) {
1357
- monitor_c. chain_monitor . channel_monitor_updated ( chan_2_funding, * id) . unwrap ( ) ;
1358
- }
1359
- nodes[ 2 ] . process_monitor_events ( ) ;
1360
- }
1361
- 0xfd => {
1362
- let pending_updates = monitor_c. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_2_funding) . unwrap ( ) ;
1363
- if let Some ( id) = pending_updates. get ( 1 ) {
1364
- monitor_c. chain_monitor . channel_monitor_updated ( chan_2_funding, * id) . unwrap ( ) ;
1365
- }
1366
- nodes[ 2 ] . process_monitor_events ( ) ;
1367
- }
1368
- 0xfe => {
1369
- let pending_updates = monitor_c. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_2_funding) . unwrap ( ) ;
1370
- if let Some ( id) = pending_updates. last ( ) {
1371
- monitor_c. chain_monitor . channel_monitor_updated ( chan_2_funding, * id) . unwrap ( ) ;
1372
- }
1373
- nodes[ 2 ] . process_monitor_events ( ) ;
1374
- }
1360
+ 0xfc => complete_monitor_update( & monitor_c, & chan_2_funding, & complete_first) ,
1361
+ 0xfd => complete_monitor_update( & monitor_c, & chan_2_funding, & complete_second) ,
1362
+ 0xfe => complete_monitor_update( & monitor_c, & chan_2_funding, & Vec :: pop) ,
1375
1363
1376
1364
0xff => {
1377
1365
// Test that no channel is in a stuck state where neither party can send funds even
1378
1366
// after we resolve all pending events.
1379
- // First make sure there are no pending monitor updates, resetting the error state
1380
- // and calling force_channel_monitor_updated for each monitor .
1367
+ // First make sure there are no pending monitor updates and further update
1368
+ // operations complete .
1381
1369
* monitor_a. persister. update_ret. lock( ) . unwrap( ) = ChannelMonitorUpdateStatus :: Completed ;
1382
1370
* monitor_b. persister. update_ret. lock( ) . unwrap( ) = ChannelMonitorUpdateStatus :: Completed ;
1383
1371
* monitor_c. persister. update_ret. lock( ) . unwrap( ) = ChannelMonitorUpdateStatus :: Completed ;
1384
1372
1385
- if let Some ( ( id, _) ) = monitor_a. latest_monitors . lock ( ) . unwrap ( ) . get ( & chan_1_funding) {
1386
- monitor_a. chain_monitor . force_channel_monitor_updated ( chan_1_funding, * id) ;
1387
- nodes[ 0 ] . process_monitor_events ( ) ;
1388
- }
1389
- if let Some ( ( id, _) ) = monitor_b. latest_monitors . lock ( ) . unwrap ( ) . get ( & chan_1_funding) {
1390
- monitor_b. chain_monitor . force_channel_monitor_updated ( chan_1_funding, * id) ;
1391
- nodes[ 1 ] . process_monitor_events ( ) ;
1392
- }
1393
- if let Some ( ( id, _) ) = monitor_b. latest_monitors . lock ( ) . unwrap ( ) . get ( & chan_2_funding) {
1394
- monitor_b. chain_monitor . force_channel_monitor_updated ( chan_2_funding, * id) ;
1395
- nodes[ 1 ] . process_monitor_events ( ) ;
1396
- }
1397
- if let Some ( ( id, _) ) = monitor_c. latest_monitors . lock ( ) . unwrap ( ) . get ( & chan_2_funding) {
1398
- monitor_c. chain_monitor . force_channel_monitor_updated ( chan_2_funding, * id) ;
1399
- nodes[ 2 ] . process_monitor_events ( ) ;
1400
- }
1373
+ complete_all_monitor_updates( & monitor_a, & chan_1_funding) ;
1374
+ complete_all_monitor_updates( & monitor_b, & chan_1_funding) ;
1375
+ complete_all_monitor_updates( & monitor_b, & chan_2_funding) ;
1376
+ complete_all_monitor_updates( & monitor_c, & chan_2_funding) ;
1401
1377
1402
1378
// Next, make sure peers are all connected to each other
1403
1379
if chan_a_disconnected {
0 commit comments