From 0352621d8d2641b0bb5e16f17276116254efb8c5 Mon Sep 17 00:00:00 2001 From: nikurt <86772482+nikurt@users.noreply.github.com> Date: Mon, 20 Mar 2023 11:36:13 +0100 Subject: [PATCH] Fix: Add --dry-run to the tool for applying state-parts (#8739) Also rework the state parts command API to be a single command with two subcommands instead of two separate commands. --- CHANGELOG.md | 7 +- tools/state-viewer/src/apply_chain_range.rs | 12 +- tools/state-viewer/src/cli.rs | 122 ++++------- tools/state-viewer/src/state_parts.rs | 213 +++++++++++++++----- 4 files changed, 210 insertions(+), 144 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fae58aee9bb..c0880133abb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,10 +6,11 @@ ### Non-protocol Changes +* State-viewer tool to dump and apply state changes from/to a range of blocks. [#8628](https://github.com/near/nearcore/pull/8628) * Experimental option to dump state of every epoch to external storage. [#8661](https://github.com/near/nearcore/pull/8661) -* State-viewer tool to dump and apply state changes from/to a range of blocks [#8628](https://github.com/near/nearcore/pull/8628) -* Node can restart if State Sync gets interrupted [#8732](https://github.com/near/nearcore/pull/8732) -* Add prometheus metrics for tracked shards, block height within epoch, if is block/chunk producer +* Add prometheus metrics for tracked shards, block height within epoch, if is block/chunk producer. [#8728](https://github.com/near/nearcore/pull/8728) +* Node can restart if State Sync gets interrupted. [#8732](https://github.com/near/nearcore/pull/8732) +* Merged two `neard view-state` commands: `apply-state-parts` and `dump-state-parts` into a single `state-parts` command. [#8739](https://github.com/near/nearcore/pull/8739) ## 1.32.0 diff --git a/tools/state-viewer/src/apply_chain_range.rs b/tools/state-viewer/src/apply_chain_range.rs index 9ed3ca26b20..3b85c24ce5b 100644 --- a/tools/state-viewer/src/apply_chain_range.rs +++ b/tools/state-viewer/src/apply_chain_range.rs @@ -1,10 +1,3 @@ -use std::fs::File; -use std::io::Write; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Mutex; - -use rayon::iter::{IntoParallelIterator, ParallelIterator}; - use near_chain::chain::collect_receipts_from_response; use near_chain::migrations::check_if_block_is_first_with_chunk_of_version; use near_chain::types::ApplyTransactionResult; @@ -19,6 +12,11 @@ use near_primitives::types::chunk_extra::ChunkExtra; use near_primitives::types::{BlockHeight, ShardId}; use near_store::{get, DBCol, Store}; use nearcore::NightshadeRuntime; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use std::fs::File; +use std::io::Write; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Mutex; fn timestamp_ms() -> u64 { use std::time::{SystemTime, UNIX_EPOCH}; diff --git a/tools/state-viewer/src/cli.rs b/tools/state-viewer/src/cli.rs index aef577a7326..a3218119f07 100644 --- a/tools/state-viewer/src/cli.rs +++ b/tools/state-viewer/src/cli.rs @@ -1,7 +1,6 @@ use crate::commands::*; use crate::contract_accounts::ContractAccountFilter; use crate::rocksdb_stats::get_rocksdb_stats; -use crate::state_parts::{apply_state_parts, dump_state_parts}; use near_chain_configs::{GenesisChangeConfig, GenesisValidationMode}; use near_primitives::account::id::AccountId; use near_primitives::hash::CryptoHash; @@ -27,8 +26,6 @@ pub enum StateViewerSubCommand { /// even if it's not included in any block on disk #[clap(alias = "apply_receipt")] ApplyReceipt(ApplyReceiptCmd), - /// Apply all or a single state part of a shard. - ApplyStateParts(ApplyStatePartsCmd), /// Apply a transaction if it occurs in some chunk we know about, /// even if it's not included in any block on disk #[clap(alias = "apply_tx")] @@ -52,8 +49,6 @@ pub enum StateViewerSubCommand { /// Generate a genesis file from the current state of the DB. #[clap(alias = "dump_state")] DumpState(DumpStateCmd), - /// Dump all or a single state part of a shard. - DumpStateParts(DumpStatePartsCmd), /// Writes state to a remote redis server. #[clap(alias = "dump_state_redis")] DumpStateRedis(DumpStateRedisCmd), @@ -78,6 +73,8 @@ pub enum StateViewerSubCommand { /// Dumps or applies StateChanges. /// Experimental tool for shard shadowing development. StateChanges(StateChangesCmd), + /// Dump or apply state parts. + StateParts(StatePartsCmd), /// View head of the storage. #[clap(alias = "view_chain")] ViewChain(ViewChainCmd), @@ -117,7 +114,6 @@ impl StateViewerSubCommand { StateViewerSubCommand::ApplyChunk(cmd) => cmd.run(home_dir, near_config, store), StateViewerSubCommand::ApplyRange(cmd) => cmd.run(home_dir, near_config, store), StateViewerSubCommand::ApplyReceipt(cmd) => cmd.run(home_dir, near_config, store), - StateViewerSubCommand::ApplyStateParts(cmd) => cmd.run(home_dir, near_config, store), StateViewerSubCommand::ApplyTx(cmd) => cmd.run(home_dir, near_config, store), StateViewerSubCommand::Chain(cmd) => cmd.run(home_dir, near_config, store), StateViewerSubCommand::CheckBlock => check_block_chunk_existence(near_config, store), @@ -126,7 +122,6 @@ impl StateViewerSubCommand { StateViewerSubCommand::DumpAccountStorage(cmd) => cmd.run(home_dir, near_config, store), StateViewerSubCommand::DumpCode(cmd) => cmd.run(home_dir, near_config, store), StateViewerSubCommand::DumpState(cmd) => cmd.run(home_dir, near_config, store), - StateViewerSubCommand::DumpStateParts(cmd) => cmd.run(home_dir, near_config, store), StateViewerSubCommand::DumpStateRedis(cmd) => cmd.run(home_dir, near_config, store), StateViewerSubCommand::DumpTx(cmd) => cmd.run(home_dir, near_config, store), StateViewerSubCommand::EpochInfo(cmd) => cmd.run(home_dir, near_config, store), @@ -136,6 +131,7 @@ impl StateViewerSubCommand { StateViewerSubCommand::RocksDBStats(cmd) => cmd.run(store_opener.path()), StateViewerSubCommand::State => state(home_dir, near_config, store), StateViewerSubCommand::StateChanges(cmd) => cmd.run(home_dir, near_config, store), + StateViewerSubCommand::StateParts(cmd) => cmd.run(home_dir, near_config, store), StateViewerSubCommand::ViewChain(cmd) => cmd.run(near_config, store), StateViewerSubCommand::ViewTrie(cmd) => cmd.run(store), } @@ -219,42 +215,6 @@ impl ApplyReceiptCmd { } } -#[derive(clap::Parser)] -pub struct ApplyStatePartsCmd { - /// Selects an epoch. The dump will be of the state at the beginning of this epoch. - #[clap(subcommand)] - epoch_selection: crate::state_parts::EpochSelection, - /// Shard id. - #[clap(long)] - shard_id: ShardId, - /// State part id. Leave empty to go through every part in the shard. - #[clap(long)] - part_id: Option, - /// Where to write the state parts to. - #[clap(long)] - root_dir: Option, - /// Store state parts in an S3 bucket. - #[clap(long)] - s3_bucket: Option, - /// Store state parts in an S3 bucket. - #[clap(long)] - s3_region: Option, -} - -impl ApplyStatePartsCmd { - pub fn run(self, home_dir: &Path, near_config: NearConfig, store: Store) { - apply_state_parts( - self.epoch_selection, - self.shard_id, - self.part_id, - home_dir, - near_config, - store, - crate::state_parts::Location::new(self.root_dir, (self.s3_bucket, self.s3_region)), - ); - } -} - #[derive(clap::Parser)] pub struct ApplyTxCmd { #[clap(long)] @@ -402,49 +362,6 @@ impl DumpStateCmd { } } -#[derive(clap::Parser)] -pub struct DumpStatePartsCmd { - /// Selects an epoch. The dump will be of the state at the beginning of this epoch. - #[clap(subcommand)] - epoch_selection: crate::state_parts::EpochSelection, - /// Shard id. - #[clap(long)] - shard_id: ShardId, - /// Dump a single part id. - #[clap(long)] - part_id: Option, - /// Dump part ids starting from this part. - #[clap(long)] - part_from: Option, - /// Dump part ids up to this part (exclusive). - #[clap(long)] - part_to: Option, - /// Where to write the state parts to. - #[clap(long)] - root_dir: Option, - /// Store state parts in an S3 bucket. - #[clap(long)] - s3_bucket: Option, - /// S3 region to store state parts. - #[clap(long)] - s3_region: Option, -} - -impl DumpStatePartsCmd { - pub fn run(self, home_dir: &Path, near_config: NearConfig, store: Store) { - dump_state_parts( - self.epoch_selection, - self.shard_id, - self.part_from.or(self.part_id), - self.part_to.or(self.part_id.map(|x| x + 1)), - home_dir, - near_config, - store, - crate::state_parts::Location::new(self.root_dir, (self.s3_bucket, self.s3_region)), - ); - } -} - #[derive(clap::Parser)] pub struct DumpStateRedisCmd { /// Optionally, can specify at which height to dump state. @@ -655,3 +572,36 @@ impl ViewTrieCmd { } } } + +#[derive(clap::Parser)] +pub struct StatePartsCmd { + /// Shard id. + #[clap(long)] + shard_id: ShardId, + /// Location of serialized state parts. + #[clap(long)] + root_dir: Option, + /// Store state parts in an S3 bucket. + #[clap(long)] + s3_bucket: Option, + /// Store state parts in an S3 bucket. + #[clap(long)] + s3_region: Option, + /// Dump or Apply state parts. + #[clap(subcommand)] + command: crate::state_parts::StatePartsSubCommand, +} + +impl StatePartsCmd { + pub fn run(self, home_dir: &Path, near_config: NearConfig, store: Store) { + self.command.run( + self.shard_id, + self.root_dir, + self.s3_bucket, + self.s3_region, + home_dir, + near_config, + store, + ); + } +} diff --git a/tools/state-viewer/src/state_parts.rs b/tools/state-viewer/src/state_parts.rs index 934221ac61f..a0a15e8414a 100644 --- a/tools/state-viewer/src/state_parts.rs +++ b/tools/state-viewer/src/state_parts.rs @@ -1,12 +1,11 @@ use crate::epoch_info::iterate_and_filter; -use clap::Subcommand; use near_chain::types::RuntimeAdapter; use near_chain::{ChainStore, ChainStoreAccess}; use near_epoch_manager::EpochManager; use near_primitives::epoch_manager::epoch_info::EpochInfo; use near_primitives::state_part::PartId; use near_primitives::syncing::get_num_state_parts; -use near_primitives::types::EpochId; +use near_primitives::types::{EpochId, StateRoot}; use near_primitives_core::hash::CryptoHash; use near_primitives_core::types::{BlockHeight, EpochHeight, ShardId}; use near_store::Store; @@ -19,7 +18,81 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Instant; -#[derive(Subcommand, Debug, Clone)] +#[derive(clap::Subcommand, Debug, Clone)] +pub(crate) enum StatePartsSubCommand { + /// Apply all or a single state part of a shard. + Apply { + /// If true, validate the state part but don't write it to the DB. + #[clap(long)] + dry_run: bool, + /// If provided, this value will be used instead of looking it up in the headers. + /// Use if those headers or blocks are not available. + #[clap(long)] + state_root: Option, + /// Choose a single part id. + /// If None - affects all state parts. + #[clap(long)] + part_id: Option, + /// Select an epoch to work on. + #[clap(subcommand)] + epoch_selection: EpochSelection, + }, + /// Dump all or a single state part of a shard. + Dump { + /// Dump part ids starting from this part. + #[clap(long)] + part_from: Option, + /// Dump part ids up to this part (exclusive). + #[clap(long)] + part_to: Option, + /// Select an epoch to work on. + #[clap(subcommand)] + epoch_selection: EpochSelection, + }, +} + +impl StatePartsSubCommand { + pub(crate) fn run( + self, + shard_id: ShardId, + root_dir: Option, + s3_bucket: Option, + s3_region: Option, + home_dir: &Path, + near_config: NearConfig, + store: Store, + ) { + match self { + StatePartsSubCommand::Apply { dry_run, state_root, part_id, epoch_selection } => { + apply_state_parts( + epoch_selection, + shard_id, + part_id, + dry_run, + state_root, + home_dir, + near_config, + store, + Location::new(root_dir, (s3_bucket, s3_region)), + ); + } + StatePartsSubCommand::Dump { part_from, part_to, epoch_selection } => { + dump_state_parts( + epoch_selection, + shard_id, + part_from, + part_to, + home_dir, + near_config, + store, + Location::new(root_dir, (s3_bucket, s3_region)), + ); + } + } + } +} + +#[derive(clap::Subcommand, Debug, Clone)] pub(crate) enum EpochSelection { /// Current epoch. Current, @@ -34,7 +107,7 @@ pub(crate) enum EpochSelection { } impl EpochSelection { - pub fn to_epoch_id( + fn to_epoch_id( &self, store: Store, chain_store: &ChainStore, @@ -70,13 +143,13 @@ impl EpochSelection { } } -pub(crate) enum Location { +enum Location { Files(PathBuf), S3 { bucket: String, region: String }, } impl Location { - pub(crate) fn new( + fn new( root_dir: Option, s3_bucket_and_region: (Option, Option), ) -> Self { @@ -134,10 +207,12 @@ fn get_prev_hash_of_epoch( } } -pub(crate) fn apply_state_parts( +fn apply_state_parts( epoch_selection: EpochSelection, shard_id: ShardId, part_id: Option, + dry_run: bool, + maybe_state_root: Option, home_dir: &Path, near_config: NearConfig, store: Store, @@ -156,17 +231,23 @@ pub(crate) fn apply_state_parts( let epoch_id = epoch_selection.to_epoch_id(store, &chain_store, &epoch_manager); let epoch = epoch_manager.get_epoch_info(&epoch_id).unwrap(); - let sync_prev_hash = get_prev_hash_of_epoch(&epoch, &chain_store, &epoch_manager); - let sync_prev_block = chain_store.get_block(&sync_prev_hash).unwrap(); - assert!(epoch_manager.is_next_block_epoch_start(&sync_prev_hash).unwrap()); - assert!( - shard_id < sync_prev_block.chunks().len() as u64, - "shard_id: {}, #shards: {}", - shard_id, - sync_prev_block.chunks().len() - ); - let state_root = sync_prev_block.chunks()[shard_id as usize].prev_state_root(); + let (state_root, sync_prev_hash) = if let Some(state_root) = maybe_state_root { + (state_root, None) + } else { + let sync_prev_hash = get_prev_hash_of_epoch(&epoch, &chain_store, &epoch_manager); + let sync_prev_block = chain_store.get_block(&sync_prev_hash).unwrap(); + + assert!(epoch_manager.is_next_block_epoch_start(&sync_prev_hash).unwrap()); + assert!( + shard_id < sync_prev_block.chunks().len() as u64, + "shard_id: {}, #shards: {}", + shard_id, + sync_prev_block.chunks().len() + ); + let state_root = sync_prev_block.chunks()[shard_id as usize].prev_state_root(); + (state_root, Some(sync_prev_hash)) + }; let part_storage = get_state_part_reader( location, @@ -176,6 +257,7 @@ pub(crate) fn apply_state_parts( ); let num_parts = part_storage.num_parts(); + assert_ne!(num_parts, 0, "Too few num_parts: {}", num_parts); let part_ids = get_part_ids(part_id, part_id.map(|x| x + 1), num_parts); tracing::info!( target: "state-parts", @@ -192,22 +274,32 @@ pub(crate) fn apply_state_parts( for part_id in part_ids { let timer = Instant::now(); assert!(part_id < num_parts, "part_id: {}, num_parts: {}", part_id, num_parts); - let part = part_storage.read(part_id); - runtime_adapter - .apply_state_part( - shard_id, + let part = part_storage.read(part_id, num_parts); + + if dry_run { + assert!(runtime_adapter.validate_state_part( &state_root, PartId::new(part_id, num_parts), - &part, - &epoch_id, - ) - .unwrap(); - tracing::info!(target: "state-parts", part_id, part_length = part.len(), elapsed_sec = timer.elapsed().as_secs_f64(), "Applied a state part"); + &part + )); + tracing::info!(target: "state-parts", part_id, part_length = part.len(), elapsed_sec = timer.elapsed().as_secs_f64(), "Validated a state part"); + } else { + runtime_adapter + .apply_state_part( + shard_id, + &state_root, + PartId::new(part_id, num_parts), + &part, + &epoch_id, + ) + .unwrap(); + tracing::info!(target: "state-parts", part_id, part_length = part.len(), elapsed_sec = timer.elapsed().as_secs_f64(), "Applied a state part"); + } } tracing::info!(target: "state-parts", total_elapsed_sec = timer.elapsed().as_secs_f64(), "Applied all requested state parts"); } -pub(crate) fn dump_state_parts( +fn dump_state_parts( epoch_selection: EpochSelection, shard_id: ShardId, part_from: Option, @@ -277,7 +369,7 @@ pub(crate) fn dump_state_parts( PartId::new(part_id, num_parts), ) .unwrap(); - part_storage.write(&state_part, part_id); + part_storage.write(&state_part, part_id, num_parts); tracing::info!(target: "state-parts", part_id, part_length = state_part.len(), elapsed_sec = timer.elapsed().as_secs_f64(), "Wrote a state part"); } tracing::info!(target: "state-parts", total_elapsed_sec = timer.elapsed().as_secs_f64(), "Wrote all requested state parts"); @@ -291,21 +383,36 @@ fn location_prefix(chain_id: &str, epoch_height: u64, shard_id: u64) -> String { format!("chain_id={}/epoch_height={}/shard_id={}", chain_id, epoch_height, shard_id) } +fn match_filename(s: &str) -> Option { + let re = regex::Regex::new(r"^state_part_(\d{6})_of_(\d{6})$").unwrap(); + re.captures(s) +} + fn is_part_filename(s: &str) -> bool { - let re = regex::Regex::new(r"^state_part_(\d{6})$").unwrap(); - re.is_match(s) + match_filename(s).is_some() +} + +fn get_num_parts_from_filename(s: &str) -> Option { + if let Some(captures) = match_filename(s) { + if let Some(num_parts) = captures.get(2) { + if let Ok(num_parts) = num_parts.as_str().parse::() { + return Some(num_parts); + } + } + } + None } -fn part_filename(part_id: u64) -> String { - format!("state_part_{:06}", part_id) +fn part_filename(part_id: u64, num_parts: u64) -> String { + format!("state_part_{:06}_of_{:06}", part_id, num_parts) } trait StatePartWriter { - fn write(&self, state_part: &[u8], part_id: u64); + fn write(&self, state_part: &[u8], part_id: u64, num_parts: u64); } trait StatePartReader { - fn read(&self, part_id: u64) -> Vec; + fn read(&self, part_id: u64, num_parts: u64) -> Vec; fn num_parts(&self) -> u64; } @@ -362,22 +469,22 @@ impl FileSystemStorage { Self { state_parts_dir } } - fn get_location(&self, part_id: u64) -> PathBuf { - (&self.state_parts_dir).join(part_filename(part_id)) + fn get_location(&self, part_id: u64, num_parts: u64) -> PathBuf { + (&self.state_parts_dir).join(part_filename(part_id, num_parts)) } } impl StatePartWriter for FileSystemStorage { - fn write(&self, state_part: &[u8], part_id: u64) { - let filename = self.get_location(part_id); + fn write(&self, state_part: &[u8], part_id: u64, num_parts: u64) { + let filename = self.get_location(part_id, num_parts); std::fs::write(&filename, state_part).unwrap(); tracing::info!(target: "state-parts", part_id, part_length = state_part.len(), ?filename, "Wrote a state part to disk"); } } impl StatePartReader for FileSystemStorage { - fn read(&self, part_id: u64) -> Vec { - let filename = self.get_location(part_id); + fn read(&self, part_id: u64, num_parts: u64) -> Vec { + let filename = self.get_location(part_id, num_parts); let part = std::fs::read(filename).unwrap(); part } @@ -421,22 +528,22 @@ impl S3Storage { Self { location, bucket } } - fn get_location(&self, part_id: u64) -> String { - format!("{}/{}", self.location, part_filename(part_id)) + fn get_location(&self, part_id: u64, num_parts: u64) -> String { + format!("{}/{}", self.location, part_filename(part_id, num_parts)) } } impl StatePartWriter for S3Storage { - fn write(&self, state_part: &[u8], part_id: u64) { - let location = self.get_location(part_id); + fn write(&self, state_part: &[u8], part_id: u64, num_parts: u64) { + let location = self.get_location(part_id, num_parts); self.bucket.put_object_blocking(&location, &state_part).unwrap(); tracing::info!(target: "state-parts", part_id, part_length = state_part.len(), ?location, "Wrote a state part to S3"); } } impl StatePartReader for S3Storage { - fn read(&self, part_id: u64) -> Vec { - let location = self.get_location(part_id); + fn read(&self, part_id: u64, num_parts: u64) -> Vec { + let location = self.get_location(part_id, num_parts); let response = self.bucket.get_object_blocking(location.clone()).unwrap(); tracing::info!(target: "state-parts", part_id, location, response_code = response.status_code(), "Got an object from S3"); assert_eq!(response.status_code(), 200); @@ -449,17 +556,27 @@ impl StatePartReader for S3Storage { let list: Vec = self.bucket.list_blocking(location, Some("/".to_string())).unwrap(); assert_eq!(list.len(), 1); - let num_parts = list[0] + let mut known_num_parts = None; + let num_objects = list[0] .contents .iter() .filter(|object| { let filename = Path::new(&object.key); let filename = filename.file_name().unwrap().to_str().unwrap(); tracing::debug!(target: "state-parts", object_key = ?object.key, ?filename); + if let Some(num_parts) = get_num_parts_from_filename(filename) { + if let Some(known_num_parts) = known_num_parts { + assert_eq!(known_num_parts, num_parts); + } + known_num_parts = Some(num_parts); + } is_part_filename(filename) }) .collect::>() .len(); - num_parts as u64 + if let Some(known_num_parts) = known_num_parts { + assert_eq!(known_num_parts, num_objects as u64); + } + num_objects as u64 } }