Skip to content

Commit

Permalink
Merge pull request #3554 from wpaulino/channel-monitor-by-channel-id
Browse files Browse the repository at this point in the history
Start tracking ChannelMonitors by channel ID in ChainMonitor and ChannelManager
  • Loading branch information
wpaulino authored Jan 31, 2025
2 parents 1434e9c + 717db82 commit 9b66e2c
Show file tree
Hide file tree
Showing 16 changed files with 499 additions and 469 deletions.
92 changes: 48 additions & 44 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ struct TestChainMonitor {
Arc<TestPersister>,
>,
>,
pub latest_monitors: Mutex<HashMap<OutPoint, LatestMonitorState>>,
pub latest_monitors: Mutex<HashMap<ChannelId, LatestMonitorState>>,
}
impl TestChainMonitor {
pub fn new(
Expand All @@ -213,12 +213,12 @@ impl TestChainMonitor {
}
impl chain::Watch<TestChannelSigner> for TestChainMonitor {
fn watch_channel(
&self, funding_txo: OutPoint, monitor: channelmonitor::ChannelMonitor<TestChannelSigner>,
&self, channel_id: ChannelId, monitor: channelmonitor::ChannelMonitor<TestChannelSigner>,
) -> Result<chain::ChannelMonitorUpdateStatus, ()> {
let mut ser = VecWriter(Vec::new());
monitor.write(&mut ser).unwrap();
let monitor_id = monitor.get_latest_update_id();
let res = self.chain_monitor.watch_channel(funding_txo, monitor);
let res = self.chain_monitor.watch_channel(channel_id, monitor);
let state = match res {
Ok(chain::ChannelMonitorUpdateStatus::Completed) => LatestMonitorState {
persisted_monitor_id: monitor_id,
Expand All @@ -231,17 +231,17 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
Ok(chain::ChannelMonitorUpdateStatus::UnrecoverableError) => panic!(),
Err(()) => panic!(),
};
if self.latest_monitors.lock().unwrap().insert(funding_txo, state).is_some() {
if self.latest_monitors.lock().unwrap().insert(channel_id, state).is_some() {
panic!("Already had monitor pre-watch_channel");
}
res
}

fn update_channel(
&self, funding_txo: OutPoint, update: &channelmonitor::ChannelMonitorUpdate,
&self, channel_id: ChannelId, update: &channelmonitor::ChannelMonitorUpdate,
) -> chain::ChannelMonitorUpdateStatus {
let mut map_lock = self.latest_monitors.lock().unwrap();
let map_entry = map_lock.get_mut(&funding_txo).expect("Didn't have monitor on update call");
let map_entry = map_lock.get_mut(&channel_id).expect("Didn't have monitor on update call");
let latest_monitor_data = map_entry
.pending_monitors
.last()
Expand All @@ -265,7 +265,7 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
.unwrap();
let mut ser = VecWriter(Vec::new());
deserialized_monitor.write(&mut ser).unwrap();
let res = self.chain_monitor.update_channel(funding_txo, update);
let res = self.chain_monitor.update_channel(channel_id, update);
match res {
chain::ChannelMonitorUpdateStatus::Completed => {
map_entry.persisted_monitor_id = update.update_id;
Expand Down Expand Up @@ -711,9 +711,9 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {

let mut monitors = new_hash_map();
let mut old_monitors = $old_monitors.latest_monitors.lock().unwrap();
for (outpoint, mut prev_state) in old_monitors.drain() {
for (channel_id, mut prev_state) in old_monitors.drain() {
monitors.insert(
outpoint,
channel_id,
<(BlockHash, ChannelMonitor<TestChannelSigner>)>::read(
&mut Cursor::new(&prev_state.persisted_monitor),
(&*$keys_manager, &*$keys_manager),
Expand All @@ -725,11 +725,11 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
// considering them discarded. LDK should replay these for us as they're stored in
// the `ChannelManager`.
prev_state.pending_monitors.clear();
chain_monitor.latest_monitors.lock().unwrap().insert(outpoint, prev_state);
chain_monitor.latest_monitors.lock().unwrap().insert(channel_id, prev_state);
}
let mut monitor_refs = new_hash_map();
for (outpoint, monitor) in monitors.iter() {
monitor_refs.insert(*outpoint, monitor);
for (channel_id, monitor) in monitors.iter() {
monitor_refs.insert(*channel_id, monitor);
}

let read_args = ChannelManagerReadArgs {
Expand All @@ -752,9 +752,9 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
.1,
chain_monitor.clone(),
);
for (funding_txo, mon) in monitors.drain() {
for (channel_id, mon) in monitors.drain() {
assert_eq!(
chain_monitor.chain_monitor.watch_channel(funding_txo, mon),
chain_monitor.chain_monitor.watch_channel(channel_id, mon),
Ok(ChannelMonitorUpdateStatus::Completed)
);
}
Expand Down Expand Up @@ -825,7 +825,6 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
};

$source.handle_accept_channel($dest.get_our_node_id(), &accept_channel);
let funding_output;
{
let mut events = $source.get_and_clear_pending_events();
assert_eq!(events.len(), 1);
Expand All @@ -845,7 +844,6 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
script_pubkey: output_script,
}],
};
funding_output = OutPoint { txid: tx.compute_txid(), index: 0 };
$source
.funding_transaction_generated(
temporary_channel_id,
Expand Down Expand Up @@ -890,13 +888,19 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
$source.handle_funding_signed($dest.get_our_node_id(), &funding_signed);
let events = $source.get_and_clear_pending_events();
assert_eq!(events.len(), 1);
if let events::Event::ChannelPending { ref counterparty_node_id, .. } = events[0] {
let channel_id = if let events::Event::ChannelPending {
ref counterparty_node_id,
ref channel_id,
..
} = events[0]
{
assert_eq!(counterparty_node_id, &$dest.get_our_node_id());
channel_id.clone()
} else {
panic!("Wrong event type");
}
};

funding_output
channel_id
}};
}

Expand Down Expand Up @@ -963,8 +967,8 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {

let mut nodes = [node_a, node_b, node_c];

let chan_1_funding = make_channel!(nodes[0], nodes[1], keys_manager_b, 0);
let chan_2_funding = make_channel!(nodes[1], nodes[2], keys_manager_c, 1);
let chan_1_id = make_channel!(nodes[0], nodes[1], keys_manager_b, 0);
let chan_2_id = make_channel!(nodes[1], nodes[2], keys_manager_c, 1);

for node in nodes.iter() {
confirm_txn!(node);
Expand Down Expand Up @@ -1363,14 +1367,14 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
}
};

let complete_all_monitor_updates = |monitor: &Arc<TestChainMonitor>, chan_funding| {
if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) {
let complete_all_monitor_updates = |monitor: &Arc<TestChainMonitor>, chan_id| {
if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_id) {
assert!(
state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),
"updates should be sorted by id"
);
for (id, data) in state.pending_monitors.drain(..) {
monitor.chain_monitor.channel_monitor_updated(*chan_funding, id).unwrap();
monitor.chain_monitor.channel_monitor_updated(*chan_id, id).unwrap();
if id > state.persisted_monitor_id {
state.persisted_monitor_id = id;
state.persisted_monitor = data;
Expand Down Expand Up @@ -1410,10 +1414,10 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
ChannelMonitorUpdateStatus::Completed
},

0x08 => complete_all_monitor_updates(&monitor_a, &chan_1_funding),
0x09 => complete_all_monitor_updates(&monitor_b, &chan_1_funding),
0x0a => complete_all_monitor_updates(&monitor_b, &chan_2_funding),
0x0b => complete_all_monitor_updates(&monitor_c, &chan_2_funding),
0x08 => complete_all_monitor_updates(&monitor_a, &chan_1_id),
0x09 => complete_all_monitor_updates(&monitor_b, &chan_1_id),
0x0a => complete_all_monitor_updates(&monitor_b, &chan_2_id),
0x0b => complete_all_monitor_updates(&monitor_c, &chan_2_id),

0x0c => {
if !chan_a_disconnected {
Expand Down Expand Up @@ -1683,21 +1687,21 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
nodes[2].maybe_update_chan_fees();
},

0xf0 => complete_monitor_update(&monitor_a, &chan_1_funding, &complete_first),
0xf1 => complete_monitor_update(&monitor_a, &chan_1_funding, &complete_second),
0xf2 => complete_monitor_update(&monitor_a, &chan_1_funding, &Vec::pop),
0xf0 => complete_monitor_update(&monitor_a, &chan_1_id, &complete_first),
0xf1 => complete_monitor_update(&monitor_a, &chan_1_id, &complete_second),
0xf2 => complete_monitor_update(&monitor_a, &chan_1_id, &Vec::pop),

0xf4 => complete_monitor_update(&monitor_b, &chan_1_funding, &complete_first),
0xf5 => complete_monitor_update(&monitor_b, &chan_1_funding, &complete_second),
0xf6 => complete_monitor_update(&monitor_b, &chan_1_funding, &Vec::pop),
0xf4 => complete_monitor_update(&monitor_b, &chan_1_id, &complete_first),
0xf5 => complete_monitor_update(&monitor_b, &chan_1_id, &complete_second),
0xf6 => complete_monitor_update(&monitor_b, &chan_1_id, &Vec::pop),

0xf8 => complete_monitor_update(&monitor_b, &chan_2_funding, &complete_first),
0xf9 => complete_monitor_update(&monitor_b, &chan_2_funding, &complete_second),
0xfa => complete_monitor_update(&monitor_b, &chan_2_funding, &Vec::pop),
0xf8 => complete_monitor_update(&monitor_b, &chan_2_id, &complete_first),
0xf9 => complete_monitor_update(&monitor_b, &chan_2_id, &complete_second),
0xfa => complete_monitor_update(&monitor_b, &chan_2_id, &Vec::pop),

0xfc => complete_monitor_update(&monitor_c, &chan_2_funding, &complete_first),
0xfd => complete_monitor_update(&monitor_c, &chan_2_funding, &complete_second),
0xfe => complete_monitor_update(&monitor_c, &chan_2_funding, &Vec::pop),
0xfc => complete_monitor_update(&monitor_c, &chan_2_id, &complete_first),
0xfd => complete_monitor_update(&monitor_c, &chan_2_id, &complete_second),
0xfe => complete_monitor_update(&monitor_c, &chan_2_id, &Vec::pop),

0xff => {
// Test that no channel is in a stuck state where neither party can send funds even
Expand All @@ -1711,10 +1715,10 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
*monitor_c.persister.update_ret.lock().unwrap() =
ChannelMonitorUpdateStatus::Completed;

complete_all_monitor_updates(&monitor_a, &chan_1_funding);
complete_all_monitor_updates(&monitor_b, &chan_1_funding);
complete_all_monitor_updates(&monitor_b, &chan_2_funding);
complete_all_monitor_updates(&monitor_c, &chan_2_funding);
complete_all_monitor_updates(&monitor_a, &chan_1_id);
complete_all_monitor_updates(&monitor_b, &chan_1_id);
complete_all_monitor_updates(&monitor_b, &chan_2_id);
complete_all_monitor_updates(&monitor_c, &chan_2_id);

// Next, make sure peers are all connected to each other
if chan_a_disconnected {
Expand Down
2 changes: 1 addition & 1 deletion lightning-block-sync/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ where
///
/// // Allow the chain monitor to watch any channels.
/// let monitor = monitor_listener.0;
/// chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor);
/// chain_monitor.watch_channel(monitor.channel_id(), monitor);
///
/// // Create an SPV client to notify the chain monitor and channel manager of block events.
/// let chain_poller = poll::ChainPoller::new(block_source, Network::Bitcoin);
Expand Down
Loading

0 comments on commit 9b66e2c

Please sign in to comment.