Skip to content

Commit 61aef9b

Browse files
committed
Fuzz reloading with a stale monitor in chanmon_consistency
Now that we are gearing up to support fully async monitor storage, we really need to fuzz monitor updates not completing before a reload, which we do here in the `chanmon_consistency` fuzzer. While there are more parts to async monitor updating that we need to fuzz, this at least gets us started by having basic async restart cases handled. In the future, we should extend this to make sure some basic properties (eg claim/balance consistency) remain true through `chanmon_consistency` runs.
1 parent 1d0c6c6 commit 61aef9b

File tree

1 file changed

+130
-138
lines changed

1 file changed

+130
-138
lines changed

fuzz/src/chanmon_consistency.rs

Lines changed: 130 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,12 @@ impl Writer for VecWriter {
142142
}
143143
}
144144

145+
struct LatestMonitorState {
146+
persisted_monitor_id: u64,
147+
persisted_monitor: Vec<u8>,
148+
pending_monitor_updates: Vec<(u64, Vec<u8>)>,
149+
}
150+
145151
struct TestChainMonitor {
146152
pub logger: Arc<dyn Logger>,
147153
pub keys: Arc<KeyProvider>,
@@ -152,7 +158,10 @@ struct TestChainMonitor {
152158
// monitor implying we are not able to punish misbehaving counterparties). Because this test
153159
// "fails" if we ever force-close a channel, we avoid doing so, always saving the latest
154160
// fully-serialized monitor state here, as well as the corresponding update_id.
155-
pub latest_monitors: Mutex<HashMap<OutPoint, (u64, Vec<u8>)>>,
161+
//
162+
// Note that this doesn't apply to monitors which are pending persistence, so we store the
163+
// latest pending monitor separately.
164+
pub latest_monitors: Mutex<HashMap<OutPoint, LatestMonitorState>>,
156165
}
157166
impl TestChainMonitor {
158167
pub fn new(broadcaster: Arc<TestBroadcaster>, logger: Arc<dyn Logger>, feeest: Arc<FuzzEstimator>, persister: Arc<TestPersister>, keys: Arc<KeyProvider>) -> Self {
@@ -169,22 +178,47 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
169178
fn watch_channel(&self, funding_txo: OutPoint, monitor: channelmonitor::ChannelMonitor<TestChannelSigner>) -> Result<chain::ChannelMonitorUpdateStatus, ()> {
170179
let mut ser = VecWriter(Vec::new());
171180
monitor.write(&mut ser).unwrap();
172-
if let Some(_) = self.latest_monitors.lock().unwrap().insert(funding_txo, (monitor.get_latest_update_id(), ser.0)) {
181+
let monitor_id = monitor.get_latest_update_id();
182+
let res = self.chain_monitor.watch_channel(funding_txo, monitor);
183+
let state = match res {
184+
Ok(chain::ChannelMonitorUpdateStatus::Completed) => {
185+
LatestMonitorState {
186+
persisted_monitor_id: monitor_id, persisted_monitor: ser.0,
187+
pending_monitor_updates: Vec::new(),
188+
}
189+
},
190+
Ok(chain::ChannelMonitorUpdateStatus::InProgress) =>
191+
panic!("The test currently doesn't test initial-persistence via the async pipeline"),
192+
Ok(chain::ChannelMonitorUpdateStatus::UnrecoverableError) => panic!(),
193+
Err(()) => panic!(),
194+
};
195+
if self.latest_monitors.lock().unwrap().insert(funding_txo, state).is_some() {
173196
panic!("Already had monitor pre-watch_channel");
174197
}
175-
self.chain_monitor.watch_channel(funding_txo, monitor)
198+
res
176199
}
177200

178201
fn update_channel(&self, funding_txo: OutPoint, update: &channelmonitor::ChannelMonitorUpdate) -> chain::ChannelMonitorUpdateStatus {
179202
let mut map_lock = self.latest_monitors.lock().unwrap();
180203
let map_entry = map_lock.get_mut(&funding_txo).expect("Didn't have monitor on update call");
204+
let latest_monitor_data = map_entry.pending_monitor_updates.last().as_ref().map(|(_, data)| data).unwrap_or(&map_entry.persisted_monitor);
181205
let deserialized_monitor = <(BlockHash, channelmonitor::ChannelMonitor<TestChannelSigner>)>::
182-
read(&mut Cursor::new(&map_entry.1), (&*self.keys, &*self.keys)).unwrap().1;
206+
read(&mut Cursor::new(&latest_monitor_data), (&*self.keys, &*self.keys)).unwrap().1;
183207
deserialized_monitor.update_monitor(update, &&TestBroadcaster{}, &&FuzzEstimator { ret_val: atomic::AtomicU32::new(253) }, &self.logger).unwrap();
184208
let mut ser = VecWriter(Vec::new());
185209
deserialized_monitor.write(&mut ser).unwrap();
186-
*map_entry = (update.update_id, ser.0);
187-
self.chain_monitor.update_channel(funding_txo, update)
210+
let res = self.chain_monitor.update_channel(funding_txo, update);
211+
match res {
212+
chain::ChannelMonitorUpdateStatus::Completed => {
213+
map_entry.persisted_monitor_id = update.update_id;
214+
map_entry.persisted_monitor = ser.0;
215+
},
216+
chain::ChannelMonitorUpdateStatus::InProgress => {
217+
map_entry.pending_monitor_updates.push((update.update_id, ser.0));
218+
},
219+
chain::ChannelMonitorUpdateStatus::UnrecoverableError => panic!(),
220+
}
221+
res
188222
}
189223

190224
fn release_pending_monitor_events(&self) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, Option<PublicKey>)> {
@@ -511,9 +545,12 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
511545

512546
let mut monitors = new_hash_map();
513547
let mut old_monitors = $old_monitors.latest_monitors.lock().unwrap();
514-
for (outpoint, (update_id, monitor_ser)) in old_monitors.drain() {
515-
monitors.insert(outpoint, <(BlockHash, ChannelMonitor<TestChannelSigner>)>::read(&mut Cursor::new(&monitor_ser), (&*$keys_manager, &*$keys_manager)).expect("Failed to read monitor").1);
516-
chain_monitor.latest_monitors.lock().unwrap().insert(outpoint, (update_id, monitor_ser));
548+
for (outpoint, mut prev_state) in old_monitors.drain() {
549+
monitors.insert(outpoint, <(BlockHash, ChannelMonitor<TestChannelSigner>)>::read(
550+
&mut Cursor::new(&prev_state.persisted_monitor), (&*$keys_manager, &*$keys_manager)
551+
).expect("Failed to read monitor").1);
552+
prev_state.pending_monitor_updates.clear();
553+
chain_monitor.latest_monitors.lock().unwrap().insert(outpoint, prev_state);
517554
}
518555
let mut monitor_refs = new_hash_map();
519556
for (outpoint, monitor) in monitors.iter_mut() {
@@ -1040,6 +1077,41 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
10401077
} }
10411078
}
10421079

1080+
let complete_monitor_update = |
1081+
monitor: &Arc<TestChainMonitor>, chan_funding,
1082+
compl_selector: &dyn Fn(&mut Vec<(u64, Vec<u8>)>) -> Option<(u64, Vec<u8>)>,
1083+
| {
1084+
if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) {
1085+
assert!(
1086+
state.pending_monitor_updates.windows(2).all(|pair| pair[0].0 < pair[1].0),
1087+
"updates should be sorted by id"
1088+
);
1089+
if let Some((id, data)) = compl_selector(&mut state.pending_monitor_updates) {
1090+
monitor.chain_monitor.channel_monitor_updated(*chan_funding, id).unwrap();
1091+
if id > state.persisted_monitor_id {
1092+
state.persisted_monitor_id = id;
1093+
state.persisted_monitor = data;
1094+
}
1095+
}
1096+
}
1097+
};
1098+
1099+
let complete_all_monitor_updates = |monitor: &Arc<TestChainMonitor>, chan_funding| {
1100+
if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) {
1101+
assert!(
1102+
state.pending_monitor_updates.windows(2).all(|pair| pair[0].0 < pair[1].0),
1103+
"updates should be sorted by id"
1104+
);
1105+
for (id, data) in state.pending_monitor_updates.drain(..) {
1106+
monitor.chain_monitor.channel_monitor_updated(*chan_funding, id).unwrap();
1107+
if id > state.persisted_monitor_id {
1108+
state.persisted_monitor_id = id;
1109+
state.persisted_monitor = data;
1110+
}
1111+
}
1112+
}
1113+
};
1114+
10431115
let v = get_slice!(1)[0];
10441116
out.locked_write(format!("READ A BYTE! HANDLING INPUT {:x}...........\n", v).as_bytes());
10451117
match v {
@@ -1054,30 +1126,10 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
10541126
0x05 => *monitor_b.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed,
10551127
0x06 => *monitor_c.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed,
10561128

1057-
0x08 => {
1058-
if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
1059-
monitor_a.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
1060-
nodes[0].process_monitor_events();
1061-
}
1062-
},
1063-
0x09 => {
1064-
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) {
1065-
monitor_b.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
1066-
nodes[1].process_monitor_events();
1067-
}
1068-
},
1069-
0x0a => {
1070-
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) {
1071-
monitor_b.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
1072-
nodes[1].process_monitor_events();
1073-
}
1074-
},
1075-
0x0b => {
1076-
if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) {
1077-
monitor_c.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
1078-
nodes[2].process_monitor_events();
1079-
}
1080-
},
1129+
0x08 => complete_all_monitor_updates(&monitor_a, &chan_1_funding),
1130+
0x09 => complete_all_monitor_updates(&monitor_b, &chan_1_funding),
1131+
0x0a => complete_all_monitor_updates(&monitor_b, &chan_2_funding),
1132+
0x0b => complete_all_monitor_updates(&monitor_c, &chan_2_funding),
10811133

10821134
0x0c => {
10831135
if !chan_a_disconnected {
@@ -1285,119 +1337,59 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
12851337
},
12861338
0x89 => { fee_est_c.ret_val.store(253, atomic::Ordering::Release); nodes[2].maybe_update_chan_fees(); },
12871339

1288-
0xf0 => {
1289-
let pending_updates = monitor_a.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
1290-
if let Some(id) = pending_updates.get(0) {
1291-
monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
1292-
}
1293-
nodes[0].process_monitor_events();
1294-
}
1295-
0xf1 => {
1296-
let pending_updates = monitor_a.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
1297-
if let Some(id) = pending_updates.get(1) {
1298-
monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
1299-
}
1300-
nodes[0].process_monitor_events();
1301-
}
1302-
0xf2 => {
1303-
let pending_updates = monitor_a.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
1304-
if let Some(id) = pending_updates.last() {
1305-
monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
1306-
}
1307-
nodes[0].process_monitor_events();
1308-
}
1309-
1310-
0xf4 => {
1311-
let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
1312-
if let Some(id) = pending_updates.get(0) {
1313-
monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
1314-
}
1315-
nodes[1].process_monitor_events();
1316-
}
1317-
0xf5 => {
1318-
let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
1319-
if let Some(id) = pending_updates.get(1) {
1320-
monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
1321-
}
1322-
nodes[1].process_monitor_events();
1323-
}
1324-
0xf6 => {
1325-
let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
1326-
if let Some(id) = pending_updates.last() {
1327-
monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
1328-
}
1329-
nodes[1].process_monitor_events();
1330-
}
1331-
1332-
0xf8 => {
1333-
let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
1334-
if let Some(id) = pending_updates.get(0) {
1335-
monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
1336-
}
1337-
nodes[1].process_monitor_events();
1338-
}
1339-
0xf9 => {
1340-
let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
1341-
if let Some(id) = pending_updates.get(1) {
1342-
monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
1343-
}
1344-
nodes[1].process_monitor_events();
1345-
}
1346-
0xfa => {
1347-
let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
1348-
if let Some(id) = pending_updates.last() {
1349-
monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
1350-
}
1351-
nodes[1].process_monitor_events();
1352-
}
1353-
1354-
0xfc => {
1355-
let pending_updates = monitor_c.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
1356-
if let Some(id) = pending_updates.get(0) {
1357-
monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
1358-
}
1359-
nodes[2].process_monitor_events();
1360-
}
1361-
0xfd => {
1362-
let pending_updates = monitor_c.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
1363-
if let Some(id) = pending_updates.get(1) {
1364-
monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
1365-
}
1366-
nodes[2].process_monitor_events();
1367-
}
1368-
0xfe => {
1369-
let pending_updates = monitor_c.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
1370-
if let Some(id) = pending_updates.last() {
1371-
monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
1372-
}
1373-
nodes[2].process_monitor_events();
1374-
}
1340+
0xf0 =>
1341+
complete_monitor_update(&monitor_a, &chan_1_funding,
1342+
&|v: &mut Vec<_>| if !v.is_empty() { Some(v.remove(0)) } else { None }),
1343+
0xf1 =>
1344+
complete_monitor_update(&monitor_a, &chan_1_funding,
1345+
&|v: &mut Vec<_>| if v.len() > 1 { Some(v.remove(1)) } else { None }),
1346+
0xf2 =>
1347+
complete_monitor_update(&monitor_a, &chan_1_funding,
1348+
&|v: &mut Vec<_>| v.pop()),
1349+
1350+
0xf4 =>
1351+
complete_monitor_update(&monitor_b, &chan_1_funding,
1352+
&|v: &mut Vec<_>| if !v.is_empty() { Some(v.remove(0)) } else { None }),
1353+
0xf5 =>
1354+
complete_monitor_update(&monitor_b, &chan_1_funding,
1355+
&|v: &mut Vec<_>| if v.len() > 1 { Some(v.remove(1)) } else { None }),
1356+
0xf6 =>
1357+
complete_monitor_update(&monitor_b, &chan_1_funding,
1358+
&|v: &mut Vec<_>| v.pop()),
1359+
1360+
0xf8 =>
1361+
complete_monitor_update(&monitor_b, &chan_2_funding,
1362+
&|v: &mut Vec<_>| if !v.is_empty() { Some(v.remove(0)) } else { None }),
1363+
0xf9 =>
1364+
complete_monitor_update(&monitor_b, &chan_2_funding,
1365+
&|v: &mut Vec<_>| if v.len() > 1 { Some(v.remove(1)) } else { None }),
1366+
0xfa =>
1367+
complete_monitor_update(&monitor_b, &chan_2_funding,
1368+
&|v: &mut Vec<_>| v.pop()),
1369+
1370+
0xfc =>
1371+
complete_monitor_update(&monitor_c, &chan_2_funding,
1372+
&|v: &mut Vec<_>| if !v.is_empty() { Some(v.remove(0)) } else { None }),
1373+
0xfd =>
1374+
complete_monitor_update(&monitor_c, &chan_2_funding,
1375+
&|v: &mut Vec<_>| if v.len() > 1 { Some(v.remove(1)) } else { None }),
1376+
0xfe =>
1377+
complete_monitor_update(&monitor_c, &chan_2_funding,
1378+
&|v: &mut Vec<_>| v.pop()),
13751379

13761380
0xff => {
13771381
// Test that no channel is in a stuck state where neither party can send funds even
13781382
// after we resolve all pending events.
1379-
// First make sure there are no pending monitor updates, resetting the error state
1380-
// and calling force_channel_monitor_updated for each monitor.
1383+
// First make sure there are no pending monitor updates and further update
1384+
// operations complete.
13811385
*monitor_a.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;
13821386
*monitor_b.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;
13831387
*monitor_c.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;
13841388

1385-
if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
1386-
monitor_a.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
1387-
nodes[0].process_monitor_events();
1388-
}
1389-
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) {
1390-
monitor_b.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
1391-
nodes[1].process_monitor_events();
1392-
}
1393-
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) {
1394-
monitor_b.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
1395-
nodes[1].process_monitor_events();
1396-
}
1397-
if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) {
1398-
monitor_c.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
1399-
nodes[2].process_monitor_events();
1400-
}
1389+
complete_all_monitor_updates(&monitor_a, &chan_1_funding);
1390+
complete_all_monitor_updates(&monitor_b, &chan_1_funding);
1391+
complete_all_monitor_updates(&monitor_b, &chan_2_funding);
1392+
complete_all_monitor_updates(&monitor_c, &chan_2_funding);
14011393

14021394
// Next, make sure peers are all connected to each other
14031395
if chan_a_disconnected {

0 commit comments

Comments
 (0)