From ac61fedb5756ed700e35f231a364b9c933423ab8 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Wed, 5 Jun 2024 10:15:27 +0300 Subject: [PATCH] feat(en): Allow recovery from specific snapshot (#2137) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Allows recovering a node from a specific snapshot specified at the start of recovery. ## Why ❔ Useful at least for testing recovery and pruning end-to-end on the testnet. There, L1 batches are produced very slowly, so it makes sense to recover from an earlier snapshot in order to meaningfully test pruning. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. - [x] Spellcheck has been run via `zk spellcheck`. --- core/bin/external_node/src/config/mod.rs | 27 ++-- core/bin/external_node/src/init.rs | 30 ++-- core/bin/external_node/src/main.rs | 11 +- core/lib/snapshots_applier/src/lib.rs | 66 +++++++-- core/lib/snapshots_applier/src/tests/mod.rs | 138 +++++++++++++++++- core/lib/snapshots_applier/src/tests/utils.rs | 99 +++++++++++-- 6 files changed, 314 insertions(+), 57 deletions(-) diff --git a/core/bin/external_node/src/config/mod.rs b/core/bin/external_node/src/config/mod.rs index 08fd955297e..3d94e833217 100644 --- a/core/bin/external_node/src/config/mod.rs +++ b/core/bin/external_node/src/config/mod.rs @@ -25,8 +25,8 @@ use zksync_node_api_server::{ use zksync_protobuf_config::proto; use zksync_snapshots_applier::SnapshotsApplierConfig; use zksync_types::{ - api::BridgeAddresses, commitment::L1BatchCommitmentMode, url::SensitiveUrl, Address, L1ChainId, - L2ChainId, ETHEREUM_ADDRESS, + api::BridgeAddresses, commitment::L1BatchCommitmentMode, url::SensitiveUrl, Address, + L1BatchNumber, L1ChainId, L2ChainId, ETHEREUM_ADDRESS, }; use zksync_web3_decl::{ client::{DynClient, L2}, @@ -746,6 +746,8 @@ pub(crate) struct ExperimentalENConfig { pub state_keeper_db_max_open_files: Option, // Snapshot recovery + /// L1 batch number of the snapshot to use during recovery. Specifying this parameter is mostly useful for testing. + pub snapshots_recovery_l1_batch: Option, /// Approximate chunk size (measured in the number of entries) to recover in a single iteration. /// Reasonable values are order of 100,000 (meaning an iteration takes several seconds). /// @@ -775,6 +777,7 @@ impl ExperimentalENConfig { state_keeper_db_block_cache_capacity_mb: Self::default_state_keeper_db_block_cache_capacity_mb(), state_keeper_db_max_open_files: None, + snapshots_recovery_l1_batch: None, snapshots_recovery_tree_chunk_size: Self::default_snapshots_recovery_tree_chunk_size(), commitment_generator_max_parallelism: None, } @@ -807,21 +810,11 @@ pub(crate) fn read_consensus_config() -> anyhow::Result> )) } -/// Configuration for snapshot recovery. Loaded optionally, only if snapshot recovery is enabled. -#[derive(Debug)] -pub(crate) struct SnapshotsRecoveryConfig { - pub snapshots_object_store: ObjectStoreConfig, -} - -impl SnapshotsRecoveryConfig { - pub fn new() -> anyhow::Result { - let snapshots_object_store = envy::prefixed("EN_SNAPSHOTS_OBJECT_STORE_") - .from_env::() - .context("failed loading snapshot object store config from env variables")?; - Ok(Self { - snapshots_object_store, - }) - } +/// Configuration for snapshot recovery. Should be loaded optionally, only if snapshot recovery is enabled. +pub(crate) fn snapshot_recovery_object_store_config() -> anyhow::Result { + envy::prefixed("EN_SNAPSHOTS_OBJECT_STORE_") + .from_env::() + .context("failed loading snapshot object store config from env variables") } #[derive(Debug, Deserialize)] diff --git a/core/bin/external_node/src/init.rs b/core/bin/external_node/src/init.rs index 0f4ae9a8036..fb30628e389 100644 --- a/core/bin/external_node/src/init.rs +++ b/core/bin/external_node/src/init.rs @@ -12,7 +12,13 @@ use zksync_snapshots_applier::{SnapshotsApplierConfig, SnapshotsApplierTask}; use zksync_types::{L1BatchNumber, L2ChainId}; use zksync_web3_decl::client::{DynClient, L2}; -use crate::config::SnapshotsRecoveryConfig; +use crate::config::snapshot_recovery_object_store_config; + +#[derive(Debug)] +pub(crate) struct SnapshotRecoveryConfig { + /// If not specified, the latest snapshot will be used. + pub snapshot_l1_batch_override: Option, +} #[derive(Debug)] enum InitDecision { @@ -27,7 +33,7 @@ pub(crate) async fn ensure_storage_initialized( main_node_client: Box>, app_health: &AppHealthCheck, l2_chain_id: L2ChainId, - consider_snapshot_recovery: bool, + recovery_config: Option, ) -> anyhow::Result<()> { let mut storage = pool.connection_tagged("en").await?; let genesis_l1_batch = storage @@ -57,7 +63,7 @@ pub(crate) async fn ensure_storage_initialized( } (None, None) => { tracing::info!("Node has neither genesis L1 batch, nor snapshot recovery info"); - if consider_snapshot_recovery { + if recovery_config.is_some() { InitDecision::SnapshotRecovery } else { InitDecision::Genesis @@ -78,25 +84,31 @@ pub(crate) async fn ensure_storage_initialized( .context("performing genesis failed")?; } InitDecision::SnapshotRecovery => { - anyhow::ensure!( - consider_snapshot_recovery, + let recovery_config = recovery_config.context( "Snapshot recovery is required to proceed, but it is not enabled. Enable by setting \ `EN_SNAPSHOTS_RECOVERY_ENABLED=true` env variable to the node binary, or use a Postgres dump for recovery" - ); + )?; tracing::warn!("Proceeding with snapshot recovery. This is an experimental feature; use at your own risk"); - let recovery_config = SnapshotsRecoveryConfig::new()?; - let blob_store = ObjectStoreFactory::new(recovery_config.snapshots_object_store) + let object_store_config = snapshot_recovery_object_store_config()?; + let blob_store = ObjectStoreFactory::new(object_store_config) .create_store() .await; let config = SnapshotsApplierConfig::default(); - let snapshots_applier_task = SnapshotsApplierTask::new( + let mut snapshots_applier_task = SnapshotsApplierTask::new( config, pool, Box::new(main_node_client.for_component("snapshot_recovery")), blob_store, ); + if let Some(snapshot_l1_batch) = recovery_config.snapshot_l1_batch_override { + tracing::info!( + "Using a specific snapshot with L1 batch #{snapshot_l1_batch}; this may not work \ + if the snapshot is too old (order of several weeks old) or non-existent" + ); + snapshots_applier_task.set_snapshot_l1_batch(snapshot_l1_batch); + } app_health.insert_component(snapshots_applier_task.health_check())?; let recovery_started_at = Instant::now(); diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 584356e755b..05f4b2ba9d4 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -55,7 +55,7 @@ use zksync_web3_decl::{ use crate::{ config::ExternalNodeConfig, helpers::{MainNodeHealthCheck, ValidateChainIdsTask}, - init::ensure_storage_initialized, + init::{ensure_storage_initialized, SnapshotRecoveryConfig}, metrics::RUST_METRICS, }; @@ -908,12 +908,19 @@ async fn run_node( task_handles.extend(prometheus_task); // Make sure that the node storage is initialized either via genesis or snapshot recovery. + let recovery_config = + config + .optional + .snapshots_recovery_enabled + .then_some(SnapshotRecoveryConfig { + snapshot_l1_batch_override: config.experimental.snapshots_recovery_l1_batch, + }); ensure_storage_initialized( connection_pool.clone(), main_node_client.clone(), &app_health, config.required.l2_chain_id, - config.optional.snapshots_recovery_enabled, + recovery_config, ) .await?; let sigint_receiver = env.setup_sigint_handler(); diff --git a/core/lib/snapshots_applier/src/lib.rs b/core/lib/snapshots_applier/src/lib.rs index bcf4b3c1432..b0024f78433 100644 --- a/core/lib/snapshots_applier/src/lib.rs +++ b/core/lib/snapshots_applier/src/lib.rs @@ -123,7 +123,14 @@ pub trait SnapshotsApplierMainNodeClient: fmt::Debug + Send + Sync { number: L2BlockNumber, ) -> EnrichedClientResult>; - async fn fetch_newest_snapshot(&self) -> EnrichedClientResult>; + async fn fetch_newest_snapshot_l1_batch_number( + &self, + ) -> EnrichedClientResult>; + + async fn fetch_snapshot( + &self, + l1_batch_number: L1BatchNumber, + ) -> EnrichedClientResult>; async fn fetch_tokens( &self, @@ -153,17 +160,23 @@ impl SnapshotsApplierMainNodeClient for Box> { .await } - async fn fetch_newest_snapshot(&self) -> EnrichedClientResult> { + async fn fetch_newest_snapshot_l1_batch_number( + &self, + ) -> EnrichedClientResult> { let snapshots = self .get_all_snapshots() .rpc_context("get_all_snapshots") .await?; - let Some(newest_snapshot) = snapshots.snapshots_l1_batch_numbers.first() else { - return Ok(None); - }; - self.get_snapshot_by_l1_batch_number(*newest_snapshot) + Ok(snapshots.snapshots_l1_batch_numbers.first().copied()) + } + + async fn fetch_snapshot( + &self, + l1_batch_number: L1BatchNumber, + ) -> EnrichedClientResult> { + self.get_snapshot_by_l1_batch_number(l1_batch_number) .rpc_context("get_snapshot_by_l1_batch_number") - .with_arg("number", newest_snapshot) + .with_arg("number", &l1_batch_number) .await } @@ -179,7 +192,7 @@ impl SnapshotsApplierMainNodeClient for Box> { } /// Snapshot applier configuration options. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct SnapshotsApplierConfig { /// Number of retries for transient errors before giving up on recovery (i.e., returning an error /// from [`Self::run()`]). @@ -223,6 +236,7 @@ pub struct SnapshotApplierTaskStats { #[derive(Debug)] pub struct SnapshotsApplierTask { + snapshot_l1_batch: Option, config: SnapshotsApplierConfig, health_updater: HealthUpdater, connection_pool: ConnectionPool, @@ -238,6 +252,7 @@ impl SnapshotsApplierTask { blob_store: Arc, ) -> Self { Self { + snapshot_l1_batch: None, config, health_updater: ReactiveHealthCheck::new("snapshot_recovery").1, connection_pool, @@ -246,6 +261,11 @@ impl SnapshotsApplierTask { } } + /// Specifies the L1 batch to recover from. This setting is ignored if recovery is complete or resumed. + pub fn set_snapshot_l1_batch(&mut self, number: L1BatchNumber) { + self.snapshot_l1_batch = Some(number); + } + /// Returns the health check for snapshot recovery. pub fn health_check(&self) -> ReactiveHealthCheck { self.health_updater.subscribe() @@ -270,6 +290,7 @@ impl SnapshotsApplierTask { self.main_node_client.as_ref(), &self.blob_store, &self.health_updater, + self.snapshot_l1_batch, self.config.max_concurrency.get(), ) .await; @@ -324,6 +345,7 @@ impl SnapshotRecoveryStrategy { async fn new( storage: &mut Connection<'_, Core>, main_node_client: &dyn SnapshotsApplierMainNodeClient, + snapshot_l1_batch: Option, ) -> Result<(Self, SnapshotRecoveryStatus), SnapshotsApplierError> { let latency = METRICS.initial_stage_duration[&InitialStage::FetchMetadataFromMainNode].start(); @@ -350,7 +372,8 @@ impl SnapshotRecoveryStrategy { return Err(SnapshotsApplierError::Fatal(err)); } - let recovery_status = Self::create_fresh_recovery_status(main_node_client).await?; + let recovery_status = + Self::create_fresh_recovery_status(main_node_client, snapshot_l1_batch).await?; let storage_logs_count = storage .storage_logs_dal() @@ -373,12 +396,20 @@ impl SnapshotRecoveryStrategy { async fn create_fresh_recovery_status( main_node_client: &dyn SnapshotsApplierMainNodeClient, + snapshot_l1_batch: Option, ) -> Result { - let snapshot_response = main_node_client.fetch_newest_snapshot().await?; + let l1_batch_number = match snapshot_l1_batch { + Some(num) => num, + None => main_node_client + .fetch_newest_snapshot_l1_batch_number() + .await? + .context("no snapshots on main node; snapshot recovery is impossible")?, + }; + let snapshot_response = main_node_client.fetch_snapshot(l1_batch_number).await?; - let snapshot = snapshot_response - .context("no snapshots on main node; snapshot recovery is impossible")?; - let l1_batch_number = snapshot.l1_batch_number; + let snapshot = snapshot_response.with_context(|| { + format!("snapshot for L1 batch #{l1_batch_number} is not present on main node") + })?; let l2_block_number = snapshot.l2_block_number; tracing::info!( "Found snapshot with data up to L1 batch #{l1_batch_number}, L2 block #{l2_block_number}, \ @@ -461,6 +492,7 @@ impl<'a> SnapshotsApplier<'a> { main_node_client: &'a dyn SnapshotsApplierMainNodeClient, blob_store: &'a dyn ObjectStore, health_updater: &'a HealthUpdater, + snapshot_l1_batch: Option, max_concurrency: usize, ) -> Result<(SnapshotRecoveryStrategy, SnapshotRecoveryStatus), SnapshotsApplierError> { // While the recovery is in progress, the node is healthy (no error has occurred), @@ -472,8 +504,12 @@ impl<'a> SnapshotsApplier<'a> { .await?; let mut storage_transaction = storage.start_transaction().await?; - let (strategy, applied_snapshot_status) = - SnapshotRecoveryStrategy::new(&mut storage_transaction, main_node_client).await?; + let (strategy, applied_snapshot_status) = SnapshotRecoveryStrategy::new( + &mut storage_transaction, + main_node_client, + snapshot_l1_batch, + ) + .await?; tracing::info!("Chosen snapshot recovery strategy: {strategy:?} with status: {applied_snapshot_status:?}"); let created_from_scratch = match strategy { SnapshotRecoveryStrategy::Completed => return Ok((strategy, applied_snapshot_status)), diff --git a/core/lib/snapshots_applier/src/tests/mod.rs b/core/lib/snapshots_applier/src/tests/mod.rs index 59a95792c1c..e61f7645537 100644 --- a/core/lib/snapshots_applier/src/tests/mod.rs +++ b/core/lib/snapshots_applier/src/tests/mod.rs @@ -21,6 +21,7 @@ use self::utils::{ random_storage_logs, MockMainNodeClient, ObjectStoreWithErrors, }; use super::*; +use crate::tests::utils::HangingObjectStore; mod utils; @@ -142,6 +143,131 @@ async fn snapshots_creator_can_successfully_recover_db( assert!(!stats.done_work); } +#[tokio::test] +async fn applier_recovers_explicitly_specified_snapshot() { + let pool = ConnectionPool::::test_pool().await; + let expected_status = mock_recovery_status(); + let storage_logs = random_storage_logs(expected_status.l1_batch_number, 200); + let (object_store, client) = prepare_clients(&expected_status, &storage_logs).await; + + let mut task = SnapshotsApplierTask::new( + SnapshotsApplierConfig::for_tests(), + pool.clone(), + Box::new(client), + object_store, + ); + task.set_snapshot_l1_batch(expected_status.l1_batch_number); + let stats = task.run().await.unwrap(); + assert!(stats.done_work); + + let mut storage = pool.connection().await.unwrap(); + let all_storage_logs = storage + .storage_logs_dal() + .dump_all_storage_logs_for_tests() + .await; + assert_eq!(all_storage_logs.len(), storage_logs.len()); +} + +#[tokio::test] +async fn applier_error_for_missing_explicitly_specified_snapshot() { + let pool = ConnectionPool::::test_pool().await; + let expected_status = mock_recovery_status(); + let storage_logs = random_storage_logs(expected_status.l1_batch_number, 200); + let (object_store, client) = prepare_clients(&expected_status, &storage_logs).await; + + let mut task = SnapshotsApplierTask::new( + SnapshotsApplierConfig::for_tests(), + pool, + Box::new(client), + object_store, + ); + task.set_snapshot_l1_batch(expected_status.l1_batch_number + 1); + + let err = task.run().await.unwrap_err(); + assert!( + format!("{err:#}").contains("not present on main node"), + "{err:#}" + ); +} + +#[tokio::test] +async fn snapshot_applier_recovers_after_stopping() { + let pool = ConnectionPool::::test_pool().await; + let mut expected_status = mock_recovery_status(); + expected_status.storage_logs_chunks_processed = vec![true; 10]; + let storage_logs = random_storage_logs(expected_status.l1_batch_number, 200); + let (object_store, client) = prepare_clients(&expected_status, &storage_logs).await; + let (stopping_object_store, mut stop_receiver) = + HangingObjectStore::new(object_store.clone(), 1); + + let mut config = SnapshotsApplierConfig::for_tests(); + config.max_concurrency = NonZeroUsize::new(1).unwrap(); + let task = SnapshotsApplierTask::new( + config.clone(), + pool.clone(), + Box::new(client.clone()), + Arc::new(stopping_object_store), + ); + let task_handle = tokio::spawn(task.run()); + + // Wait until the first storage logs chunk is requested (the object store hangs up at this point) + stop_receiver.wait_for(|&count| count > 1).await.unwrap(); + assert!(!task_handle.is_finished()); + task_handle.abort(); + + // Check that factory deps have been persisted, but no storage logs. + let mut storage = pool.connection().await.unwrap(); + let all_factory_deps = storage + .factory_deps_dal() + .dump_all_factory_deps_for_tests() + .await; + assert!(!all_factory_deps.is_empty()); + let all_storage_logs = storage + .storage_logs_dal() + .dump_all_storage_logs_for_tests() + .await; + assert!(all_storage_logs.is_empty(), "{all_storage_logs:?}"); + + // Recover 3 storage log chunks and stop again + let (stopping_object_store, mut stop_receiver) = + HangingObjectStore::new(object_store.clone(), 3); + + let task = SnapshotsApplierTask::new( + config.clone(), + pool.clone(), + Box::new(client.clone()), + Arc::new(stopping_object_store), + ); + let task_handle = tokio::spawn(task.run()); + + stop_receiver.wait_for(|&count| count > 3).await.unwrap(); + assert!(!task_handle.is_finished()); + task_handle.abort(); + + let all_storage_logs = storage + .storage_logs_dal() + .dump_all_storage_logs_for_tests() + .await; + assert!(all_storage_logs.len() < storage_logs.len()); + + // Recover remaining 7 (10 - 3) storage log chunks. + let (stopping_object_store, _) = HangingObjectStore::new(object_store.clone(), 7); + let mut task = SnapshotsApplierTask::new( + config, + pool.clone(), + Box::new(client), + Arc::new(stopping_object_store), + ); + task.set_snapshot_l1_batch(expected_status.l1_batch_number); // check that this works fine + task.run().await.unwrap(); + + let all_storage_logs = storage + .storage_logs_dal() + .dump_all_storage_logs_for_tests() + .await; + assert_eq!(all_storage_logs.len(), storage_logs.len()); +} + #[tokio::test] async fn health_status_immediately_after_task_start() { #[derive(Debug, Clone)] @@ -165,7 +291,17 @@ async fn health_status_immediately_after_task_start() { future::pending().await } - async fn fetch_newest_snapshot(&self) -> EnrichedClientResult> { + async fn fetch_newest_snapshot_l1_batch_number( + &self, + ) -> EnrichedClientResult> { + self.0.wait().await; + future::pending().await + } + + async fn fetch_snapshot( + &self, + _l1_batch_number: L1BatchNumber, + ) -> EnrichedClientResult> { self.0.wait().await; future::pending().await } diff --git a/core/lib/snapshots_applier/src/tests/utils.rs b/core/lib/snapshots_applier/src/tests/utils.rs index 4629d8c0a2f..c853481ab53 100644 --- a/core/lib/snapshots_applier/src/tests/utils.rs +++ b/core/lib/snapshots_applier/src/tests/utils.rs @@ -1,8 +1,9 @@ //! Test utils. -use std::{collections::HashMap, fmt, sync::Arc}; +use std::{collections::HashMap, fmt, future, sync::Arc}; use async_trait::async_trait; +use tokio::sync::watch; use zksync_object_store::{Bucket, ObjectStore, ObjectStoreError, ObjectStoreFactory}; use zksync_types::{ api, @@ -45,8 +46,23 @@ impl SnapshotsApplierMainNodeClient for MockMainNodeClient { Ok(self.fetch_l2_block_responses.get(&number).cloned()) } - async fn fetch_newest_snapshot(&self) -> EnrichedClientResult> { - Ok(self.fetch_newest_snapshot_response.clone()) + async fn fetch_newest_snapshot_l1_batch_number( + &self, + ) -> EnrichedClientResult> { + Ok(self + .fetch_newest_snapshot_response + .as_ref() + .map(|response| response.l1_batch_number)) + } + + async fn fetch_snapshot( + &self, + l1_batch_number: L1BatchNumber, + ) -> EnrichedClientResult> { + Ok(self + .fetch_newest_snapshot_response + .clone() + .filter(|response| response.l1_batch_number == l1_batch_number)) } async fn fetch_tokens( @@ -223,16 +239,12 @@ pub(super) fn mock_snapshot_header(status: &SnapshotRecoveryStatus) -> SnapshotH version: SnapshotVersion::Version0.into(), l1_batch_number: status.l1_batch_number, l2_block_number: status.l2_block_number, - storage_logs_chunks: vec![ - SnapshotStorageLogsChunkMetadata { - chunk_id: 0, - filepath: "file0".to_string(), - }, - SnapshotStorageLogsChunkMetadata { - chunk_id: 1, - filepath: "file1".to_string(), - }, - ], + storage_logs_chunks: (0..status.storage_logs_chunks_processed.len() as u64) + .map(|chunk_id| SnapshotStorageLogsChunkMetadata { + chunk_id, + filepath: format!("file{chunk_id}"), + }) + .collect(), factory_deps_filepath: "some_filepath".to_string(), } } @@ -289,3 +301,64 @@ pub(super) async fn prepare_clients( ); (object_store, client) } + +/// Object store wrapper that hangs up after processing the specified number of requests. +/// Used to emulate the snapshot applier being restarted since, if it's configured to have concurrency 1, +/// the applier will request an object from the store strictly after fully processing all previously requested objects. +#[derive(Debug)] +pub(super) struct HangingObjectStore { + inner: Arc, + stop_after_count: usize, + count_sender: watch::Sender, +} + +impl HangingObjectStore { + pub fn new( + inner: Arc, + stop_after_count: usize, + ) -> (Self, watch::Receiver) { + let (count_sender, count_receiver) = watch::channel(0); + let this = Self { + inner, + stop_after_count, + count_sender, + }; + (this, count_receiver) + } +} + +#[async_trait] +impl ObjectStore for HangingObjectStore { + async fn get_raw(&self, bucket: Bucket, key: &str) -> Result, ObjectStoreError> { + let mut should_proceed = true; + self.count_sender.send_modify(|count| { + *count += 1; + if dbg!(*count) > self.stop_after_count { + should_proceed = false; + } + }); + + if dbg!(should_proceed) { + self.inner.get_raw(bucket, key).await + } else { + future::pending().await // Hang up the snapshot applier task + } + } + + async fn put_raw( + &self, + _bucket: Bucket, + _key: &str, + _value: Vec, + ) -> Result<(), ObjectStoreError> { + unreachable!("Should not be used in snapshot applier") + } + + async fn remove_raw(&self, _bucket: Bucket, _key: &str) -> Result<(), ObjectStoreError> { + unreachable!("Should not be used in snapshot applier") + } + + fn storage_prefix_raw(&self, bucket: Bucket) -> String { + self.inner.storage_prefix_raw(bucket) + } +}