Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(en): Allow recovery from specific snapshot #2137

Merged
merged 7 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 10 additions & 17 deletions core/bin/external_node/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use zksync_node_api_server::{
use zksync_protobuf_config::proto;
use zksync_snapshots_applier::SnapshotsApplierConfig;
use zksync_types::{
api::BridgeAddresses, commitment::L1BatchCommitmentMode, url::SensitiveUrl, Address, L1ChainId,
L2ChainId, ETHEREUM_ADDRESS,
api::BridgeAddresses, commitment::L1BatchCommitmentMode, url::SensitiveUrl, Address,
L1BatchNumber, L1ChainId, L2ChainId, ETHEREUM_ADDRESS,
};
use zksync_web3_decl::{
client::{DynClient, L2},
Expand Down Expand Up @@ -746,6 +746,8 @@ pub(crate) struct ExperimentalENConfig {
pub state_keeper_db_max_open_files: Option<NonZeroU32>,

// Snapshot recovery
/// L1 batch number of the snapshot to use during recovery. Specifying this parameter is mostly useful for testing.
pub snapshots_recovery_l1_batch: Option<L1BatchNumber>,
/// Approximate chunk size (measured in the number of entries) to recover in a single iteration.
/// Reasonable values are order of 100,000 (meaning an iteration takes several seconds).
///
Expand Down Expand Up @@ -775,6 +777,7 @@ impl ExperimentalENConfig {
state_keeper_db_block_cache_capacity_mb:
Self::default_state_keeper_db_block_cache_capacity_mb(),
state_keeper_db_max_open_files: None,
snapshots_recovery_l1_batch: None,
snapshots_recovery_tree_chunk_size: Self::default_snapshots_recovery_tree_chunk_size(),
commitment_generator_max_parallelism: None,
}
Expand Down Expand Up @@ -807,21 +810,11 @@ pub(crate) fn read_consensus_config() -> anyhow::Result<Option<ConsensusConfig>>
))
}

/// Configuration for snapshot recovery. Loaded optionally, only if snapshot recovery is enabled.
#[derive(Debug)]
pub(crate) struct SnapshotsRecoveryConfig {
pub snapshots_object_store: ObjectStoreConfig,
}

impl SnapshotsRecoveryConfig {
pub fn new() -> anyhow::Result<Self> {
let snapshots_object_store = envy::prefixed("EN_SNAPSHOTS_OBJECT_STORE_")
.from_env::<ObjectStoreConfig>()
.context("failed loading snapshot object store config from env variables")?;
Ok(Self {
snapshots_object_store,
})
}
/// Configuration for snapshot recovery. Should be loaded optionally, only if snapshot recovery is enabled.
pub(crate) fn snapshot_recovery_object_store_config() -> anyhow::Result<ObjectStoreConfig> {
envy::prefixed("EN_SNAPSHOTS_OBJECT_STORE_")
.from_env::<ObjectStoreConfig>()
.context("failed loading snapshot object store config from env variables")
}

#[derive(Debug, Deserialize)]
Expand Down
30 changes: 21 additions & 9 deletions core/bin/external_node/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ use zksync_snapshots_applier::{SnapshotsApplierConfig, SnapshotsApplierTask};
use zksync_types::{L1BatchNumber, L2ChainId};
use zksync_web3_decl::client::{DynClient, L2};

use crate::config::SnapshotsRecoveryConfig;
use crate::config::snapshot_recovery_object_store_config;

#[derive(Debug)]
pub(crate) struct SnapshotRecoveryConfig {
/// If not specified, the latest snapshot will be used.
pub snapshot_l1_batch_override: Option<L1BatchNumber>,
}

#[derive(Debug)]
enum InitDecision {
Expand All @@ -27,7 +33,7 @@ pub(crate) async fn ensure_storage_initialized(
main_node_client: Box<DynClient<L2>>,
app_health: &AppHealthCheck,
l2_chain_id: L2ChainId,
consider_snapshot_recovery: bool,
recovery_config: Option<SnapshotRecoveryConfig>,
) -> anyhow::Result<()> {
let mut storage = pool.connection_tagged("en").await?;
let genesis_l1_batch = storage
Expand Down Expand Up @@ -57,7 +63,7 @@ pub(crate) async fn ensure_storage_initialized(
}
(None, None) => {
tracing::info!("Node has neither genesis L1 batch, nor snapshot recovery info");
if consider_snapshot_recovery {
if recovery_config.is_some() {
InitDecision::SnapshotRecovery
} else {
InitDecision::Genesis
Expand All @@ -78,25 +84,31 @@ pub(crate) async fn ensure_storage_initialized(
.context("performing genesis failed")?;
}
InitDecision::SnapshotRecovery => {
anyhow::ensure!(
consider_snapshot_recovery,
let recovery_config = recovery_config.context(
"Snapshot recovery is required to proceed, but it is not enabled. Enable by setting \
`EN_SNAPSHOTS_RECOVERY_ENABLED=true` env variable to the node binary, or use a Postgres dump for recovery"
);
)?;

tracing::warn!("Proceeding with snapshot recovery. This is an experimental feature; use at your own risk");
let recovery_config = SnapshotsRecoveryConfig::new()?;
let blob_store = ObjectStoreFactory::new(recovery_config.snapshots_object_store)
let object_store_config = snapshot_recovery_object_store_config()?;
let blob_store = ObjectStoreFactory::new(object_store_config)
.create_store()
.await;

let config = SnapshotsApplierConfig::default();
let snapshots_applier_task = SnapshotsApplierTask::new(
let mut snapshots_applier_task = SnapshotsApplierTask::new(
config,
pool,
Box::new(main_node_client.for_component("snapshot_recovery")),
blob_store,
);
if let Some(snapshot_l1_batch) = recovery_config.snapshot_l1_batch_override {
tracing::info!(
"Using a specific snapshot with L1 batch #{snapshot_l1_batch}; this may not work \
if the snapshot is too old (order of several weeks old) or non-existent"
);
snapshots_applier_task.set_snapshot_l1_batch(snapshot_l1_batch);
}
app_health.insert_component(snapshots_applier_task.health_check())?;

let recovery_started_at = Instant::now();
Expand Down
11 changes: 9 additions & 2 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use zksync_web3_decl::{
use crate::{
config::ExternalNodeConfig,
helpers::{MainNodeHealthCheck, ValidateChainIdsTask},
init::ensure_storage_initialized,
init::{ensure_storage_initialized, SnapshotRecoveryConfig},
metrics::RUST_METRICS,
};

Expand Down Expand Up @@ -908,12 +908,19 @@ async fn run_node(
task_handles.extend(prometheus_task);

// Make sure that the node storage is initialized either via genesis or snapshot recovery.
let recovery_config =
config
.optional
.snapshots_recovery_enabled
.then_some(SnapshotRecoveryConfig {
snapshot_l1_batch_override: config.experimental.snapshots_recovery_l1_batch,
});
ensure_storage_initialized(
connection_pool.clone(),
main_node_client.clone(),
&app_health,
config.required.l2_chain_id,
config.optional.snapshots_recovery_enabled,
recovery_config,
)
.await?;
let sigint_receiver = env.setup_sigint_handler();
Expand Down
66 changes: 51 additions & 15 deletions core/lib/snapshots_applier/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,14 @@ pub trait SnapshotsApplierMainNodeClient: fmt::Debug + Send + Sync {
number: L2BlockNumber,
) -> EnrichedClientResult<Option<api::BlockDetails>>;

async fn fetch_newest_snapshot(&self) -> EnrichedClientResult<Option<SnapshotHeader>>;
async fn fetch_newest_snapshot_l1_batch_number(
&self,
) -> EnrichedClientResult<Option<L1BatchNumber>>;

async fn fetch_snapshot(
&self,
l1_batch_number: L1BatchNumber,
) -> EnrichedClientResult<Option<SnapshotHeader>>;

async fn fetch_tokens(
&self,
Expand Down Expand Up @@ -153,17 +160,23 @@ impl SnapshotsApplierMainNodeClient for Box<DynClient<L2>> {
.await
}

async fn fetch_newest_snapshot(&self) -> EnrichedClientResult<Option<SnapshotHeader>> {
async fn fetch_newest_snapshot_l1_batch_number(
&self,
) -> EnrichedClientResult<Option<L1BatchNumber>> {
let snapshots = self
.get_all_snapshots()
.rpc_context("get_all_snapshots")
.await?;
let Some(newest_snapshot) = snapshots.snapshots_l1_batch_numbers.first() else {
return Ok(None);
};
self.get_snapshot_by_l1_batch_number(*newest_snapshot)
Ok(snapshots.snapshots_l1_batch_numbers.first().copied())
}

async fn fetch_snapshot(
&self,
l1_batch_number: L1BatchNumber,
) -> EnrichedClientResult<Option<SnapshotHeader>> {
self.get_snapshot_by_l1_batch_number(l1_batch_number)
.rpc_context("get_snapshot_by_l1_batch_number")
.with_arg("number", newest_snapshot)
.with_arg("number", &l1_batch_number)
.await
}

Expand All @@ -179,7 +192,7 @@ impl SnapshotsApplierMainNodeClient for Box<DynClient<L2>> {
}

/// Snapshot applier configuration options.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct SnapshotsApplierConfig {
/// Number of retries for transient errors before giving up on recovery (i.e., returning an error
/// from [`Self::run()`]).
Expand Down Expand Up @@ -223,6 +236,7 @@ pub struct SnapshotApplierTaskStats {

#[derive(Debug)]
pub struct SnapshotsApplierTask {
snapshot_l1_batch: Option<L1BatchNumber>,
config: SnapshotsApplierConfig,
health_updater: HealthUpdater,
connection_pool: ConnectionPool<Core>,
Expand All @@ -238,6 +252,7 @@ impl SnapshotsApplierTask {
blob_store: Arc<dyn ObjectStore>,
) -> Self {
Self {
snapshot_l1_batch: None,
config,
health_updater: ReactiveHealthCheck::new("snapshot_recovery").1,
connection_pool,
Expand All @@ -246,6 +261,11 @@ impl SnapshotsApplierTask {
}
}

/// Specifies the L1 batch to recover from. This setting is ignored if recovery is complete or resumed.
pub fn set_snapshot_l1_batch(&mut self, number: L1BatchNumber) {
self.snapshot_l1_batch = Some(number);
}

/// Returns the health check for snapshot recovery.
pub fn health_check(&self) -> ReactiveHealthCheck {
self.health_updater.subscribe()
Expand All @@ -270,6 +290,7 @@ impl SnapshotsApplierTask {
self.main_node_client.as_ref(),
&self.blob_store,
&self.health_updater,
self.snapshot_l1_batch,
self.config.max_concurrency.get(),
)
.await;
Expand Down Expand Up @@ -324,6 +345,7 @@ impl SnapshotRecoveryStrategy {
async fn new(
storage: &mut Connection<'_, Core>,
main_node_client: &dyn SnapshotsApplierMainNodeClient,
snapshot_l1_batch: Option<L1BatchNumber>,
) -> Result<(Self, SnapshotRecoveryStatus), SnapshotsApplierError> {
let latency =
METRICS.initial_stage_duration[&InitialStage::FetchMetadataFromMainNode].start();
Expand All @@ -350,7 +372,8 @@ impl SnapshotRecoveryStrategy {
return Err(SnapshotsApplierError::Fatal(err));
}

let recovery_status = Self::create_fresh_recovery_status(main_node_client).await?;
let recovery_status =
Self::create_fresh_recovery_status(main_node_client, snapshot_l1_batch).await?;

let storage_logs_count = storage
.storage_logs_dal()
Expand All @@ -373,12 +396,20 @@ impl SnapshotRecoveryStrategy {

async fn create_fresh_recovery_status(
main_node_client: &dyn SnapshotsApplierMainNodeClient,
snapshot_l1_batch: Option<L1BatchNumber>,
) -> Result<SnapshotRecoveryStatus, SnapshotsApplierError> {
let snapshot_response = main_node_client.fetch_newest_snapshot().await?;
let l1_batch_number = match snapshot_l1_batch {
Some(num) => num,
None => main_node_client
.fetch_newest_snapshot_l1_batch_number()
.await?
.context("no snapshots on main node; snapshot recovery is impossible")?,
};
let snapshot_response = main_node_client.fetch_snapshot(l1_batch_number).await?;

let snapshot = snapshot_response
.context("no snapshots on main node; snapshot recovery is impossible")?;
let l1_batch_number = snapshot.l1_batch_number;
let snapshot = snapshot_response.with_context(|| {
format!("snapshot for L1 batch #{l1_batch_number} is not present on main node")
})?;
let l2_block_number = snapshot.l2_block_number;
tracing::info!(
"Found snapshot with data up to L1 batch #{l1_batch_number}, L2 block #{l2_block_number}, \
Expand Down Expand Up @@ -461,6 +492,7 @@ impl<'a> SnapshotsApplier<'a> {
main_node_client: &'a dyn SnapshotsApplierMainNodeClient,
blob_store: &'a dyn ObjectStore,
health_updater: &'a HealthUpdater,
snapshot_l1_batch: Option<L1BatchNumber>,
max_concurrency: usize,
) -> Result<(SnapshotRecoveryStrategy, SnapshotRecoveryStatus), SnapshotsApplierError> {
// While the recovery is in progress, the node is healthy (no error has occurred),
Expand All @@ -472,8 +504,12 @@ impl<'a> SnapshotsApplier<'a> {
.await?;
let mut storage_transaction = storage.start_transaction().await?;

let (strategy, applied_snapshot_status) =
SnapshotRecoveryStrategy::new(&mut storage_transaction, main_node_client).await?;
let (strategy, applied_snapshot_status) = SnapshotRecoveryStrategy::new(
&mut storage_transaction,
main_node_client,
snapshot_l1_batch,
)
.await?;
tracing::info!("Chosen snapshot recovery strategy: {strategy:?} with status: {applied_snapshot_status:?}");
let created_from_scratch = match strategy {
SnapshotRecoveryStrategy::Completed => return Ok((strategy, applied_snapshot_status)),
Expand Down
Loading
Loading