Skip to content

Commit

Permalink
Split out a scan_height...() function
Browse files Browse the repository at this point in the history
  • Loading branch information
teor2345 committed Dec 5, 2023
1 parent 3e7f9df commit 9a1414a
Showing 1 changed file with 127 additions and 87 deletions.
214 changes: 127 additions & 87 deletions zebra-scan/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use zcash_primitives::{
};

use zebra_chain::{
block::Block,
block::{Block, Height},
chain_tip::ChainTip,
diagnostic::task::WaitForPanics,
parameters::Network,
Expand Down Expand Up @@ -55,7 +55,7 @@ const INFO_LOG_INTERVAL: u32 = 100_000;
/// Start a scan task that reads blocks from `state`, scans them with the configured keys in
/// `storage`, and then writes the results to `storage`.
pub async fn start(
mut state: State,
state: State,
chain_tip_change: ChainTipChange,
storage: Storage,
) -> Result<(), Report> {
Expand All @@ -67,6 +67,7 @@ pub async fn start(
let key_birthdays = tokio::task::spawn_blocking(move || key_storage.sapling_keys())
.wait_for_panics()
.await;
let key_birthdays = Arc::new(key_birthdays);

// Parse and convert keys once, then use them to scan all blocks.
// There is some cryptography here, but it should be fast even with thousands of keys.
Expand All @@ -80,103 +81,142 @@ pub async fn start(
Ok::<_, Report>((key.clone(), parsed_keys))
})
.try_collect()?;
let parsed_keys = Arc::new(parsed_keys);

// Give empty states time to verify some blocks before we start scanning.
tokio::time::sleep(INITIAL_WAIT).await;

loop {
// Get a block from the state.
// We can't use ServiceExt::oneshot() here, because it causes lifetime errors in init().
let block = state
.ready()
.await
.map_err(|e| eyre!(e))?
.call(zebra_state::Request::Block(height.into()))
.await
.map_err(|e| eyre!(e))?;

let block = match block {
zebra_state::Response::Block(Some(block)) => block,
zebra_state::Response::Block(None) => {
// If we've reached the tip, sleep for a while then try and get the same block.
tokio::time::sleep(CHECK_INTERVAL).await;
continue;
}
_ => unreachable!("unmatched response to a state::Tip request"),
};

// Only log at info level every 100,000 blocks
let is_info_log =
height == storage.min_sapling_birthday_height() || height.0 % INFO_LOG_INTERVAL == 0;

// TODO: add debug logs?
let scanned_height = scan_height_and_store_results(
height,
state.clone(),
chain_tip_change.clone(),
storage.clone(),
key_birthdays.clone(),
parsed_keys.clone(),
)
.await?;

// If we've reached the tip, sleep for a while then try and get the same block.
if scanned_height.is_none() {
tokio::time::sleep(CHECK_INTERVAL).await;
continue;
}

height = height
.next()
.expect("a valid blockchain never reaches the max height");
}
}

/// Get the block at `height` from `state`, scan it with the keys in `parsed_keys`, and store the
/// results in `storage`. If `height` is lower than the `key_birthdays` for that key, skip it.
///
/// Returns:
/// - `Ok(Some(height))` if the height was scanned,
/// - `Ok(None)` if the height was not in the database, and
/// - `Err(error)` on fatal errors.
pub async fn scan_height_and_store_results(
height: Height,
mut state: State,
chain_tip_change: ChainTipChange,
storage: Storage,
key_birthdays: Arc<HashMap<SaplingScanningKey, Height>>,
parsed_keys: Arc<
HashMap<SaplingScanningKey, (Vec<DiversifiableFullViewingKey>, Vec<SaplingIvk>)>,
>,
) -> Result<Option<Height>, Report> {
let network = storage.network();

// Only log at info level every 100,000 blocks.
//
// TODO: also log progress every 5 minutes once we reach the tip?
let is_info_log =
height == storage.min_sapling_birthday_height() || height.0 % INFO_LOG_INTERVAL == 0;

// TODO: add debug logs?
if is_info_log {
info!(
"Scanning the blockchain: now at block {:?}, current tip {:?}",
height,
chain_tip_change
.latest_chain_tip()
.best_tip_height_and_hash(),
);
}

// Get a block from the state.
// We can't use ServiceExt::oneshot() here, because it causes lifetime errors in init().
let block = state
.ready()
.await
.map_err(|e| eyre!(e))?
.call(zebra_state::Request::Block(height.into()))
.await
.map_err(|e| eyre!(e))?;

let block = match block {
zebra_state::Response::Block(Some(block)) => block,
zebra_state::Response::Block(None) => return Ok(None),
_ => unreachable!("unmatched response to a state::Tip request"),
};

// Scan it with all the keys.
//
// TODO: scan each key in parallel (after MVP?)
for (key_num, (sapling_key, birthday_height)) in key_birthdays.iter().enumerate() {
// # Security
//
// We can't log `sapling_key` here because it is a private viewing key. Anyone who reads
// the logs could use the key to view those transactions.
if is_info_log {
info!(
"Scanning the blockchain: now at block {:?}, current tip {:?}",
height,
chain_tip_change
.latest_chain_tip()
.best_tip_height_and_hash(),
"Scanning the blockchain for key {}, started at block {:?}",
key_num, birthday_height,
);
}

for (key_num, (sapling_key, birthday_height)) in key_birthdays.iter().enumerate() {
// # Security
//
// We can't log `sapling_key` here because it is a private viewing key. Anyone who reads
// the logs could use the key to view those transactions.
if is_info_log {
info!(
"Scanning the blockchain for key {}, started at block {:?}",
key_num, birthday_height,
);
}

// Get the pre-parsed keys for this configured key.
let (dfvks, ivks) = parsed_keys.get(sapling_key).cloned().unwrap_or_default();

// Scan the block, which blocks async execution until the scan is complete.
//
// TODO: skip scanning before birthday height (#8022)
// TODO: scan each key in parallel (after MVP?)
let sapling_key = sapling_key.clone();
let block = block.clone();
let mut storage = storage.clone();

// We use a dummy size of the Sapling note commitment tree.
//
// We can't set the size to zero, because the underlying scanning function would return
// `zcash_client_backeng::scanning::ScanError::TreeSizeUnknown`.
//
// And we can't set them close to 0, because the scanner subtracts the number of notes
// in the block, and panics with "attempt to subtract with overflow". The number of
// notes in a block must be less than this value, this is a consensus rule.
//
// TODO: use the real sapling tree size: `zs::Response::SaplingTree().position() + 1`
let sapling_tree_size = 1 << 16;

tokio::task::spawn_blocking(move || {
let dfvk_res =
scan_block(network, &block, sapling_tree_size, &dfvks).map_err(|e| eyre!(e))?;
let ivk_res =
scan_block(network, &block, sapling_tree_size, &ivks).map_err(|e| eyre!(e))?;

let dfvk_res = scanned_block_to_db_result(dfvk_res);
let ivk_res = scanned_block_to_db_result(ivk_res);

storage.add_sapling_result(sapling_key.clone(), height, dfvk_res);
storage.add_sapling_result(sapling_key, height, ivk_res);

Ok::<_, Report>(())
})
.wait_for_panics()
.await?;
}
// Get the pre-parsed keys for this configured key.
let (dfvks, ivks) = parsed_keys.get(sapling_key).cloned().unwrap_or_default();

height = height
.next()
.expect("a valid blockchain never reaches the max height");
// Scan the block, which blocks async execution until the scan is complete.
//
// TODO: skip scanning before birthday height (#8022)
let sapling_key = sapling_key.clone();
let block = block.clone();
let mut storage = storage.clone();

// We use a dummy size of the Sapling note commitment tree.
//
// We can't set the size to zero, because the underlying scanning function would return
// `zcash_client_backeng::scanning::ScanError::TreeSizeUnknown`.
//
// And we can't set them close to 0, because the scanner subtracts the number of notes
// in the block, and panics with "attempt to subtract with overflow". The number of
// notes in a block must be less than this value, this is a consensus rule.
//
// TODO: use the real sapling tree size: `zs::Response::SaplingTree().position() + 1`
let sapling_tree_size = 1 << 16;

tokio::task::spawn_blocking(move || {
let dfvk_res =
scan_block(network, &block, sapling_tree_size, &dfvks).map_err(|e| eyre!(e))?;
let ivk_res =
scan_block(network, &block, sapling_tree_size, &ivks).map_err(|e| eyre!(e))?;

let dfvk_res = scanned_block_to_db_result(dfvk_res);
let ivk_res = scanned_block_to_db_result(ivk_res);

storage.add_sapling_result(sapling_key.clone(), height, dfvk_res);
storage.add_sapling_result(sapling_key, height, ivk_res);

Ok::<_, Report>(())
})
.wait_for_panics()
.await?;
}

Ok(Some(height))
}

/// Returns the transactions from `block` belonging to the given `scanning_keys`.
Expand Down

0 comments on commit 9a1414a

Please sign in to comment.