Skip to content

Commit f91c4fe

Browse files
committed
Remove Completed from Watch trait, making updates always-async
The Watch trait previously shared ChannelMonitorUpdateStatus (with Completed, InProgress, UnrecoverableError variants) with the Persist trait. This meant ChannelManager had to handle both synchronous completion and asynchronous completion, including a runtime check (monitor_update_type atomic) to ensure they weren't mixed. This commit makes the Watch trait always-async: watch_channel returns Result<(), ()> and update_channel returns (). ChainMonitor maps Persist::Completed onto an immediately-queued MonitorEvent::Completed so the channel unblocks on the next event processing round. When Persist returns Completed but prior async updates are still pending, no event is emitted since the prior updates' eventual completion via channel_monitor_updated will cover this update too. This allows removing from ChannelManager: - The WatchUpdateStatus enum (and former ChannelMonitorUpdateStatus usage in Watch) - The monitor_update_type atomic and its mode-mixing checks - handle_monitor_update_res (was just a log, inlined) - handle_post_close_monitor_update (trivial wrapper, inlined) - handle_new_monitor_update_with_status (sync completion path) AI tools were used in preparing this commit.
1 parent ec03159 commit f91c4fe

File tree

5 files changed

+185
-442
lines changed

5 files changed

+185
-442
lines changed

lightning/src/chain/chainmonitor.rs

Lines changed: 62 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,11 +1032,13 @@ where
10321032
/// See the release notes for LDK 0.1 for more information on this requirement.
10331033
///
10341034
/// [`ChannelMonitor`]s which do not need to be persisted (i.e. were last written by LDK 0.1 or
1035-
/// later) will be loaded without persistence and this method will return
1036-
/// [`ChannelMonitorUpdateStatus::Completed`].
1035+
/// later) will be loaded without persistence and a [`MonitorEvent::Completed`] will be
1036+
/// immediately queued.
1037+
///
1038+
/// [`MonitorEvent::Completed`]: channelmonitor::MonitorEvent::Completed
10371039
pub fn load_existing_monitor(
10381040
&self, channel_id: ChannelId, monitor: ChannelMonitor<ChannelSigner>,
1039-
) -> Result<ChannelMonitorUpdateStatus, ()> {
1041+
) -> Result<(), ()> {
10401042
if !monitor.written_by_0_1_or_later() {
10411043
return chain::Watch::watch_channel(self, channel_id, monitor);
10421044
}
@@ -1054,9 +1056,20 @@ where
10541056
if let Some(ref chain_source) = self.chain_source {
10551057
monitor.load_outputs_to_watch(chain_source, &self.logger);
10561058
}
1059+
// The monitor is already persisted, so generate MonitorEvent::Completed immediately.
1060+
let funding_txo = monitor.get_funding_txo();
1061+
let counterparty_node_id = monitor.get_counterparty_node_id();
1062+
let update_id = monitor.get_latest_update_id();
10571063
entry.insert(MonitorHolder { monitor, pending_monitor_updates: Mutex::new(Vec::new()) });
1064+
self.pending_monitor_events.lock().unwrap().push((
1065+
funding_txo,
1066+
channel_id,
1067+
vec![MonitorEvent::Completed { funding_txo, channel_id, monitor_update_id: update_id }],
1068+
counterparty_node_id,
1069+
));
1070+
self.event_notifier.notify();
10581071

1059-
Ok(ChannelMonitorUpdateStatus::Completed)
1072+
Ok(())
10601073
}
10611074
}
10621075

@@ -1271,7 +1284,7 @@ where
12711284
{
12721285
fn watch_channel(
12731286
&self, channel_id: ChannelId, monitor: ChannelMonitor<ChannelSigner>,
1274-
) -> Result<ChannelMonitorUpdateStatus, ()> {
1287+
) -> Result<(), ()> {
12751288
let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
12761289
let mut monitors = self.monitors.write().unwrap();
12771290
let entry = match monitors.entry(channel_id) {
@@ -1285,33 +1298,52 @@ where
12851298
let update_id = monitor.get_latest_update_id();
12861299
let mut pending_monitor_updates = Vec::new();
12871300
let persist_res = self.persister.persist_new_channel(monitor.persistence_key(), &monitor);
1288-
match persist_res {
1301+
let persist_completed = match persist_res {
12891302
ChannelMonitorUpdateStatus::InProgress => {
12901303
log_info!(logger, "Persistence of new ChannelMonitor in progress",);
12911304
pending_monitor_updates.push(update_id);
1305+
false
12921306
},
12931307
ChannelMonitorUpdateStatus::Completed => {
12941308
log_info!(logger, "Persistence of new ChannelMonitor completed",);
1309+
true
12951310
},
12961311
ChannelMonitorUpdateStatus::UnrecoverableError => {
12971312
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
12981313
log_error!(logger, "{}", err_str);
12991314
panic!("{}", err_str);
13001315
},
1301-
}
1316+
};
13021317
if let Some(ref chain_source) = self.chain_source {
13031318
monitor.load_outputs_to_watch(chain_source, &self.logger);
13041319
}
1320+
// Capture monitor info before moving it into the map.
1321+
let funding_txo = monitor.get_funding_txo();
1322+
let counterparty_node_id = monitor.get_counterparty_node_id();
13051323
entry.insert(MonitorHolder {
13061324
monitor,
13071325
pending_monitor_updates: Mutex::new(pending_monitor_updates),
13081326
});
1309-
Ok(persist_res)
1327+
if persist_completed {
1328+
// Persist returned Completed, so generate MonitorEvent::Completed immediately.
1329+
// We can't call channel_monitor_updated here because we hold the monitors write
1330+
// lock. Instead, push directly to pending_monitor_events which is a separate Mutex.
1331+
self.pending_monitor_events.lock().unwrap().push((
1332+
funding_txo,
1333+
channel_id,
1334+
vec![MonitorEvent::Completed {
1335+
funding_txo,
1336+
channel_id,
1337+
monitor_update_id: update_id,
1338+
}],
1339+
counterparty_node_id,
1340+
));
1341+
self.event_notifier.notify();
1342+
}
1343+
Ok(())
13101344
}
13111345

1312-
fn update_channel(
1313-
&self, channel_id: ChannelId, update: &ChannelMonitorUpdate,
1314-
) -> ChannelMonitorUpdateStatus {
1346+
fn update_channel(&self, channel_id: ChannelId, update: &ChannelMonitorUpdate) {
13151347
// `ChannelMonitorUpdate`'s `channel_id` is `None` prior to 0.0.121 and all channels in those
13161348
// versions are V1-established. For 0.0.121+ the `channel_id` fields is always `Some`.
13171349
debug_assert_eq!(update.channel_id.unwrap(), channel_id);
@@ -1328,7 +1360,7 @@ where
13281360
#[cfg(debug_assertions)]
13291361
panic!("ChannelManager generated a channel update for a channel that was not yet registered!");
13301362
#[cfg(not(debug_assertions))]
1331-
ChannelMonitorUpdateStatus::InProgress
1363+
return;
13321364
},
13331365
Some(monitor_state) => {
13341366
let monitor = &monitor_state.monitor;
@@ -1382,6 +1414,24 @@ where
13821414
"Persistence of ChannelMonitorUpdate id {:?} completed",
13831415
update_id,
13841416
);
1417+
// If no prior async updates are pending, we can immediately signal
1418+
// completion. Otherwise, completion of those prior updates via
1419+
// channel_monitor_updated will eventually generate the event (using
1420+
// monitor.get_latest_update_id() which covers this update too).
1421+
if !monitor_state.has_pending_updates(&pending_monitor_updates) {
1422+
let funding_txo = monitor.get_funding_txo();
1423+
self.pending_monitor_events.lock().unwrap().push((
1424+
funding_txo,
1425+
channel_id,
1426+
vec![MonitorEvent::Completed {
1427+
funding_txo,
1428+
channel_id,
1429+
monitor_update_id: monitor.get_latest_update_id(),
1430+
}],
1431+
monitor.get_counterparty_node_id(),
1432+
));
1433+
self.event_notifier.notify();
1434+
}
13851435
},
13861436
ChannelMonitorUpdateStatus::UnrecoverableError => {
13871437
// Take the monitors lock for writing so that we poison it and any future
@@ -1413,12 +1463,6 @@ where
14131463
});
14141464
}
14151465
}
1416-
1417-
if update_res.is_err() {
1418-
ChannelMonitorUpdateStatus::InProgress
1419-
} else {
1420-
persist_res
1421-
}
14221466
},
14231467
}
14241468
}

lightning/src/chain/mod.rs

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ pub trait Confirm {
207207

208208
/// An enum representing the status of a channel monitor update persistence.
209209
///
210-
/// These are generally used as the return value for an implementation of [`Persist`] which is used
210+
/// These are used as the return value for an implementation of [`Persist`] which is used
211211
/// as the storage layer for a [`ChainMonitor`]. See the docs on [`Persist`] for a high-level
212212
/// explanation of how to handle different cases.
213213
///
@@ -234,7 +234,7 @@ pub enum ChannelMonitorUpdateStatus {
234234
/// be available on restart even if the application crashes.
235235
///
236236
/// If you return this variant, you cannot later return [`InProgress`] from the same instance of
237-
/// [`Persist`]/[`Watch`] without first restarting.
237+
/// [`Persist`] without first restarting.
238238
///
239239
/// [`InProgress`]: ChannelMonitorUpdateStatus::InProgress
240240
/// [`Persist`]: chainmonitor::Persist
@@ -264,7 +264,7 @@ pub enum ChannelMonitorUpdateStatus {
264264
/// remaining cases are fixed, in rare cases, *using this feature may lead to funds loss*.
265265
///
266266
/// If you return this variant, you cannot later return [`Completed`] from the same instance of
267-
/// [`Persist`]/[`Watch`] without first restarting.
267+
/// [`Persist`] without first restarting.
268268
///
269269
/// [`InProgress`]: ChannelMonitorUpdateStatus::InProgress
270270
/// [`Completed`]: ChannelMonitorUpdateStatus::Completed
@@ -293,7 +293,8 @@ pub enum ChannelMonitorUpdateStatus {
293293
/// persisted to disk to ensure that the latest [`ChannelMonitor`] state can be reloaded if the
294294
/// application crashes.
295295
///
296-
/// See method documentation and [`ChannelMonitorUpdateStatus`] for specific requirements.
296+
/// Updates are always considered in-progress until completion is signaled asynchronously via
297+
/// [`MonitorEvent::Completed`] in [`Watch::release_pending_monitor_events`].
297298
pub trait Watch<ChannelSigner: EcdsaChannelSigner> {
298299
/// Watches a channel identified by `channel_id` using `monitor`.
299300
///
@@ -312,36 +313,37 @@ pub trait Watch<ChannelSigner: EcdsaChannelSigner> {
312313
/// [`blocks_disconnected`]: channelmonitor::ChannelMonitor::blocks_disconnected
313314
fn watch_channel(
314315
&self, channel_id: ChannelId, monitor: ChannelMonitor<ChannelSigner>,
315-
) -> Result<ChannelMonitorUpdateStatus, ()>;
316+
) -> Result<(), ()>;
316317

317318
/// Updates a channel identified by `channel_id` by applying `update` to its monitor.
318319
///
319320
/// Implementations must call [`ChannelMonitor::update_monitor`] with the given update. This
320-
/// may fail (returning an `Err(())`), in which case this should return
321-
/// [`ChannelMonitorUpdateStatus::InProgress`] (and the update should never complete). This
321+
/// may fail (returning an `Err(())`), in which case the update should never complete. This
322322
/// generally implies the channel has been closed (either by the funding outpoint being spent
323323
/// on-chain or the [`ChannelMonitor`] having decided to do so and broadcasted a transaction),
324324
/// and the [`ChannelManager`] state will be updated once it sees the funding spend on-chain.
325325
///
326-
/// In general, persistence failures should be retried after returning
327-
/// [`ChannelMonitorUpdateStatus::InProgress`] and eventually complete. If a failure truly
328-
/// cannot be retried, the node should shut down immediately after returning
329-
/// [`ChannelMonitorUpdateStatus::UnrecoverableError`], see its documentation for more info.
326+
/// The update is considered in-progress until a [`MonitorEvent::Completed`] is provided via
327+
/// [`Watch::release_pending_monitor_events`]. While in-progress, the channel will be
328+
/// "frozen", preventing us from revoking old states or submitting a new commitment
329+
/// transaction to the counterparty.
330+
///
331+
/// Even when a channel has been "frozen", updates to the [`ChannelMonitor`] can continue to
332+
/// occur (e.g. if an inbound HTLC which we forwarded was claimed upstream, resulting in us
333+
/// attempting to claim it on this channel) and those updates must still be persisted.
334+
///
335+
/// In general, persistence failures should be retried in the background and eventually
336+
/// complete. If a failure truly cannot be retried, the node should shut down.
330337
///
331338
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
332-
fn update_channel(
333-
&self, channel_id: ChannelId, update: &ChannelMonitorUpdate,
334-
) -> ChannelMonitorUpdateStatus;
339+
fn update_channel(&self, channel_id: ChannelId, update: &ChannelMonitorUpdate);
335340

336341
/// Returns any monitor events since the last call. Subsequent calls must only return new
337342
/// events.
338343
///
339344
/// Note that after any block- or transaction-connection calls to a [`ChannelMonitor`], no
340345
/// further events may be returned here until the [`ChannelMonitor`] has been fully persisted
341346
/// to disk.
342-
///
343-
/// For details on asynchronous [`ChannelMonitor`] updating and returning
344-
/// [`MonitorEvent::Completed`] here, see [`ChannelMonitorUpdateStatus::InProgress`].
345347
fn release_pending_monitor_events(
346348
&self,
347349
) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)>;
@@ -352,13 +354,11 @@ impl<ChannelSigner: EcdsaChannelSigner, T: Watch<ChannelSigner> + ?Sized, W: Der
352354
{
353355
fn watch_channel(
354356
&self, channel_id: ChannelId, monitor: ChannelMonitor<ChannelSigner>,
355-
) -> Result<ChannelMonitorUpdateStatus, ()> {
357+
) -> Result<(), ()> {
356358
self.deref().watch_channel(channel_id, monitor)
357359
}
358360

359-
fn update_channel(
360-
&self, channel_id: ChannelId, update: &ChannelMonitorUpdate,
361-
) -> ChannelMonitorUpdateStatus {
361+
fn update_channel(&self, channel_id: ChannelId, update: &ChannelMonitorUpdate) {
362362
self.deref().update_channel(channel_id, update)
363363
}
364364

@@ -384,9 +384,8 @@ impl<ChannelSigner: EcdsaChannelSigner, T: Watch<ChannelSigner> + ?Sized, W: Der
384384
/// Note that use as part of a [`Watch`] implementation involves reentrancy. Therefore, the `Filter`
385385
/// should not block on I/O. Implementations should instead queue the newly monitored data to be
386386
/// processed later. Then, in order to block until the data has been processed, any [`Watch`]
387-
/// invocation that has called the `Filter` must return [`InProgress`].
388-
///
389-
/// [`InProgress`]: ChannelMonitorUpdateStatus::InProgress
387+
/// invocation that has called the `Filter` should delay its [`MonitorEvent::Completed`] until
388+
/// processing finishes.
390389
/// [BIP 157]: https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki
391390
/// [BIP 158]: https://github.com/bitcoin/bips/blob/master/bip-0158.mediawiki
392391
pub trait Filter {

lightning/src/ln/channel.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9193,8 +9193,6 @@ where
91939193
/// [`Self::monitor_updating_restored`] is called.
91949194
///
91959195
/// [`ChannelManager`]: super::channelmanager::ChannelManager
9196-
/// [`chain::Watch`]: crate::chain::Watch
9197-
/// [`ChannelMonitorUpdateStatus::InProgress`]: crate::chain::ChannelMonitorUpdateStatus::InProgress
91989196
fn monitor_updating_paused<L: Logger>(
91999197
&mut self, resend_raa: bool, resend_commitment: bool, resend_channel_ready: bool,
92009198
pending_forwards: Vec<(PendingHTLCInfo, u64)>,

0 commit comments

Comments
 (0)