Skip to content

Commit

Permalink
feat: Unify and port node storage initialization (#2363)
Browse files Browse the repository at this point in the history
## What ❔

- Introduces the `zksync_node_storage_init` crate: a unified approach to
the node storage initialization. Key moments:
- Storage initializer is a structure that makes sure that, well, the
storage is initialized.
- Initializer understands what does initialized storage means, but
defers any real initialization actions to the implementation of the
`NodeRole` trait. Currently we have two `NodeRole` implementations:
`MainNodeRole` and `ExternalNodeRole`.
- `MainNodeRole` can only perform genesis. It does not support snapshot
recovery or automatic rollbacks.
- `ExternalNodeRole` can perform either genesis or snapshot recovery; it
can also detect reorg and perform a rollback is required.
- Framework integration consists of three parts: `NodeRole` resource,
and `NodeStorageInitializer` task and precondition.
- Old genesis code for the main node is fully replaced with the
framework.
- The init code is integrated into the EN, but the old code is left for
the time being.
- Makes snapshot recovery aware of stop signals. It isn't integrated for
the old code, but I assume that since the snapshot recovery happens
before we setup sigint handler, it implicitly works out of the box.
- Integrated `reorg_detector` into the EN via framework.

## Why ❔

- Unify codebases and approach to the storage initialization.
- Define the interfaces for future extensions.
- Part of porting the codebase to the framework.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
popzxc authored Jul 9, 2024
1 parent 27fabaf commit 8ea9791
Show file tree
Hide file tree
Showing 30 changed files with 1,210 additions and 136 deletions.
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ members = [
"core/node/consistency_checker",
"core/node/metadata_calculator",
"core/node/node_sync",
"core/node/node_storage_init",
"core/node/consensus",
"core/node/contract_verification_server",
"core/node/api_server",
Expand Down Expand Up @@ -277,6 +278,7 @@ zksync_reorg_detector = { path = "core/node/reorg_detector" }
zksync_consistency_checker = { path = "core/node/consistency_checker" }
zksync_metadata_calculator = { path = "core/node/metadata_calculator" }
zksync_node_sync = { path = "core/node/node_sync" }
zksync_node_storage_init = { path = "core/node/node_storage_init" }
zksync_node_consensus = { path = "core/node/consensus" }
zksync_contract_verification_server = { path = "core/node/contract_verification_server" }
zksync_node_api_server = { path = "core/node/api_server" }
Expand Down
8 changes: 7 additions & 1 deletion core/bin/external_node/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::time::Instant;

use anyhow::Context as _;
use tokio::sync::watch;
use zksync_config::ObjectStoreConfig;
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_health_check::AppHealthCheck;
Expand Down Expand Up @@ -30,6 +31,7 @@ enum InitDecision {
}

pub(crate) async fn ensure_storage_initialized(
stop_receiver: watch::Receiver<bool>,
pool: ConnectionPool<Core>,
main_node_client: Box<DynClient<L2>>,
app_health: &AppHealthCheck,
Expand Down Expand Up @@ -120,7 +122,7 @@ pub(crate) async fn ensure_storage_initialized(

let recovery_started_at = Instant::now();
let stats = snapshots_applier_task
.run()
.run(stop_receiver)
.await
.context("snapshot recovery failed")?;
if stats.done_work {
Expand All @@ -129,6 +131,10 @@ pub(crate) async fn ensure_storage_initialized(
.set(latency);
tracing::info!("Recovered Postgres from snapshot in {latency:?}");
}
assert!(
!stats.canceled,
"Snapshot recovery task cannot be canceled in the current implementation"
);
}
}
Ok(())
Expand Down
3 changes: 3 additions & 0 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,10 @@ async fn run_node(
.snapshots_recovery_drop_storage_key_preimages,
object_store_config: config.optional.snapshots_recovery_object_store.clone(),
});
// Note: while stop receiver is passed there, it won't be respected, since we wait this task
// to complete. Will be fixed after migration to the node framework.
ensure_storage_initialized(
stop_receiver.clone(),
connection_pool.clone(),
main_node_client.clone(),
&app_health,
Expand Down
65 changes: 63 additions & 2 deletions core/bin/external_node/src/node_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@ use zksync_node_framework::{
main_node_client::MainNodeClientLayer,
main_node_fee_params_fetcher::MainNodeFeeParamsFetcherLayer,
metadata_calculator::MetadataCalculatorLayer,
node_storage_init::{
external_node_strategy::{ExternalNodeInitStrategyLayer, SnapshotRecoveryConfig},
NodeStorageInitializerLayer,
},
pools_layer::PoolsLayerBuilder,
postgres_metrics::PostgresMetricsLayer,
prometheus_exporter::PrometheusExporterLayer,
pruning::PruningLayer,
query_eth_client::QueryEthClientLayer,
reorg_detector::ReorgDetectorLayer,
sigint::SigintHandlerLayer,
state_keeper::{
external_io::ExternalIOLayer, main_batch_executor::MainBatchExecutorLayer,
Expand Down Expand Up @@ -421,6 +426,49 @@ impl ExternalNodeBuilder {
Ok(self)
}

fn add_reorg_detector_layer(mut self) -> anyhow::Result<Self> {
self.node.add_layer(ReorgDetectorLayer);
Ok(self)
}

/// This layer will make sure that the database is initialized correctly,
/// e.g.:
/// - genesis or snapshot recovery will be performed if it's required.
/// - we perform the storage rollback if required (e.g. if reorg is detected).
///
/// Depending on the `kind` provided, either a task or a precondition will be added.
///
/// *Important*: the task should be added by at most one component, because
/// it assumes unique control over the database. Multiple components adding this
/// layer in a distributed mode may result in the database corruption.
///
/// This task works in pair with precondition, which must be present in every component:
/// the precondition will prevent node from starting until the database is initialized.
fn add_storage_initialization_layer(mut self, kind: LayerKind) -> anyhow::Result<Self> {
let config = &self.config;
let snapshot_recovery_config =
config
.optional
.snapshots_recovery_enabled
.then_some(SnapshotRecoveryConfig {
snapshot_l1_batch_override: config.experimental.snapshots_recovery_l1_batch,
drop_storage_key_preimages: config
.experimental
.snapshots_recovery_drop_storage_key_preimages,
object_store_config: config.optional.snapshots_recovery_object_store.clone(),
});
self.node.add_layer(ExternalNodeInitStrategyLayer {
l2_chain_id: self.config.required.l2_chain_id,
snapshot_recovery_config,
});
let mut layer = NodeStorageInitializerLayer::new();
if matches!(kind, LayerKind::Precondition) {
layer = layer.as_precondition();
}
self.node.add_layer(layer);
Ok(self)
}

pub fn build(mut self, mut components: Vec<Component>) -> anyhow::Result<ZkStackService> {
// Add "base" layers
self = self
Expand All @@ -429,12 +477,14 @@ impl ExternalNodeBuilder {
.add_prometheus_exporter_layer()?
.add_pools_layer()?
.add_main_node_client_layer()?
.add_query_eth_client_layer()?;
.add_query_eth_client_layer()?
.add_reorg_detector_layer()?;

// Add preconditions for all the components.
self = self
.add_l1_batch_commitment_mode_validation_layer()?
.add_validate_chain_ids_layer()?;
.add_validate_chain_ids_layer()?
.add_storage_initialization_layer(LayerKind::Precondition)?;

// Sort the components, so that the components they may depend on each other are added in the correct order.
components.sort_unstable_by_key(|component| match component {
Expand Down Expand Up @@ -499,10 +549,21 @@ impl ExternalNodeBuilder {
.add_consistency_checker_layer()?
.add_commitment_generator_layer()?
.add_batch_status_updater_layer()?;

// We assign the storage initialization to the core, as it's considered to be
// the "main" component.
self = self.add_storage_initialization_layer(LayerKind::Task)?;
}
}
}

Ok(self.node.build()?)
}
}

/// Marker for layers that can add either a task or a precondition.
#[derive(Debug)]
enum LayerKind {
Task,
Precondition,
}
63 changes: 8 additions & 55 deletions core/bin/zksync_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ use zksync_config::{
SnapshotsCreatorConfig,
};
use zksync_core_leftovers::{
genesis_init, is_genesis_needed,
temp_config_store::{decode_yaml_repr, TempConfigStore},
Component, Components,
};
use zksync_env_config::FromEnv;
use zksync_eth_client::clients::Client;

use crate::node_builder::MainNodeBuilder;

Expand All @@ -42,9 +40,6 @@ struct Cli {
/// Generate genesis block for the first contract deployment using temporary DB.
#[arg(long)]
genesis: bool,
/// Rebuild tree.
#[arg(long)]
rebuild_tree: bool,
/// Comma-separated list of components to launch.
#[arg(
long,
Expand Down Expand Up @@ -180,65 +175,23 @@ fn main() -> anyhow::Result<()> {
}
};

run_genesis_if_needed(opt.genesis, &genesis, &contracts_config, &secrets)?;
if opt.genesis {
// If genesis is requested, we don't need to run the node.
return Ok(());
}

let components = if opt.rebuild_tree {
vec![Component::Tree]
} else {
opt.components.0
};

let node = MainNodeBuilder::new(
configs,
wallets,
genesis,
contracts_config,
secrets,
consensus,
)
.build(components)?;
node.run()?;
Ok(())
}
);

fn run_genesis_if_needed(
force_genesis: bool,
genesis: &GenesisConfig,
contracts_config: &ContractsConfig,
secrets: &Secrets,
) -> anyhow::Result<()> {
let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
tokio_runtime.block_on(async move {
let database_secrets = secrets.database.clone().context("DatabaseSecrets")?;
if force_genesis || is_genesis_needed(&database_secrets).await {
genesis_init(genesis.clone(), &database_secrets)
.await
.context("genesis_init")?;
if opt.genesis {
// If genesis is requested, we don't need to run the node.
node.only_genesis()?.run()?;
return Ok(());
}

if let Some(ecosystem_contracts) = &contracts_config.ecosystem_contracts {
let l1_secrets = secrets.l1.as_ref().context("l1_screts")?;
let query_client = Client::http(l1_secrets.l1_rpc_url.clone())
.context("Ethereum client")?
.for_network(genesis.l1_chain_id.into())
.build();
zksync_node_genesis::save_set_chain_id_tx(
&query_client,
contracts_config.diamond_proxy_addr,
ecosystem_contracts.state_transition_proxy_addr,
&database_secrets,
)
.await
.context("Failed to save SetChainId upgrade transaction")?;
}
}
Ok(())
})
node.build(opt.components.0)?.run()?;
Ok(())
}

fn load_env_config() -> anyhow::Result<TempConfigStore> {
Expand Down
Loading

0 comments on commit 8ea9791

Please sign in to comment.