diff --git a/components/chainhook-cli/src/cli/mod.rs b/components/chainhook-cli/src/cli/mod.rs index 958fded1e..13baf7ff7 100644 --- a/components/chainhook-cli/src/cli/mod.rs +++ b/components/chainhook-cli/src/cli/mod.rs @@ -11,8 +11,8 @@ use crate::storage::{ delete_confirmed_entry_from_stacks_blocks, delete_unconfirmed_entry_from_stacks_blocks, get_last_block_height_inserted, get_last_unconfirmed_block_height_inserted, get_stacks_block_at_block_height, insert_unconfirmed_entry_in_stacks_blocks, - is_stacks_block_present, open_readonly_stacks_db_conn, open_readwrite_stacks_db_conn, - set_last_confirmed_insert_key, + is_stacks_block_present, open_readonly_stacks_db_conn, open_readonly_stacks_db_conn_with_retry, + open_readwrite_stacks_db_conn, set_last_confirmed_insert_key, }; use chainhook_sdk::chainhooks::bitcoin::BitcoinChainhookSpecification; use chainhook_sdk::chainhooks::bitcoin::BitcoinChainhookSpecificationNetworkMap; @@ -543,16 +543,22 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> { } }; match open_readonly_stacks_db_conn(&config.expected_cache_path(), &ctx) { - Ok(db_conn) => { + Ok(_) => { let _ = consolidate_local_stacks_chainstate_using_csv( &mut config, &ctx, ) .await; + // Refresh DB connection so it picks up recent changes made by TSV consolidation. + let new_conn = open_readonly_stacks_db_conn_with_retry( + &config.expected_cache_path(), + 5, + &ctx, + )?; scan_stacks_chainstate_via_rocksdb_using_predicate( &predicate_spec, None, - &db_conn, + &new_conn, &config, None, &ctx, diff --git a/components/chainhook-cli/src/scan/stacks.rs b/components/chainhook-cli/src/scan/stacks.rs index 57dd2d04b..06d32f7fc 100644 --- a/components/chainhook-cli/src/scan/stacks.rs +++ b/components/chainhook-cli/src/scan/stacks.rs @@ -1,5 +1,7 @@ use std::{ collections::{HashMap, VecDeque}, + fs::File, + io::{BufRead, BufReader}, sync::{Arc, RwLock}, }; @@ -66,11 +68,13 @@ pub enum RecordKind { AttachmentReceived, } +/// Calculates the canonical chain of Stacks blocks based on a Stacks node events TSV file. Returns a `VecDeque` structure of +/// block hashes along with the line number where we can find the entire block message within the TSV. pub async fn get_canonical_fork_from_tsv( config: &mut Config, start_block: Option, ctx: &Context, -) -> Result, String> { +) -> Result, String> { let seed_tsv_path = config.expected_local_stacks_tsv_file()?.clone(); let (record_tx, record_rx) = std::sync::mpsc::channel(); @@ -89,10 +93,12 @@ pub async fn get_canonical_fork_from_tsv( .from_path(&seed_tsv_path) .expect("unable to create csv reader"); + let mut line: u64 = 0; for result in reader_builder.deserialize() { + line += 1; let record: Record = result.unwrap(); match &record.kind { - RecordKind::StacksBlockReceived => match record_tx.send(Some(record)) { + RecordKind::StacksBlockReceived => match record_tx.send(Some((record, line))) { Err(_e) => { break; } @@ -108,9 +114,9 @@ pub async fn get_canonical_fork_from_tsv( let stacks_db = open_readonly_stacks_db_conn_with_retry(&config.expected_cache_path(), 3, ctx)?; let canonical_fork = { let mut cursor = BlockIdentifier::default(); - let mut dump = HashMap::new(); + let mut tsv_new_blocks = HashMap::new(); - while let Ok(Some(mut record)) = record_rx.recv() { + while let Ok(Some((record, line))) = record_rx.recv() { let (block_identifier, parent_block_identifier) = match (&record.kind, &record.blob) { (RecordKind::StacksBlockReceived, Some(blob)) => { match standardize_stacks_serialized_block_header(&blob) { @@ -141,23 +147,28 @@ pub async fn get_canonical_fork_from_tsv( } if block_identifier.index > cursor.index { - cursor = block_identifier.clone(); // todo(lgalabru) + cursor = block_identifier.clone(); } - dump.insert( - block_identifier, - (parent_block_identifier, record.blob.take().unwrap()), - ); + tsv_new_blocks.insert(block_identifier, (parent_block_identifier, line)); } let mut canonical_fork = VecDeque::new(); while cursor.index > 0 { - let (block_identifer, (parent_block_identifier, blob)) = - match dump.remove_entry(&cursor) { + let (block_identifer, (parent_block_identifier, line)) = + match tsv_new_blocks.remove_entry(&cursor) { Some(entry) => entry, - None => break, + None => { + warn!( + ctx.expect_logger(), + "Unable to find block {} with index block hash {} in TSV", + cursor.index, + cursor.hash + ); + break; + } }; - cursor = parent_block_identifier.clone(); // todo(lgalabru) - canonical_fork.push_front((block_identifer, parent_block_identifier, blob)); + cursor = parent_block_identifier.clone(); + canonical_fork.push_front((block_identifer, parent_block_identifier, line)); } canonical_fork }; @@ -474,7 +485,10 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate( ); let mut last_block_scanned = BlockIdentifier::default(); let mut err_count = 0; - for (block_identifier, _parent_block_identifier, blob) in canonical_fork.drain(..) { + let tsv_path = config.expected_local_stacks_tsv_file()?.clone(); + let mut tsv_reader = BufReader::new(File::open(tsv_path).map_err(|e| e.to_string())?); + let mut tsv_current_line = 0; + for (block_identifier, _parent_block_identifier, tsv_line_number) in canonical_fork.drain(..) { if block_identifier.index < start_block { continue; } @@ -484,11 +498,25 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate( } } + // Seek to required line from TSV and retrieve its block payload. + let mut tsv_line = String::new(); + while tsv_current_line < tsv_line_number { + tsv_line.clear(); + let bytes_read = tsv_reader.read_line(&mut tsv_line).map_err(|e| e.to_string())?; + if bytes_read == 0 { + return Err("Unexpected EOF when reading TSV".to_string()); + } + tsv_current_line += 1; + } + let Some(serialized_block) = tsv_line.split('\t').last() else { + return Err("Unable to retrieve serialized block from TSV line".to_string()); + }; + last_block_scanned = block_identifier; blocks_scanned += 1; let block_data = match indexer::stacks::standardize_stacks_serialized_block( &indexer.config, - &blob, + serialized_block, &mut indexer.stacks_context, ctx, ) { @@ -555,12 +583,12 @@ pub async fn consolidate_local_stacks_chainstate_using_csv( ); let downloaded_new_dataset = download_stacks_dataset_if_required(config, ctx).await?; - if downloaded_new_dataset { let stacks_db = open_readonly_stacks_db_conn_with_retry(&config.expected_cache_path(), 3, ctx)?; let confirmed_tip = get_last_block_height_inserted(&stacks_db, &ctx); - let mut canonical_fork = get_canonical_fork_from_tsv(config, confirmed_tip, ctx).await?; + let mut canonical_fork: VecDeque<(BlockIdentifier, BlockIdentifier, u64)> = + get_canonical_fork_from_tsv(config, confirmed_tip, ctx).await?; let mut indexer = Indexer::new(config.network.clone()); let mut blocks_inserted = 0; @@ -571,7 +599,14 @@ pub async fn consolidate_local_stacks_chainstate_using_csv( ctx.expect_logger(), "Beginning import of {} Stacks blocks into rocks db", blocks_to_insert ); - for (block_identifier, _parent_block_identifier, blob) in canonical_fork.drain(..) { + // TODO: To avoid repeating code with `scan_stacks_chainstate_via_csv_using_predicate`, we should move this block + // retrieval code into a reusable function. + let tsv_path = config.expected_local_stacks_tsv_file()?.clone(); + let mut tsv_reader = BufReader::new(File::open(tsv_path).map_err(|e| e.to_string())?); + let mut tsv_current_line = 0; + for (block_identifier, _parent_block_identifier, tsv_line_number) in + canonical_fork.drain(..) + { blocks_read += 1; // If blocks already stored, move on @@ -580,9 +615,23 @@ pub async fn consolidate_local_stacks_chainstate_using_csv( } blocks_inserted += 1; + // Seek to required line from TSV and retrieve its block payload. + let mut tsv_line = String::new(); + while tsv_current_line < tsv_line_number { + tsv_line.clear(); + let bytes_read = tsv_reader.read_line(&mut tsv_line).map_err(|e| e.to_string())?; + if bytes_read == 0 { + return Err("Unexpected EOF when reading TSV".to_string()); + } + tsv_current_line += 1; + } + let Some(serialized_block) = tsv_line.split('\t').last() else { + return Err("Unable to retrieve serialized block from TSV line".to_string()); + }; + let block_data = match indexer::stacks::standardize_stacks_serialized_block( &indexer.config, - &blob, + serialized_block, &mut indexer.stacks_context, ctx, ) {