Skip to content

Commit ff398ec

Browse files
committed
Introduce pagenated import logic to RpcBlockchain
* Add `RpcSyncParams::page_size` that restricts req/resp array count for various RPC calls. * Add `pagenated_import` function.
1 parent 5319816 commit ff398ec

File tree

1 file changed

+49
-20
lines changed

1 file changed

+49
-20
lines changed

src/blockchain/rpc.rs

Lines changed: 49 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ pub struct RpcSyncParams {
115115
pub force_start_time: bool,
116116
/// RPC poll rate (in seconds) to get state updates.
117117
pub poll_rate_sec: u64,
118+
/// Page size for RPC calls (`importdescriptors`, `importmulti` and `listtransactions`).
119+
pub page_size: usize,
118120
}
119121

120122
impl Default for RpcSyncParams {
@@ -123,7 +125,8 @@ impl Default for RpcSyncParams {
123125
start_script_count: 100,
124126
start_time: 0,
125127
force_start_time: false,
126-
poll_rate_sec: 3,
128+
poll_rate_sec: 2,
129+
page_size: 200,
127130
}
128131
}
129132
}
@@ -432,7 +435,7 @@ impl<'a, D: BatchDatabase> DbState<'a, D> {
432435

433436
// sync scriptPubKeys from Database into Core wallet, starting from derivation indexes
434437
// defined in `import_params`
435-
let (scripts_count, scripts_iter) = {
438+
let scripts_iter = {
436439
let ext_spks = self
437440
.ext_spks
438441
.iter()
@@ -441,31 +444,25 @@ impl<'a, D: BatchDatabase> DbState<'a, D> {
441444
.int_spks
442445
.iter()
443446
.skip(import_params.internal_start_index);
444-
445-
let scripts_count = ext_spks.len() + int_spks.len();
446-
let scripts_iter = ext_spks.chain(int_spks);
447-
println!("scripts count: {}", scripts_count);
448-
449-
(scripts_count, scripts_iter)
447+
ext_spks.chain(int_spks)
450448
};
451-
452-
if scripts_count > 0 {
453-
if use_desc {
454-
import_descriptors(client, start_epoch, scripts_iter)?;
455-
} else {
456-
import_multi(client, start_epoch, scripts_iter)?;
457-
}
458-
}
459-
460-
await_wallet_scan(client, self.params.poll_rate_sec, self.prog)?;
449+
pagenated_import(
450+
client,
451+
use_desc,
452+
start_epoch,
453+
self.params.poll_rate_sec,
454+
self.params.page_size,
455+
scripts_iter,
456+
self.prog,
457+
)?;
458+
// await_wallet_scan(client, self.params.poll_rate_sec, self.prog)?;
461459

462460
// update import_params
463461
import_params.external_start_index = self.ext_spks.len();
464462
import_params.internal_start_index = self.int_spks.len();
465463

466464
// obtain iterator of pagenated `listtransactions` RPC calls
467-
const LIST_TX_PAGE_SIZE: usize = 100; // item count per page
468-
let tx_iter = list_transactions(client, LIST_TX_PAGE_SIZE)?.filter(|item| {
465+
let tx_iter = list_transactions(client, self.params.page_size)?.filter(|item| {
469466
// filter out conflicting transactions - only accept transactions that are already
470467
// confirmed, or exists in mempool
471468
let confirmed = item.info.confirmations > 0;
@@ -789,6 +786,38 @@ where
789786
Ok(())
790787
}
791788

789+
fn pagenated_import<'a, S>(
790+
client: &Client,
791+
use_desc: bool,
792+
start_epoch: u64,
793+
poll_rate_sec: u64,
794+
page_size: usize,
795+
scripts_iter: S,
796+
progress: &dyn Progress,
797+
) -> Result<(), Error>
798+
where
799+
S: Iterator<Item = &'a Script> + Clone,
800+
{
801+
(0_usize..)
802+
.map(|page_index| {
803+
scripts_iter
804+
.clone()
805+
.skip(page_index * page_size)
806+
.take(page_size)
807+
.cloned()
808+
.collect::<Vec<_>>()
809+
})
810+
.take_while(|scripts| !scripts.is_empty())
811+
.try_for_each(|scripts| {
812+
if use_desc {
813+
import_descriptors(client, start_epoch, scripts.iter())?;
814+
} else {
815+
import_multi(client, start_epoch, scripts.iter())?;
816+
}
817+
await_wallet_scan(client, poll_rate_sec, progress)
818+
})
819+
}
820+
792821
/// Calls the `listtransactions` RPC method in `page_size`s and returns iterator of the tx results
793822
/// in chronological order.
794823
///

0 commit comments

Comments
 (0)