-
Notifications
You must be signed in to change notification settings - Fork 411
Process updates before archiving monitors. #3276
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -400,28 +400,34 @@ where | |
/// If you have many stale updates stored (such as after a crash with pending lazy deletes), and | ||
/// would like to get rid of them, consider using the | ||
/// [`MonitorUpdatingPersister::cleanup_stale_updates`] function. | ||
pub struct MonitorUpdatingPersister<K: Deref, L: Deref, ES: Deref, SP: Deref> | ||
pub struct MonitorUpdatingPersister<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref> | ||
where | ||
K::Target: KVStore, | ||
L::Target: Logger, | ||
ES::Target: EntropySource + Sized, | ||
SP::Target: SignerProvider + Sized, | ||
BI::Target: BroadcasterInterface, | ||
FE::Target: FeeEstimator | ||
{ | ||
kv_store: K, | ||
logger: L, | ||
maximum_pending_updates: u64, | ||
entropy_source: ES, | ||
signer_provider: SP, | ||
broadcaster: BI, | ||
fee_estimator: FE | ||
} | ||
|
||
#[allow(dead_code)] | ||
impl<K: Deref, L: Deref, ES: Deref, SP: Deref> | ||
MonitorUpdatingPersister<K, L, ES, SP> | ||
impl<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref> | ||
MonitorUpdatingPersister<K, L, ES, SP, BI, FE> | ||
where | ||
K::Target: KVStore, | ||
L::Target: Logger, | ||
ES::Target: EntropySource + Sized, | ||
SP::Target: SignerProvider + Sized, | ||
BI::Target: BroadcasterInterface, | ||
FE::Target: FeeEstimator | ||
{ | ||
/// Constructs a new [`MonitorUpdatingPersister`]. | ||
/// | ||
|
@@ -441,14 +447,16 @@ where | |
/// [`MonitorUpdatingPersister::cleanup_stale_updates`]. | ||
pub fn new( | ||
kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES, | ||
signer_provider: SP, | ||
signer_provider: SP, broadcaster: BI, fee_estimator: FE | ||
) -> Self { | ||
MonitorUpdatingPersister { | ||
kv_store, | ||
logger, | ||
maximum_pending_updates, | ||
entropy_source, | ||
signer_provider, | ||
broadcaster, | ||
fee_estimator | ||
} | ||
} | ||
|
||
|
@@ -457,24 +465,14 @@ where | |
/// It is extremely important that your [`KVStore::read`] implementation uses the | ||
/// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the | ||
/// documentation for [`MonitorUpdatingPersister`]. | ||
pub fn read_all_channel_monitors_with_updates<B: Deref, F: Deref>( | ||
&self, broadcaster: &B, fee_estimator: &F, | ||
) -> Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>, io::Error> | ||
where | ||
B::Target: BroadcasterInterface, | ||
F::Target: FeeEstimator, | ||
{ | ||
pub fn read_all_channel_monitors_with_updates(&self) -> Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>, io::Error> { | ||
let monitor_list = self.kv_store.list( | ||
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, | ||
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, | ||
)?; | ||
let mut res = Vec::with_capacity(monitor_list.len()); | ||
for monitor_key in monitor_list { | ||
res.push(self.read_channel_monitor_with_updates( | ||
broadcaster, | ||
fee_estimator, | ||
monitor_key, | ||
)?) | ||
res.push(self.read_channel_monitor_with_updates(monitor_key)?) | ||
} | ||
Ok(res) | ||
} | ||
|
@@ -496,13 +494,9 @@ where | |
/// | ||
/// Loading a large number of monitors will be faster if done in parallel. You can use this | ||
/// function to accomplish this. Take care to limit the number of parallel readers. | ||
pub fn read_channel_monitor_with_updates<B: Deref, F: Deref>( | ||
&self, broadcaster: &B, fee_estimator: &F, monitor_key: String, | ||
) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error> | ||
where | ||
B::Target: BroadcasterInterface, | ||
F::Target: FeeEstimator, | ||
{ | ||
pub fn read_channel_monitor_with_updates( | ||
&self, monitor_key: String, | ||
) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error> { | ||
let monitor_name = MonitorName::new(monitor_key)?; | ||
let (block_hash, monitor) = self.read_monitor(&monitor_name)?; | ||
let mut current_update_id = monitor.get_latest_update_id(); | ||
|
@@ -521,7 +515,7 @@ where | |
Err(err) => return Err(err), | ||
}; | ||
|
||
monitor.update_monitor(&update, broadcaster, fee_estimator, &self.logger) | ||
monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger) | ||
.map_err(|e| { | ||
log_error!( | ||
self.logger, | ||
|
@@ -639,13 +633,15 @@ where | |
} | ||
} | ||
|
||
impl<ChannelSigner: EcdsaChannelSigner, K: Deref, L: Deref, ES: Deref, SP: Deref> | ||
Persist<ChannelSigner> for MonitorUpdatingPersister<K, L, ES, SP> | ||
impl<ChannelSigner: EcdsaChannelSigner, K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref> | ||
Persist<ChannelSigner> for MonitorUpdatingPersister<K, L, ES, SP, BI, FE> | ||
where | ||
K::Target: KVStore, | ||
L::Target: Logger, | ||
ES::Target: EntropySource + Sized, | ||
SP::Target: SignerProvider + Sized, | ||
BI::Target: BroadcasterInterface, | ||
FE::Target: FeeEstimator | ||
{ | ||
/// Persists a new channel. This means writing the entire monitor to the | ||
/// parametrized [`KVStore`]. | ||
|
@@ -766,17 +762,18 @@ where | |
|
||
fn archive_persisted_channel(&self, funding_txo: OutPoint) { | ||
let monitor_name = MonitorName::from(funding_txo); | ||
let monitor = match self.read_monitor(&monitor_name) { | ||
let monitor_key = monitor_name.as_str().to_string(); | ||
let monitor = match self.read_channel_monitor_with_updates(monitor_key) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FWIW, I'm not sure this bug is reachable at the moment because I think LDK will persist the full monitor on each block connection, and we don't archive until ~4k blocks have passed post-channel close. Still seems like a reasonable fix though in case things change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, that caveat is addressed in the issue this is fixing. I think it primarily affects mobile clients. |
||
Ok((_block_hash, monitor)) => monitor, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pre-existing, but I think the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. Separately, there's also the issue of applying monitor updates requiring a fee estimator and a broadcaster in the first place, which should be avoidable. I'll create issues for both. |
||
Err(_) => return | ||
}; | ||
match self.kv_store.write( | ||
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, | ||
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, | ||
monitor_name.as_str(), | ||
&monitor.encode() | ||
&monitor.encode(), | ||
) { | ||
Ok(()) => {}, | ||
Ok(()) => {} | ||
Err(_e) => return, | ||
}; | ||
let _ = self.kv_store.remove( | ||
|
@@ -788,12 +785,14 @@ where | |
} | ||
} | ||
|
||
impl<K: Deref, L: Deref, ES: Deref, SP: Deref> MonitorUpdatingPersister<K, L, ES, SP> | ||
impl<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref> MonitorUpdatingPersister<K, L, ES, SP, BI, FE> | ||
where | ||
ES::Target: EntropySource + Sized, | ||
K::Target: KVStore, | ||
L::Target: Logger, | ||
SP::Target: SignerProvider + Sized | ||
SP::Target: SignerProvider + Sized, | ||
BI::Target: BroadcasterInterface, | ||
FE::Target: FeeEstimator | ||
{ | ||
// Cleans up monitor updates for given monitor in range `start..=end`. | ||
fn cleanup_in_range(&self, monitor_name: MonitorName, start: u64, end: u64) { | ||
|
@@ -962,13 +961,17 @@ mod tests { | |
maximum_pending_updates: persister_0_max_pending_updates, | ||
entropy_source: &chanmon_cfgs[0].keys_manager, | ||
signer_provider: &chanmon_cfgs[0].keys_manager, | ||
broadcaster: &chanmon_cfgs[0].tx_broadcaster, | ||
fee_estimator: &chanmon_cfgs[0].fee_estimator, | ||
}; | ||
let persister_1 = MonitorUpdatingPersister { | ||
kv_store: &TestStore::new(false), | ||
logger: &TestLogger::new(), | ||
maximum_pending_updates: persister_1_max_pending_updates, | ||
entropy_source: &chanmon_cfgs[1].keys_manager, | ||
signer_provider: &chanmon_cfgs[1].keys_manager, | ||
broadcaster: &chanmon_cfgs[1].tx_broadcaster, | ||
fee_estimator: &chanmon_cfgs[1].fee_estimator, | ||
}; | ||
let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs); | ||
let chain_mon_0 = test_utils::TestChainMonitor::new( | ||
|
@@ -991,23 +994,18 @@ mod tests { | |
node_cfgs[1].chain_monitor = chain_mon_1; | ||
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); | ||
let nodes = create_network(2, &node_cfgs, &node_chanmgrs); | ||
let broadcaster_0 = &chanmon_cfgs[2].tx_broadcaster; | ||
let broadcaster_1 = &chanmon_cfgs[3].tx_broadcaster; | ||
|
||
// Check that the persisted channel data is empty before any channels are | ||
// open. | ||
let mut persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates( | ||
&broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap(); | ||
let mut persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap(); | ||
assert_eq!(persisted_chan_data_0.len(), 0); | ||
let mut persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates( | ||
&broadcaster_1, &&chanmon_cfgs[1].fee_estimator).unwrap(); | ||
let mut persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap(); | ||
assert_eq!(persisted_chan_data_1.len(), 0); | ||
|
||
// Helper to make sure the channel is on the expected update ID. | ||
macro_rules! check_persisted_data { | ||
($expected_update_id: expr) => { | ||
persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates( | ||
&broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap(); | ||
persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap(); | ||
// check that we stored only one monitor | ||
assert_eq!(persisted_chan_data_0.len(), 1); | ||
for (_, mon) in persisted_chan_data_0.iter() { | ||
|
@@ -1026,8 +1024,7 @@ mod tests { | |
); | ||
} | ||
} | ||
persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates( | ||
&broadcaster_1, &&chanmon_cfgs[1].fee_estimator).unwrap(); | ||
persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap(); | ||
assert_eq!(persisted_chan_data_1.len(), 1); | ||
for (_, mon) in persisted_chan_data_1.iter() { | ||
assert_eq!(mon.get_latest_update_id(), $expected_update_id); | ||
|
@@ -1095,7 +1092,7 @@ mod tests { | |
check_persisted_data!(CLOSED_CHANNEL_UPDATE_ID); | ||
|
||
// Make sure the expected number of stale updates is present. | ||
let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates(&broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap(); | ||
let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates().unwrap(); | ||
let (_, monitor) = &persisted_chan_data[0]; | ||
let monitor_name = MonitorName::from(monitor.get_funding_txo().0); | ||
// The channel should have 0 updates, as it wrote a full monitor and consolidated. | ||
|
@@ -1129,6 +1126,8 @@ mod tests { | |
maximum_pending_updates: 11, | ||
entropy_source: node_cfgs[0].keys_manager, | ||
signer_provider: node_cfgs[0].keys_manager, | ||
broadcaster: node_cfgs[0].tx_broadcaster, | ||
fee_estimator: node_cfgs[0].fee_estimator, | ||
}; | ||
match ro_persister.persist_new_channel(test_txo, &added_monitors[0].1) { | ||
ChannelMonitorUpdateStatus::UnrecoverableError => { | ||
|
@@ -1168,13 +1167,17 @@ mod tests { | |
maximum_pending_updates: test_max_pending_updates, | ||
entropy_source: &chanmon_cfgs[0].keys_manager, | ||
signer_provider: &chanmon_cfgs[0].keys_manager, | ||
broadcaster: &chanmon_cfgs[0].tx_broadcaster, | ||
fee_estimator: &chanmon_cfgs[0].fee_estimator, | ||
}; | ||
let persister_1 = MonitorUpdatingPersister { | ||
kv_store: &TestStore::new(false), | ||
logger: &TestLogger::new(), | ||
maximum_pending_updates: test_max_pending_updates, | ||
entropy_source: &chanmon_cfgs[1].keys_manager, | ||
signer_provider: &chanmon_cfgs[1].keys_manager, | ||
broadcaster: &chanmon_cfgs[1].tx_broadcaster, | ||
fee_estimator: &chanmon_cfgs[1].fee_estimator, | ||
}; | ||
let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs); | ||
let chain_mon_0 = test_utils::TestChainMonitor::new( | ||
|
@@ -1198,11 +1201,9 @@ mod tests { | |
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); | ||
let nodes = create_network(2, &node_cfgs, &node_chanmgrs); | ||
|
||
let broadcaster_0 = &chanmon_cfgs[2].tx_broadcaster; | ||
|
||
// Check that the persisted channel data is empty before any channels are | ||
// open. | ||
let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates(&broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap(); | ||
let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates().unwrap(); | ||
assert_eq!(persisted_chan_data.len(), 0); | ||
|
||
// Create some initial channel | ||
|
@@ -1213,7 +1214,7 @@ mod tests { | |
send_payment(&nodes[1], &vec![&nodes[0]][..], 4_000_000); | ||
|
||
// Get the monitor and make a fake stale update at update_id=1 (lowest height of an update possible) | ||
let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates(&broadcaster_0, &&chanmon_cfgs[0].fee_estimator).unwrap(); | ||
let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates().unwrap(); | ||
let (_, monitor) = &persisted_chan_data[0]; | ||
let monitor_name = MonitorName::from(monitor.get_funding_txo().0); | ||
persister_0 | ||
|
Uh oh!
There was an error while loading. Please reload this page.