Skip to content

Commit

Permalink
refactor(vm-runner): Allow switching between VMs for latest protocol …
Browse files Browse the repository at this point in the history
…version – follow ups (#2567)

## What ❔

Various follow-ups after
#2508:

- Adds VM playground config to `etc/env`.
- Adds a health check for the VM playground.
- Runs VM playground in server integration tests and checks it on
teardown.

## Why ❔

Improves maintainability and test coverage.
  • Loading branch information
slowli authored Aug 5, 2024
1 parent 77b6d81 commit c75f6e1
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 26 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci-core-reusable.yml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ jobs:
base_token: ["Eth", "Custom"]
deployment_mode: ["Rollup", "Validium"]
env:
SERVER_COMPONENTS: "api,tree,eth,state_keeper,housekeeper,commitment_generator,vm_runner_protective_reads,vm_runner_bwip,da_dispatcher${{ matrix.consensus && ',consensus' || '' }}${{ matrix.base_token == 'Custom' && ',base_token_ratio_persister' || '' }}"
SERVER_COMPONENTS: "api,tree,eth,state_keeper,housekeeper,commitment_generator,vm_runner_protective_reads,vm_runner_bwip,vm_playground,da_dispatcher${{ matrix.consensus && ',consensus' || '' }}${{ matrix.base_token == 'Custom' && ',base_token_ratio_persister' || '' }}"

runs-on: [matterlabs-ci-runner]
steps:
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use async_trait::async_trait;
use zksync_config::configs::ExperimentalVmPlaygroundConfig;
use zksync_node_framework_derive::{FromContext, IntoContext};
use zksync_state_keeper::MainBatchExecutor;
use zksync_types::L2ChainId;
use zksync_vm_runner::{
impls::{VmPlayground, VmPlaygroundIo, VmPlaygroundLoaderTask},
ConcurrentOutputHandlerFactoryTask,
};

use crate::{
implementations::resources::pools::{MasterPool, PoolResource},
implementations::resources::{
healthcheck::AppHealthCheckResource,
pools::{MasterPool, PoolResource},
},
StopReceiver, Task, TaskId, WiringError, WiringLayer,
};

Expand All @@ -32,6 +34,8 @@ impl VmPlaygroundLayer {
#[context(crate = crate)]
pub struct Input {
pub master_pool: PoolResource<MasterPool>,
#[context(default)]
pub app_health: AppHealthCheckResource,
}

#[derive(Debug, IntoContext)]
Expand All @@ -55,7 +59,10 @@ impl WiringLayer for VmPlaygroundLayer {
}

async fn wire(self, input: Self::Input) -> Result<Self::Output, WiringError> {
let Input { master_pool } = input;
let Input {
master_pool,
app_health,
} = input;

// - 1 connection for `StorageSyncTask` which can hold a long-term connection in case it needs to
// catch up cache.
Expand All @@ -64,19 +71,21 @@ impl WiringLayer for VmPlaygroundLayer {
// - 1 connection for the only running VM instance.
let connection_pool = master_pool.get_custom(3).await?;

let mut batch_executor = Box::new(MainBatchExecutor::new(false, false));
batch_executor.set_fast_vm_mode(self.config.fast_vm_mode);

let (playground, tasks) = VmPlayground::new(
connection_pool,
batch_executor,
self.config.fast_vm_mode,
self.config.db_path,
self.zksync_network_id,
self.config.first_processed_batch,
self.config.reset,
)
.await?;

app_health
.0
.insert_component(playground.health_check())
.map_err(WiringError::internal)?;

Ok(Output {
output_handler_factory_task: tasks.output_handler_factory_task,
loader_task: tasks.loader_task,
Expand Down
2 changes: 2 additions & 0 deletions core/node/vm_runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ zksync_utils.workspace = true
zksync_prover_interface.workspace = true
zksync_object_store.workspace = true
zksync_vm_utils.workspace = true
zksync_health_check.workspace = true

serde.workspace = true
tokio = { workspace = true, features = ["time"] }
anyhow.workspace = true
async-trait.workspace = true
Expand Down
51 changes: 45 additions & 6 deletions core/node/vm_runner/src/impls/playground.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,40 @@ use std::{

use anyhow::Context as _;
use async_trait::async_trait;
use serde::Serialize;
use tokio::{
fs,
sync::{oneshot, watch},
};
use zksync_dal::{Connection, ConnectionPool, Core, CoreDal};
use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck};
use zksync_state::RocksdbStorage;
use zksync_state_keeper::{BatchExecutor, StateKeeperOutputHandler, UpdatesManager};
use zksync_types::{L1BatchNumber, L2ChainId};
use zksync_state_keeper::{MainBatchExecutor, StateKeeperOutputHandler, UpdatesManager};
use zksync_types::{vm::FastVmMode, L1BatchNumber, L2ChainId};

use crate::{
ConcurrentOutputHandlerFactory, ConcurrentOutputHandlerFactoryTask, OutputHandlerFactory,
StorageSyncTask, VmRunner, VmRunnerIo, VmRunnerStorage,
};

#[derive(Debug, Serialize)]
struct VmPlaygroundHealth {
vm_mode: FastVmMode,
last_processed_batch: L1BatchNumber,
}

impl From<VmPlaygroundHealth> for Health {
fn from(health: VmPlaygroundHealth) -> Self {
Health::from(HealthStatus::Ready).with_details(health)
}
}

/// Virtual machine playground. Does not persist anything in Postgres; instead, keeps an L1 batch cursor as a plain text file in the RocksDB directory
/// (so that the playground doesn't repeatedly process same batches after a restart).
#[derive(Debug)]
pub struct VmPlayground {
pool: ConnectionPool<Core>,
batch_executor: Box<dyn BatchExecutor>,
batch_executor: MainBatchExecutor,
rocksdb_path: String,
chain_id: L2ChainId,
io: VmPlaygroundIo,
Expand All @@ -39,14 +53,14 @@ impl VmPlayground {
/// Creates a new playground.
pub async fn new(
pool: ConnectionPool<Core>,
batch_executor: Box<dyn BatchExecutor>,
vm_mode: FastVmMode,
rocksdb_path: String,
chain_id: L2ChainId,
first_processed_batch: L1BatchNumber,
reset_state: bool,
) -> anyhow::Result<(Self, VmPlaygroundTasks)> {
tracing::info!(
"Starting VM playground with executor {batch_executor:?}, first processed batch is #{first_processed_batch} \
"Starting VM playground with mode {vm_mode:?}, first processed batch is #{first_processed_batch} \
(reset processing: {reset_state:?})"
);

Expand All @@ -59,9 +73,14 @@ impl VmPlayground {
latest_processed_batch.unwrap_or(first_processed_batch)
};

let mut batch_executor = MainBatchExecutor::new(false, false);
batch_executor.set_fast_vm_mode(vm_mode);

let io = VmPlaygroundIo {
cursor_file_path,
vm_mode,
latest_processed_batch: Arc::new(watch::channel(latest_processed_batch).0),
health_updater: Arc::new(ReactiveHealthCheck::new("vm_playground").1),
};
let (output_handler_factory, output_handler_factory_task) =
ConcurrentOutputHandlerFactory::new(
Expand Down Expand Up @@ -92,6 +111,11 @@ impl VmPlayground {
))
}

/// Returns a health check for this component.
pub fn health_check(&self) -> ReactiveHealthCheck {
self.io.health_updater.subscribe()
}

#[cfg(test)]
pub(crate) fn io(&self) -> &VmPlaygroundIo {
&self.io
Expand Down Expand Up @@ -123,6 +147,8 @@ impl VmPlayground {
.with_context(|| format!("cannot create dir `{}`", self.rocksdb_path))?;

if let Some(reset_to_batch) = self.reset_to_batch {
self.io.health_updater.update(HealthStatus::Affected.into());

self.reset_rocksdb_cache(reset_to_batch).await?;
self.io
.write_cursor(reset_to_batch)
Expand All @@ -131,6 +157,8 @@ impl VmPlayground {
tracing::info!("Finished resetting playground state");
}

self.io.update_health();

let (loader, loader_task) = VmRunnerStorage::new(
self.pool.clone(),
self.rocksdb_path,
Expand All @@ -144,7 +172,7 @@ impl VmPlayground {
Box::new(self.io),
Arc::new(loader),
Box::new(self.output_handler_factory),
self.batch_executor,
Box::new(self.batch_executor),
);
vm_runner.run(stop_receiver).await
}
Expand Down Expand Up @@ -184,9 +212,11 @@ pub struct VmPlaygroundTasks {
#[derive(Debug, Clone)]
pub struct VmPlaygroundIo {
cursor_file_path: PathBuf,
vm_mode: FastVmMode,
// We don't read this value from the cursor file in the `VmRunnerIo` implementation because reads / writes
// aren't guaranteed to be atomic.
latest_processed_batch: Arc<watch::Sender<L1BatchNumber>>,
health_updater: Arc<HealthUpdater>,
}

impl VmPlaygroundIo {
Expand Down Expand Up @@ -218,6 +248,14 @@ impl VmPlaygroundIo {
})
}

fn update_health(&self) {
let health = VmPlaygroundHealth {
vm_mode: self.vm_mode,
last_processed_batch: *self.latest_processed_batch.borrow(),
};
self.health_updater.update(health.into());
}

#[cfg(test)]
pub(crate) fn subscribe_to_completed_batches(&self) -> watch::Receiver<L1BatchNumber> {
self.latest_processed_batch.subscribe()
Expand Down Expand Up @@ -268,6 +306,7 @@ impl VmRunnerIo for VmPlaygroundIo {
self.write_cursor(l1_batch_number).await?;
// We should only update the in-memory value after the write to the cursor file succeeded.
self.latest_processed_batch.send_replace(l1_batch_number);
self.update_health();
Ok(())
}
}
Expand Down
17 changes: 13 additions & 4 deletions core/node/vm_runner/src/tests/playground.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use test_casing::test_casing;
use tokio::sync::watch;
use zksync_health_check::HealthStatus;
use zksync_node_genesis::{insert_genesis_batch, GenesisParams};
use zksync_state::RocksdbStorage;
use zksync_state_keeper::MainBatchExecutor;
use zksync_types::vm::FastVmMode;

use super::*;
Expand Down Expand Up @@ -33,11 +33,9 @@ async fn run_playground(
.unwrap();
}

let mut batch_executor = MainBatchExecutor::new(false, false);
batch_executor.set_fast_vm_mode(FastVmMode::Shadow);
let (playground, playground_tasks) = VmPlayground::new(
pool.clone(),
Box::new(batch_executor),
FastVmMode::Shadow,
rocksdb_dir.path().to_str().unwrap().to_owned(),
genesis_params.config().l2_chain_id,
L1BatchNumber(0),
Expand All @@ -62,6 +60,7 @@ async fn run_playground(
.unwrap(),
L1BatchNumber(1)
);
let mut health_check = playground.health_check();

let mut completed_batches = playground_io.subscribe_to_completed_batches();
let task_handles = [
Expand All @@ -78,6 +77,16 @@ async fn run_playground(
.wait_for(|&number| number == L1BatchNumber(1))
.await
.unwrap();
health_check
.wait_for(|health| {
if !matches!(health.status(), HealthStatus::Ready) {
return false;
}
let health_details = health.details().unwrap();
assert_eq!(health_details["vm_mode"], "shadow");
health_details["last_processed_batch"] == 1_u64
})
.await;

// Check that playground I/O works correctly.
assert_eq!(
Expand Down
70 changes: 67 additions & 3 deletions core/tests/ts-integration/src/context-owner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as zksync from 'zksync-ethers';
import * as ethers from 'ethers';
import { BigNumberish } from 'ethers';

import { TestContext, TestEnvironment, TestWallets } from './types';
import { NodeMode, TestContext, TestEnvironment, TestWallets } from './types';
import { lookupPrerequisites } from './prerequisites';
import { Reporter } from './reporter';
import { scaledGasPrice } from './helpers';
Expand Down Expand Up @@ -541,17 +541,81 @@ export class TestContextOwner {
this.reporter.finishAction();
}

/**
* Waits until the VM playground processes all L1 batches. If the playground runs the new VM in the shadow mode, this means
* that there are no divergence in old and new VM execution. Outputs a warning if the VM playground isn't run or runs not in the shadow mode.
*/
private async waitForVmPlayground() {
while (true) {
const lastProcessedBatch = await this.lastPlaygroundBatch();
if (lastProcessedBatch === undefined) {
this.reporter.warn('The node does not run VM playground; run to check old / new VM divergence');
break;
}
const lastNodeBatch = await this.l2Provider.getL1BatchNumber();

this.reporter.debug(`VM playground progress: L1 batch #${lastProcessedBatch} / ${lastNodeBatch}`);
if (lastProcessedBatch >= lastNodeBatch) {
break;
}
await zksync.utils.sleep(500);
}
}

/**
* Returns the number of the last L1 batch processed by the VM playground, taking it from the node health endpoint.
* Returns `undefined` if the VM playground isn't run or doesn't have the shadow mode.
*/
private async lastPlaygroundBatch() {
interface VmPlaygroundHealth {
readonly status: string;
readonly details?: {
vm_mode?: string;
last_processed_batch?: number;
};
}

interface NodeHealth {
readonly components: {
vm_playground?: VmPlaygroundHealth;
};
}

const healthcheckPort = process.env.API_HEALTHCHECK_PORT ?? '3071';
const nodeHealth = (await (await fetch(`http://127.0.0.1:${healthcheckPort}/health`)).json()) as NodeHealth;
const playgroundHealth = nodeHealth.components.vm_playground;
if (playgroundHealth === undefined) {
return undefined;
}
if (playgroundHealth.status !== 'ready') {
throw new Error(`Unexpected VM playground health status: ${playgroundHealth.status}`);
}
if (playgroundHealth.details?.vm_mode !== 'shadow') {
this.reporter.warn(
`VM playground mode is '${playgroundHealth.details?.vm_mode}'; should be set to 'shadow' to check VM divergence`
);
return undefined;
}
return playgroundHealth.details?.last_processed_batch ?? 0;
}

/**
* Performs context deinitialization.
*/
async teardownContext() {
// Reset the reporter context.
this.reporter = new Reporter();
try {
if (this.env.nodeMode == NodeMode.Main && this.env.network === 'localhost') {
// Check that the VM execution hasn't diverged using the VM playground. The component and thus the main node
// will crash on divergence, so we just need to make sure that the test doesn't exit before the VM playground
// processes all batches on the node.
this.reporter.startAction('Waiting for VM playground to catch up');
await this.waitForVmPlayground();
this.reporter.finishAction();
}
this.reporter.startAction(`Tearing down the context`);

await this.collectFunds();

this.reporter.finishAction();
} catch (error: any) {
// Report the issue to the console and mark the last action as failed.
Expand Down
Loading

0 comments on commit c75f6e1

Please sign in to comment.