Skip to content

Fuzz reloading with a stale monitor in chanmon_consistency #3113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 116 additions & 140 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,29 @@ impl Writer for VecWriter {
}
}

/// The LDK API requires that any time we tell it we're done persisting a `ChannelMonitor[Update]`
/// we never pass it in as the "latest" `ChannelMonitor` on startup. However, we can pass
/// out-of-date monitors as long as we never told LDK we finished persisting them, which we do by
/// storing both old `ChannelMonitor`s and ones that are "being persisted" here.
///
/// Note that such "being persisted" `ChannelMonitor`s are stored in `ChannelManager` and will
/// simply be replayed on startup.
struct LatestMonitorState {
/// The latest monitor id which we told LDK we've persisted
persisted_monitor_id: u64,
/// The latest serialized `ChannelMonitor` that we told LDK we persisted.
persisted_monitor: Vec<u8>,
/// A set of (monitor id, serialized `ChannelMonitor`)s which we're currently "persisting",
/// from LDK's perspective.
pending_monitors: Vec<(u64, Vec<u8>)>,
}

struct TestChainMonitor {
pub logger: Arc<dyn Logger>,
pub keys: Arc<KeyProvider>,
pub persister: Arc<TestPersister>,
pub chain_monitor: Arc<chainmonitor::ChainMonitor<TestChannelSigner, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<TestPersister>>>,
// If we reload a node with an old copy of ChannelMonitors, the ChannelManager deserialization
// logic will automatically force-close our channels for us (as we don't have an up-to-date
// monitor implying we are not able to punish misbehaving counterparties). Because this test
// "fails" if we ever force-close a channel, we avoid doing so, always saving the latest
// fully-serialized monitor state here, as well as the corresponding update_id.
pub latest_monitors: Mutex<HashMap<OutPoint, (u64, Vec<u8>)>>,
pub latest_monitors: Mutex<HashMap<OutPoint, LatestMonitorState>>,
}
impl TestChainMonitor {
pub fn new(broadcaster: Arc<TestBroadcaster>, logger: Arc<dyn Logger>, feeest: Arc<FuzzEstimator>, persister: Arc<TestPersister>, keys: Arc<KeyProvider>) -> Self {
Expand All @@ -169,22 +181,47 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
fn watch_channel(&self, funding_txo: OutPoint, monitor: channelmonitor::ChannelMonitor<TestChannelSigner>) -> Result<chain::ChannelMonitorUpdateStatus, ()> {
let mut ser = VecWriter(Vec::new());
monitor.write(&mut ser).unwrap();
if let Some(_) = self.latest_monitors.lock().unwrap().insert(funding_txo, (monitor.get_latest_update_id(), ser.0)) {
let monitor_id = monitor.get_latest_update_id();
let res = self.chain_monitor.watch_channel(funding_txo, monitor);
let state = match res {
Ok(chain::ChannelMonitorUpdateStatus::Completed) => {
LatestMonitorState {
persisted_monitor_id: monitor_id, persisted_monitor: ser.0,
pending_monitors: Vec::new(),
}
},
Ok(chain::ChannelMonitorUpdateStatus::InProgress) =>
panic!("The test currently doesn't test initial-persistence via the async pipeline"),
Ok(chain::ChannelMonitorUpdateStatus::UnrecoverableError) => panic!(),
Err(()) => panic!(),
};
if self.latest_monitors.lock().unwrap().insert(funding_txo, state).is_some() {
panic!("Already had monitor pre-watch_channel");
}
self.chain_monitor.watch_channel(funding_txo, monitor)
res
}

fn update_channel(&self, funding_txo: OutPoint, update: &channelmonitor::ChannelMonitorUpdate) -> chain::ChannelMonitorUpdateStatus {
let mut map_lock = self.latest_monitors.lock().unwrap();
let map_entry = map_lock.get_mut(&funding_txo).expect("Didn't have monitor on update call");
let latest_monitor_data = map_entry.pending_monitors.last().as_ref().map(|(_, data)| data).unwrap_or(&map_entry.persisted_monitor);
let deserialized_monitor = <(BlockHash, channelmonitor::ChannelMonitor<TestChannelSigner>)>::
read(&mut Cursor::new(&map_entry.1), (&*self.keys, &*self.keys)).unwrap().1;
read(&mut Cursor::new(&latest_monitor_data), (&*self.keys, &*self.keys)).unwrap().1;
deserialized_monitor.update_monitor(update, &&TestBroadcaster{}, &&FuzzEstimator { ret_val: atomic::AtomicU32::new(253) }, &self.logger).unwrap();
let mut ser = VecWriter(Vec::new());
deserialized_monitor.write(&mut ser).unwrap();
*map_entry = (update.update_id, ser.0);
self.chain_monitor.update_channel(funding_txo, update)
let res = self.chain_monitor.update_channel(funding_txo, update);
match res {
chain::ChannelMonitorUpdateStatus::Completed => {
map_entry.persisted_monitor_id = update.update_id;
map_entry.persisted_monitor = ser.0;
},
chain::ChannelMonitorUpdateStatus::InProgress => {
map_entry.pending_monitors.push((update.update_id, ser.0));
},
chain::ChannelMonitorUpdateStatus::UnrecoverableError => panic!(),
}
res
}

fn release_pending_monitor_events(&self) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, Option<PublicKey>)> {
Expand Down Expand Up @@ -511,9 +548,15 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {

let mut monitors = new_hash_map();
let mut old_monitors = $old_monitors.latest_monitors.lock().unwrap();
for (outpoint, (update_id, monitor_ser)) in old_monitors.drain() {
monitors.insert(outpoint, <(BlockHash, ChannelMonitor<TestChannelSigner>)>::read(&mut Cursor::new(&monitor_ser), (&*$keys_manager, &*$keys_manager)).expect("Failed to read monitor").1);
chain_monitor.latest_monitors.lock().unwrap().insert(outpoint, (update_id, monitor_ser));
for (outpoint, mut prev_state) in old_monitors.drain() {
monitors.insert(outpoint, <(BlockHash, ChannelMonitor<TestChannelSigner>)>::read(
&mut Cursor::new(&prev_state.persisted_monitor), (&*$keys_manager, &*$keys_manager)
).expect("Failed to read monitor").1);
// Wipe any `ChannelMonitor`s which we never told LDK we finished persisting,
// considering them discarded. LDK should replay these for us as they're stored in
// the `ChannelManager`.
prev_state.pending_monitors.clear();
chain_monitor.latest_monitors.lock().unwrap().insert(outpoint, prev_state);
}
let mut monitor_refs = new_hash_map();
for (outpoint, monitor) in monitors.iter_mut() {
Expand Down Expand Up @@ -1040,6 +1083,43 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
} }
}

let complete_first = |v: &mut Vec<_>| if !v.is_empty() { Some(v.remove(0)) } else { None };
let complete_second = |v: &mut Vec<_>| if v.len() > 1 { Some(v.remove(1)) } else { None };
let complete_monitor_update = |
monitor: &Arc<TestChainMonitor>, chan_funding,
compl_selector: &dyn Fn(&mut Vec<(u64, Vec<u8>)>) -> Option<(u64, Vec<u8>)>,
| {
if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) {
assert!(
state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),
"updates should be sorted by id"
);
if let Some((id, data)) = compl_selector(&mut state.pending_monitors) {
monitor.chain_monitor.channel_monitor_updated(*chan_funding, id).unwrap();
if id > state.persisted_monitor_id {
state.persisted_monitor_id = id;
state.persisted_monitor = data;
}
}
}
};

let complete_all_monitor_updates = |monitor: &Arc<TestChainMonitor>, chan_funding| {
if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) {
assert!(
state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),
"updates should be sorted by id"
);
for (id, data) in state.pending_monitors.drain(..) {
monitor.chain_monitor.channel_monitor_updated(*chan_funding, id).unwrap();
if id > state.persisted_monitor_id {
state.persisted_monitor_id = id;
state.persisted_monitor = data;
}
}
}
};

let v = get_slice!(1)[0];
out.locked_write(format!("READ A BYTE! HANDLING INPUT {:x}...........\n", v).as_bytes());
match v {
Expand All @@ -1054,30 +1134,10 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
0x05 => *monitor_b.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed,
0x06 => *monitor_c.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed,

0x08 => {
if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
monitor_a.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
nodes[0].process_monitor_events();
}
},
0x09 => {
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) {
monitor_b.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
nodes[1].process_monitor_events();
}
},
0x0a => {
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) {
monitor_b.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
nodes[1].process_monitor_events();
}
},
0x0b => {
if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) {
monitor_c.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
nodes[2].process_monitor_events();
}
},
0x08 => complete_all_monitor_updates(&monitor_a, &chan_1_funding),
0x09 => complete_all_monitor_updates(&monitor_b, &chan_1_funding),
0x0a => complete_all_monitor_updates(&monitor_b, &chan_2_funding),
0x0b => complete_all_monitor_updates(&monitor_c, &chan_2_funding),

0x0c => {
if !chan_a_disconnected {
Expand Down Expand Up @@ -1285,119 +1345,35 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
},
0x89 => { fee_est_c.ret_val.store(253, atomic::Ordering::Release); nodes[2].maybe_update_chan_fees(); },

0xf0 => {
let pending_updates = monitor_a.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
if let Some(id) = pending_updates.get(0) {
monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
}
nodes[0].process_monitor_events();
}
0xf1 => {
let pending_updates = monitor_a.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
if let Some(id) = pending_updates.get(1) {
monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
}
nodes[0].process_monitor_events();
}
0xf2 => {
let pending_updates = monitor_a.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
if let Some(id) = pending_updates.last() {
monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
}
nodes[0].process_monitor_events();
}
0xf0 => complete_monitor_update(&monitor_a, &chan_1_funding, &complete_first),
0xf1 => complete_monitor_update(&monitor_a, &chan_1_funding, &complete_second),
0xf2 => complete_monitor_update(&monitor_a, &chan_1_funding, &Vec::pop),

0xf4 => {
let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
if let Some(id) = pending_updates.get(0) {
monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
}
nodes[1].process_monitor_events();
}
0xf5 => {
let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
if let Some(id) = pending_updates.get(1) {
monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
}
nodes[1].process_monitor_events();
}
0xf6 => {
let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_1_funding).unwrap();
if let Some(id) = pending_updates.last() {
monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id).unwrap();
}
nodes[1].process_monitor_events();
}
0xf4 => complete_monitor_update(&monitor_b, &chan_1_funding, &complete_first),
0xf5 => complete_monitor_update(&monitor_b, &chan_1_funding, &complete_second),
0xf6 => complete_monitor_update(&monitor_b, &chan_1_funding, &Vec::pop),

0xf8 => {
let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
if let Some(id) = pending_updates.get(0) {
monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
}
nodes[1].process_monitor_events();
}
0xf9 => {
let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
if let Some(id) = pending_updates.get(1) {
monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
}
nodes[1].process_monitor_events();
}
0xfa => {
let pending_updates = monitor_b.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
if let Some(id) = pending_updates.last() {
monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
}
nodes[1].process_monitor_events();
}
0xf8 => complete_monitor_update(&monitor_b, &chan_2_funding, &complete_first),
0xf9 => complete_monitor_update(&monitor_b, &chan_2_funding, &complete_second),
0xfa => complete_monitor_update(&monitor_b, &chan_2_funding, &Vec::pop),

0xfc => {
let pending_updates = monitor_c.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
if let Some(id) = pending_updates.get(0) {
monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
}
nodes[2].process_monitor_events();
}
0xfd => {
let pending_updates = monitor_c.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
if let Some(id) = pending_updates.get(1) {
monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
}
nodes[2].process_monitor_events();
}
0xfe => {
let pending_updates = monitor_c.chain_monitor.list_pending_monitor_updates().remove(&chan_2_funding).unwrap();
if let Some(id) = pending_updates.last() {
monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id).unwrap();
}
nodes[2].process_monitor_events();
}
0xfc => complete_monitor_update(&monitor_c, &chan_2_funding, &complete_first),
0xfd => complete_monitor_update(&monitor_c, &chan_2_funding, &complete_second),
0xfe => complete_monitor_update(&monitor_c, &chan_2_funding, &Vec::pop),

0xff => {
// Test that no channel is in a stuck state where neither party can send funds even
// after we resolve all pending events.
// First make sure there are no pending monitor updates, resetting the error state
// and calling force_channel_monitor_updated for each monitor.
// First make sure there are no pending monitor updates and further update
// operations complete.
*monitor_a.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;
*monitor_b.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;
*monitor_c.persister.update_ret.lock().unwrap() = ChannelMonitorUpdateStatus::Completed;

if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
monitor_a.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
nodes[0].process_monitor_events();
}
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) {
monitor_b.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
nodes[1].process_monitor_events();
}
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) {
monitor_b.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
nodes[1].process_monitor_events();
}
if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) {
monitor_c.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
nodes[2].process_monitor_events();
}
complete_all_monitor_updates(&monitor_a, &chan_1_funding);
complete_all_monitor_updates(&monitor_b, &chan_1_funding);
complete_all_monitor_updates(&monitor_b, &chan_2_funding);
complete_all_monitor_updates(&monitor_c, &chan_2_funding);

// Next, make sure peers are all connected to each other
if chan_a_disconnected {
Expand Down
Loading
Loading