@@ -142,6 +142,12 @@ impl Writer for VecWriter {
142
142
}
143
143
}
144
144
145
+ struct LatestMonitorState {
146
+ persisted_monitor_id : u64 ,
147
+ persisted_monitor : Vec < u8 > ,
148
+ pending_monitor_updates : Vec < ( u64 , Vec < u8 > ) > ,
149
+ }
150
+
145
151
struct TestChainMonitor {
146
152
pub logger : Arc < dyn Logger > ,
147
153
pub keys : Arc < KeyProvider > ,
@@ -152,7 +158,10 @@ struct TestChainMonitor {
152
158
// monitor implying we are not able to punish misbehaving counterparties). Because this test
153
159
// "fails" if we ever force-close a channel, we avoid doing so, always saving the latest
154
160
// fully-serialized monitor state here, as well as the corresponding update_id.
155
- pub latest_monitors : Mutex < HashMap < OutPoint , ( u64 , Vec < u8 > ) > > ,
161
+ //
162
+ // Note that this doesn't apply to monitors which are pending persistence, so we store the
163
+ // latest pending monitor separately.
164
+ pub latest_monitors : Mutex < HashMap < OutPoint , LatestMonitorState > > ,
156
165
}
157
166
impl TestChainMonitor {
158
167
pub fn new ( broadcaster : Arc < TestBroadcaster > , logger : Arc < dyn Logger > , feeest : Arc < FuzzEstimator > , persister : Arc < TestPersister > , keys : Arc < KeyProvider > ) -> Self {
@@ -169,22 +178,48 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
169
178
fn watch_channel ( & self , funding_txo : OutPoint , monitor : channelmonitor:: ChannelMonitor < TestChannelSigner > ) -> Result < chain:: ChannelMonitorUpdateStatus , ( ) > {
170
179
let mut ser = VecWriter ( Vec :: new ( ) ) ;
171
180
monitor. write ( & mut ser) . unwrap ( ) ;
172
- if let Some ( _) = self . latest_monitors . lock ( ) . unwrap ( ) . insert ( funding_txo, ( monitor. get_latest_update_id ( ) , ser. 0 ) ) {
181
+ let monitor_id = monitor. get_latest_update_id ( ) ;
182
+ let res = self . chain_monitor . watch_channel ( funding_txo, monitor) ;
183
+ let state = match res {
184
+ Ok ( chain:: ChannelMonitorUpdateStatus :: Completed ) => {
185
+ LatestMonitorState {
186
+ persisted_monitor_id : monitor_id, persisted_monitor : ser. 0 ,
187
+ pending_monitor_updates : Vec :: new ( ) ,
188
+ }
189
+ } ,
190
+ Ok ( chain:: ChannelMonitorUpdateStatus :: InProgress ) =>
191
+ panic ! ( "The test currently doesn't test initial-persistence via the async pipeline" ) ,
192
+ Ok ( chain:: ChannelMonitorUpdateStatus :: UnrecoverableError ) => panic ! ( ) ,
193
+ Err ( ( ) ) => panic ! ( ) ,
194
+ } ;
195
+ if self . latest_monitors . lock ( ) . unwrap ( ) . insert ( funding_txo, state) . is_some ( ) {
173
196
panic ! ( "Already had monitor pre-watch_channel" ) ;
174
197
}
175
- self . chain_monitor . watch_channel ( funding_txo , monitor )
198
+ res
176
199
}
177
200
178
201
fn update_channel ( & self , funding_txo : OutPoint , update : & channelmonitor:: ChannelMonitorUpdate ) -> chain:: ChannelMonitorUpdateStatus {
179
202
let mut map_lock = self . latest_monitors . lock ( ) . unwrap ( ) ;
180
203
let map_entry = map_lock. get_mut ( & funding_txo) . expect ( "Didn't have monitor on update call" ) ;
204
+ let latest_monitor_data = map_entry. pending_monitor_updates . last ( ) . as_ref ( ) . map ( |( _, data) | data) . unwrap_or ( & map_entry. persisted_monitor ) ;
181
205
let deserialized_monitor = <( BlockHash , channelmonitor:: ChannelMonitor < TestChannelSigner > ) >::
182
- read ( & mut Cursor :: new ( & map_entry . 1 ) , ( & * self . keys , & * self . keys ) ) . unwrap ( ) . 1 ;
206
+ read ( & mut Cursor :: new ( & latest_monitor_data ) , ( & * self . keys , & * self . keys ) ) . unwrap ( ) . 1 ;
183
207
deserialized_monitor. update_monitor ( update, & & TestBroadcaster { } , & & FuzzEstimator { ret_val : atomic:: AtomicU32 :: new ( 253 ) } , & self . logger ) . unwrap ( ) ;
184
208
let mut ser = VecWriter ( Vec :: new ( ) ) ;
185
209
deserialized_monitor. write ( & mut ser) . unwrap ( ) ;
186
- * map_entry = ( update. update_id , ser. 0 ) ;
187
- self . chain_monitor . update_channel ( funding_txo, update)
210
+ let res = self . chain_monitor . update_channel ( funding_txo, update) ;
211
+ match res {
212
+ chain:: ChannelMonitorUpdateStatus :: Completed => {
213
+ map_entry. pending_monitor_updates . clear ( ) ;
214
+ map_entry. persisted_monitor_id = update. update_id ;
215
+ map_entry. persisted_monitor = ser. 0 ;
216
+ } ,
217
+ chain:: ChannelMonitorUpdateStatus :: InProgress => {
218
+ map_entry. pending_monitor_updates . push ( ( update. update_id , ser. 0 ) ) ;
219
+ } ,
220
+ chain:: ChannelMonitorUpdateStatus :: UnrecoverableError => panic ! ( ) ,
221
+ }
222
+ res
188
223
}
189
224
190
225
fn release_pending_monitor_events ( & self ) -> Vec < ( OutPoint , ChannelId , Vec < MonitorEvent > , Option < PublicKey > ) > {
@@ -511,9 +546,12 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
511
546
512
547
let mut monitors = new_hash_map( ) ;
513
548
let mut old_monitors = $old_monitors. latest_monitors. lock( ) . unwrap( ) ;
514
- for ( outpoint, ( update_id, monitor_ser) ) in old_monitors. drain( ) {
515
- monitors. insert( outpoint, <( BlockHash , ChannelMonitor <TestChannelSigner >) >:: read( & mut Cursor :: new( & monitor_ser) , ( & * $keys_manager, & * $keys_manager) ) . expect( "Failed to read monitor" ) . 1 ) ;
516
- chain_monitor. latest_monitors. lock( ) . unwrap( ) . insert( outpoint, ( update_id, monitor_ser) ) ;
549
+ for ( outpoint, mut prev_state) in old_monitors. drain( ) {
550
+ monitors. insert( outpoint, <( BlockHash , ChannelMonitor <TestChannelSigner >) >:: read(
551
+ & mut Cursor :: new( & prev_state. persisted_monitor) , ( & * $keys_manager, & * $keys_manager)
552
+ ) . expect( "Failed to read monitor" ) . 1 ) ;
553
+ prev_state. pending_monitor_updates. clear( ) ;
554
+ chain_monitor. latest_monitors. lock( ) . unwrap( ) . insert( outpoint, prev_state) ;
517
555
}
518
556
let mut monitor_refs = new_hash_map( ) ;
519
557
for ( outpoint, monitor) in monitors. iter_mut( ) {
@@ -1040,6 +1078,43 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
1040
1078
} }
1041
1079
}
1042
1080
1081
+ let complete_monitor_update = |
1082
+ node : & ChannelManager < _ , _ , _ , _ , _ , _ , _ , _ > , monitor : & Arc < TestChainMonitor > ,
1083
+ chan_funding, compl_selector : & dyn Fn ( & mut Vec < ( u64 , Vec < u8 > ) > ) -> Option < ( u64 , Vec < u8 > ) > ,
1084
+ | {
1085
+ if let Some ( state) = monitor. latest_monitors. lock( ) . unwrap( ) . get_mut( chan_funding) {
1086
+ assert ! (
1087
+ state. pending_monitor_updates. windows( 2 ) . all( |pair| pair[ 0 ] . 0 < pair[ 1 ] . 0 ) ,
1088
+ "updates should be sorted by id"
1089
+ ) ;
1090
+ if let Some ( ( id, data) ) = compl_selector( & mut state. pending_monitor_updates) {
1091
+ monitor. chain_monitor. channel_monitor_updated( * chan_funding, id) . unwrap( ) ;
1092
+ if id > state. persisted_monitor_id {
1093
+ state. persisted_monitor_id = id;
1094
+ state. persisted_monitor = data;
1095
+ }
1096
+ node. process_monitor_events( ) ;
1097
+ }
1098
+ }
1099
+ } ;
1100
+
1101
+ let complete_all_monitor_updates = |node: & ChannelManager < _ , _ , _ , _ , _ , _ , _ , _ > , monitor: & Arc < TestChainMonitor > , chan_funding| {
1102
+ if let Some ( state) = monitor. latest_monitors. lock( ) . unwrap( ) . get_mut( chan_funding) {
1103
+ assert ! (
1104
+ state. pending_monitor_updates. windows( 2 ) . all( |pair| pair[ 0 ] . 0 < pair[ 1 ] . 0 ) ,
1105
+ "updates should be sorted by id"
1106
+ ) ;
1107
+ for ( id, data) in state. pending_monitor_updates. drain( ..) {
1108
+ monitor. chain_monitor. channel_monitor_updated( * chan_funding, id) . unwrap( ) ;
1109
+ if id > state. persisted_monitor_id {
1110
+ state. persisted_monitor_id = id;
1111
+ state. persisted_monitor = data;
1112
+ }
1113
+ }
1114
+ node. process_monitor_events( ) ;
1115
+ }
1116
+ } ;
1117
+
1043
1118
let v = get_slice!( 1 ) [ 0 ] ;
1044
1119
out. locked_write( format!( "READ A BYTE! HANDLING INPUT {:x}...........\n " , v) . as_bytes( ) ) ;
1045
1120
match v {
@@ -1054,30 +1129,10 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
1054
1129
0x05 => * monitor_b. persister. update_ret. lock( ) . unwrap( ) = ChannelMonitorUpdateStatus :: Completed ,
1055
1130
0x06 => * monitor_c. persister. update_ret. lock( ) . unwrap( ) = ChannelMonitorUpdateStatus :: Completed ,
1056
1131
1057
- 0x08 => {
1058
- if let Some ( ( id, _) ) = monitor_a. latest_monitors . lock ( ) . unwrap ( ) . get ( & chan_1_funding) {
1059
- monitor_a. chain_monitor . force_channel_monitor_updated ( chan_1_funding, * id) ;
1060
- nodes[ 0 ] . process_monitor_events ( ) ;
1061
- }
1062
- } ,
1063
- 0x09 => {
1064
- if let Some ( ( id, _) ) = monitor_b. latest_monitors . lock ( ) . unwrap ( ) . get ( & chan_1_funding) {
1065
- monitor_b. chain_monitor . force_channel_monitor_updated ( chan_1_funding, * id) ;
1066
- nodes[ 1 ] . process_monitor_events ( ) ;
1067
- }
1068
- } ,
1069
- 0x0a => {
1070
- if let Some ( ( id, _) ) = monitor_b. latest_monitors . lock ( ) . unwrap ( ) . get ( & chan_2_funding) {
1071
- monitor_b. chain_monitor . force_channel_monitor_updated ( chan_2_funding, * id) ;
1072
- nodes[ 1 ] . process_monitor_events ( ) ;
1073
- }
1074
- } ,
1075
- 0x0b => {
1076
- if let Some ( ( id, _) ) = monitor_c. latest_monitors . lock ( ) . unwrap ( ) . get ( & chan_2_funding) {
1077
- monitor_c. chain_monitor . force_channel_monitor_updated ( chan_2_funding, * id) ;
1078
- nodes[ 2 ] . process_monitor_events ( ) ;
1079
- }
1080
- } ,
1132
+ 0x08 => complete_all_monitor_updates( & nodes[ 0 ] , & monitor_a, & chan_1_funding) ,
1133
+ 0x09 => complete_all_monitor_updates( & nodes[ 1 ] , & monitor_b, & chan_1_funding) ,
1134
+ 0x0a => complete_all_monitor_updates( & nodes[ 1 ] , & monitor_b, & chan_2_funding) ,
1135
+ 0x0b => complete_all_monitor_updates( & nodes[ 2 ] , & monitor_c, & chan_2_funding) ,
1081
1136
1082
1137
0x0c => {
1083
1138
if !chan_a_disconnected {
@@ -1285,119 +1340,59 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
1285
1340
} ,
1286
1341
0x89 => { fee_est_c. ret_val. store( 253 , atomic:: Ordering :: Release ) ; nodes[ 2 ] . maybe_update_chan_fees( ) ; } ,
1287
1342
1288
- 0xf0 => {
1289
- let pending_updates = monitor_a. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_1_funding) . unwrap ( ) ;
1290
- if let Some ( id) = pending_updates. get ( 0 ) {
1291
- monitor_a. chain_monitor . channel_monitor_updated ( chan_1_funding, * id) . unwrap ( ) ;
1292
- }
1293
- nodes[ 0 ] . process_monitor_events ( ) ;
1294
- }
1295
- 0xf1 => {
1296
- let pending_updates = monitor_a. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_1_funding) . unwrap ( ) ;
1297
- if let Some ( id) = pending_updates. get ( 1 ) {
1298
- monitor_a. chain_monitor . channel_monitor_updated ( chan_1_funding, * id) . unwrap ( ) ;
1299
- }
1300
- nodes[ 0 ] . process_monitor_events ( ) ;
1301
- }
1302
- 0xf2 => {
1303
- let pending_updates = monitor_a. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_1_funding) . unwrap ( ) ;
1304
- if let Some ( id) = pending_updates. last ( ) {
1305
- monitor_a. chain_monitor . channel_monitor_updated ( chan_1_funding, * id) . unwrap ( ) ;
1306
- }
1307
- nodes[ 0 ] . process_monitor_events ( ) ;
1308
- }
1309
-
1310
- 0xf4 => {
1311
- let pending_updates = monitor_b. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_1_funding) . unwrap ( ) ;
1312
- if let Some ( id) = pending_updates. get ( 0 ) {
1313
- monitor_b. chain_monitor . channel_monitor_updated ( chan_1_funding, * id) . unwrap ( ) ;
1314
- }
1315
- nodes[ 1 ] . process_monitor_events ( ) ;
1316
- }
1317
- 0xf5 => {
1318
- let pending_updates = monitor_b. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_1_funding) . unwrap ( ) ;
1319
- if let Some ( id) = pending_updates. get ( 1 ) {
1320
- monitor_b. chain_monitor . channel_monitor_updated ( chan_1_funding, * id) . unwrap ( ) ;
1321
- }
1322
- nodes[ 1 ] . process_monitor_events ( ) ;
1323
- }
1324
- 0xf6 => {
1325
- let pending_updates = monitor_b. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_1_funding) . unwrap ( ) ;
1326
- if let Some ( id) = pending_updates. last ( ) {
1327
- monitor_b. chain_monitor . channel_monitor_updated ( chan_1_funding, * id) . unwrap ( ) ;
1328
- }
1329
- nodes[ 1 ] . process_monitor_events ( ) ;
1330
- }
1331
-
1332
- 0xf8 => {
1333
- let pending_updates = monitor_b. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_2_funding) . unwrap ( ) ;
1334
- if let Some ( id) = pending_updates. get ( 0 ) {
1335
- monitor_b. chain_monitor . channel_monitor_updated ( chan_2_funding, * id) . unwrap ( ) ;
1336
- }
1337
- nodes[ 1 ] . process_monitor_events ( ) ;
1338
- }
1339
- 0xf9 => {
1340
- let pending_updates = monitor_b. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_2_funding) . unwrap ( ) ;
1341
- if let Some ( id) = pending_updates. get ( 1 ) {
1342
- monitor_b. chain_monitor . channel_monitor_updated ( chan_2_funding, * id) . unwrap ( ) ;
1343
- }
1344
- nodes[ 1 ] . process_monitor_events ( ) ;
1345
- }
1346
- 0xfa => {
1347
- let pending_updates = monitor_b. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_2_funding) . unwrap ( ) ;
1348
- if let Some ( id) = pending_updates. last ( ) {
1349
- monitor_b. chain_monitor . channel_monitor_updated ( chan_2_funding, * id) . unwrap ( ) ;
1350
- }
1351
- nodes[ 1 ] . process_monitor_events ( ) ;
1352
- }
1353
-
1354
- 0xfc => {
1355
- let pending_updates = monitor_c. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_2_funding) . unwrap ( ) ;
1356
- if let Some ( id) = pending_updates. get ( 0 ) {
1357
- monitor_c. chain_monitor . channel_monitor_updated ( chan_2_funding, * id) . unwrap ( ) ;
1358
- }
1359
- nodes[ 2 ] . process_monitor_events ( ) ;
1360
- }
1361
- 0xfd => {
1362
- let pending_updates = monitor_c. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_2_funding) . unwrap ( ) ;
1363
- if let Some ( id) = pending_updates. get ( 1 ) {
1364
- monitor_c. chain_monitor . channel_monitor_updated ( chan_2_funding, * id) . unwrap ( ) ;
1365
- }
1366
- nodes[ 2 ] . process_monitor_events ( ) ;
1367
- }
1368
- 0xfe => {
1369
- let pending_updates = monitor_c. chain_monitor . list_pending_monitor_updates ( ) . remove ( & chan_2_funding) . unwrap ( ) ;
1370
- if let Some ( id) = pending_updates. last ( ) {
1371
- monitor_c. chain_monitor . channel_monitor_updated ( chan_2_funding, * id) . unwrap ( ) ;
1372
- }
1373
- nodes[ 2 ] . process_monitor_events ( ) ;
1374
- }
1343
+ 0xf0 =>
1344
+ complete_monitor_update( & nodes[ 0 ] , & monitor_a, & chan_1_funding,
1345
+ & |v: & mut Vec <_>| if !v. is_empty( ) { Some ( v. remove( 0 ) ) } else { None } ) ,
1346
+ 0xf1 =>
1347
+ complete_monitor_update( & nodes[ 0 ] , & monitor_a, & chan_1_funding,
1348
+ & |v: & mut Vec <_>| if v. len( ) > 1 { Some ( v. remove( 1 ) ) } else { None } ) ,
1349
+ 0xf2 =>
1350
+ complete_monitor_update( & nodes[ 0 ] , & monitor_a, & chan_1_funding,
1351
+ & |v: & mut Vec <_>| v. pop( ) ) ,
1352
+
1353
+ 0xf4 =>
1354
+ complete_monitor_update( & nodes[ 1 ] , & monitor_b, & chan_1_funding,
1355
+ & |v: & mut Vec <_>| if !v. is_empty( ) { Some ( v. remove( 0 ) ) } else { None } ) ,
1356
+ 0xf5 =>
1357
+ complete_monitor_update( & nodes[ 1 ] , & monitor_b, & chan_1_funding,
1358
+ & |v: & mut Vec <_>| if v. len( ) > 1 { Some ( v. remove( 1 ) ) } else { None } ) ,
1359
+ 0xf6 =>
1360
+ complete_monitor_update( & nodes[ 1 ] , & monitor_b, & chan_1_funding,
1361
+ & |v: & mut Vec <_>| v. pop( ) ) ,
1362
+
1363
+ 0xf8 =>
1364
+ complete_monitor_update( & nodes[ 1 ] , & monitor_b, & chan_2_funding,
1365
+ & |v: & mut Vec <_>| if !v. is_empty( ) { Some ( v. remove( 0 ) ) } else { None } ) ,
1366
+ 0xf9 =>
1367
+ complete_monitor_update( & nodes[ 1 ] , & monitor_b, & chan_2_funding,
1368
+ & |v: & mut Vec <_>| if v. len( ) > 1 { Some ( v. remove( 1 ) ) } else { None } ) ,
1369
+ 0xfa =>
1370
+ complete_monitor_update( & nodes[ 1 ] , & monitor_b, & chan_2_funding,
1371
+ & |v: & mut Vec <_>| v. pop( ) ) ,
1372
+
1373
+ 0xfc =>
1374
+ complete_monitor_update( & nodes[ 2 ] , & monitor_c, & chan_2_funding,
1375
+ & |v: & mut Vec <_>| if !v. is_empty( ) { Some ( v. remove( 0 ) ) } else { None } ) ,
1376
+ 0xfd =>
1377
+ complete_monitor_update( & nodes[ 2 ] , & monitor_c, & chan_2_funding,
1378
+ & |v: & mut Vec <_>| if v. len( ) > 1 { Some ( v. remove( 1 ) ) } else { None } ) ,
1379
+ 0xfe =>
1380
+ complete_monitor_update( & nodes[ 2 ] , & monitor_c, & chan_2_funding,
1381
+ & |v: & mut Vec <_>| v. pop( ) ) ,
1375
1382
1376
1383
0xff => {
1377
1384
// Test that no channel is in a stuck state where neither party can send funds even
1378
1385
// after we resolve all pending events.
1379
- // First make sure there are no pending monitor updates, resetting the error state
1380
- // and calling force_channel_monitor_updated for each monitor .
1386
+ // First make sure there are no pending monitor updates and further update
1387
+ // operations complete .
1381
1388
* monitor_a. persister. update_ret. lock( ) . unwrap( ) = ChannelMonitorUpdateStatus :: Completed ;
1382
1389
* monitor_b. persister. update_ret. lock( ) . unwrap( ) = ChannelMonitorUpdateStatus :: Completed ;
1383
1390
* monitor_c. persister. update_ret. lock( ) . unwrap( ) = ChannelMonitorUpdateStatus :: Completed ;
1384
1391
1385
- if let Some ( ( id, _) ) = monitor_a. latest_monitors . lock ( ) . unwrap ( ) . get ( & chan_1_funding) {
1386
- monitor_a. chain_monitor . force_channel_monitor_updated ( chan_1_funding, * id) ;
1387
- nodes[ 0 ] . process_monitor_events ( ) ;
1388
- }
1389
- if let Some ( ( id, _) ) = monitor_b. latest_monitors . lock ( ) . unwrap ( ) . get ( & chan_1_funding) {
1390
- monitor_b. chain_monitor . force_channel_monitor_updated ( chan_1_funding, * id) ;
1391
- nodes[ 1 ] . process_monitor_events ( ) ;
1392
- }
1393
- if let Some ( ( id, _) ) = monitor_b. latest_monitors . lock ( ) . unwrap ( ) . get ( & chan_2_funding) {
1394
- monitor_b. chain_monitor . force_channel_monitor_updated ( chan_2_funding, * id) ;
1395
- nodes[ 1 ] . process_monitor_events ( ) ;
1396
- }
1397
- if let Some ( ( id, _) ) = monitor_c. latest_monitors . lock ( ) . unwrap ( ) . get ( & chan_2_funding) {
1398
- monitor_c. chain_monitor . force_channel_monitor_updated ( chan_2_funding, * id) ;
1399
- nodes[ 2 ] . process_monitor_events ( ) ;
1400
- }
1392
+ complete_all_monitor_updates( & nodes[ 0 ] , & monitor_a, & chan_1_funding) ;
1393
+ complete_all_monitor_updates( & nodes[ 1 ] , & monitor_b, & chan_1_funding) ;
1394
+ complete_all_monitor_updates( & nodes[ 1 ] , & monitor_b, & chan_2_funding) ;
1395
+ complete_all_monitor_updates( & nodes[ 2 ] , & monitor_c, & chan_2_funding) ;
1401
1396
1402
1397
// Next, make sure peers are all connected to each other
1403
1398
if chan_a_disconnected {
0 commit comments