Skip to content

Commit 4dce1e6

Browse files
committed
Introduce RpcImportParams to make rpc sync more efficient
`RpcImportParams` keeps track of the scriptPubKey derivation index to start from for the next call to `importdescripts`/`importmulti`, thus avoiding re-importing into Bitcoin Core.
1 parent 6fd25bd commit 4dce1e6

File tree

3 files changed

+103
-16
lines changed

3 files changed

+103
-16
lines changed

src/blockchain/esplora/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ impl fmt::Display for EsploraError {
9797
}
9898

9999
/// Configuration for an [`EsploraBlockchain`]
100-
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)]
100+
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)]
101101
pub struct EsploraBlockchainConfig {
102102
/// Base URL of the esplora service
103103
///

src/blockchain/rpc.rs

Lines changed: 96 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
//! network: bdk::bitcoin::Network::Testnet,
2828
//! wallet_name: "wallet_name".to_string(),
2929
//! sync_params: None,
30+
//! import_params: None,
3031
//! };
3132
//! let blockchain = RpcBlockchain::from_config(&config);
3233
//! ```
@@ -47,8 +48,9 @@ use bitcoincore_rpc::json::{
4748
use bitcoincore_rpc::jsonrpc::serde_json::{json, Value};
4849
use bitcoincore_rpc::Auth as RpcAuth;
4950
use bitcoincore_rpc::{Client, RpcApi};
50-
use log::{debug, info};
51+
use log::{debug, info, warn};
5152
use serde::{Deserialize, Serialize};
53+
use std::cell::RefCell;
5254
use std::collections::{HashMap, HashSet};
5355
use std::ops::Deref;
5456
use std::path::PathBuf;
@@ -66,6 +68,8 @@ pub struct RpcBlockchain {
6668
capabilities: HashSet<Capability>,
6769
/// Sync parameters.
6870
sync_params: RpcSyncParams,
71+
/// Import parameters.
72+
import_params: RefCell<RpcImportParams>,
6973
}
7074

7175
impl Deref for RpcBlockchain {
@@ -89,6 +93,10 @@ pub struct RpcConfig {
8993
pub wallet_name: String,
9094
/// Sync parameters
9195
pub sync_params: Option<RpcSyncParams>,
96+
/// Initial `scriptPubKey` import parameters. [`RpcImportParams`] will be mutated in
97+
/// [`RpcBlockchain`] for every sync to reflect what was imported. The updated params can be
98+
/// obtained via [`RpcBlockchain::import_parameters`].
99+
pub import_params: Option<RpcImportParams>,
92100
}
93101

94102
/// Sync parameters for Bitcoin Core RPC.
@@ -120,6 +128,18 @@ impl Default for RpcSyncParams {
120128
}
121129
}
122130

131+
/// `scriptPubKey` import parameters for Bitcoin Core RPC.
132+
///
133+
/// This defines which derivation index(es) to start importing from so that BDK can avoid
134+
/// re-importing `scriptPubKey`s into the Core wallet.
135+
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
136+
pub struct RpcImportParams {
137+
/// External derivation index to start import from.
138+
pub external_start_index: usize,
139+
/// Internal derivation index to start import from.
140+
pub internal_start_index: usize,
141+
}
142+
123143
/// This struct is equivalent to [bitcoincore_rpc::Auth] but it implements [serde::Serialize]
124144
/// To be removed once upstream equivalent is implementing Serialize (json serialization format
125145
/// should be the same), see [rust-bitcoincore-rpc/pull/181](https://github.com/rust-bitcoin/rust-bitcoincore-rpc/pull/181)
@@ -153,6 +173,18 @@ impl From<Auth> for RpcAuth {
153173
}
154174
}
155175

176+
impl RpcBlockchain {
177+
/// Returns the current import parameters.
178+
pub fn get_import_parameters(&self) -> RpcImportParams {
179+
*self.import_params.borrow()
180+
}
181+
182+
/// Replaces the import parameters and returns the old value.
183+
pub fn replace_import_parameters(&self, params: RpcImportParams) -> RpcImportParams {
184+
self.import_params.replace(params)
185+
}
186+
}
187+
156188
impl Blockchain for RpcBlockchain {
157189
fn get_capabilities(&self) -> HashSet<Capability> {
158190
self.capabilities.clone()
@@ -198,7 +230,11 @@ impl WalletSync for RpcBlockchain {
198230
D: BatchDatabase,
199231
{
200232
let batch = DbState::new(db, &self.sync_params, &*prog)?
201-
.sync_with_core(&self.client, self.is_descriptors)?
233+
.sync_with_core(
234+
&self.client,
235+
&mut *self.import_params.borrow_mut(),
236+
self.is_descriptors,
237+
)?
202238
.as_db_batch()?;
203239

204240
db.commit_batch(batch)
@@ -274,6 +310,7 @@ impl ConfigurableBlockchain for RpcBlockchain {
274310
capabilities,
275311
is_descriptors,
276312
sync_params: config.sync_params.clone().unwrap_or_default(),
313+
import_params: RefCell::new(config.import_params.unwrap_or_default()),
277314
})
278315
}
279316
}
@@ -314,7 +351,11 @@ struct DbState<'a, D> {
314351

315352
impl<'a, D: BatchDatabase> DbState<'a, D> {
316353
/// Obtain [DbState] from [crate::database::Database].
317-
fn new(db: &'a D, params: &'a RpcSyncParams, prog: &'a dyn Progress) -> Result<Self, Error> {
354+
fn new(
355+
db: &'a D,
356+
sync_params: &'a RpcSyncParams,
357+
prog: &'a dyn Progress,
358+
) -> Result<Self, Error> {
318359
let ext_spks = db.iter_script_pubkeys(Some(KeychainKind::External))?;
319360
let int_spks = db.iter_script_pubkeys(Some(KeychainKind::Internal))?;
320361

@@ -325,10 +366,10 @@ impl<'a, D: BatchDatabase> DbState<'a, D> {
325366

326367
// If at least one descriptor is derivable, we need to ensure scriptPubKeys are sufficiently
327368
// cached.
328-
if has_derivable && last_count < params.start_script_count {
369+
if has_derivable && last_count < sync_params.start_script_count {
329370
let inner_err = MissingCachedScripts {
330371
last_count,
331-
missing_count: params.start_script_count - last_count,
372+
missing_count: sync_params.start_script_count - last_count,
332373
};
333374
debug!("requesting more spks with: {:?}", inner_err);
334375
return Err(Error::MissingCachedScripts(inner_err));
@@ -359,7 +400,7 @@ impl<'a, D: BatchDatabase> DbState<'a, D> {
359400

360401
Ok(Self {
361402
db,
362-
params,
403+
params: sync_params,
363404
prog,
364405
ext_spks,
365406
int_spks,
@@ -374,7 +415,12 @@ impl<'a, D: BatchDatabase> DbState<'a, D> {
374415

375416
/// Sync states of [BatchDatabase] and Core wallet.
376417
/// First we import all `scriptPubKey`s from database into core wallet
377-
fn sync_with_core(&mut self, client: &Client, use_desc: bool) -> Result<&mut Self, Error> {
418+
fn sync_with_core(
419+
&mut self,
420+
client: &Client,
421+
import_params: &mut RpcImportParams,
422+
use_desc: bool,
423+
) -> Result<&mut Self, Error> {
378424
// this tells Core wallet where to sync from for imported scripts
379425
let start_epoch = if self.params.force_start_time {
380426
self.params.start_time
@@ -384,23 +430,55 @@ impl<'a, D: BatchDatabase> DbState<'a, D> {
384430
.map_or(self.params.start_time, |st| st.block_time.timestamp)
385431
};
386432

387-
// sync scriptPubKeys from Database to Core wallet
388-
let scripts_iter = self.ext_spks.iter().chain(&self.int_spks);
389-
if use_desc {
390-
import_descriptors(client, start_epoch, scripts_iter)?;
391-
} else {
392-
import_multi(client, start_epoch, scripts_iter)?;
433+
// sync scriptPubKeys from Database into Core wallet, starting from derivation indexes
434+
// defined in `import_params`
435+
let (scripts_count, scripts_iter) = {
436+
let ext_spks = self
437+
.ext_spks
438+
.iter()
439+
.skip(import_params.external_start_index);
440+
let int_spks = self
441+
.int_spks
442+
.iter()
443+
.skip(import_params.internal_start_index);
444+
445+
let scripts_count = ext_spks.len() + int_spks.len();
446+
let scripts_iter = ext_spks.chain(int_spks);
447+
println!("scripts count: {}", scripts_count);
448+
449+
(scripts_count, scripts_iter)
450+
};
451+
452+
if scripts_count > 0 {
453+
if use_desc {
454+
import_descriptors(client, start_epoch, scripts_iter)?;
455+
} else {
456+
import_multi(client, start_epoch, scripts_iter)?;
457+
}
393458
}
394459

395-
// wait for Core wallet to rescan (TODO: maybe make this async)
396460
await_wallet_scan(client, self.params.poll_rate_sec, self.prog)?;
397461

462+
// update import_params
463+
import_params.external_start_index = self.ext_spks.len();
464+
import_params.internal_start_index = self.int_spks.len();
465+
398466
// obtain iterator of pagenated `listtransactions` RPC calls
399467
const LIST_TX_PAGE_SIZE: usize = 100; // item count per page
400468
let tx_iter = list_transactions(client, LIST_TX_PAGE_SIZE)?.filter(|item| {
401469
// filter out conflicting transactions - only accept transactions that are already
402470
// confirmed, or exists in mempool
403-
item.info.confirmations > 0 || client.get_mempool_entry(&item.info.txid).is_ok()
471+
let confirmed = item.info.confirmations > 0;
472+
let in_mempool = client.get_mempool_entry(&item.info.txid).is_ok();
473+
474+
let keep = confirmed || in_mempool;
475+
if !keep {
476+
warn!(
477+
"transaction {} is skipped: confirmed={}, in_mempool={}",
478+
item.info.txid, confirmed, in_mempool
479+
);
480+
}
481+
keep
404482
});
405483

406484
// iterate through chronological results of `listtransactions`
@@ -865,6 +943,8 @@ impl BlockchainFactory for RpcBlockchainFactory {
865943
checksum
866944
),
867945
sync_params: self.sync_params.clone(),
946+
// TODO @evanlinjin: How can be set this individually for each build?
947+
import_params: Default::default(),
868948
})
869949
}
870950
}
@@ -892,6 +972,7 @@ mod test {
892972
network: Network::Regtest,
893973
wallet_name: format!("client-wallet-test-{}", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos() ),
894974
sync_params: None,
975+
import_params: None,
895976
};
896977
RpcBlockchain::from_config(&config).unwrap()
897978
}

src/testutils/blockchain_tests.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -755,6 +755,12 @@ macro_rules! bdk_blockchain_tests {
755755

756756
blockchain.broadcast(&tx1).expect("broadcasting first");
757757
blockchain.broadcast(&tx2).expect("broadcasting replacement");
758+
759+
// TODO @evanlinjin: Core's `listtransactions` RPC call does not return conflicting
760+
// unconfirmed transactions (unless we re-import associated scriptPubKey/descriptor)
761+
#[cfg(feature = "rpc")]
762+
blockchain.replace_import_parameters(Default::default());
763+
758764
receiver_wallet.sync(&blockchain, SyncOptions::default()).expect("syncing receiver");
759765
assert_eq!(receiver_wallet.get_balance().expect("balance").untrusted_pending, 49_000, "should have received coins once and only once");
760766
}

0 commit comments

Comments
 (0)