Skip to content

Process updates before archiving monitors. #3276

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 47 additions & 46 deletions lightning/src/util/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,28 +400,34 @@ where
/// If you have many stale updates stored (such as after a crash with pending lazy deletes), and
/// would like to get rid of them, consider using the
/// [`MonitorUpdatingPersister::cleanup_stale_updates`] function.
pub struct MonitorUpdatingPersister<K: Deref, L: Deref, ES: Deref, SP: Deref>
pub struct MonitorUpdatingPersister<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
where
K::Target: KVStore,
L::Target: Logger,
ES::Target: EntropySource + Sized,
SP::Target: SignerProvider + Sized,
BI::Target: BroadcasterInterface,
FE::Target: FeeEstimator
{
kv_store: K,
logger: L,
maximum_pending_updates: u64,
entropy_source: ES,
signer_provider: SP,
broadcaster: BI,
fee_estimator: FE
}

#[allow(dead_code)]
impl<K: Deref, L: Deref, ES: Deref, SP: Deref>
MonitorUpdatingPersister<K, L, ES, SP>
impl<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
where
K::Target: KVStore,
L::Target: Logger,
ES::Target: EntropySource + Sized,
SP::Target: SignerProvider + Sized,
BI::Target: BroadcasterInterface,
FE::Target: FeeEstimator
{
/// Constructs a new [`MonitorUpdatingPersister`].
///
Expand All @@ -441,14 +447,16 @@ where
/// [`MonitorUpdatingPersister::cleanup_stale_updates`].
pub fn new(
kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES,
signer_provider: SP,
signer_provider: SP, broadcaster: BI, fee_estimator: FE
) -> Self {
MonitorUpdatingPersister {
kv_store,
logger,
maximum_pending_updates,
entropy_source,
signer_provider,
broadcaster,
fee_estimator
}
}

Expand All @@ -457,24 +465,14 @@ where
/// It is extremely important that your [`KVStore::read`] implementation uses the
/// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
/// documentation for [`MonitorUpdatingPersister`].
pub fn read_all_channel_monitors_with_updates<B: Deref, F: Deref>(
&self, broadcaster: &B, fee_estimator: &F,
) -> Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>, io::Error>
where
B::Target: BroadcasterInterface,
F::Target: FeeEstimator,
{
pub fn read_all_channel_monitors_with_updates(&self) -> Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>, io::Error> {
let monitor_list = self.kv_store.list(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
)?;
let mut res = Vec::with_capacity(monitor_list.len());
for monitor_key in monitor_list {
res.push(self.read_channel_monitor_with_updates(
broadcaster,
fee_estimator,
monitor_key,
)?)
res.push(self.read_channel_monitor_with_updates(monitor_key)?)
}
Ok(res)
}
Expand All @@ -496,13 +494,9 @@ where
///
/// Loading a large number of monitors will be faster if done in parallel. You can use this
/// function to accomplish this. Take care to limit the number of parallel readers.
pub fn read_channel_monitor_with_updates<B: Deref, F: Deref>(
&self, broadcaster: &B, fee_estimator: &F, monitor_key: String,
) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error>
where
B::Target: BroadcasterInterface,
F::Target: FeeEstimator,
{
pub fn read_channel_monitor_with_updates(
&self, monitor_key: String,
) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error> {
let monitor_name = MonitorName::new(monitor_key)?;
let (block_hash, monitor) = self.read_monitor(&monitor_name)?;
let mut current_update_id = monitor.get_latest_update_id();
Expand All @@ -521,7 +515,7 @@ where
Err(err) => return Err(err),
};

monitor.update_monitor(&update, broadcaster, fee_estimator, &self.logger)
monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger)
.map_err(|e| {
log_error!(
self.logger,
Expand Down Expand Up @@ -639,13 +633,15 @@ where
}
}

impl<ChannelSigner: EcdsaChannelSigner, K: Deref, L: Deref, ES: Deref, SP: Deref>
Persist<ChannelSigner> for MonitorUpdatingPersister<K, L, ES, SP>
impl<ChannelSigner: EcdsaChannelSigner, K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
Persist<ChannelSigner> for MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
where
K::Target: KVStore,
L::Target: Logger,
ES::Target: EntropySource + Sized,
SP::Target: SignerProvider + Sized,
BI::Target: BroadcasterInterface,
FE::Target: FeeEstimator
{
/// Persists a new channel. This means writing the entire monitor to the
/// parametrized [`KVStore`].
Expand Down Expand Up @@ -766,17 +762,18 @@ where

fn archive_persisted_channel(&self, funding_txo: OutPoint) {
let monitor_name = MonitorName::from(funding_txo);
let monitor = match self.read_monitor(&monitor_name) {
let monitor_key = monitor_name.as_str().to_string();
let monitor = match self.read_channel_monitor_with_updates(monitor_key) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I'm not sure this bug is reachable at the moment because I think LDK will persist the full monitor on each block connection, and we don't archive until ~4k blocks have passed post-channel close. Still seems like a reasonable fix though in case things change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that caveat is addressed in the issue this is fixing. I think it primarily affects mobile clients.

Ok((_block_hash, monitor)) => monitor,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pre-existing, but I think the read_monitor method could use a rename and maybe some docs since it basically reads an old monitor, which seems worth a call-out. No need to do it in this PR though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Separately, there's also the issue of applying monitor updates requiring a fee estimator and a broadcaster in the first place, which should be avoidable. I'll create issues for both.

Err(_) => return
};
match self.kv_store.write(
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
monitor_name.as_str(),
&monitor.encode()
&monitor.encode(),
) {
Ok(()) => {},
Ok(()) => {}
Err(_e) => return,
};
let _ = self.kv_store.remove(
Expand All @@ -788,12 +785,14 @@ where
}
}

impl<K: Deref, L: Deref, ES: Deref, SP: Deref> MonitorUpdatingPersister<K, L, ES, SP>
impl<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref> MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
where
ES::Target: EntropySource + Sized,
K::Target: KVStore,
L::Target: Logger,
SP::Target: SignerProvider + Sized
SP::Target: SignerProvider + Sized,
BI::Target: BroadcasterInterface,
FE::Target: FeeEstimator
{
// Cleans up monitor updates for given monitor in range `start..=end`.
fn cleanup_in_range(&self, monitor_name: MonitorName, start: u64, end: u64) {
Expand Down Expand Up @@ -962,13 +961,17 @@ mod tests {
maximum_pending_updates: persister_0_max_pending_updates,
entropy_source: &chanmon_cfgs[0].keys_manager,
signer_provider: &chanmon_cfgs[0].keys_manager,
broadcaster: &chanmon_cfgs[0].tx_broadcaster,
fee_estimator: &chanmon_cfgs[0].fee_estimator,
};
let persister_1 = MonitorUpdatingPersister {
kv_store: &TestStore::new(false),
logger: &TestLogger::new(),
maximum_pending_updates: persister_1_max_pending_updates,
entropy_source: &chanmon_cfgs[1].keys_manager,
signer_provider: &chanmon_cfgs[1].keys_manager,
broadcaster: &chanmon_cfgs[1].tx_broadcaster,
fee_estimator: &chanmon_cfgs[1].fee_estimator,
};
let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let chain_mon_0 = test_utils::TestChainMonitor::new(
Expand All @@ -991,23 +994,18 @@ mod tests {
node_cfgs[1].chain_monitor = chain_mon_1;
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
let broadcaster_0 = &chanmon_cfgs[2].tx_broadcaster;
let broadcaster_1 = &chanmon_cfgs[3].tx_broadcaster;

// Check that the persisted channel data is empty before any channels are
// open.
let mut persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates(
&broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap();
let mut persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap();
assert_eq!(persisted_chan_data_0.len(), 0);
let mut persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates(
&broadcaster_1, &&chanmon_cfgs[1].fee_estimator).unwrap();
let mut persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap();
assert_eq!(persisted_chan_data_1.len(), 0);

// Helper to make sure the channel is on the expected update ID.
macro_rules! check_persisted_data {
($expected_update_id: expr) => {
persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates(
&broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap();
persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap();
// check that we stored only one monitor
assert_eq!(persisted_chan_data_0.len(), 1);
for (_, mon) in persisted_chan_data_0.iter() {
Expand All @@ -1026,8 +1024,7 @@ mod tests {
);
}
}
persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates(
&broadcaster_1, &&chanmon_cfgs[1].fee_estimator).unwrap();
persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap();
assert_eq!(persisted_chan_data_1.len(), 1);
for (_, mon) in persisted_chan_data_1.iter() {
assert_eq!(mon.get_latest_update_id(), $expected_update_id);
Expand Down Expand Up @@ -1095,7 +1092,7 @@ mod tests {
check_persisted_data!(CLOSED_CHANNEL_UPDATE_ID);

// Make sure the expected number of stale updates is present.
let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates(&broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap();
let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates().unwrap();
let (_, monitor) = &persisted_chan_data[0];
let monitor_name = MonitorName::from(monitor.get_funding_txo().0);
// The channel should have 0 updates, as it wrote a full monitor and consolidated.
Expand Down Expand Up @@ -1129,6 +1126,8 @@ mod tests {
maximum_pending_updates: 11,
entropy_source: node_cfgs[0].keys_manager,
signer_provider: node_cfgs[0].keys_manager,
broadcaster: node_cfgs[0].tx_broadcaster,
fee_estimator: node_cfgs[0].fee_estimator,
};
match ro_persister.persist_new_channel(test_txo, &added_monitors[0].1) {
ChannelMonitorUpdateStatus::UnrecoverableError => {
Expand Down Expand Up @@ -1168,13 +1167,17 @@ mod tests {
maximum_pending_updates: test_max_pending_updates,
entropy_source: &chanmon_cfgs[0].keys_manager,
signer_provider: &chanmon_cfgs[0].keys_manager,
broadcaster: &chanmon_cfgs[0].tx_broadcaster,
fee_estimator: &chanmon_cfgs[0].fee_estimator,
};
let persister_1 = MonitorUpdatingPersister {
kv_store: &TestStore::new(false),
logger: &TestLogger::new(),
maximum_pending_updates: test_max_pending_updates,
entropy_source: &chanmon_cfgs[1].keys_manager,
signer_provider: &chanmon_cfgs[1].keys_manager,
broadcaster: &chanmon_cfgs[1].tx_broadcaster,
fee_estimator: &chanmon_cfgs[1].fee_estimator,
};
let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let chain_mon_0 = test_utils::TestChainMonitor::new(
Expand All @@ -1198,11 +1201,9 @@ mod tests {
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);

let broadcaster_0 = &chanmon_cfgs[2].tx_broadcaster;

// Check that the persisted channel data is empty before any channels are
// open.
let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates(&broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap();
let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates().unwrap();
assert_eq!(persisted_chan_data.len(), 0);

// Create some initial channel
Expand All @@ -1213,7 +1214,7 @@ mod tests {
send_payment(&nodes[1], &vec![&nodes[0]][..], 4_000_000);

// Get the monitor and make a fake stale update at update_id=1 (lowest height of an update possible)
let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates(&broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap();
let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates().unwrap();
let (_, monitor) = &persisted_chan_data[0];
let monitor_name = MonitorName::from(monitor.get_funding_txo().0);
persister_0
Expand Down
Loading