From c75f6e166bed09e79f546e99a43426cf6d3931a0 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Mon, 5 Aug 2024 10:13:07 +0300 Subject: [PATCH] =?UTF-8?q?refactor(vm-runner):=20Allow=20switching=20betw?= =?UTF-8?q?een=20VMs=20for=20latest=20protocol=20version=20=E2=80=93=20fol?= =?UTF-8?q?low=20ups=20(#2567)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Various follow-ups after https://github.com/matter-labs/zksync-era/pull/2508: - Adds VM playground config to `etc/env`. - Adds a health check for the VM playground. - Runs VM playground in server integration tests and checks it on teardown. ## Why ❔ Improves maintainability and test coverage. --- .github/workflows/ci-core-reusable.yml | 2 +- Cargo.lock | 2 + .../layers/vm_runner/playground.rs | 23 ++++-- core/node/vm_runner/Cargo.toml | 2 + core/node/vm_runner/src/impls/playground.rs | 51 ++++++++++++-- core/node/vm_runner/src/tests/playground.rs | 17 +++-- .../tests/ts-integration/src/context-owner.ts | 70 ++++++++++++++++++- core/tests/ts-integration/src/reporter.ts | 4 +- etc/env/base/vm_runner.toml | 6 ++ etc/env/file_based/general.yaml | 10 ++- 10 files changed, 161 insertions(+), 26 deletions(-) diff --git a/.github/workflows/ci-core-reusable.yml b/.github/workflows/ci-core-reusable.yml index c0c816aa8e2..6eae8501dcd 100644 --- a/.github/workflows/ci-core-reusable.yml +++ b/.github/workflows/ci-core-reusable.yml @@ -147,7 +147,7 @@ jobs: base_token: ["Eth", "Custom"] deployment_mode: ["Rollup", "Validium"] env: - SERVER_COMPONENTS: "api,tree,eth,state_keeper,housekeeper,commitment_generator,vm_runner_protective_reads,vm_runner_bwip,da_dispatcher${{ matrix.consensus && ',consensus' || '' }}${{ matrix.base_token == 'Custom' && ',base_token_ratio_persister' || '' }}" + SERVER_COMPONENTS: "api,tree,eth,state_keeper,housekeeper,commitment_generator,vm_runner_protective_reads,vm_runner_bwip,vm_playground,da_dispatcher${{ matrix.consensus && ',consensus' || '' }}${{ matrix.base_token == 'Custom' && ',base_token_ratio_persister' || '' }}" runs-on: [matterlabs-ci-runner] steps: diff --git a/Cargo.lock b/Cargo.lock index f6c07b39dc6..520ae286e64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9745,6 +9745,7 @@ dependencies = [ "futures 0.3.28", "once_cell", "rand 0.8.5", + "serde", "tempfile", "test-casing", "tokio", @@ -9752,6 +9753,7 @@ dependencies = [ "vise", "zksync_contracts", "zksync_dal", + "zksync_health_check", "zksync_multivm", "zksync_node_genesis", "zksync_node_test_utils", diff --git a/core/node/node_framework/src/implementations/layers/vm_runner/playground.rs b/core/node/node_framework/src/implementations/layers/vm_runner/playground.rs index 549a3ed13e4..810d538ba97 100644 --- a/core/node/node_framework/src/implementations/layers/vm_runner/playground.rs +++ b/core/node/node_framework/src/implementations/layers/vm_runner/playground.rs @@ -1,7 +1,6 @@ use async_trait::async_trait; use zksync_config::configs::ExperimentalVmPlaygroundConfig; use zksync_node_framework_derive::{FromContext, IntoContext}; -use zksync_state_keeper::MainBatchExecutor; use zksync_types::L2ChainId; use zksync_vm_runner::{ impls::{VmPlayground, VmPlaygroundIo, VmPlaygroundLoaderTask}, @@ -9,7 +8,10 @@ use zksync_vm_runner::{ }; use crate::{ - implementations::resources::pools::{MasterPool, PoolResource}, + implementations::resources::{ + healthcheck::AppHealthCheckResource, + pools::{MasterPool, PoolResource}, + }, StopReceiver, Task, TaskId, WiringError, WiringLayer, }; @@ -32,6 +34,8 @@ impl VmPlaygroundLayer { #[context(crate = crate)] pub struct Input { pub master_pool: PoolResource, + #[context(default)] + pub app_health: AppHealthCheckResource, } #[derive(Debug, IntoContext)] @@ -55,7 +59,10 @@ impl WiringLayer for VmPlaygroundLayer { } async fn wire(self, input: Self::Input) -> Result { - let Input { master_pool } = input; + let Input { + master_pool, + app_health, + } = input; // - 1 connection for `StorageSyncTask` which can hold a long-term connection in case it needs to // catch up cache. @@ -64,12 +71,9 @@ impl WiringLayer for VmPlaygroundLayer { // - 1 connection for the only running VM instance. let connection_pool = master_pool.get_custom(3).await?; - let mut batch_executor = Box::new(MainBatchExecutor::new(false, false)); - batch_executor.set_fast_vm_mode(self.config.fast_vm_mode); - let (playground, tasks) = VmPlayground::new( connection_pool, - batch_executor, + self.config.fast_vm_mode, self.config.db_path, self.zksync_network_id, self.config.first_processed_batch, @@ -77,6 +81,11 @@ impl WiringLayer for VmPlaygroundLayer { ) .await?; + app_health + .0 + .insert_component(playground.health_check()) + .map_err(WiringError::internal)?; + Ok(Output { output_handler_factory_task: tasks.output_handler_factory_task, loader_task: tasks.loader_task, diff --git a/core/node/vm_runner/Cargo.toml b/core/node/vm_runner/Cargo.toml index 52a8e467643..cc6313fa572 100644 --- a/core/node/vm_runner/Cargo.toml +++ b/core/node/vm_runner/Cargo.toml @@ -22,7 +22,9 @@ zksync_utils.workspace = true zksync_prover_interface.workspace = true zksync_object_store.workspace = true zksync_vm_utils.workspace = true +zksync_health_check.workspace = true +serde.workspace = true tokio = { workspace = true, features = ["time"] } anyhow.workspace = true async-trait.workspace = true diff --git a/core/node/vm_runner/src/impls/playground.rs b/core/node/vm_runner/src/impls/playground.rs index 11792ebdefd..4fb140431df 100644 --- a/core/node/vm_runner/src/impls/playground.rs +++ b/core/node/vm_runner/src/impls/playground.rs @@ -6,26 +6,40 @@ use std::{ use anyhow::Context as _; use async_trait::async_trait; +use serde::Serialize; use tokio::{ fs, sync::{oneshot, watch}, }; use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; +use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck}; use zksync_state::RocksdbStorage; -use zksync_state_keeper::{BatchExecutor, StateKeeperOutputHandler, UpdatesManager}; -use zksync_types::{L1BatchNumber, L2ChainId}; +use zksync_state_keeper::{MainBatchExecutor, StateKeeperOutputHandler, UpdatesManager}; +use zksync_types::{vm::FastVmMode, L1BatchNumber, L2ChainId}; use crate::{ ConcurrentOutputHandlerFactory, ConcurrentOutputHandlerFactoryTask, OutputHandlerFactory, StorageSyncTask, VmRunner, VmRunnerIo, VmRunnerStorage, }; +#[derive(Debug, Serialize)] +struct VmPlaygroundHealth { + vm_mode: FastVmMode, + last_processed_batch: L1BatchNumber, +} + +impl From for Health { + fn from(health: VmPlaygroundHealth) -> Self { + Health::from(HealthStatus::Ready).with_details(health) + } +} + /// Virtual machine playground. Does not persist anything in Postgres; instead, keeps an L1 batch cursor as a plain text file in the RocksDB directory /// (so that the playground doesn't repeatedly process same batches after a restart). #[derive(Debug)] pub struct VmPlayground { pool: ConnectionPool, - batch_executor: Box, + batch_executor: MainBatchExecutor, rocksdb_path: String, chain_id: L2ChainId, io: VmPlaygroundIo, @@ -39,14 +53,14 @@ impl VmPlayground { /// Creates a new playground. pub async fn new( pool: ConnectionPool, - batch_executor: Box, + vm_mode: FastVmMode, rocksdb_path: String, chain_id: L2ChainId, first_processed_batch: L1BatchNumber, reset_state: bool, ) -> anyhow::Result<(Self, VmPlaygroundTasks)> { tracing::info!( - "Starting VM playground with executor {batch_executor:?}, first processed batch is #{first_processed_batch} \ + "Starting VM playground with mode {vm_mode:?}, first processed batch is #{first_processed_batch} \ (reset processing: {reset_state:?})" ); @@ -59,9 +73,14 @@ impl VmPlayground { latest_processed_batch.unwrap_or(first_processed_batch) }; + let mut batch_executor = MainBatchExecutor::new(false, false); + batch_executor.set_fast_vm_mode(vm_mode); + let io = VmPlaygroundIo { cursor_file_path, + vm_mode, latest_processed_batch: Arc::new(watch::channel(latest_processed_batch).0), + health_updater: Arc::new(ReactiveHealthCheck::new("vm_playground").1), }; let (output_handler_factory, output_handler_factory_task) = ConcurrentOutputHandlerFactory::new( @@ -92,6 +111,11 @@ impl VmPlayground { )) } + /// Returns a health check for this component. + pub fn health_check(&self) -> ReactiveHealthCheck { + self.io.health_updater.subscribe() + } + #[cfg(test)] pub(crate) fn io(&self) -> &VmPlaygroundIo { &self.io @@ -123,6 +147,8 @@ impl VmPlayground { .with_context(|| format!("cannot create dir `{}`", self.rocksdb_path))?; if let Some(reset_to_batch) = self.reset_to_batch { + self.io.health_updater.update(HealthStatus::Affected.into()); + self.reset_rocksdb_cache(reset_to_batch).await?; self.io .write_cursor(reset_to_batch) @@ -131,6 +157,8 @@ impl VmPlayground { tracing::info!("Finished resetting playground state"); } + self.io.update_health(); + let (loader, loader_task) = VmRunnerStorage::new( self.pool.clone(), self.rocksdb_path, @@ -144,7 +172,7 @@ impl VmPlayground { Box::new(self.io), Arc::new(loader), Box::new(self.output_handler_factory), - self.batch_executor, + Box::new(self.batch_executor), ); vm_runner.run(stop_receiver).await } @@ -184,9 +212,11 @@ pub struct VmPlaygroundTasks { #[derive(Debug, Clone)] pub struct VmPlaygroundIo { cursor_file_path: PathBuf, + vm_mode: FastVmMode, // We don't read this value from the cursor file in the `VmRunnerIo` implementation because reads / writes // aren't guaranteed to be atomic. latest_processed_batch: Arc>, + health_updater: Arc, } impl VmPlaygroundIo { @@ -218,6 +248,14 @@ impl VmPlaygroundIo { }) } + fn update_health(&self) { + let health = VmPlaygroundHealth { + vm_mode: self.vm_mode, + last_processed_batch: *self.latest_processed_batch.borrow(), + }; + self.health_updater.update(health.into()); + } + #[cfg(test)] pub(crate) fn subscribe_to_completed_batches(&self) -> watch::Receiver { self.latest_processed_batch.subscribe() @@ -268,6 +306,7 @@ impl VmRunnerIo for VmPlaygroundIo { self.write_cursor(l1_batch_number).await?; // We should only update the in-memory value after the write to the cursor file succeeded. self.latest_processed_batch.send_replace(l1_batch_number); + self.update_health(); Ok(()) } } diff --git a/core/node/vm_runner/src/tests/playground.rs b/core/node/vm_runner/src/tests/playground.rs index 772917f00d6..c4111f73741 100644 --- a/core/node/vm_runner/src/tests/playground.rs +++ b/core/node/vm_runner/src/tests/playground.rs @@ -1,8 +1,8 @@ use test_casing::test_casing; use tokio::sync::watch; +use zksync_health_check::HealthStatus; use zksync_node_genesis::{insert_genesis_batch, GenesisParams}; use zksync_state::RocksdbStorage; -use zksync_state_keeper::MainBatchExecutor; use zksync_types::vm::FastVmMode; use super::*; @@ -33,11 +33,9 @@ async fn run_playground( .unwrap(); } - let mut batch_executor = MainBatchExecutor::new(false, false); - batch_executor.set_fast_vm_mode(FastVmMode::Shadow); let (playground, playground_tasks) = VmPlayground::new( pool.clone(), - Box::new(batch_executor), + FastVmMode::Shadow, rocksdb_dir.path().to_str().unwrap().to_owned(), genesis_params.config().l2_chain_id, L1BatchNumber(0), @@ -62,6 +60,7 @@ async fn run_playground( .unwrap(), L1BatchNumber(1) ); + let mut health_check = playground.health_check(); let mut completed_batches = playground_io.subscribe_to_completed_batches(); let task_handles = [ @@ -78,6 +77,16 @@ async fn run_playground( .wait_for(|&number| number == L1BatchNumber(1)) .await .unwrap(); + health_check + .wait_for(|health| { + if !matches!(health.status(), HealthStatus::Ready) { + return false; + } + let health_details = health.details().unwrap(); + assert_eq!(health_details["vm_mode"], "shadow"); + health_details["last_processed_batch"] == 1_u64 + }) + .await; // Check that playground I/O works correctly. assert_eq!( diff --git a/core/tests/ts-integration/src/context-owner.ts b/core/tests/ts-integration/src/context-owner.ts index 634e8c950a6..6cc2bed0a8d 100644 --- a/core/tests/ts-integration/src/context-owner.ts +++ b/core/tests/ts-integration/src/context-owner.ts @@ -2,7 +2,7 @@ import * as zksync from 'zksync-ethers'; import * as ethers from 'ethers'; import { BigNumberish } from 'ethers'; -import { TestContext, TestEnvironment, TestWallets } from './types'; +import { NodeMode, TestContext, TestEnvironment, TestWallets } from './types'; import { lookupPrerequisites } from './prerequisites'; import { Reporter } from './reporter'; import { scaledGasPrice } from './helpers'; @@ -541,6 +541,64 @@ export class TestContextOwner { this.reporter.finishAction(); } + /** + * Waits until the VM playground processes all L1 batches. If the playground runs the new VM in the shadow mode, this means + * that there are no divergence in old and new VM execution. Outputs a warning if the VM playground isn't run or runs not in the shadow mode. + */ + private async waitForVmPlayground() { + while (true) { + const lastProcessedBatch = await this.lastPlaygroundBatch(); + if (lastProcessedBatch === undefined) { + this.reporter.warn('The node does not run VM playground; run to check old / new VM divergence'); + break; + } + const lastNodeBatch = await this.l2Provider.getL1BatchNumber(); + + this.reporter.debug(`VM playground progress: L1 batch #${lastProcessedBatch} / ${lastNodeBatch}`); + if (lastProcessedBatch >= lastNodeBatch) { + break; + } + await zksync.utils.sleep(500); + } + } + + /** + * Returns the number of the last L1 batch processed by the VM playground, taking it from the node health endpoint. + * Returns `undefined` if the VM playground isn't run or doesn't have the shadow mode. + */ + private async lastPlaygroundBatch() { + interface VmPlaygroundHealth { + readonly status: string; + readonly details?: { + vm_mode?: string; + last_processed_batch?: number; + }; + } + + interface NodeHealth { + readonly components: { + vm_playground?: VmPlaygroundHealth; + }; + } + + const healthcheckPort = process.env.API_HEALTHCHECK_PORT ?? '3071'; + const nodeHealth = (await (await fetch(`http://127.0.0.1:${healthcheckPort}/health`)).json()) as NodeHealth; + const playgroundHealth = nodeHealth.components.vm_playground; + if (playgroundHealth === undefined) { + return undefined; + } + if (playgroundHealth.status !== 'ready') { + throw new Error(`Unexpected VM playground health status: ${playgroundHealth.status}`); + } + if (playgroundHealth.details?.vm_mode !== 'shadow') { + this.reporter.warn( + `VM playground mode is '${playgroundHealth.details?.vm_mode}'; should be set to 'shadow' to check VM divergence` + ); + return undefined; + } + return playgroundHealth.details?.last_processed_batch ?? 0; + } + /** * Performs context deinitialization. */ @@ -548,10 +606,16 @@ export class TestContextOwner { // Reset the reporter context. this.reporter = new Reporter(); try { + if (this.env.nodeMode == NodeMode.Main && this.env.network === 'localhost') { + // Check that the VM execution hasn't diverged using the VM playground. The component and thus the main node + // will crash on divergence, so we just need to make sure that the test doesn't exit before the VM playground + // processes all batches on the node. + this.reporter.startAction('Waiting for VM playground to catch up'); + await this.waitForVmPlayground(); + this.reporter.finishAction(); + } this.reporter.startAction(`Tearing down the context`); - await this.collectFunds(); - this.reporter.finishAction(); } catch (error: any) { // Report the issue to the console and mark the last action as failed. diff --git a/core/tests/ts-integration/src/reporter.ts b/core/tests/ts-integration/src/reporter.ts index 114ff2a7f5c..903ff3101ef 100644 --- a/core/tests/ts-integration/src/reporter.ts +++ b/core/tests/ts-integration/src/reporter.ts @@ -88,8 +88,8 @@ export class Reporter { /** * Prints an error message to the console. */ - error(message: string) { - console.log(this.indent(`${errorPrefix('Error:')}: ${fail(message)}`)); + error(message: string, ...args: any[]) { + console.log(this.indent(`${errorPrefix('Error:')}: ${fail(message)}`), ...args); } /** diff --git a/etc/env/base/vm_runner.toml b/etc/env/base/vm_runner.toml index dd8e9915280..4835706b74d 100644 --- a/etc/env/base/vm_runner.toml +++ b/etc/env/base/vm_runner.toml @@ -17,3 +17,9 @@ db_path = "./db/main/basic_witness_input_producer" window_size = 3 # All batches before this one (inclusive) are always considered to be processed. first_processed_batch = 0 + +[vm_runner.playground] +# Path to the directory that contains RocksDB with protective reads writer cache. +db_path = "./db/main/vm_playground" +# Mode in which to run the new fast VM +fast_vm_mode = "shadow" diff --git a/etc/env/file_based/general.yaml b/etc/env/file_based/general.yaml index 7914ece95c7..120e4687ca6 100644 --- a/etc/env/file_based/general.yaml +++ b/etc/env/file_based/general.yaml @@ -342,9 +342,13 @@ protective_reads_writer: first_processed_batch: 0 basic_witness_input_producer: - db_path: "./db/main/basic_witness_input_producer" - window_size: 3 - first_processed_batch: 0 + db_path: "./db/main/basic_witness_input_producer" + window_size: 3 + first_processed_batch: 0 + +vm_playground: + db_path: "./db/main/vm_playground" + fast_vm_mode: SHADOW snapshot_recovery: enabled: false