Skip to content

Commit 7e8bdc2

Browse files
committed
Migrate to the upstreamed OutputSweeper
1 parent caddab6 commit 7e8bdc2

File tree

8 files changed

+237
-555
lines changed

8 files changed

+237
-555
lines changed

src/balance.rs

Lines changed: 41 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1+
use crate::sweep::value_satoshis_from_descriptor;
2+
13
use lightning::chain::channelmonitor::Balance as LdkBalance;
24
use lightning::ln::{ChannelId, PaymentHash, PaymentPreimage};
5+
use lightning::util::sweep::{OutputSpendStatus, TrackedSpendableOutput};
36

47
use bitcoin::secp256k1::PublicKey;
58
use bitcoin::{BlockHash, Txid};
69

7-
use crate::sweep::SpendableOutputInfo;
8-
910
/// Details of the known available balances returned by [`Node::list_balances`].
1011
///
1112
/// [`Node::list_balances`]: crate::Node::list_balances
@@ -258,46 +259,45 @@ pub enum PendingSweepBalance {
258259
}
259260

260261
impl PendingSweepBalance {
261-
pub(crate) fn from_tracked_spendable_output(output_info: SpendableOutputInfo) -> Self {
262-
if let Some(confirmation_hash) = output_info.confirmation_hash {
263-
debug_assert!(output_info.confirmation_height.is_some());
264-
debug_assert!(output_info.latest_spending_tx.is_some());
265-
let channel_id = output_info.channel_id;
266-
let confirmation_height = output_info
267-
.confirmation_height
268-
.expect("Height must be set if the output is confirmed");
269-
let latest_spending_txid = output_info
270-
.latest_spending_tx
271-
.as_ref()
272-
.expect("Spending tx must be set if the output is confirmed")
273-
.txid();
274-
let amount_satoshis = output_info.value_satoshis();
275-
Self::AwaitingThresholdConfirmations {
276-
channel_id,
277-
latest_spending_txid,
278-
confirmation_hash,
279-
confirmation_height,
280-
amount_satoshis,
281-
}
282-
} else if let Some(latest_broadcast_height) = output_info.latest_broadcast_height {
283-
debug_assert!(output_info.latest_spending_tx.is_some());
284-
let channel_id = output_info.channel_id;
285-
let latest_spending_txid = output_info
286-
.latest_spending_tx
287-
.as_ref()
288-
.expect("Spending tx must be set if the spend was broadcast")
289-
.txid();
290-
let amount_satoshis = output_info.value_satoshis();
291-
Self::BroadcastAwaitingConfirmation {
292-
channel_id,
262+
pub(crate) fn from_tracked_spendable_output(output_info: TrackedSpendableOutput) -> Self {
263+
match output_info.status {
264+
OutputSpendStatus::PendingInitialBroadcast { .. } => {
265+
let channel_id = output_info.channel_id;
266+
let amount_satoshis = value_satoshis_from_descriptor(&output_info.descriptor);
267+
Self::PendingBroadcast { channel_id, amount_satoshis }
268+
},
269+
OutputSpendStatus::PendingFirstConfirmation {
293270
latest_broadcast_height,
294-
latest_spending_txid,
295-
amount_satoshis,
296-
}
297-
} else {
298-
let channel_id = output_info.channel_id;
299-
let amount_satoshis = output_info.value_satoshis();
300-
Self::PendingBroadcast { channel_id, amount_satoshis }
271+
latest_spending_tx,
272+
..
273+
} => {
274+
let channel_id = output_info.channel_id;
275+
let amount_satoshis = value_satoshis_from_descriptor(&output_info.descriptor);
276+
let latest_spending_txid = latest_spending_tx.txid();
277+
Self::BroadcastAwaitingConfirmation {
278+
channel_id,
279+
latest_broadcast_height,
280+
latest_spending_txid,
281+
amount_satoshis,
282+
}
283+
},
284+
OutputSpendStatus::PendingThresholdConfirmations {
285+
latest_spending_tx,
286+
confirmation_height,
287+
confirmation_hash,
288+
..
289+
} => {
290+
let channel_id = output_info.channel_id;
291+
let amount_satoshis = value_satoshis_from_descriptor(&output_info.descriptor);
292+
let latest_spending_txid = latest_spending_tx.txid();
293+
Self::AwaitingThresholdConfirmations {
294+
channel_id,
295+
latest_spending_txid,
296+
confirmation_hash,
297+
confirmation_height,
298+
amount_satoshis,
299+
}
300+
},
301301
}
302302
}
303303
}

src/builder.rs

Lines changed: 43 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@ use crate::gossip::GossipSource;
88
use crate::io;
99
use crate::io::sqlite_store::SqliteStore;
1010
use crate::liquidity::LiquiditySource;
11-
use crate::logger::{log_error, FilesystemLogger, Logger};
11+
use crate::logger::{log_error, log_info, FilesystemLogger, Logger};
1212
use crate::message_handler::NodeCustomMessageHandler;
1313
use crate::payment_store::PaymentStore;
1414
use crate::peer_store::PeerStore;
15-
use crate::sweep::OutputSweeper;
1615
use crate::tx_broadcaster::TransactionBroadcaster;
1716
use crate::types::{
1817
ChainMonitor, ChannelManager, GossipSync, KeysManager, MessageRouter, NetworkGraph,
@@ -37,6 +36,7 @@ use lightning::util::persist::{
3736
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
3837
};
3938
use lightning::util::ser::ReadableArgs;
39+
use lightning::util::sweep::OutputSweeper;
4040

4141
use lightning_persister::fs_store::FilesystemStore;
4242

@@ -895,6 +895,47 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
895895

896896
liquidity_source.as_ref().map(|l| l.set_peer_manager(Arc::clone(&peer_manager)));
897897

898+
let output_sweeper = match io::utils::read_output_sweeper(
899+
Arc::clone(&tx_broadcaster),
900+
Arc::clone(&fee_estimator),
901+
Arc::clone(&tx_sync),
902+
Arc::clone(&keys_manager),
903+
Arc::clone(&kv_store),
904+
Arc::clone(&logger),
905+
) {
906+
Ok(output_sweeper) => Arc::new(output_sweeper),
907+
Err(e) => {
908+
if e.kind() == std::io::ErrorKind::NotFound {
909+
Arc::new(OutputSweeper::new(
910+
channel_manager.current_best_block(),
911+
Arc::clone(&tx_broadcaster),
912+
Arc::clone(&fee_estimator),
913+
Some(Arc::clone(&tx_sync)),
914+
Arc::clone(&keys_manager),
915+
Arc::clone(&keys_manager),
916+
Arc::clone(&kv_store),
917+
Arc::clone(&logger),
918+
))
919+
} else {
920+
return Err(BuildError::ReadFailed);
921+
}
922+
},
923+
};
924+
925+
match io::utils::migrate_deprecated_spendable_outputs(
926+
Arc::clone(&output_sweeper),
927+
Arc::clone(&kv_store),
928+
Arc::clone(&logger),
929+
) {
930+
Ok(()) => {
931+
log_info!(logger, "Successfully migrated OutputSweeper data.");
932+
},
933+
Err(e) => {
934+
log_error!(logger, "Failed to migrate OutputSweeper data: {}", e);
935+
return Err(BuildError::ReadFailed);
936+
},
937+
}
938+
898939
// Init payment info storage
899940
let payment_store = match io::utils::read_payments(Arc::clone(&kv_store), Arc::clone(&logger)) {
900941
Ok(payments) => {
@@ -928,25 +969,6 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
928969
},
929970
};
930971

931-
let best_block = channel_manager.current_best_block();
932-
let output_sweeper =
933-
match io::utils::read_spendable_outputs(Arc::clone(&kv_store), Arc::clone(&logger)) {
934-
Ok(outputs) => Arc::new(OutputSweeper::new(
935-
outputs,
936-
Arc::clone(&wallet),
937-
Arc::clone(&tx_broadcaster),
938-
Arc::clone(&fee_estimator),
939-
Arc::clone(&keys_manager),
940-
Arc::clone(&kv_store),
941-
best_block,
942-
Some(Arc::clone(&tx_sync)),
943-
Arc::clone(&logger),
944-
)),
945-
Err(_) => {
946-
return Err(BuildError::ReadFailed);
947-
},
948-
};
949-
950972
let (stop_sender, _) = tokio::sync::watch::channel(());
951973

952974
let is_listening = Arc::new(AtomicBool::new(false));

src/event.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -708,7 +708,7 @@ where
708708
}
709709
},
710710
LdkEvent::SpendableOutputs { outputs, channel_id } => {
711-
self.output_sweeper.add_outputs(outputs, channel_id)
711+
self.output_sweeper.track_spendable_outputs(outputs, channel_id, true, None)
712712
},
713713
LdkEvent::OpenChannelRequest {
714714
temporary_channel_id,

src/io/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@ pub(crate) const PEER_INFO_PERSISTENCE_KEY: &str = "peers";
2121
pub(crate) const PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "payments";
2222
pub(crate) const PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
2323

24-
/// The spendable output information will be persisted under this prefix.
25-
pub(crate) const SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "spendable_outputs";
26-
pub(crate) const SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
24+
/// The spendable output information used to persisted under this prefix until LDK Node v0.3.0.
25+
pub(crate) const DEPRECATED_SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str =
26+
"spendable_outputs";
27+
pub(crate) const DEPRECATED_SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
2728

2829
/// RapidGossipSync's `latest_sync_timestamp` will be persisted under this key.
2930
pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_PRIMARY_NAMESPACE: &str = "";

src/io/utils.rs

Lines changed: 92 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
use super::*;
22
use crate::config::WALLET_KEYS_SEED_LEN;
33

4-
use crate::logger::log_error;
4+
use crate::logger::{log_error, FilesystemLogger};
55
use crate::peer_store::PeerStore;
6-
use crate::sweep::SpendableOutputInfo;
6+
use crate::sweep::DeprecatedSpendableOutputInfo;
7+
use crate::types::{Broadcaster, ChainSource, FeeEstimator, KeysManager, Sweeper};
78
use crate::{Error, EventQueue, PaymentDetails};
89

910
use lightning::routing::gossip::NetworkGraph;
@@ -12,13 +13,16 @@ use lightning::util::logger::Logger;
1213
use lightning::util::persist::{
1314
KVStore, KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN,
1415
NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
15-
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY,
16-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
16+
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_KEY,
17+
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
18+
SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
19+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1720
};
1821
use lightning::util::ser::{Readable, ReadableArgs, Writeable};
1922
use lightning::util::string::PrintableString;
2023

2124
use bip39::Mnemonic;
25+
use lightning::util::sweep::{OutputSpendStatus, OutputSweeper};
2226
use rand::{thread_rng, RngCore};
2327

2428
use std::fs;
@@ -200,34 +204,107 @@ where
200204
Ok(res)
201205
}
202206

207+
/// Read `OutputSweeper` state from the store.
208+
pub(crate) fn read_output_sweeper<K: KVStore + Send + Sync>(
209+
broadcaster: Arc<Broadcaster>, fee_estimator: Arc<FeeEstimator>,
210+
chain_data_source: Arc<ChainSource>, keys_manager: Arc<KeysManager>, kv_store: Arc<K>,
211+
logger: Arc<FilesystemLogger>,
212+
) -> Result<Sweeper<K>, std::io::Error> {
213+
let mut reader = Cursor::new(kv_store.read(
214+
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
215+
OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
216+
OUTPUT_SWEEPER_PERSISTENCE_KEY,
217+
)?);
218+
let args = (
219+
broadcaster,
220+
fee_estimator,
221+
Some(chain_data_source),
222+
Arc::clone(&keys_manager),
223+
keys_manager,
224+
kv_store,
225+
logger.clone(),
226+
);
227+
OutputSweeper::read(&mut reader, args).map_err(|e| {
228+
log_error!(logger, "Failed to deserialize OutputSweeper: {}", e);
229+
std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize OutputSweeper")
230+
})
231+
}
232+
203233
/// Read previously persisted spendable output information from the store.
204-
pub(crate) fn read_spendable_outputs<K: KVStore + Sync + Send, L: Deref>(
205-
kv_store: Arc<K>, logger: L,
206-
) -> Result<Vec<SpendableOutputInfo>, std::io::Error>
234+
pub(crate) fn migrate_deprecated_spendable_outputs<K: KVStore + Sync + Send, L: Deref>(
235+
sweeper: Arc<Sweeper<K>>, kv_store: Arc<K>, logger: L,
236+
) -> Result<(), std::io::Error>
207237
where
208238
L::Target: Logger,
209239
{
210-
let mut res = Vec::new();
240+
let best_block = sweeper.current_best_block();
211241

212242
for stored_key in kv_store.list(
213-
SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
214-
SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
243+
DEPRECATED_SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
244+
DEPRECATED_SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
215245
)? {
216246
let mut reader = Cursor::new(kv_store.read(
217-
SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
218-
SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
247+
DEPRECATED_SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
248+
DEPRECATED_SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
219249
&stored_key,
220250
)?);
221-
let output = SpendableOutputInfo::read(&mut reader).map_err(|e| {
251+
let output = DeprecatedSpendableOutputInfo::read(&mut reader).map_err(|e| {
222252
log_error!(logger, "Failed to deserialize SpendableOutputInfo: {}", e);
223253
std::io::Error::new(
224254
std::io::ErrorKind::InvalidData,
225255
"Failed to deserialize SpendableOutputInfo",
226256
)
227257
})?;
228-
res.push(output);
258+
let descriptors = vec![output.descriptor.clone()];
259+
let spend_delay = Some(best_block.height + 2);
260+
sweeper.track_spendable_outputs(descriptors, output.channel_id, true, spend_delay);
261+
if let Some(tracked_spendable_output) =
262+
sweeper.tracked_spendable_outputs().iter().find(|o| o.descriptor == output.descriptor)
263+
{
264+
match tracked_spendable_output.status {
265+
OutputSpendStatus::PendingInitialBroadcast { delayed_until_height } => {
266+
if delayed_until_height == spend_delay {
267+
kv_store.remove(
268+
DEPRECATED_SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
269+
DEPRECATED_SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
270+
&stored_key,
271+
false,
272+
)?;
273+
} else {
274+
debug_assert!(false, "Unexpected status in OutputSweeper migration.");
275+
log_error!(logger, "Unexpected status in OutputSweeper migration.");
276+
return Err(std::io::Error::new(
277+
std::io::ErrorKind::Other,
278+
"Failed to migrate OutputSweeper state.",
279+
));
280+
}
281+
},
282+
_ => {
283+
debug_assert!(false, "Unexpected status in OutputSweeper migration.");
284+
log_error!(logger, "Unexpected status in OutputSweeper migration.");
285+
return Err(std::io::Error::new(
286+
std::io::ErrorKind::Other,
287+
"Failed to migrate OutputSweeper state.",
288+
));
289+
},
290+
}
291+
} else {
292+
debug_assert!(
293+
false,
294+
"OutputSweeper failed to track and persist outputs during migration."
295+
);
296+
log_error!(
297+
logger,
298+
"OutputSweeper failed to track and persist outputs during migration."
299+
);
300+
return Err(std::io::Error::new(
301+
std::io::ErrorKind::Other,
302+
"Failed to migrate OutputSweeper state.",
303+
));
304+
}
229305
}
230-
Ok(res)
306+
307+
Ok(())
231308
}
232309

233310
pub(crate) fn read_latest_rgs_sync_timestamp<K: KVStore + Sync + Send, L: Deref>(

0 commit comments

Comments
 (0)