diff --git a/CHANGELOG.md b/CHANGELOG.md index cca8ce9c5a7..0997bd36604 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,9 +11,10 @@ ### Non-protocol Changes -* Node can sync State from S3. [#8789](https://github.com/near/nearcore/pull/8789) * The contract runtime switched to using our fork of wasmer, with various improvements. * undo-block tool to reset the chain head from current head to its prev block. Use the tool by running: `./target/release/neard --home {path_to_config_directory} undo-block`. [#8681](https://github.com/near/nearcore/pull/8681) +* Node can sync State from S3. [#8789](https://github.com/near/nearcore/pull/8789) +* Node can sync State from local filesystem. [#8789](https://github.com/near/nearcore/pull/8789) * Add per shard granularity for chunks in validator info metric. [#8934](https://github.com/near/nearcore/pull/8934) * Add prometheus metrics for expected number of blocks/chunks at the end of the epoch. [#8759](https://github.com/near/nearcore/pull/8759) diff --git a/Cargo.lock b/Cargo.lock index 246aa0b30c6..7ce7a542dd3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3229,6 +3229,7 @@ dependencies = [ "actix", "actix-rt", "ansi_term", + "anyhow", "assert_matches", "async-trait", "borsh 0.10.2", @@ -3260,6 +3261,7 @@ dependencies = [ "once_cell", "rand 0.8.5", "reed-solomon-erasure", + "regex", "rust-s3", "serde_json", "strum", diff --git a/chain/client-primitives/src/types.rs b/chain/client-primitives/src/types.rs index ac9a77d2c74..80a4829d328 100644 --- a/chain/client-primitives/src/types.rs +++ b/chain/client-primitives/src/types.rs @@ -50,7 +50,6 @@ pub enum AccountOrPeerIdOrHash { AccountId(AccountId), PeerId(PeerId), Hash(CryptoHash), - ExternalStorage, } #[derive(Debug, serde::Serialize)] @@ -63,7 +62,8 @@ pub struct DownloadStatus { pub state_requests_count: u64, pub last_target: Option, #[serde(skip_serializing, skip_deserializing)] - pub response: Arc), String>>>>, + // Use type `String` as an error to avoid a dependency on the `rust-s3` or `anyhow` crates. + pub response: Arc, String>>>>, } impl DownloadStatus { diff --git a/chain/client/Cargo.toml b/chain/client/Cargo.toml index cfc3eeb9a2e..da2ee825b9a 100644 --- a/chain/client/Cargo.toml +++ b/chain/client/Cargo.toml @@ -9,6 +9,7 @@ edition.workspace = true actix-rt.workspace = true actix.workspace = true ansi_term.workspace = true +anyhow.workspace = true async-trait.workspace = true borsh.workspace = true chrono.workspace = true @@ -20,6 +21,7 @@ num-rational.workspace = true once_cell.workspace = true rand.workspace = true reed-solomon-erasure.workspace = true +regex.workspace = true rust-s3.workspace = true serde_json.workspace = true strum.workspace = true @@ -28,24 +30,24 @@ thiserror.workspace = true tokio.workspace = true tracing.workspace = true +delay-detector.workspace = true near-async.workspace = true -near-chain-primitives.workspace = true -near-crypto.workspace = true -near-primitives.workspace = true -near-store.workspace = true near-chain-configs.workspace = true +near-chain-primitives.workspace = true near-chain.workspace = true +near-chunks.workspace = true near-client-primitives.workspace = true +near-crypto.workspace = true near-dyn-configs.workspace = true +near-epoch-manager.workspace = true near-network.workspace = true -near-pool.workspace = true -near-chunks.workspace = true -near-telemetry.workspace = true near-o11y.workspace = true -near-performance-metrics.workspace = true near-performance-metrics-macros.workspace = true -near-epoch-manager.workspace = true -delay-detector.workspace = true +near-performance-metrics.workspace = true +near-pool.workspace = true +near-primitives.workspace = true +near-store.workspace = true +near-telemetry.workspace = true [dev-dependencies] assert_matches.workspace = true diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index 788ab27fe4b..c36ef263bd0 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -252,10 +252,7 @@ impl Client { network_adapter.clone(), config.state_sync_timeout, &config.chain_id, - config.state_sync_from_s3_enabled, - &config.state_sync_s3_bucket, - &config.state_sync_s3_region, - config.state_sync_num_concurrent_s3_requests, + &config.state_sync.sync, ); let num_block_producer_seats = config.num_block_producer_seats as usize; let data_parts = runtime_adapter.num_data_parts(); @@ -2131,10 +2128,7 @@ impl Client { network_adapter1, state_sync_timeout, &self.config.chain_id, - self.config.state_sync_from_s3_enabled, - &self.config.state_sync_s3_bucket, - &self.config.state_sync_s3_region, - self.config.state_sync_num_concurrent_s3_requests, + &self.config.state_sync.sync, ), new_shard_sync, BlocksCatchUpState::new(sync_hash, epoch_id), diff --git a/chain/client/src/client_actor.rs b/chain/client/src/client_actor.rs index fa62f7c7cf5..f1aa62ef3a5 100644 --- a/chain/client/src/client_actor.rs +++ b/chain/client/src/client_actor.rs @@ -700,7 +700,11 @@ impl Handler> for ClientActor { sync_status: format!( "{} ({})", self.client.sync_status.as_variant_name().to_string(), - display_sync_status(&self.client.sync_status, &self.client.chain.head()?,), + display_sync_status( + &self.client.sync_status, + &self.client.chain.head()?, + &self.client.config.state_sync.sync, + ), ), catchup_status: self.client.get_catchup_status()?, current_head_status: head.clone().into(), diff --git a/chain/client/src/info.rs b/chain/client/src/info.rs index 1e9eb0b0e39..409211eb4bf 100644 --- a/chain/client/src/info.rs +++ b/chain/client/src/info.rs @@ -2,7 +2,7 @@ use crate::config_updater::ConfigUpdater; use crate::{metrics, SyncStatus}; use actix::Addr; use itertools::Itertools; -use near_chain_configs::{ClientConfig, LogSummaryStyle}; +use near_chain_configs::{ClientConfig, LogSummaryStyle, SyncConfig}; use near_network::types::NetworkInfo; use near_primitives::block::Tip; use near_primitives::network::PeerId; @@ -343,10 +343,9 @@ impl InfoHelper { let s = |num| if num == 1 { "" } else { "s" }; - let sync_status_log = Some(display_sync_status(sync_status, head)); - + let sync_status_log = + Some(display_sync_status(sync_status, head, &client_config.state_sync.sync)); let catchup_status_log = display_catchup_status(catchup_status); - let validator_info_log = validator_info.as_ref().map(|info| { format!( " {}{} validator{}", @@ -564,7 +563,11 @@ pub fn display_catchup_status(catchup_status: Vec) -> String .join("\n") } -pub fn display_sync_status(sync_status: &SyncStatus, head: &Tip) -> String { +pub fn display_sync_status( + sync_status: &SyncStatus, + head: &Tip, + state_sync_config: &SyncConfig, +) -> String { metrics::SYNC_STATUS.set(sync_status.repr() as i64); match sync_status { SyncStatus::AwaitingPeers => format!("#{:>8} Waiting for peers", head.height), @@ -609,14 +612,17 @@ pub fn display_sync_status(sync_status: &SyncStatus, head: &Tip) -> String { for (shard_id, shard_status) in shard_statuses { write!(res, "[{}: {}]", shard_id, shard_status.status.to_string(),).unwrap(); } - // TODO #8719 - tracing::warn!(target: "stats", - "The node is syncing its State. The current implementation of this mechanism is known to be unreliable. It may never complete, or fail randomly and corrupt the DB.\n\ - Suggestions:\n\ - * Download a recent data snapshot and restart the node.\n\ - * Disable state sync in the config. Add `\"state_sync_enabled\": false` to `config.json`.\n\ - \n\ - A better implementation of State Sync is work in progress."); + if matches!(state_sync_config, SyncConfig::Peers) { + // TODO #8719 + tracing::warn!( + target: "stats", + "The node is syncing its State. The current implementation of this mechanism is known to be unreliable. It may never complete, or fail randomly and corrupt the DB.\n\ + Suggestions:\n\ + * Download a recent data snapshot and restart the node.\n\ + * Disable state sync in the config. Add `\"state_sync_enabled\": false` to `config.json`.\n\ + \n\ + A better implementation of State Sync is work in progress."); + } res } SyncStatus::StateSyncDone => "State sync done".to_string(), diff --git a/chain/client/src/metrics.rs b/chain/client/src/metrics.rs index faac8cb09e3..0eff9ade609 100644 --- a/chain/client/src/metrics.rs +++ b/chain/client/src/metrics.rs @@ -341,7 +341,7 @@ pub(crate) static NODE_PROTOCOL_UPGRADE_VOTING_START: Lazy = Lazy::new .unwrap() }); -pub static PRODUCE_CHUNK_TIME: Lazy = Lazy::new(|| { +pub(crate) static PRODUCE_CHUNK_TIME: Lazy = Lazy::new(|| { try_create_histogram_vec( "near_produce_chunk_time", "Time taken to produce a chunk", @@ -351,7 +351,7 @@ pub static PRODUCE_CHUNK_TIME: Lazy = Lazy::ne .unwrap() }); -pub static VIEW_CLIENT_MESSAGE_TIME: Lazy = Lazy::new(|| { +pub(crate) static VIEW_CLIENT_MESSAGE_TIME: Lazy = Lazy::new(|| { try_create_histogram_vec( "near_view_client_messages_processing_time", "Time that view client takes to handle different messages", @@ -361,16 +361,15 @@ pub static VIEW_CLIENT_MESSAGE_TIME: Lazy = La .unwrap() }); -pub static PRODUCE_AND_DISTRIBUTE_CHUNK_TIME: Lazy = - Lazy::new(|| { - try_create_histogram_vec( - "near_produce_and_distribute_chunk_time", - "Time to produce a chunk and distribute it to peers", - &["shard_id"], - Some(exponential_buckets(0.001, 2.0, 16).unwrap()), - ) - .unwrap() - }); +pub(crate) static PRODUCE_AND_DISTRIBUTE_CHUNK_TIME: Lazy = Lazy::new(|| { + try_create_histogram_vec( + "near_produce_and_distribute_chunk_time", + "Time to produce a chunk and distribute it to peers", + &["shard_id"], + Some(exponential_buckets(0.001, 2.0, 16).unwrap()), + ) + .unwrap() +}); /// Exports neard, protocol and database versions via Prometheus metrics. /// /// Sets metrics which export node’s max supported protocol version, used @@ -391,7 +390,7 @@ pub(crate) fn export_version(neard_version: &near_primitives::version::Version) .inc(); } -pub static STATE_SYNC_STAGE: Lazy = Lazy::new(|| { +pub(crate) static STATE_SYNC_STAGE: Lazy = Lazy::new(|| { try_create_int_gauge_vec( "near_state_sync_stage", "Stage of state sync per shard", @@ -400,7 +399,7 @@ pub static STATE_SYNC_STAGE: Lazy = Lazy::new(| .unwrap() }); -pub static STATE_SYNC_RETRY_PART: Lazy = Lazy::new(|| { +pub(crate) static STATE_SYNC_RETRY_PART: Lazy = Lazy::new(|| { try_create_int_counter_vec( "near_state_sync_retry_part_total", "Number of part requests retried", @@ -409,7 +408,7 @@ pub static STATE_SYNC_RETRY_PART: Lazy = Lazy .unwrap() }); -pub static STATE_SYNC_PARTS_DONE: Lazy = Lazy::new(|| { +pub(crate) static STATE_SYNC_PARTS_DONE: Lazy = Lazy::new(|| { try_create_int_gauge_vec( "near_state_sync_parts_done", "Number of parts downloaded", @@ -418,16 +417,16 @@ pub static STATE_SYNC_PARTS_DONE: Lazy = Lazy:: .unwrap() }); -pub static STATE_SYNC_PARTS_TOTAL: Lazy = Lazy::new(|| { +pub(crate) static STATE_SYNC_PARTS_TOTAL: Lazy = Lazy::new(|| { try_create_int_gauge_vec( "near_state_sync_parts_per_shard", - "Number of parts that need to be downloaded for the shard", + "Number of parts in the shard", &["shard_id"], ) .unwrap() }); -pub static STATE_SYNC_DISCARD_PARTS: Lazy = Lazy::new(|| { +pub(crate) static STATE_SYNC_DISCARD_PARTS: Lazy = Lazy::new(|| { try_create_int_counter_vec( "near_state_sync_discard_parts_total", "Number of times all downloaded parts were discarded to try again", @@ -436,54 +435,50 @@ pub static STATE_SYNC_DISCARD_PARTS: Lazy = L .unwrap() }); -pub static STATE_SYNC_EXTERNAL_PARTS_DONE: Lazy = - Lazy::new(|| { - try_create_int_counter_vec( - "near_state_sync_external_parts_done_total", - "Number of parts successfully retrieved from an external storage", - &["shard_id"], - ) - .unwrap() - }); - -pub static STATE_SYNC_EXTERNAL_PARTS_FAILED: Lazy = - Lazy::new(|| { - try_create_int_counter_vec( - "near_state_sync_external_parts_failed_total", - "Number of parts failed attempts to retrieve parts from an external storage", - &["shard_id"], - ) - .unwrap() - }); +pub(crate) static STATE_SYNC_EXTERNAL_PARTS_DONE: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_state_sync_external_parts_done_total", + "Number of parts retrieved from external storage", + &["shard_id"], + ) + .unwrap() +}); -pub static STATE_SYNC_EXTERNAL_PARTS_SCHEDULING_DELAY: Lazy = - Lazy::new(|| { - try_create_histogram_vec( - "near_state_sync_external_parts_scheduling_delay_sec", - "Delay for a request for parts from an external storage", - &["shard_id"], - Some(exponential_buckets(0.001, 2.0, 20).unwrap()), - ) - .unwrap() - }); +pub(crate) static STATE_SYNC_EXTERNAL_PARTS_FAILED: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_state_sync_external_parts_failed_total", + "Failed retrieval attempts from external storage", + &["shard_id"], + ) + .unwrap() +}); -pub static STATE_SYNC_EXTERNAL_PARTS_REQUEST_DELAY: Lazy = - Lazy::new(|| { - try_create_histogram_vec( - "near_state_sync_external_parts_request_delay_sec", - "Latency of state part requests to external storage", - &["shard_id"], - Some(exponential_buckets(0.001, 2.0, 20).unwrap()), - ) - .unwrap() - }); +pub(crate) static STATE_SYNC_EXTERNAL_PARTS_REQUEST_DELAY: Lazy = Lazy::new(|| { + try_create_histogram_vec( + "near_state_sync_external_parts_request_delay_sec", + "Latency of state part requests to external storage", + &["shard_id"], + Some(exponential_buckets(0.001, 2.0, 20).unwrap()), + ) + .unwrap() +}); -pub static STATE_SYNC_EXTERNAL_PARTS_SIZE_DOWNLOADED: Lazy = +pub(crate) static STATE_SYNC_EXTERNAL_PARTS_SIZE_DOWNLOADED: Lazy = Lazy::new(|| { try_create_int_counter_vec( "near_state_sync_external_parts_size_downloaded_bytes_total", - "Amount of bytes downloaded from an external storage when requesting state parts for a shard", + "Bytes downloaded from an external storage", &["shard_id"], ) - .unwrap() + .unwrap() }); + +pub(crate) static STATE_SYNC_DUMP_PUT_OBJECT_ELAPSED: Lazy = Lazy::new(|| { + try_create_histogram_vec( + "near_state_sync_dump_put_object_elapsed_sec", + "Latency of writes to external storage", + &["shard_id"], + Some(exponential_buckets(0.001, 1.6, 25).unwrap()), + ) + .unwrap() +}); diff --git a/chain/client/src/sync/state.rs b/chain/client/src/sync/state.rs index 89d8d5263b3..675e185ea98 100644 --- a/chain/client/src/sync/state.rs +++ b/chain/client/src/sync/state.rs @@ -29,6 +29,7 @@ use near_async::messaging::CanSendAsync; use near_chain::chain::{ApplyStatePartsRequest, StateSplitRequest}; use near_chain::near_chain_primitives; use near_chain::{Chain, RuntimeWithEpochManagerAdapter}; +use near_chain_configs::{ExternalStorageConfig, ExternalStorageLocation, SyncConfig}; use near_client_primitives::types::{ DownloadStatus, ShardSyncDownload, ShardSyncStatus, StateSplitApplyingStatus, }; @@ -46,8 +47,10 @@ use near_primitives::types::{AccountId, EpochHeight, ShardId, StateRoot}; use rand::seq::SliceRandom; use rand::{thread_rng, Rng}; use std::collections::HashMap; +use std::io::Write; use std::ops::Add; -use std::sync::atomic::{AtomicI64, Ordering}; +use std::path::PathBuf; +use std::sync::atomic::{AtomicI32, Ordering}; use std::sync::Arc; use std::time::Duration as TimeDuration; @@ -109,14 +112,74 @@ enum StateSyncInner { PartsFromExternal { /// Chain ID. chain_id: String, - /// Connection to the external storage. - bucket: Arc, /// The number of requests for state parts from external storage that are /// allowed to be started for this shard. - requests_remaining: Arc, + requests_remaining: Arc, + /// Connection to the external storage. + external: ExternalConnection, }, } +/// Connection to the external storage. +#[derive(Clone)] +pub enum ExternalConnection { + S3 { bucket: Arc }, + Filesystem { root_dir: PathBuf }, +} + +impl ExternalConnection { + async fn get_part(self, shard_id: ShardId, location: &str) -> Result, anyhow::Error> { + let _timer = metrics::STATE_SYNC_EXTERNAL_PARTS_REQUEST_DELAY + .with_label_values(&[&shard_id.to_string()]) + .start_timer(); + match self { + ExternalConnection::S3 { bucket } => { + let response = bucket.get_object(location).await?; + tracing::debug!(target: "sync", %shard_id, location, response_code = response.status_code(), num_bytes = response.bytes().len(), "S3 request finished"); + if response.status_code() == 200 { + Ok(response.bytes().to_vec()) + } else { + Err(anyhow::anyhow!("Bad response status code: {}", response.status_code())) + } + } + ExternalConnection::Filesystem { root_dir } => { + let path = root_dir.join(location); + tracing::debug!(target: "sync", %shard_id, ?path, "Reading a file"); + let data = std::fs::read(&path)?; + Ok(data) + } + } + } + + pub async fn put_state_part( + &self, + state_part: &[u8], + shard_id: ShardId, + location: &str, + ) -> Result<(), anyhow::Error> { + let _timer = metrics::STATE_SYNC_DUMP_PUT_OBJECT_ELAPSED + .with_label_values(&[&shard_id.to_string()]) + .start_timer(); + match self { + ExternalConnection::S3 { bucket } => { + bucket.put_object(&location, state_part).await?; + tracing::debug!(target: "state_sync_dump", shard_id, part_length = state_part.len(), ?location, "Wrote a state part to S3"); + Ok(()) + } + ExternalConnection::Filesystem { root_dir } => { + let path = root_dir.join(location); + if let Some(parent_dir) = path.parent() { + std::fs::create_dir_all(parent_dir)?; + } + let mut file = std::fs::OpenOptions::new().write(true).create(true).open(&path)?; + file.write_all(state_part)?; + tracing::debug!(target: "state_sync_dump", shard_id, part_length = state_part.len(), ?location, "Wrote a state part to a file"); + Ok(()) + } + } + } +} + /// Helper to track state sync. pub struct StateSync { /// How to retrieve the state data. @@ -145,25 +208,34 @@ impl StateSync { network_adapter: PeerManagerAdapter, timeout: TimeDuration, chain_id: &str, - state_sync_from_s3_enabled: bool, - s3_bucket: &str, - s3_region: &str, - num_s3_requests_per_shard: u64, + sync_config: &SyncConfig, ) -> Self { - let inner = if state_sync_from_s3_enabled { - let bucket = create_bucket(s3_bucket, s3_region, timeout); - if let Err(err) = bucket { - panic!("Failed to create an S3 bucket: {}", err); - } - StateSyncInner::PartsFromExternal { - chain_id: chain_id.to_string(), - bucket: Arc::new(bucket.unwrap()), - requests_remaining: Arc::new(AtomicI64::new(num_s3_requests_per_shard as i64)), - } - } else { - StateSyncInner::Peers { + let inner = match sync_config { + SyncConfig::Peers => StateSyncInner::Peers { last_part_id_requested: Default::default(), requested_target: lru::LruCache::new(MAX_PENDING_PART as usize), + }, + SyncConfig::ExternalStorage(ExternalStorageConfig { + location, + num_concurrent_requests, + }) => { + let external = match location { + ExternalStorageLocation::S3 { bucket, region } => { + let bucket = create_bucket(&bucket, ®ion, timeout); + if let Err(err) = bucket { + panic!("Failed to create an S3 bucket: {}", err); + } + ExternalConnection::S3 { bucket: Arc::new(bucket.unwrap()) } + } + ExternalStorageLocation::Filesystem { root_dir } => { + ExternalConnection::Filesystem { root_dir: root_dir.clone() } + } + }; + StateSyncInner::PartsFromExternal { + chain_id: chain_id.to_string(), + requests_remaining: Arc::new(AtomicI32::new(*num_concurrent_requests as i32)), + external, + } } }; let timeout = Duration::from_std(timeout).unwrap(); @@ -640,7 +712,7 @@ impl StateSync { ); } } - StateSyncInner::PartsFromExternal { chain_id, bucket, requests_remaining } => { + StateSyncInner::PartsFromExternal { chain_id, requests_remaining, external } => { let sync_block_header = chain.get_block_header(&sync_hash).unwrap(); let epoch_id = sync_block_header.epoch_id(); let epoch_info = chain.runtime_adapter.get_epoch_info(epoch_id).unwrap(); @@ -658,8 +730,8 @@ impl StateSync { epoch_height, state_num_parts, &chain_id.clone(), - bucket.clone(), requests_remaining.clone(), + external.clone(), ); } } @@ -878,8 +950,7 @@ impl StateSync { let part_timeout = now - prev > self.timeout; // Retry parts that failed. if part_timeout || part_download.error { download_timeout |= part_timeout; - if part_timeout || - part_download.last_target != Some(near_client_primitives::types::AccountOrPeerIdOrHash::ExternalStorage) { + if part_timeout || part_download.last_target.is_some() { // Don't immediately retry failed requests from external // storage. Most often error is a state part not // available. That error doesn't get fixed by retrying, @@ -901,6 +972,7 @@ impl StateSync { num_parts_done += 1; } } + tracing::debug!(target: "sync", %shard_id, %sync_hash, num_parts_done, parts_done); metrics::STATE_SYNC_PARTS_DONE .with_label_values(&[&shard_id.to_string()]) .set(num_parts_done); @@ -1104,8 +1176,8 @@ fn request_part_from_external_storage( epoch_height: EpochHeight, num_parts: u64, chain_id: &str, - bucket: Arc, - requests_remaining: Arc, + requests_remaining: Arc, + external: ExternalConnection, ) { if !allow_request(&requests_remaining) { return; @@ -1118,40 +1190,14 @@ fn request_part_from_external_storage( download.state_requests_count += 1; download.last_target = None; - let location = s3_location(chain_id, epoch_height, shard_id, part_id, num_parts); + let location = external_storage_location(chain_id, epoch_height, shard_id, part_id, num_parts); let download_response = download.response.clone(); - let scheduled = StaticClock::utc(); near_performance_metrics::actix::spawn("StateSync", { async move { - tracing::info!(target: "sync", %shard_id, part_id, location, "Getting an object from the external storage"); - let started = StaticClock::utc(); - metrics::STATE_SYNC_EXTERNAL_PARTS_SCHEDULING_DELAY - .with_label_values(&[&shard_id.to_string()]) - .observe( - started.signed_duration_since(scheduled).num_nanoseconds().unwrap_or(0) as f64 - / 1e9, - ); - let result = bucket.get_object(location.clone()).await; - let completed = StaticClock::utc(); + let result = external.get_part(shard_id, &location).await; finished_request(&requests_remaining); - metrics::STATE_SYNC_EXTERNAL_PARTS_REQUEST_DELAY - .with_label_values(&[&shard_id.to_string()]) - .observe( - completed.signed_duration_since(started).num_nanoseconds().unwrap_or(0) as f64 - / 1e9, - ); - match result { - Ok(response) => { - tracing::info!(target: "sync", %shard_id, part_id, location, response_code = response.status_code(), num_bytes = response.bytes().len(), "S3 request finished"); - let mut lock = download_response.lock().unwrap(); - *lock = Some(Ok((response.status_code(), response.bytes().to_vec()))); - } - Err(err) => { - tracing::info!(target: "sync", %shard_id, part_id, location, ?err, "S3 request failed"); - let mut lock = download_response.lock().unwrap(); - *lock = Some(Err(err.to_string())); - } - } + let mut lock = download_response.lock().unwrap(); + *lock = Some(result.map_err(|err| err.to_string())); } }); } @@ -1210,7 +1256,7 @@ fn sent_request_part( } /// Verifies that one more concurrent request can be started. -fn allow_request(requests_remaining: &AtomicI64) -> bool { +fn allow_request(requests_remaining: &AtomicI32) -> bool { let remaining = requests_remaining.fetch_sub(1, Ordering::SeqCst); if remaining <= 0 { requests_remaining.fetch_add(1, Ordering::SeqCst); @@ -1220,7 +1266,7 @@ fn allow_request(requests_remaining: &AtomicI64) -> bool { } } -fn finished_request(requests_remaining: &AtomicI64) { +fn finished_request(requests_remaining: &AtomicI32) { requests_remaining.fetch_add(1, Ordering::SeqCst); } @@ -1255,8 +1301,7 @@ fn check_external_storage_part_response( let mut err_to_retry = None; match external_storage_response { // HTTP status code 200 means success. - Ok((200, data)) => { - tracing::debug!(target: "sync", %shard_id, part_id, "Got 200 response from external storage"); + Ok(data) => { match chain.set_state_part( shard_id, sync_hash, @@ -1283,11 +1328,6 @@ fn check_external_storage_part_response( } } } - // Other HTTP status codes are considered errors. - Ok((status_code, _)) => { - tracing::debug!(target: "sync", %shard_id, %sync_hash, part_id, status_code, "Wrong response code, expected 200"); - err_to_retry = Some(near_chain::Error::Other(format!("status_code: {}", status_code))); - } // The request failed without reaching the external storage. Err(err) => { err_to_retry = Some(near_chain::Error::Other(err)); @@ -1301,20 +1341,6 @@ fn check_external_storage_part_response( true } -/// Construct a location on the external storage. -pub fn s3_location( - chain_id: &str, - epoch_height: u64, - shard_id: u64, - part_id: u64, - num_parts: u64, -) -> String { - format!( - "chain_id={}/epoch_height={}/shard_id={}/state_part_{:06}_of_{:06}", - chain_id, epoch_height, shard_id, part_id, num_parts - ) -} - /// Applies style if `use_colour` is enabled. fn paint(s: &str, style: Style, use_style: bool) -> String { if use_style { @@ -1426,13 +1452,56 @@ impl Iterator for SamplerLimited { } } +/// Construct a location on the external storage. +pub fn external_storage_location( + chain_id: &str, + epoch_height: u64, + shard_id: u64, + part_id: u64, + num_parts: u64, +) -> String { + format!( + "{}/{}", + location_prefix(chain_id, epoch_height, shard_id), + part_filename(part_id, num_parts) + ) +} + +pub fn location_prefix(chain_id: &str, epoch_height: u64, shard_id: u64) -> String { + format!("chain_id={}/epoch_height={}/shard_id={}", chain_id, epoch_height, shard_id) +} + +pub fn part_filename(part_id: u64, num_parts: u64) -> String { + format!("state_part_{:06}_of_{:06}", part_id, num_parts) +} + +pub fn match_filename(s: &str) -> Option { + let re = regex::Regex::new(r"^state_part_(\d{6})_of_(\d{6})$").unwrap(); + re.captures(s) +} + +pub fn is_part_filename(s: &str) -> bool { + match_filename(s).is_some() +} + +pub fn get_num_parts_from_filename(s: &str) -> Option { + if let Some(captures) = match_filename(s) { + if let Some(num_parts) = captures.get(2) { + if let Ok(num_parts) = num_parts.as_str().parse::() { + return Some(num_parts); + } + } + } + None +} + #[cfg(test)] mod test { - + use super::*; use actix::System; use near_actix_test_utils::run_actix; + use near_chain::test_utils; use near_chain::{test_utils::process_block_sync, BlockProcessingArtifact, Provenance}; - use near_epoch_manager::EpochManagerAdapter; use near_network::test_utils::MockPeerManagerAdapter; use near_primitives::{ @@ -1441,10 +1510,6 @@ mod test { types::EpochId, }; - use near_chain::test_utils; - - use super::*; - #[test] // Start a new state sync - and check that it asks for a header. fn test_ask_for_header() { @@ -1453,10 +1518,7 @@ mod test { mock_peer_manager.clone().into(), TimeDuration::from_secs(1), "chain_id", - false, - "", - "", - 100, + &SyncConfig::Peers, ); let mut new_shard_sync = HashMap::new(); @@ -1567,4 +1629,14 @@ mod test { System::current().stop() }); } + + #[test] + fn test_match_filename() { + let filename = part_filename(5, 15); + assert!(is_part_filename(&filename)); + assert!(!is_part_filename("123123")); + + assert_eq!(get_num_parts_from_filename(&filename), Some(15)); + assert_eq!(get_num_parts_from_filename("123123"), None); + } } diff --git a/core/chain-configs/src/client_config.rs b/core/chain-configs/src/client_config.rs index 440b26281b6..fe67ead2893 100644 --- a/core/chain-configs/src/client_config.rs +++ b/core/chain-configs/src/client_config.rs @@ -5,6 +5,7 @@ use near_primitives::types::{ }; use near_primitives::version::Version; use std::cmp::{max, min}; +use std::path::PathBuf; use std::time::Duration; pub const TEST_STATE_SYNC_TIMEOUT: u64 = 5; @@ -23,6 +24,9 @@ pub const MIN_GC_NUM_EPOCHS_TO_KEEP: u64 = 3; /// Default number of epochs for which we keep store data pub const DEFAULT_GC_NUM_EPOCHS_TO_KEEP: u64 = 5; +/// Default number of concurrent requests to external storage to fetch state parts. +pub const DEFAULT_STATE_SYNC_NUM_CONCURRENT_REQUESTS_EXTERNAL: u32 = 25; + /// Configuration for garbage collection. #[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq)] pub struct GCConfig { @@ -69,6 +73,80 @@ impl GCConfig { } } +fn default_num_concurrent_requests() -> u32 { + DEFAULT_STATE_SYNC_NUM_CONCURRENT_REQUESTS_EXTERNAL +} + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] +pub struct ExternalStorageConfig { + /// Location of state parts. + pub location: ExternalStorageLocation, + /// When fetching state parts from external storage, throttle fetch requests + /// to this many concurrent requests per shard. + #[serde(default = "default_num_concurrent_requests")] + pub num_concurrent_requests: u32, +} + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] +pub enum ExternalStorageLocation { + S3 { + /// Location of state dumps on S3. + bucket: String, + /// Data may only be available in certain locations. + region: String, + }, + Filesystem { + root_dir: PathBuf, + }, +} + +/// Configures how to dump state to external storage. +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] +pub struct DumpConfig { + /// Specifies where to write the obtained state parts. + pub location: ExternalStorageLocation, + /// Use in case a node that dumps state to the external storage + /// gets in trouble. + #[serde(skip_serializing_if = "Option::is_none")] + pub restart_dump_for_shards: Option>, + /// How often to check if a new epoch has started. + /// Feel free to set to `None`, defaults are sensible. + #[serde(skip_serializing_if = "Option::is_none")] + pub iteration_delay: Option, +} + +/// Configures how to fetch state parts during state sync. +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] +pub enum SyncConfig { + /// Syncs state from the peers without reading anything from external storage. + Peers, + /// Expects parts to be available in external storage. + ExternalStorage(ExternalStorageConfig), +} + +impl Default for SyncConfig { + fn default() -> Self { + Self::Peers + } +} + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default)] +/// Options for dumping state to S3. +pub struct StateSyncConfig { + #[serde(skip_serializing_if = "Option::is_none")] + /// `none` value disables state dump to external storage. + pub dump: Option, + #[serde(skip_serializing_if = "SyncConfig::is_default", default = "SyncConfig::default")] + pub sync: SyncConfig, +} + +impl SyncConfig { + /// Checks whether the object equals its default value. + fn is_default(&self) -> bool { + matches!(self, Self::Peers) + } +} + /// ClientConfig where some fields can be updated at runtime. #[derive(Clone, serde::Serialize)] pub struct ClientConfig { @@ -168,23 +246,11 @@ pub struct ClientConfig { pub flat_storage_creation_enabled: bool, /// Duration to perform background flat storage creation step. pub flat_storage_creation_period: Duration, - /// If enabled, will dump state of every epoch to external storage. - pub state_sync_dump_enabled: bool, - /// S3 bucket for storing state dumps. - pub state_sync_s3_bucket: String, - /// S3 region for storing state dumps. - pub state_sync_s3_region: String, - /// Restart dumping state of selected shards. - /// Use for troubleshooting of the state dumping process. - pub state_sync_restart_dump_for_shards: Vec, - /// Whether to enable state sync from S3. - /// If disabled will perform state sync from the peers. - pub state_sync_from_s3_enabled: bool, - /// Number of parallel in-flight requests allowed per shard. - pub state_sync_num_concurrent_s3_requests: u64, /// Whether to use the State Sync mechanism. /// If disabled, the node will do Block Sync instead of State Sync. pub state_sync_enabled: bool, + /// Options for syncing state. + pub state_sync: StateSyncConfig, } impl ClientConfig { @@ -255,13 +321,8 @@ impl ClientConfig { client_background_migration_threads: 1, flat_storage_creation_enabled: true, flat_storage_creation_period: Duration::from_secs(1), - state_sync_dump_enabled: false, - state_sync_s3_bucket: String::new(), - state_sync_s3_region: String::new(), - state_sync_restart_dump_for_shards: vec![], - state_sync_from_s3_enabled: false, - state_sync_num_concurrent_s3_requests: 10, state_sync_enabled: false, + state_sync: StateSyncConfig::default(), } } } diff --git a/core/chain-configs/src/lib.rs b/core/chain-configs/src/lib.rs index 77925f77c37..ec2dd1010f7 100644 --- a/core/chain-configs/src/lib.rs +++ b/core/chain-configs/src/lib.rs @@ -5,7 +5,8 @@ mod metrics; mod updateable_config; pub use client_config::{ - ClientConfig, GCConfig, LogSummaryStyle, DEFAULT_GC_NUM_EPOCHS_TO_KEEP, + ClientConfig, DumpConfig, ExternalStorageConfig, ExternalStorageLocation, GCConfig, + LogSummaryStyle, StateSyncConfig, SyncConfig, DEFAULT_GC_NUM_EPOCHS_TO_KEEP, MIN_GC_NUM_EPOCHS_TO_KEEP, TEST_STATE_SYNC_TIMEOUT, }; pub use genesis_config::{ diff --git a/core/dyn-configs/src/lib.rs b/core/dyn-configs/src/lib.rs index 196bc3ddd51..400a59e1f8e 100644 --- a/core/dyn-configs/src/lib.rs +++ b/core/dyn-configs/src/lib.rs @@ -46,7 +46,6 @@ impl UpdateableConfigLoader { updateable_configs: UpdateableConfigs, tx: Sender>>, ) -> Self { - near_o11y::reload_log_config(updateable_configs.log_config.as_ref()); let mut result = Self { tx: Some(tx) }; result.reload(Ok(updateable_configs)); result diff --git a/docs/misc/state_sync_dump.md b/docs/misc/state_sync_dump.md index 08bb1e96c2a..8fbca1e1d84 100644 --- a/docs/misc/state_sync_dump.md +++ b/docs/misc/state_sync_dump.md @@ -25,9 +25,14 @@ To enable, add this to your `config.json` file: ```json "state_sync": { - "s3_bucket": "my-bucket", - "s3_region": "eu-central-1", - "dump_enabled": true + "dump": { + "location": { + "S3": { + "bucket": "my-aws-bucket", + "region": "my-aws-region" + } + } + } } ``` @@ -37,6 +42,27 @@ And run your node with environment variables `AWS_ACCESS_KEY_ID` and AWS_ACCESS_KEY_ID="MY_ACCESS_KEY" AWS_SECRET_ACCESS_KEY="MY_AWS_SECRET_ACCESS_KEY" ./neard run ``` +## Dump to a local filesystem + +Add this to your `config.json` file to dump state of every epoch to local filesystem: + +```json +"state_sync": { + "dump": { + "location": { + "Filesystem": { + "root_dir": "/tmp/state-dump" + } + } + } +} +``` + +In this case you don't need any extra environment variables. Simply run your node: +```shell +./neard run +``` + ## Implementation Details The experimental option spawns a thread for each of the shards tracked by a node. diff --git a/docs/misc/state_sync_from_s3.md b/docs/misc/state_sync_from_s3.md index 22eb70d6d4e..99c9ee0e6fc 100644 --- a/docs/misc/state_sync_from_s3.md +++ b/docs/misc/state_sync_from_s3.md @@ -24,20 +24,52 @@ To enable, add this to your `config.json` file: ```json "state_sync_enabled": true, "state_sync": { - "s3_bucket": "my-bucket", - "s3_region": "eu-central-1", - "sync_from_s3_enabled": true + "sync": { + "ExternalStorage": { + "location": { + "S3": { + "bucket": "my-aws-bucket", + "region": "my-aws-region" + } + } + } + } } ``` -And run your node with environment variables `AWS_ACCESS_KEY_ID` and -`AWS_SECRET_ACCESS_KEY`: +Then run the `neard` binary and it will access S3 anonymously: ```shell -AWS_ACCESS_KEY_ID="MY_ACCESS_KEY" AWS_SECRET_ACCESS_KEY="MY_AWS_SECRET_ACCESS_KEY" ./neard run +./neard run +``` + +## Sync from a local filesystem + +To enable, add this to your `config.json` file: + +```json +"state_sync_enabled": true, +"state_sync": { + "sync": { + "ExternalStorage": { + "location": { + "Filesystem": { + "root_dir": "/tmp/state-parts" + } + } + } + } +} +``` + +Then run the `neard` binary: +```shell +./neard run ``` ## Implementation Details +The experimental option replaces how a node fetches state parts. +The legacy implementation asks peer nodes to create and share a state part over network. The new implementation expects to find state parts as files on an S3 storage. The sync mechanism proceeds to download state parts mostly-sequentially from S3. diff --git a/integration-tests/src/tests/nearcore/sync_state_nodes.rs b/integration-tests/src/tests/nearcore/sync_state_nodes.rs index adfca6c6ce7..94f44b4f998 100644 --- a/integration-tests/src/tests/nearcore/sync_state_nodes.rs +++ b/integration-tests/src/tests/nearcore/sync_state_nodes.rs @@ -1,18 +1,18 @@ -use std::sync::{Arc, RwLock}; -use std::time::Duration; - +use crate::test_helpers::heavy_test; use actix::{Actor, System}; use futures::{future, FutureExt}; - -use crate::test_helpers::heavy_test; use near_actix_test_utils::run_actix; -use near_chain_configs::Genesis; +use near_chain_configs::ExternalStorageLocation::Filesystem; +use near_chain_configs::{DumpConfig, ExternalStorageConfig, Genesis, SyncConfig}; use near_client::GetBlock; use near_network::tcp; -use near_network::test_utils::{convert_boot_nodes, WaitOrTimeoutActor}; +use near_network::test_utils::{convert_boot_nodes, wait_or_timeout, WaitOrTimeoutActor}; use near_o11y::testonly::init_integration_logger; use near_o11y::WithSpanContextExt; use nearcore::{config::GenesisExt, load_test_config, start_with_config}; +use std::ops::ControlFlow; +use std::sync::{Arc, RwLock}; +use std::time::Duration; /// One client is in front, another must sync to it using state (fast) sync. #[test] @@ -289,6 +289,8 @@ fn sync_empty_state() { run_actix(async move { let (port1, port2) = (tcp::ListenerAddr::reserve_for_test(), tcp::ListenerAddr::reserve_for_test()); + // State sync triggers when header head is two epochs in the future. + // Produce more blocks to make sure that state sync gets triggered when the second node starts. let state_sync_horizon = 10; let block_header_fetch_horizon = 1; let block_fetch_horizon = 1; @@ -395,3 +397,136 @@ fn sync_empty_state() { }); }); } + +/// Runs one node for some time, which dumps state to a temp directory. +/// Start the second node which gets state parts from that temp directory. +#[test] +#[cfg_attr(not(feature = "expensive_tests"), ignore)] +fn sync_state_dump() { + heavy_test(|| { + init_integration_logger(); + + let mut genesis = Genesis::test_sharded_new_version( + vec!["test1".parse().unwrap(), "test2".parse().unwrap()], + 1, + vec![1, 1, 1, 1], + ); + // Needs to be long enough to give enough time to the second node to + // start, sync headers and find a dump of state. + genesis.config.epoch_length = 30; + + run_actix(async move { + let (port1, port2) = + (tcp::ListenerAddr::reserve_for_test(), tcp::ListenerAddr::reserve_for_test()); + // Produce more blocks to make sure that state sync gets triggered when the second node starts. + let state_sync_horizon = 50; + let block_header_fetch_horizon = 1; + let block_fetch_horizon = 1; + + let mut near1 = load_test_config("test1", port1, genesis.clone()); + near1.client_config.min_num_peers = 0; + // An epoch passes in about 9 seconds. + near1.client_config.min_block_production_delay = Duration::from_millis(300); + near1.client_config.max_block_production_delay = Duration::from_millis(600); + near1.client_config.epoch_sync_enabled = false; + let dump_dir = tempfile::Builder::new().prefix("state_dump_1").tempdir().unwrap(); + near1.client_config.state_sync.dump = Some(DumpConfig { + location: Filesystem { root_dir: dump_dir.path().to_path_buf() }, + restart_dump_for_shards: None, + iteration_delay: Some(Duration::from_millis(100)), + }); + + let dir1 = tempfile::Builder::new().prefix("sync_nodes_1").tempdir().unwrap(); + let nearcore::NearNode { + view_client: view_client1, + state_sync_dump_handle: _state_sync_dump_handle, + .. + } = start_with_config(dir1.path(), near1).expect("start_with_config"); + let dir2 = tempfile::Builder::new().prefix("sync_nodes_2").tempdir().unwrap(); + + let view_client2_holder = Arc::new(RwLock::new(None)); + let arbiters_holder = Arc::new(RwLock::new(vec![])); + let arbiters_holder2 = arbiters_holder; + + wait_or_timeout(100, 60000, || async { + if view_client2_holder.read().unwrap().is_none() { + let view_client2_holder2 = view_client2_holder.clone(); + let arbiters_holder2 = arbiters_holder2.clone(); + let genesis2 = genesis.clone(); + + match view_client1.send(GetBlock::latest().with_span_context()).await { + Ok(Ok(b)) if b.header.height >= genesis.config.epoch_length + 2 => { + let mut view_client2_holder2 = view_client2_holder2.write().unwrap(); + let mut arbiters_holder2 = arbiters_holder2.write().unwrap(); + + if view_client2_holder2.is_none() { + let mut near2 = load_test_config("test2", port2, genesis2); + near2.network_config.peer_store.boot_nodes = + convert_boot_nodes(vec![("test1", *port1)]); + near2.client_config.min_num_peers = 1; + near2.client_config.min_block_production_delay = + Duration::from_millis(300); + near2.client_config.max_block_production_delay = + Duration::from_millis(600); + near2.client_config.state_fetch_horizon = state_sync_horizon; + near2.client_config.block_header_fetch_horizon = + block_header_fetch_horizon; + near2.client_config.block_fetch_horizon = block_fetch_horizon; + near2.client_config.tracked_shards = vec![0, 1, 2, 3]; + near2.client_config.epoch_sync_enabled = false; + near2.client_config.state_sync_enabled = true; + near2.client_config.state_sync_timeout = Duration::from_secs(1); + near2.client_config.state_sync.sync = + SyncConfig::ExternalStorage(ExternalStorageConfig { + location: Filesystem { + root_dir: dump_dir.path().to_path_buf(), + }, + num_concurrent_requests: 10, + }); + + let nearcore::NearNode { + view_client: view_client2, arbiters, .. + } = start_with_config(dir2.path(), near2) + .expect("start_with_config"); + *view_client2_holder2 = Some(view_client2); + *arbiters_holder2 = arbiters; + } + } + Ok(Ok(b)) if b.header.height <= state_sync_horizon => { + tracing::info!("FIRST STAGE {}", b.header.height); + } + Err(_) => {} + _ => {} + }; + return ControlFlow::Continue(()); + } + + if let Some(view_client2) = &*view_client2_holder.write().unwrap() { + match view_client2.send(GetBlock::latest().with_span_context()).await { + Ok(Ok(b)) if b.header.height >= 40 => { + return ControlFlow::Break(()); + } + Ok(Ok(b)) if b.header.height < 40 => { + tracing::info!("SECOND STAGE {}", b.header.height) + } + Ok(Err(e)) => { + tracing::info!("SECOND STAGE ERROR1: {:?}", e); + } + Err(e) => { + tracing::info!("SECOND STAGE ERROR2: {:?}", e); + } + _ => { + assert!(false); + } + }; + return ControlFlow::Continue(()); + } + + panic!("Unexpected"); + }) + .await + .unwrap(); + System::current().stop(); + }); + }); +} diff --git a/nearcore/src/config.rs b/nearcore/src/config.rs index 7bf5d860aa5..f34367dae1b 100644 --- a/nearcore/src/config.rs +++ b/nearcore/src/config.rs @@ -2,7 +2,7 @@ use crate::download_file::{run_download_file, FileDownloadError}; use anyhow::{anyhow, bail, Context}; use near_chain_configs::{ get_initial_supply, ClientConfig, GCConfig, Genesis, GenesisConfig, GenesisValidationMode, - LogSummaryStyle, MutableConfigValue, + LogSummaryStyle, MutableConfigValue, StateSyncConfig, }; use near_config_utils::{ValidationError, ValidationErrors}; use near_crypto::{InMemorySigner, KeyFile, KeyType, PublicKey, Signer}; @@ -321,19 +321,19 @@ pub struct Config { /// This feature is under development, do not use in production. #[serde(default, skip_serializing_if = "Option::is_none")] pub cold_store: Option, - /// Configuration for the + /// Configuration for the split storage. #[serde(default, skip_serializing_if = "Option::is_none")] pub split_storage: Option, /// The node will stop after the head exceeds this height. /// The node usually stops within several seconds after reaching the target height. #[serde(default, skip_serializing_if = "Option::is_none")] pub expected_shutdown: Option, - /// Options for dumping state of every epoch to S3. - #[serde(skip_serializing_if = "Option::is_none")] - pub state_sync: Option, /// Whether to use state sync (unreliable and corrupts the DB if fails) or do a block sync instead. #[serde(skip_serializing_if = "Option::is_none")] pub state_sync_enabled: Option, + /// Options for syncing state. + #[serde(skip_serializing_if = "Option::is_none")] + pub state_sync: Option, } fn is_false(value: &bool) -> bool { @@ -678,41 +678,8 @@ impl NearConfig { client_background_migration_threads: config.store.background_migration_threads, flat_storage_creation_enabled: config.store.flat_storage_creation_enabled, flat_storage_creation_period: config.store.flat_storage_creation_period, - state_sync_dump_enabled: config - .state_sync - .as_ref() - .map(|x| x.dump_enabled) - .flatten() - .unwrap_or(false), - state_sync_s3_bucket: config - .state_sync - .as_ref() - .map(|x| x.s3_bucket.clone()) - .unwrap_or(String::new()), - state_sync_s3_region: config - .state_sync - .as_ref() - .map(|x| x.s3_region.clone()) - .unwrap_or(String::new()), - state_sync_restart_dump_for_shards: config - .state_sync - .as_ref() - .map(|x| x.restart_dump_for_shards.clone()) - .flatten() - .unwrap_or(vec![]), - state_sync_from_s3_enabled: config - .state_sync - .as_ref() - .map(|x| x.sync_from_s3_enabled) - .flatten() - .unwrap_or(false), - state_sync_num_concurrent_s3_requests: config - .state_sync - .as_ref() - .map(|x| x.num_concurrent_s3_requests) - .flatten() - .unwrap_or(100), state_sync_enabled: config.state_sync_enabled.unwrap_or(false), + state_sync: config.state_sync.unwrap_or_default(), }, network_config: NetworkConfig::new( config.network, @@ -1535,30 +1502,6 @@ pub fn load_test_config(seed: &str, addr: tcp::ListenerAddr, genesis: Genesis) - NearConfig::new(config, genesis, signer.into(), validator_signer).unwrap() } -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default)] -/// Options for dumping state to S3. -pub struct StateSyncConfig { - /// Location of state dumps on S3. - pub s3_bucket: String, - /// Region is very important on S3. - pub s3_region: String, - /// Whether a node should dump state of each epoch to the external storage. - #[serde(skip_serializing_if = "Option::is_none")] - pub dump_enabled: Option, - /// Use carefully in case a node that dumps state to the external storage - /// gets in trouble. - #[serde(skip_serializing_if = "Option::is_none")] - pub restart_dump_for_shards: Option>, - /// If enabled, will download state parts from external storage and not from - /// the peers. - #[serde(skip_serializing_if = "Option::is_none")] - pub sync_from_s3_enabled: Option, - /// When syncing state from S3, throttle requests to this many concurrent - /// requests per shard. - #[serde(skip_serializing_if = "Option::is_none")] - pub num_concurrent_s3_requests: Option, -} - #[test] fn test_init_config_localnet() { // Check that we can initialize the config with multiple shards. diff --git a/nearcore/src/config_validate.rs b/nearcore/src/config_validate.rs index d56db3d8942..3c1fb58d574 100644 --- a/nearcore/src/config_validate.rs +++ b/nearcore/src/config_validate.rs @@ -1,4 +1,7 @@ +use near_chain_configs::{ExternalStorageLocation, SyncConfig}; use near_config_utils::{ValidationError, ValidationErrors}; +use std::collections::HashSet; +use std::path::Path; use crate::config::Config; @@ -77,16 +80,51 @@ impl<'a> ConfigValidator<'a> { } if let Some(state_sync) = &self.config.state_sync { - if state_sync.dump_enabled.unwrap_or(false) { - if state_sync.s3_bucket.is_empty() || state_sync.s3_region.is_empty() { - let error_message = format!("'config.state_sync.s3_bucket' and 'config.state_sync.s3_region' need to be specified when 'config.state_sync.dump_enabled' is enabled."); - self.validation_errors.push_config_semantics_error(error_message); + if let Some(dump_config) = &state_sync.dump { + if let Some(restart_dump_for_shards) = &dump_config.restart_dump_for_shards { + let unique_values: HashSet<_> = restart_dump_for_shards.iter().collect(); + if unique_values.len() != restart_dump_for_shards.len() { + let error_message = format!("'config.state_sync.dump.restart_dump_for_shards' contains duplicate values."); + self.validation_errors.push_config_semantics_error(error_message); + } + } + + match &dump_config.location { + ExternalStorageLocation::S3 { bucket, region } => { + if bucket.is_empty() || region.is_empty() { + let error_message = format!("'config.state_sync.dump.location.S3.bucket' and 'config.state_sync.dump.location.S3.region' need to be specified when 'config.state_sync.dump.location.S3' is present."); + self.validation_errors.push_config_semantics_error(error_message); + } + } + ExternalStorageLocation::Filesystem { root_dir } => { + if root_dir.as_path() == Path::new("") { + let error_message = format!("'config.state_sync.dump.location.Filesystem.root_dir' needs to be specified when 'config.state_sync.dump.location.Filesystem' is present."); + self.validation_errors.push_config_semantics_error(error_message); + } + } } } - if state_sync.sync_from_s3_enabled.unwrap_or(false) { - if state_sync.s3_bucket.is_empty() || state_sync.s3_region.is_empty() { - let error_message = format!("'config.state_sync.s3_bucket' and 'config.state_sync.s3_region' need to be specified when 'config.state_sync.sync_from_s3_enabled' is enabled."); - self.validation_errors.push_config_semantics_error(error_message); + match &state_sync.sync { + SyncConfig::Peers => {} + SyncConfig::ExternalStorage(config) => { + match &config.location { + ExternalStorageLocation::S3 { bucket, region } => { + if bucket.is_empty() || region.is_empty() { + let error_message = format!("'config.state_sync.sync.ExternalStorage.location.S3.bucket' and 'config.state_sync.sync.ExternalStorage.location.S3.region' need to be specified when 'config.state_sync.sync.ExternalStorage.location.S3' is present."); + self.validation_errors.push_config_semantics_error(error_message); + } + } + ExternalStorageLocation::Filesystem { root_dir } => { + if root_dir.as_path() == Path::new("") { + let error_message = format!("'config.state_sync.sync.ExternalStorage.location.Filesystem.root_dir' needs to be specified when 'config.state_sync.sync.ExternalStorage.location.Filesystem' is present."); + self.validation_errors.push_config_semantics_error(error_message); + } + } + } + if config.num_concurrent_requests == 0 { + let error_message = format!("'config.state_sync.sync.ExternalStorage.num_concurrent_requests' needs to be greater than 0"); + self.validation_errors.push_config_semantics_error(error_message); + } } } } diff --git a/nearcore/src/lib.rs b/nearcore/src/lib.rs index 1c01d2f17da..06fd8bea803 100644 --- a/nearcore/src/lib.rs +++ b/nearcore/src/lib.rs @@ -33,7 +33,7 @@ pub mod dyn_config; mod metrics; pub mod migrations; mod runtime; -mod state_sync; +pub mod state_sync; pub fn get_default_home() -> PathBuf { if let Ok(near_home) = std::env::var("NEAR_HOME") { @@ -273,7 +273,12 @@ pub fn start_with_config_and_synchronization( ); shards_manager_adapter.bind(shards_manager_actor); - let state_sync_dump_handle = spawn_state_sync_dump(&config, chain_genesis, runtime)?; + let state_sync_dump_handle = spawn_state_sync_dump( + &config.client_config, + chain_genesis, + runtime, + config.validator_signer.as_ref().map(|signer| signer.validator_id().clone()), + )?; #[allow(unused_mut)] let mut rpc_servers = Vec::new(); diff --git a/nearcore/src/metrics.rs b/nearcore/src/metrics.rs index 125cd923d52..bfadbe9c6a2 100644 --- a/nearcore/src/metrics.rs +++ b/nearcore/src/metrics.rs @@ -5,7 +5,7 @@ use near_o11y::metrics::{ }; use once_cell::sync::Lazy; -pub static APPLY_CHUNK_DELAY: Lazy = Lazy::new(|| { +pub(crate) static APPLY_CHUNK_DELAY: Lazy = Lazy::new(|| { try_create_histogram_vec( "near_apply_chunk_delay_seconds", "Time to process a chunk. Gas used by the chunk is a metric label, rounded up to 100 teragas.", @@ -41,15 +41,7 @@ pub(crate) static STATE_SYNC_DUMP_ITERATION_ELAPSED: Lazy = Lazy:: ) .unwrap() }); -pub(crate) static STATE_SYNC_DUMP_PUT_OBJECT_ELAPSED: Lazy = Lazy::new(|| { - try_create_histogram_vec( - "near_state_sync_dump_put_object_elapsed_sec", - "Time needed to write a part", - &["shard_id"], - Some(exponential_buckets(0.001, 1.6, 25).unwrap()), - ) - .unwrap() -}); + pub(crate) static STATE_SYNC_DUMP_NUM_PARTS_TOTAL: Lazy = Lazy::new(|| { try_create_int_gauge_vec( "near_state_sync_dump_num_parts_total", @@ -58,6 +50,7 @@ pub(crate) static STATE_SYNC_DUMP_NUM_PARTS_TOTAL: Lazy = Lazy::new ) .unwrap() }); + pub(crate) static STATE_SYNC_DUMP_NUM_PARTS_DUMPED: Lazy = Lazy::new(|| { try_create_int_gauge_vec( "near_state_sync_dump_num_parts_dumped", @@ -66,6 +59,7 @@ pub(crate) static STATE_SYNC_DUMP_NUM_PARTS_DUMPED: Lazy = Lazy::ne ) .unwrap() }); + pub(crate) static STATE_SYNC_DUMP_SIZE_TOTAL: Lazy = Lazy::new(|| { try_create_int_counter_vec( "near_state_sync_dump_size_total", @@ -74,6 +68,7 @@ pub(crate) static STATE_SYNC_DUMP_SIZE_TOTAL: Lazy = Lazy::new(|| ) .unwrap() }); + pub(crate) static STATE_SYNC_DUMP_EPOCH_HEIGHT: Lazy = Lazy::new(|| { try_create_int_gauge_vec( "near_state_sync_dump_epoch_height", @@ -82,21 +77,25 @@ pub(crate) static STATE_SYNC_DUMP_EPOCH_HEIGHT: Lazy = Lazy::new(|| ) .unwrap() }); -pub static STATE_SYNC_APPLY_PART_DELAY: Lazy = Lazy::new(|| { - try_create_histogram_vec( - "near_state_sync_apply_part_delay_sec", - "Latency of applying a state part", - &["shard_id"], - Some(exponential_buckets(0.001, 2.0, 20).unwrap()), - ) - .unwrap() -}); -pub static STATE_SYNC_OBTAIN_PART_DELAY: Lazy = Lazy::new(|| { - try_create_histogram_vec( - "near_state_sync_obtain_part_delay_sec", - "Latency of applying a state part", - &["shard_id"], - Some(exponential_buckets(0.001, 2.0, 20).unwrap()), - ) - .unwrap() -}); + +pub(crate) static STATE_SYNC_APPLY_PART_DELAY: Lazy = + Lazy::new(|| { + try_create_histogram_vec( + "near_state_sync_apply_part_delay_sec", + "Latency of applying a state part", + &["shard_id"], + Some(exponential_buckets(0.001, 2.0, 20).unwrap()), + ) + .unwrap() + }); + +pub(crate) static STATE_SYNC_OBTAIN_PART_DELAY: Lazy = + Lazy::new(|| { + try_create_histogram_vec( + "near_state_sync_obtain_part_delay_sec", + "Latency of applying a state part", + &["shard_id"], + Some(exponential_buckets(0.001, 2.0, 20).unwrap()), + ) + .unwrap() + }); diff --git a/nearcore/src/state_sync.rs b/nearcore/src/state_sync.rs index 1c865ff2856..e1d2f5f88fb 100644 --- a/nearcore/src/state_sync.rs +++ b/nearcore/src/state_sync.rs @@ -1,49 +1,56 @@ -use crate::{metrics, NearConfig, NightshadeRuntime}; +use crate::metrics; use borsh::BorshSerialize; -use near_chain::types::RuntimeAdapter; -use near_chain::{Chain, ChainGenesis, ChainStoreAccess, DoomslugThresholdMode, Error}; -use near_chain_configs::ClientConfig; -use near_client::sync::state::{s3_location, StateSync}; -use near_epoch_manager::EpochManagerAdapter; +use near_chain::{ + Chain, ChainGenesis, ChainStoreAccess, DoomslugThresholdMode, Error, + RuntimeWithEpochManagerAdapter, +}; +use near_chain_configs::{ClientConfig, ExternalStorageLocation}; +use near_client::sync::state::{external_storage_location, ExternalConnection, StateSync}; use near_primitives::hash::CryptoHash; use near_primitives::state_part::PartId; use near_primitives::syncing::{get_num_state_parts, StatePartKey, StateSyncDumpProgress}; -use near_primitives::types::{EpochHeight, EpochId, ShardId, StateRoot}; +use near_primitives::types::{AccountId, EpochHeight, EpochId, ShardId, StateRoot}; use near_store::DBCol; +use std::sync::atomic::AtomicBool; use std::sync::Arc; +use std::time::Duration; /// Starts one a thread per tracked shard. /// Each started thread will be dumping state parts of a single epoch to external storage. pub fn spawn_state_sync_dump( - config: &NearConfig, + client_config: &ClientConfig, chain_genesis: ChainGenesis, - runtime: Arc, + runtime: Arc, + account_id: Option, ) -> anyhow::Result> { - if !config.client_config.state_sync_dump_enabled { + let dump_config = if let Some(dump_config) = client_config.state_sync.dump.clone() { + dump_config + } else { + // Dump is not configured, and therefore not enabled. + tracing::debug!(target: "state_sync_dump", "Not spawning the state sync dump loop"); return Ok(None); - } - if config.client_config.state_sync_s3_bucket.is_empty() - || config.client_config.state_sync_s3_region.is_empty() - { - panic!("Enabled dumps of state to external storage. Please specify state_sync.s3_bucket and state_sync.s3_region"); - } + }; tracing::info!(target: "state_sync_dump", "Spawning the state sync dump loop"); - // Create a connection to S3. - let s3_bucket = config.client_config.state_sync_s3_bucket.clone(); - let s3_region = config.client_config.state_sync_s3_region.clone(); - - // Credentials to establish a connection are taken from environment variables: AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. - let bucket = s3::Bucket::new( - &s3_bucket, - s3_region - .parse::() - .map_err(>::into)?, - s3::creds::Credentials::default().map_err(|err| { - tracing::error!(target: "state_sync_dump", "Failed to create a connection to S3. Did you provide environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY?"); - >::into(err) - })?, - ).map_err(>::into)?; + let external = match dump_config.location { + ExternalStorageLocation::S3 { bucket, region } => { + // Credentials to establish a connection are taken from environment variables: + // * `AWS_ACCESS_KEY_ID` + // * `AWS_SECRET_ACCESS_KEY` + let creds = match s3::creds::Credentials::default() { + Ok(creds) => creds, + Err(err) => { + tracing::error!(target: "state_sync_dump", "Failed to create a connection to S3. Did you provide environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY?"); + return Err(err.into()); + } + }; + let bucket = s3::Bucket::new(&bucket, region.parse::()?, creds)?; + ExternalConnection::S3 { bucket: Arc::new(bucket) } + } + ExternalStorageLocation::Filesystem { root_dir } => { + ExternalConnection::Filesystem { root_dir } + } + }; // Determine how many threads to start. // TODO: Handle the case of changing the shard layout. @@ -59,10 +66,11 @@ pub fn spawn_state_sync_dump( runtime.num_shards(&epoch_id) }?; + let chain_id = client_config.chain_id.clone(); + let keep_running = Arc::new(AtomicBool::new(true)); // Start a thread for each shard. let handles = (0..num_shards as usize) .map(|shard_id| { - let client_config = config.client_config.clone(); let runtime = runtime.clone(); let chain_genesis = chain_genesis.clone(); let chain = Chain::new_for_view_client( @@ -77,19 +85,24 @@ pub fn spawn_state_sync_dump( shard_id as ShardId, chain, runtime, - client_config, - bucket.clone(), + chain_id.clone(), + dump_config.restart_dump_for_shards.clone().unwrap_or_default(), + external.clone(), + dump_config.iteration_delay.unwrap_or(Duration::from_secs(10)), + account_id.clone(), + keep_running.clone(), ))); arbiter_handle }) .collect(); - Ok(Some(StateSyncDumpHandle { handles })) + Ok(Some(StateSyncDumpHandle { handles, keep_running })) } /// Holds arbiter handles controlling the lifetime of the spawned threads. pub struct StateSyncDumpHandle { pub handles: Vec, + keep_running: Arc, } impl Drop for StateSyncDumpHandle { @@ -100,7 +113,10 @@ impl Drop for StateSyncDumpHandle { impl StateSyncDumpHandle { pub fn stop(&self) { - let _: Vec = self.handles.iter().map(|handle| handle.stop()).collect(); + self.keep_running.store(false, std::sync::atomic::Ordering::Relaxed); + self.handles.iter().for_each(|handle| { + handle.stop(); + }); } } @@ -110,21 +126,22 @@ impl StateSyncDumpHandle { async fn state_sync_dump( shard_id: ShardId, chain: Chain, - runtime: Arc, - config: ClientConfig, - bucket: s3::Bucket, + runtime: Arc, + chain_id: String, + restart_dump_for_shards: Vec, + external: ExternalConnection, + iteration_delay: Duration, + account_id: Option, + keep_running: Arc, ) { tracing::info!(target: "state_sync_dump", shard_id, "Running StateSyncDump loop"); - if config.state_sync_restart_dump_for_shards.contains(&shard_id) { + if restart_dump_for_shards.contains(&shard_id) { tracing::debug!(target: "state_sync_dump", shard_id, "Dropped existing progress"); chain.store().set_state_sync_dump_progress(shard_id, None).unwrap(); } - loop { - // Avoid a busy-loop when there is nothing to do. - std::thread::sleep(std::time::Duration::from_secs(10)); - + while keep_running.load(std::sync::atomic::Ordering::Relaxed) { let progress = chain.store().get_state_sync_dump_progress(shard_id); tracing::debug!(target: "state_sync_dump", shard_id, ?progress, "Running StateSyncDump loop iteration"); // The `match` returns the next state of the state machine. @@ -138,11 +155,12 @@ async fn state_sync_dump( shard_id, &chain, &runtime, + &account_id, ) } Err(Error::DBNotFoundErr(_)) | Ok(None) => { // First invocation of this state-machine. See if at least one epoch is available for dumping. - check_new_epoch(None, None, None, shard_id, &chain, &runtime) + check_new_epoch(None, None, None, shard_id, &chain, &runtime, &account_id) } Err(err) => { // Something went wrong, let's retry. @@ -158,16 +176,12 @@ async fn state_sync_dump( sync_hash, parts_dumped, })) => { - let state_header = chain.get_state_response_header(shard_id, sync_hash); - match state_header { - Ok(state_header) => { - let state_root = state_header.chunk_prev_state_root(); - let num_parts = - get_num_state_parts(state_header.state_root_node().memory_usage); - - let mut res = None; + let in_progress_data = get_in_progress_data(shard_id, sync_hash, &chain); + let mut res = None; + match in_progress_data { + Ok((state_root, num_parts, sync_prev_hash)) => { // The actual dumping of state to S3. - tracing::info!(target: "state_sync_dump", shard_id, ?epoch_id, epoch_height, %sync_hash, parts_dumped, "Creating parts and dumping them"); + tracing::info!(target: "state_sync_dump", shard_id, ?epoch_id, epoch_height, %sync_hash, ?state_root, parts_dumped, "Creating parts and dumping them"); for part_id in parts_dumped..num_parts { // Dump parts sequentially synchronously. // TODO: How to make it possible to dump state more effectively using multiple nodes? @@ -177,8 +191,9 @@ async fn state_sync_dump( let state_part = match obtain_and_store_state_part( &runtime, - &shard_id, - &sync_hash, + shard_id, + sync_hash, + &sync_prev_hash, &state_root, part_id, num_parts, @@ -190,17 +205,17 @@ async fn state_sync_dump( break; } }; - let location = s3_location( - &config.chain_id, + let location = external_storage_location( + &chain_id, epoch_height, shard_id, part_id, num_parts, ); if let Err(err) = - put_state_part(&location, &state_part, &shard_id, &bucket).await + external.put_state_part(&state_part, shard_id, &location).await { - res = Some(err); + res = Some(Error::Other(err.to_string())); break; } update_progress( @@ -213,6 +228,13 @@ async fn state_sync_dump( state_part.len(), &chain, ); + + // Stop if the node is stopped. + // Note that without this check the state dumping thread is unstoppable, i.e. non-interruptable. + if !keep_running.load(std::sync::atomic::Ordering::Relaxed) { + res = Some(Error::Other("Stopped".to_owned())); + break; + } } if let Some(err) = res { Err(err) @@ -230,42 +252,51 @@ async fn state_sync_dump( }; // Record the next state of the state machine. - match next_state { + let has_progress = match next_state { Ok(Some(next_state)) => { tracing::debug!(target: "state_sync_dump", shard_id, ?next_state); match chain.store().set_state_sync_dump_progress(shard_id, Some(next_state)) { - Ok(_) => {} + Ok(_) => true, Err(err) => { // This will be retried. tracing::debug!(target: "state_sync_dump", shard_id, ?err, "Failed to set progress"); + false } } } Ok(None) => { // Do nothing. tracing::debug!(target: "state_sync_dump", shard_id, "Idle"); + false } Err(err) => { // Will retry. tracing::debug!(target: "state_sync_dump", shard_id, ?err, "Failed to determine what to do"); + false } + }; + + if !has_progress { + // Avoid a busy-loop when there is nothing to do. + actix_rt::time::sleep(tokio::time::Duration::from(iteration_delay)).await; } } + tracing::debug!(target: "state_sync_dump", shard_id, "Stopped state dump thread"); } -async fn put_state_part( - location: &str, - state_part: &[u8], - shard_id: &ShardId, - bucket: &s3::Bucket, -) -> Result { - let _timer = metrics::STATE_SYNC_DUMP_PUT_OBJECT_ELAPSED - .with_label_values(&[&shard_id.to_string()]) - .start_timer(); - let put = - bucket.put_object(&location, state_part).await.map_err(|err| Error::Other(err.to_string())); - tracing::debug!(target: "state_sync_dump", shard_id, part_length = state_part.len(), ?location, "Wrote a state part to S3"); - put +// Extracts extra data needed for obtaining state parts. +fn get_in_progress_data( + shard_id: ShardId, + sync_hash: CryptoHash, + chain: &Chain, +) -> Result<(StateRoot, u64, CryptoHash), Error> { + let state_header = chain.get_state_response_header(shard_id, sync_hash)?; + let state_root = state_header.chunk_prev_state_root(); + let num_parts = get_num_state_parts(state_header.state_root_node().memory_usage); + + let sync_block = chain.get_block(&sync_hash)?; + let sync_prev_hash = sync_block.header().prev_hash(); + Ok((state_root, num_parts, *sync_prev_hash)) } fn update_progress( @@ -332,22 +363,23 @@ fn set_metrics( /// Obtains and then saves the part data. fn obtain_and_store_state_part( - runtime: &Arc, - shard_id: &ShardId, - sync_hash: &CryptoHash, + runtime: &Arc, + shard_id: ShardId, + sync_hash: CryptoHash, + sync_prev_hash: &CryptoHash, state_root: &StateRoot, part_id: u64, num_parts: u64, chain: &Chain, ) -> Result, Error> { let state_part = runtime.obtain_state_part( - *shard_id, - sync_hash, + shard_id, + sync_prev_hash, state_root, PartId::new(part_id, num_parts), )?; - let key = StatePartKey(*sync_hash, *shard_id, part_id).try_to_vec()?; + let key = StatePartKey(sync_hash, shard_id, part_id).try_to_vec()?; let mut store_update = chain.store().store().store_update(); store_update.set(DBCol::StateParts, &key, &state_part); store_update.commit()?; @@ -360,17 +392,26 @@ fn start_dumping( sync_hash: CryptoHash, shard_id: ShardId, chain: &Chain, - runtime: &Arc, + runtime: &Arc, + account_id: &Option, ) -> Result, Error> { let epoch_info = runtime.get_epoch_info(&epoch_id)?; let epoch_height = epoch_info.epoch_height(); - let sync_prev_header = chain.get_block_header(&sync_hash)?; - let sync_prev_hash = sync_prev_header.hash(); + + let sync_header = chain.get_block_header(&sync_hash)?; + let sync_prev_hash = sync_header.prev_hash(); + let sync_prev_header = chain.get_block_header(&sync_prev_hash)?; + // Need to check if the completed epoch had a shard this account cares about. + // sync_hash is the first block of the next epoch. + // `cares_about_shard()` accepts `parent_hash`, therefore we need prev-prev-hash, + // and its next-hash will be prev-hash. That is the last block of the completed epoch, + // which is what we wanted. + let sync_prev_prev_hash = sync_prev_header.prev_hash(); let state_header = chain.get_state_response_header(shard_id, sync_hash)?; let num_parts = get_num_state_parts(state_header.state_root_node().memory_usage); - if runtime.cares_about_shard(None, sync_prev_hash, shard_id, false) { - tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_prev_hash, %sync_hash, "Initialize dumping state of Epoch"); + if runtime.cares_about_shard(account_id.as_ref(), sync_prev_prev_hash, shard_id, true) { + tracing::info!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_prev_hash, %sync_hash, "Initialize dumping state of Epoch"); // Note that first the state of the state machines gets changes to // `InProgress` and it starts dumping state after a short interval. set_metrics(&shard_id, Some(0), Some(num_parts), Some(epoch_height)); @@ -381,7 +422,7 @@ fn start_dumping( parts_dumped: 0, })) } else { - tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_prev_hash, %sync_hash, "Shard is not tracked, skip the epoch"); + tracing::info!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_hash, "Shard is not tracked, skip the epoch"); Ok(Some(StateSyncDumpProgress::AllDumped { epoch_id, epoch_height, num_parts: Some(0) })) } } @@ -394,7 +435,8 @@ fn check_new_epoch( num_parts: Option, shard_id: ShardId, chain: &Chain, - runtime: &Arc, + runtime: &Arc, + account_id: &Option, ) -> Result, Error> { let head = chain.head()?; if Some(&head.epoch_id) == epoch_id.as_ref() { @@ -402,7 +444,7 @@ fn check_new_epoch( Ok(None) } else { // Check if the final block is now in the next epoch. - tracing::info!(target: "state_sync_dump", shard_id, ?epoch_id, "Check if a new complete epoch is available"); + tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, "Check if a new complete epoch is available"); let hash = head.last_block_hash; let header = chain.get_block_header(&hash)?; let final_hash = header.last_final_block(); @@ -412,7 +454,96 @@ fn check_new_epoch( // Still in the latest dumped epoch. Do nothing. Ok(None) } else { - start_dumping(head.epoch_id, sync_hash, shard_id, chain, runtime) + start_dumping(head.epoch_id, sync_hash, shard_id, chain, runtime, account_id) } } } + +#[cfg(test)] +mod tests { + use crate::state_sync::spawn_state_sync_dump; + use near_chain::{ChainGenesis, Provenance}; + use near_chain_configs::{DumpConfig, ExternalStorageLocation}; + use near_client::sync::state::external_storage_location; + use near_client::test_utils::TestEnv; + use near_network::test_utils::wait_or_timeout; + use near_o11y::testonly::init_test_logger; + use near_primitives::types::BlockHeight; + use std::ops::ControlFlow; + use std::time::Duration; + + #[test] + /// Produce several blocks, wait for the state dump thread to notice and + /// write files to a temp dir. + fn test_state_dump() { + init_test_logger(); + + let mut chain_genesis = ChainGenesis::test(); + chain_genesis.epoch_length = 5; + let mut env = TestEnv::builder(chain_genesis.clone()).build(); + let chain = &env.clients[0].chain; + let runtime = chain.runtime_adapter(); + let mut config = env.clients[0].config.clone(); + let root_dir = tempfile::Builder::new().prefix("state_dump").tempdir().unwrap(); + config.state_sync.dump = Some(DumpConfig { + location: ExternalStorageLocation::Filesystem { + root_dir: root_dir.path().to_path_buf(), + }, + restart_dump_for_shards: None, + iteration_delay: Some(Duration::from_millis(250)), + }); + + const MAX_HEIGHT: BlockHeight = 15; + + near_actix_test_utils::run_actix(async move { + let _state_sync_dump_handle = spawn_state_sync_dump( + &config, + chain_genesis, + runtime.clone(), + Some("test0".parse().unwrap()), + ) + .unwrap(); + let mut last_block_hash = None; + for i in 1..=MAX_HEIGHT { + let block = env.clients[0].produce_block(i as u64).unwrap().unwrap(); + last_block_hash = Some(*block.hash()); + env.process_block(0, block, Provenance::PRODUCED); + } + let epoch_id = runtime.get_epoch_id(last_block_hash.as_ref().unwrap()).unwrap(); + let epoch_info = runtime.get_epoch_info(&epoch_id).unwrap(); + let epoch_height = epoch_info.epoch_height(); + + wait_or_timeout(100, 10000, || async { + let mut all_parts_present = true; + + let num_shards = runtime.num_shards(&epoch_id).unwrap(); + assert_ne!(num_shards, 0); + + for shard_id in 0..num_shards { + let num_parts = 3; + for part_id in 0..num_parts { + let path = root_dir.path().join(external_storage_location( + "unittest", + epoch_height, + shard_id, + part_id, + num_parts, + )); + if std::fs::read(&path).is_err() { + println!("Missing {:?}", path); + all_parts_present = false; + } + } + } + if all_parts_present { + ControlFlow::Break(()) + } else { + ControlFlow::Continue(()) + } + }) + .await + .unwrap(); + actix_rt::System::current().stop(); + }); + } +} diff --git a/nightly/expensive.txt b/nightly/expensive.txt index 5066349fef9..e47a122b39e 100644 --- a/nightly/expensive.txt +++ b/nightly/expensive.txt @@ -167,6 +167,8 @@ expensive integration-tests integration_tests tests::nearcore::sync_state_nodes: expensive integration-tests integration_tests tests::nearcore::sync_state_nodes::sync_state_nodes --features nightly expensive integration-tests integration_tests tests::nearcore::sync_state_nodes::sync_state_nodes_multishard expensive integration-tests integration_tests tests::nearcore::sync_state_nodes::sync_state_nodes_multishard --features nightly +expensive integration-tests integration_tests tests::nearcore::sync_state_nodes::sync_state_dump +expensive integration-tests integration_tests tests::nearcore::sync_state_nodes::sync_state_dump --features nightly expensive integration-tests integration_tests tests::nearcore::rpc_error_structs::test_block_unknown_block_error expensive integration-tests integration_tests tests::nearcore::rpc_error_structs::test_block_unknown_block_error --features nightly diff --git a/tools/state-viewer/src/cli.rs b/tools/state-viewer/src/cli.rs index 6346ba12781..268958751b6 100644 --- a/tools/state-viewer/src/cli.rs +++ b/tools/state-viewer/src/cli.rs @@ -492,6 +492,38 @@ impl StateChangesCmd { } } +#[derive(clap::Parser)] +pub struct StatePartsCmd { + /// Shard id. + #[clap(long)] + shard_id: ShardId, + /// Location of serialized state parts. + #[clap(long)] + root_dir: Option, + /// Store state parts in an S3 bucket. + #[clap(long)] + s3_bucket: Option, + /// Store state parts in an S3 bucket. + #[clap(long)] + s3_region: Option, + /// Dump or Apply state parts. + #[clap(subcommand)] + command: crate::state_parts::StatePartsSubCommand, +} + +impl StatePartsCmd { + pub fn run(self, home_dir: &Path, near_config: NearConfig, store: Store) { + self.command.run( + self.shard_id, + self.root_dir, + self.s3_bucket, + self.s3_region, + home_dir, + near_config, + store, + ); + } +} #[derive(clap::Parser)] pub struct ViewChainCmd { #[clap(long)] @@ -571,36 +603,3 @@ impl ViewTrieCmd { } } } - -#[derive(clap::Parser)] -pub struct StatePartsCmd { - /// Shard id. - #[clap(long)] - shard_id: ShardId, - /// Location of serialized state parts. - #[clap(long)] - root_dir: Option, - /// Store state parts in an S3 bucket. - #[clap(long)] - s3_bucket: Option, - /// Store state parts in an S3 bucket. - #[clap(long)] - s3_region: Option, - /// Dump or Apply state parts. - #[clap(subcommand)] - command: crate::state_parts::StatePartsSubCommand, -} - -impl StatePartsCmd { - pub fn run(self, home_dir: &Path, near_config: NearConfig, store: Store) { - self.command.run( - self.shard_id, - self.root_dir, - self.s3_bucket, - self.s3_region, - home_dir, - near_config, - store, - ); - } -} diff --git a/tools/state-viewer/src/state_parts.rs b/tools/state-viewer/src/state_parts.rs index 5e0917b0bf4..d985da5668d 100644 --- a/tools/state-viewer/src/state_parts.rs +++ b/tools/state-viewer/src/state_parts.rs @@ -1,7 +1,9 @@ use crate::epoch_info::iterate_and_filter; use borsh::BorshDeserialize; use near_chain::{Chain, ChainGenesis, ChainStoreAccess, DoomslugThresholdMode}; -use near_client::sync::state::StateSync; +use near_client::sync::state::{ + get_num_parts_from_filename, is_part_filename, location_prefix, part_filename, StateSync, +}; use near_primitives::epoch_manager::epoch_info::EpochInfo; use near_primitives::state_part::PartId; use near_primitives::state_record::StateRecord; @@ -314,6 +316,8 @@ fn dump_state_parts( let epoch = chain.runtime_adapter.get_epoch_info(&epoch_id).unwrap(); let sync_hash = get_any_block_hash_of_epoch(&epoch, chain); let sync_hash = StateSync::get_epoch_start_sync_hash(chain, &sync_hash).unwrap(); + let sync_block = chain.get_block_header(&sync_hash).unwrap(); + let sync_prev_hash = sync_block.prev_hash(); let state_header = chain.compute_state_response_header(shard_id, sync_hash).unwrap(); let state_root = state_header.chunk_prev_state_root(); @@ -340,7 +344,12 @@ fn dump_state_parts( assert!(part_id < num_parts, "part_id: {}, num_parts: {}", part_id, num_parts); let state_part = chain .runtime_adapter - .obtain_state_part(shard_id, &sync_hash, &state_root, PartId::new(part_id, num_parts)) + .obtain_state_part( + shard_id, + &sync_prev_hash, + &state_root, + PartId::new(part_id, num_parts), + ) .unwrap(); part_storage.write(&state_part, part_id, num_parts); let elapsed_sec = timer.elapsed().as_secs_f64(); @@ -390,35 +399,6 @@ fn get_part_ids(part_from: Option, part_to: Option, num_parts: u64) -> part_from.unwrap_or(0)..part_to.unwrap_or(num_parts) } -// Needs to be in sync with `fn s3_location()`. -fn location_prefix(chain_id: &str, epoch_height: u64, shard_id: u64) -> String { - format!("chain_id={}/epoch_height={}/shard_id={}", chain_id, epoch_height, shard_id) -} - -fn match_filename(s: &str) -> Option { - let re = regex::Regex::new(r"^state_part_(\d{6})_of_(\d{6})$").unwrap(); - re.captures(s) -} - -fn is_part_filename(s: &str) -> bool { - match_filename(s).is_some() -} - -fn get_num_parts_from_filename(s: &str) -> Option { - if let Some(captures) = match_filename(s) { - if let Some(num_parts) = captures.get(2) { - if let Ok(num_parts) = num_parts.as_str().parse::() { - return Some(num_parts); - } - } - } - None -} - -fn part_filename(part_id: u64, num_parts: u64) -> String { - format!("state_part_{:06}_of_{:06}", part_id, num_parts) -} - trait StatePartWriter { fn write(&self, state_part: &[u8], part_id: u64, num_parts: u64); }