Skip to content

Commit a9cf57a

Browse files
committed
Implement incremental Esplora syncing for the on-chain wallet
1 parent 778bc91 commit a9cf57a

File tree

3 files changed

+80
-69
lines changed

3 files changed

+80
-69
lines changed

src/chain/mod.rs

Lines changed: 74 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -228,10 +228,71 @@ impl ChainSource {
228228
})?;
229229
}
230230

231-
let res =
232-
{
233-
let full_scan_request = onchain_wallet.get_full_scan_request();
231+
let res = {
232+
// If this is our first sync, do a full scan with the configured gap limit.
233+
// Otherwise just do an incremental sync.
234+
let incremental_sync =
235+
latest_onchain_wallet_sync_timestamp.read().unwrap().is_some();
236+
237+
macro_rules! get_and_apply_wallet_update {
238+
($sync_future: expr) => {{
239+
let now = Instant::now();
240+
match $sync_future.await {
241+
Ok(res) => match res {
242+
Ok(update) => match onchain_wallet.apply_update(update) {
243+
Ok(()) => {
244+
log_info!(
245+
logger,
246+
"{} of on-chain wallet finished in {}ms.",
247+
if incremental_sync { "Incremental sync" } else { "Sync" },
248+
now.elapsed().as_millis()
249+
);
250+
let unix_time_secs_opt = SystemTime::now()
251+
.duration_since(UNIX_EPOCH)
252+
.ok()
253+
.map(|d| d.as_secs());
254+
*latest_onchain_wallet_sync_timestamp.write().unwrap() =
255+
unix_time_secs_opt;
256+
Ok(())
257+
},
258+
Err(e) => Err(e),
259+
},
260+
Err(e) => match *e {
261+
esplora_client::Error::Reqwest(he) => {
262+
log_error!(
263+
logger,
264+
"{} of on-chain wallet failed due to HTTP connection error: {}",
265+
if incremental_sync { "Incremental sync" } else { "Sync" },
266+
he
267+
);
268+
Err(Error::WalletOperationFailed)
269+
},
270+
_ => {
271+
log_error!(
272+
logger,
273+
"{} of on-chain wallet failed due to Esplora error: {}",
274+
if incremental_sync { "Incremental sync" } else { "Sync" },
275+
e
276+
);
277+
Err(Error::WalletOperationFailed)
278+
},
279+
},
280+
},
281+
Err(e) => {
282+
log_error!(
283+
logger,
284+
"{} of on-chain wallet timed out: {}",
285+
if incremental_sync { "Incremental sync" } else { "Sync" },
286+
e
287+
);
288+
Err(Error::WalletOperationTimeout)
289+
},
290+
}
291+
}}
292+
}
234293

294+
if incremental_sync {
295+
let full_scan_request = onchain_wallet.get_full_scan_request();
235296
let wallet_sync_timeout_fut = tokio::time::timeout(
236297
Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS),
237298
esplora_client.full_scan(
@@ -240,48 +301,16 @@ impl ChainSource {
240301
BDK_CLIENT_CONCURRENCY,
241302
),
242303
);
243-
244-
let now = Instant::now();
245-
match wallet_sync_timeout_fut.await {
246-
Ok(res) => match res {
247-
Ok(update) => match onchain_wallet.apply_update(update) {
248-
Ok(()) => {
249-
log_info!(
250-
logger,
251-
"Sync of on-chain wallet finished in {}ms.",
252-
now.elapsed().as_millis()
253-
);
254-
let unix_time_secs_opt = SystemTime::now()
255-
.duration_since(UNIX_EPOCH)
256-
.ok()
257-
.map(|d| d.as_secs());
258-
*latest_onchain_wallet_sync_timestamp.write().unwrap() =
259-
unix_time_secs_opt;
260-
Ok(())
261-
},
262-
Err(e) => Err(e),
263-
},
264-
Err(e) => match *e {
265-
esplora_client::Error::Reqwest(he) => {
266-
log_error!(
267-
logger,
268-
"Sync failed due to HTTP connection error: {}",
269-
he
270-
);
271-
Err(Error::WalletOperationFailed)
272-
},
273-
_ => {
274-
log_error!(logger, "Sync of on-chain wallet failed due to Esplora error: {}", e);
275-
Err(Error::WalletOperationFailed)
276-
},
277-
},
278-
},
279-
Err(e) => {
280-
log_error!(logger, "On-chain wallet sync timed out: {}", e);
281-
Err(Error::WalletOperationTimeout)
282-
},
283-
}
284-
};
304+
get_and_apply_wallet_update!(wallet_sync_timeout_fut)
305+
} else {
306+
let sync_request = onchain_wallet.get_incremental_sync_request();
307+
let wallet_sync_timeout_fut = tokio::time::timeout(
308+
Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS),
309+
esplora_client.sync(sync_request, BDK_CLIENT_CONCURRENCY),
310+
);
311+
get_and_apply_wallet_update!(wallet_sync_timeout_fut)
312+
}
313+
};
285314

286315
onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res);
287316

src/lib.rs

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -240,31 +240,9 @@ impl Node {
240240

241241
// Block to ensure we update our fee rate cache once on startup
242242
let chain_source = Arc::clone(&self.chain_source);
243-
let sync_logger = Arc::clone(&self.logger);
244-
let sync_fee_rate_update_timestamp =
245-
Arc::clone(&self.latest_fee_rate_cache_update_timestamp);
246243
let runtime_ref = &runtime;
247244
tokio::task::block_in_place(move || {
248-
runtime_ref.block_on(async move {
249-
let now = Instant::now();
250-
match chain_source.update_fee_rate_estimates().await {
251-
Ok(()) => {
252-
log_info!(
253-
sync_logger,
254-
"Initial fee rate cache update finished in {}ms.",
255-
now.elapsed().as_millis()
256-
);
257-
let unix_time_secs_opt =
258-
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
259-
*sync_fee_rate_update_timestamp.write().unwrap() = unix_time_secs_opt;
260-
Ok(())
261-
},
262-
Err(e) => {
263-
log_error!(sync_logger, "Initial fee rate cache update failed: {}", e,);
264-
Err(e)
265-
},
266-
}
267-
})
245+
runtime_ref.block_on(async move { chain_source.update_fee_rate_estimates().await })
268246
})?;
269247

270248
// Spawn background task continuously syncing onchain, lightning, and fee rate cache.

src/wallet/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use lightning::sign::{
2525
use lightning::util::message_signing;
2626
use lightning_invoice::RawBolt11Invoice;
2727

28-
use bdk_chain::spk_client::FullScanRequest;
28+
use bdk_chain::spk_client::{FullScanRequest, SyncRequest};
2929
use bdk_chain::ChainPosition;
3030
use bdk_wallet::{KeychainKind, PersistedWallet, SignOptions, Update};
3131

@@ -80,6 +80,10 @@ where
8080
self.inner.lock().unwrap().start_full_scan().build()
8181
}
8282

83+
pub(crate) fn get_incremental_sync_request(&self) -> SyncRequest<(KeychainKind, u32)> {
84+
self.inner.lock().unwrap().start_sync_with_revealed_spks().build()
85+
}
86+
8387
pub(crate) fn apply_update(&self, update: impl Into<Update>) -> Result<(), Error> {
8488
let mut locked_wallet = self.inner.lock().unwrap();
8589
match locked_wallet.apply_update(update) {

0 commit comments

Comments
 (0)