From 7c8e24ce7d6e6d47359d5ae4ab1db4ddbd3e9441 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Fri, 14 Jun 2024 15:39:04 +0300 Subject: [PATCH] perf(db): Improve storage switching for state keeper cache (#2234) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Improves switching logic between Postgres and RocksDB for SK cache. With these changes, RocksDB is guaranteed to be used if it's up to date when it's opened. ## Why ❔ Previously, Postgres could be used after node start (primarily if there's a pending L1 batch) even if RocksDB is up to date. This is caused by potential delays when opening RocksDB. This behavior was observed in the wild, e.g. for a mainnet full node with pruning enabled. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. - [x] Spellcheck has been run via `zk spellcheck`. --- core/lib/state/src/catchup.rs | 226 ++++++++++++++++-- core/lib/state/src/lib.rs | 2 +- core/lib/state/src/storage_factory.rs | 2 +- .../state_keeper/src/state_keeper_storage.rs | 76 +++--- core/node/vm_runner/src/storage.rs | 25 +- 5 files changed, 255 insertions(+), 76 deletions(-) diff --git a/core/lib/state/src/catchup.rs b/core/lib/state/src/catchup.rs index 4adf7547a30..139e10ea19f 100644 --- a/core/lib/state/src/catchup.rs +++ b/core/lib/state/src/catchup.rs @@ -1,7 +1,6 @@ -use std::{sync::Arc, time::Instant}; +use std::{error, fmt, time::Instant}; use anyhow::Context; -use once_cell::sync::OnceCell; use tokio::sync::watch; use zksync_dal::{ConnectionPool, Core}; use zksync_shared_metrics::{SnapshotRecoveryStage, APP_METRICS}; @@ -10,6 +9,85 @@ use zksync_types::L1BatchNumber; use crate::{RocksdbStorage, RocksdbStorageOptions, StateKeeperColumnFamily}; +/// Initial RocksDB cache state returned by [`RocksdbCell::ensure_initialized()`]. +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct InitialRocksdbState { + /// Last processed L1 batch number in the RocksDB cache + 1 (i.e., the batch that the cache is ready to process). + /// `None` if the cache is empty (i.e., needs recovery). + pub l1_batch_number: Option, +} + +/// Error returned from [`RocksdbCell`] methods if the corresponding [`AsyncCatchupTask`] has failed +/// or was canceled. +#[derive(Debug)] +pub struct AsyncCatchupFailed(()); + +impl fmt::Display for AsyncCatchupFailed { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter.write_str("Async RocksDB cache catchup failed or was canceled") + } +} + +impl error::Error for AsyncCatchupFailed {} + +/// `OnceCell` equivalent that can be `.await`ed. Correspondingly, it has the following invariants: +/// +/// - The cell is only set once +/// - The cell is always set to `Some(_)`. +/// +/// `OnceCell` (either from `once_cell` or `tokio`) is not used because it lacks a way to wait for the cell +/// to be initialized. `once_cell::sync::OnceCell` has a blocking `wait()` method, but since it's blocking, +/// it risks spawning non-cancellable threads if misused. +type AsyncOnceCell = watch::Receiver>; + +/// A lazily initialized handle to RocksDB cache returned from [`AsyncCatchupTask::new()`]. +#[derive(Debug)] +pub struct RocksdbCell { + initial_state: AsyncOnceCell, + db: AsyncOnceCell>, +} + +impl RocksdbCell { + /// Waits until RocksDB is initialized and returns it. + /// + /// # Errors + /// + /// Returns an error if the async catch-up task failed or was canceled before initialization. + #[allow(clippy::missing_panics_doc)] // false positive + pub async fn wait(&self) -> Result, AsyncCatchupFailed> { + self.db + .clone() + .wait_for(Option::is_some) + .await + // `unwrap` below is safe by construction + .map(|db| db.clone().unwrap()) + .map_err(|_| AsyncCatchupFailed(())) + } + + /// Gets a RocksDB instance if it has been initialized. + pub fn get(&self) -> Option> { + self.db.borrow().clone() + } + + /// Ensures that the RocksDB has started catching up, and returns the **initial** RocksDB state + /// at the start of the catch-up. + /// + /// # Errors + /// + /// Returns an error if the async catch-up task failed or was canceled. + #[allow(clippy::missing_panics_doc)] // false positive + pub async fn ensure_initialized(&self) -> Result { + self.initial_state + .clone() + .wait_for(Option::is_some) + .await + // `unwrap` below is safe by construction + .map(|state| state.clone().unwrap()) + .map_err(|_| AsyncCatchupFailed(())) + } +} + /// A runnable task that blocks until the provided RocksDB cache instance is caught up with /// Postgres. /// @@ -19,27 +97,41 @@ pub struct AsyncCatchupTask { pool: ConnectionPool, state_keeper_db_path: String, state_keeper_db_options: RocksdbStorageOptions, - rocksdb_cell: Arc>>, + initial_state_sender: watch::Sender>, + db_sender: watch::Sender>>, to_l1_batch_number: Option, } impl AsyncCatchupTask { /// Create a new catch-up task with the provided Postgres and RocksDB instances. Optionally /// accepts the last L1 batch number to catch up to (defaults to latest if not specified). - pub fn new( - pool: ConnectionPool, - state_keeper_db_path: String, - state_keeper_db_options: RocksdbStorageOptions, - rocksdb_cell: Arc>>, - to_l1_batch_number: Option, - ) -> Self { - Self { + pub fn new(pool: ConnectionPool, state_keeper_db_path: String) -> (Self, RocksdbCell) { + let (initial_state_sender, initial_state) = watch::channel(None); + let (db_sender, db) = watch::channel(None); + let this = Self { pool, state_keeper_db_path, - state_keeper_db_options, - rocksdb_cell, - to_l1_batch_number, - } + state_keeper_db_options: RocksdbStorageOptions::default(), + initial_state_sender, + db_sender, + to_l1_batch_number: None, + }; + (this, RocksdbCell { initial_state, db }) + } + + /// Sets RocksDB options. + #[must_use] + pub fn with_db_options(mut self, options: RocksdbStorageOptions) -> Self { + self.state_keeper_db_options = options; + self + } + + /// Sets the L1 batch number to catch up. By default, the task will catch up to the latest L1 batch + /// (at the start of catch-up). + #[must_use] + pub fn with_target_l1_batch_number(mut self, number: L1BatchNumber) -> Self { + self.to_l1_batch_number = Some(number); + self } /// Block until RocksDB cache instance is caught up with Postgres. @@ -47,9 +139,10 @@ impl AsyncCatchupTask { /// # Errors /// /// Propagates RocksDB and Postgres errors. + #[tracing::instrument(name = "catch_up", skip_all, fields(target_l1_batch = ?self.to_l1_batch_number))] pub async fn run(self, stop_receiver: watch::Receiver) -> anyhow::Result<()> { let started_at = Instant::now(); - tracing::debug!("Catching up RocksDB asynchronously"); + tracing::info!("Catching up RocksDB asynchronously"); let mut rocksdb_builder = RocksdbStorage::builder_with_options( self.state_keeper_db_path.as_ref(), @@ -58,6 +151,12 @@ impl AsyncCatchupTask { .await .context("Failed creating RocksDB storage builder")?; + let initial_state = InitialRocksdbState { + l1_batch_number: rocksdb_builder.l1_batch_number().await, + }; + tracing::info!("Initialized RocksDB catchup from state: {initial_state:?}"); + self.initial_state_sender.send_replace(Some(initial_state)); + let mut connection = self.pool.connection_tagged("state_keeper").await?; let was_recovered_from_snapshot = rocksdb_builder .ensure_ready(&mut connection, &stop_receiver) @@ -76,12 +175,101 @@ impl AsyncCatchupTask { .context("Failed to catch up RocksDB to Postgres")?; drop(connection); if let Some(rocksdb) = rocksdb { - self.rocksdb_cell - .set(rocksdb.into_rocksdb()) - .map_err(|_| anyhow::anyhow!("Async RocksDB cache was initialized twice"))?; + self.db_sender.send_replace(Some(rocksdb.into_rocksdb())); } else { tracing::info!("Synchronizing RocksDB interrupted"); } Ok(()) } } + +#[cfg(test)] +mod tests { + use tempfile::TempDir; + use test_casing::test_casing; + use zksync_types::L2BlockNumber; + + use super::*; + use crate::{ + test_utils::{create_l1_batch, create_l2_block, gen_storage_logs, prepare_postgres}, + RocksdbStorageBuilder, + }; + + #[tokio::test] + async fn catching_up_basics() { + let pool = ConnectionPool::::test_pool().await; + let mut conn = pool.connection().await.unwrap(); + prepare_postgres(&mut conn).await; + let storage_logs = gen_storage_logs(20..40); + create_l2_block(&mut conn, L2BlockNumber(1), storage_logs.clone()).await; + create_l1_batch(&mut conn, L1BatchNumber(1), &storage_logs).await; + drop(conn); + + let temp_dir = TempDir::new().unwrap(); + let (task, rocksdb_cell) = + AsyncCatchupTask::new(pool.clone(), temp_dir.path().to_str().unwrap().to_owned()); + let (_stop_sender, stop_receiver) = watch::channel(false); + let task_handle = tokio::spawn(task.run(stop_receiver)); + + let initial_state = rocksdb_cell.ensure_initialized().await.unwrap(); + assert_eq!(initial_state.l1_batch_number, None); + + let db = rocksdb_cell.wait().await.unwrap(); + assert_eq!( + RocksdbStorageBuilder::from_rocksdb(db) + .l1_batch_number() + .await, + Some(L1BatchNumber(2)) + ); + task_handle.await.unwrap().unwrap(); + drop(rocksdb_cell); // should be enough to release RocksDB lock + + let (task, rocksdb_cell) = + AsyncCatchupTask::new(pool, temp_dir.path().to_str().unwrap().to_owned()); + let (_stop_sender, stop_receiver) = watch::channel(false); + let task_handle = tokio::spawn(task.run(stop_receiver)); + + let initial_state = rocksdb_cell.ensure_initialized().await.unwrap(); + assert_eq!(initial_state.l1_batch_number, Some(L1BatchNumber(2))); + + task_handle.await.unwrap().unwrap(); + rocksdb_cell.get().unwrap(); // RocksDB must be caught up at this point + } + + #[derive(Debug)] + enum CancellationScenario { + DropTask, + CancelTask, + } + + impl CancellationScenario { + const ALL: [Self; 2] = [Self::DropTask, Self::CancelTask]; + } + + #[test_casing(2, CancellationScenario::ALL)] + #[tokio::test] + async fn catching_up_cancellation(scenario: CancellationScenario) { + let pool = ConnectionPool::::test_pool().await; + let mut conn = pool.connection().await.unwrap(); + prepare_postgres(&mut conn).await; + let storage_logs = gen_storage_logs(20..40); + create_l2_block(&mut conn, L2BlockNumber(1), storage_logs.clone()).await; + create_l1_batch(&mut conn, L1BatchNumber(1), &storage_logs).await; + drop(conn); + + let temp_dir = TempDir::new().unwrap(); + let (task, rocksdb_cell) = + AsyncCatchupTask::new(pool.clone(), temp_dir.path().to_str().unwrap().to_owned()); + let (stop_sender, stop_receiver) = watch::channel(false); + match scenario { + CancellationScenario::DropTask => drop(task), + CancellationScenario::CancelTask => { + stop_sender.send_replace(true); + task.run(stop_receiver).await.unwrap(); + } + } + + assert!(rocksdb_cell.get().is_none()); + rocksdb_cell.wait().await.unwrap_err(); + } +} diff --git a/core/lib/state/src/lib.rs b/core/lib/state/src/lib.rs index cd16f65f41b..1359e62824f 100644 --- a/core/lib/state/src/lib.rs +++ b/core/lib/state/src/lib.rs @@ -30,7 +30,7 @@ mod test_utils; pub use self::{ cache::sequential_cache::SequentialCache, - catchup::AsyncCatchupTask, + catchup::{AsyncCatchupTask, RocksdbCell}, in_memory::InMemoryStorage, // Note, that `test_infra` of the bootloader tests relies on this value to be exposed in_memory::IN_MEMORY_STORAGE_DEFAULT_NETWORK_ID, diff --git a/core/lib/state/src/storage_factory.rs b/core/lib/state/src/storage_factory.rs index 625867b82c4..9f161cbeedf 100644 --- a/core/lib/state/src/storage_factory.rs +++ b/core/lib/state/src/storage_factory.rs @@ -186,7 +186,7 @@ impl ReadStorage for RocksdbWithMemory { } impl ReadStorage for PgOrRocksdbStorage<'_> { - fn read_value(&mut self, key: &StorageKey) -> zksync_types::StorageValue { + fn read_value(&mut self, key: &StorageKey) -> StorageValue { match self { Self::Postgres(postgres) => postgres.read_value(key), Self::Rocksdb(rocksdb) => rocksdb.read_value(key), diff --git a/core/node/state_keeper/src/state_keeper_storage.rs b/core/node/state_keeper/src/state_keeper_storage.rs index e0d7d20bf3f..13cedc3a58a 100644 --- a/core/node/state_keeper/src/state_keeper_storage.rs +++ b/core/node/state_keeper/src/state_keeper_storage.rs @@ -1,15 +1,12 @@ -use std::{fmt::Debug, sync::Arc}; +use std::fmt::Debug; use anyhow::Context; use async_trait::async_trait; -use once_cell::sync::OnceCell; use tokio::sync::watch; use zksync_dal::{ConnectionPool, Core}; use zksync_state::{ - AsyncCatchupTask, PgOrRocksdbStorage, ReadStorageFactory, RocksdbStorageOptions, - StateKeeperColumnFamily, + AsyncCatchupTask, PgOrRocksdbStorage, ReadStorageFactory, RocksdbCell, RocksdbStorageOptions, }; -use zksync_storage::RocksDB; use zksync_types::L1BatchNumber; /// A [`ReadStorageFactory`] implementation that can produce short-lived [`ReadStorage`] handles @@ -17,19 +14,50 @@ use zksync_types::L1BatchNumber; /// variant and is then mutated into `Rocksdb` once RocksDB cache is caught up. After which it /// can never revert back to `Postgres` as we assume RocksDB cannot fall behind under normal state /// keeper operation. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct AsyncRocksdbCache { pool: ConnectionPool, - rocksdb_cell: Arc>>, + rocksdb_cell: RocksdbCell, } impl AsyncRocksdbCache { - async fn access_storage_inner( + pub fn new( + pool: ConnectionPool, + state_keeper_db_path: String, + state_keeper_db_options: RocksdbStorageOptions, + ) -> (Self, AsyncCatchupTask) { + let (task, rocksdb_cell) = AsyncCatchupTask::new(pool.clone(), state_keeper_db_path); + ( + Self { pool, rocksdb_cell }, + task.with_db_options(state_keeper_db_options), + ) + } +} + +#[async_trait] +impl ReadStorageFactory for AsyncRocksdbCache { + #[tracing::instrument(skip(self, stop_receiver))] + async fn access_storage( &self, stop_receiver: &watch::Receiver, l1_batch_number: L1BatchNumber, ) -> anyhow::Result>> { - if let Some(rocksdb) = self.rocksdb_cell.get() { + let initial_state = self.rocksdb_cell.ensure_initialized().await?; + let rocksdb = if initial_state.l1_batch_number >= Some(l1_batch_number) { + tracing::info!( + "RocksDB cache (initial state: {initial_state:?}) doesn't need to catch up to L1 batch #{l1_batch_number}, \ + waiting for it to become available" + ); + // Opening the cache RocksDB can take a couple of seconds, so if we don't wait here, we unnecessarily miss an opportunity + // to use the cache for an entire batch. + Some(self.rocksdb_cell.wait().await?) + } else { + // This clause includes several cases: if the cache needs catching up or recovery, or if `l1_batch_number` + // is not the first processed L1 batch. + self.rocksdb_cell.get() + }; + + if let Some(rocksdb) = rocksdb { let mut connection = self .pool .connection_tagged("state_keeper") @@ -37,7 +65,7 @@ impl AsyncRocksdbCache { .context("Failed getting a Postgres connection")?; PgOrRocksdbStorage::access_storage_rocksdb( &mut connection, - rocksdb.clone(), + rocksdb, stop_receiver, l1_batch_number, ) @@ -51,32 +79,4 @@ impl AsyncRocksdbCache { )) } } - - pub fn new( - pool: ConnectionPool, - state_keeper_db_path: String, - state_keeper_db_options: RocksdbStorageOptions, - ) -> (Self, AsyncCatchupTask) { - let rocksdb_cell = Arc::new(OnceCell::new()); - let task = AsyncCatchupTask::new( - pool.clone(), - state_keeper_db_path, - state_keeper_db_options, - rocksdb_cell.clone(), - None, - ); - (Self { pool, rocksdb_cell }, task) - } -} - -#[async_trait] -impl ReadStorageFactory for AsyncRocksdbCache { - async fn access_storage( - &self, - stop_receiver: &watch::Receiver, - l1_batch_number: L1BatchNumber, - ) -> anyhow::Result>> { - self.access_storage_inner(stop_receiver, l1_batch_number) - .await - } } diff --git a/core/node/vm_runner/src/storage.rs b/core/node/vm_runner/src/storage.rs index 7a53f6034a7..7f4de2725e4 100644 --- a/core/node/vm_runner/src/storage.rs +++ b/core/node/vm_runner/src/storage.rs @@ -8,15 +8,13 @@ use std::{ use anyhow::Context as _; use async_trait::async_trait; use multivm::{interface::L1BatchEnv, vm_1_4_2::SystemEnv}; -use once_cell::sync::OnceCell; use tokio::sync::{watch, RwLock}; use vm_utils::storage::L1BatchParamsProvider; use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; use zksync_state::{ - AsyncCatchupTask, BatchDiff, PgOrRocksdbStorage, ReadStorageFactory, RocksdbStorage, - RocksdbStorageBuilder, RocksdbStorageOptions, RocksdbWithMemory, StateKeeperColumnFamily, + AsyncCatchupTask, BatchDiff, PgOrRocksdbStorage, ReadStorageFactory, RocksdbCell, + RocksdbStorage, RocksdbStorageBuilder, RocksdbWithMemory, }; -use zksync_storage::RocksDB; use zksync_types::{block::L2BlockExecutionData, L1BatchNumber, L2ChainId}; use crate::{metrics::METRICS, VmRunnerIo}; @@ -233,7 +231,7 @@ pub struct StorageSyncTask { pool: ConnectionPool, l1_batch_params_provider: L1BatchParamsProvider, chain_id: L2ChainId, - rocksdb_cell: Arc>>, + rocksdb_cell: RocksdbCell, io: Io, state: Arc>, catchup_task: AsyncCatchupTask, @@ -251,15 +249,10 @@ impl StorageSyncTask { let l1_batch_params_provider = L1BatchParamsProvider::new(&mut conn) .await .context("Failed initializing L1 batch params provider")?; - let rocksdb_cell = Arc::new(OnceCell::new()); - let catchup_task = AsyncCatchupTask::new( - pool.clone(), - rocksdb_path, - RocksdbStorageOptions::default(), - rocksdb_cell.clone(), - Some(io.latest_processed_batch(&mut conn).await?), - ); + let target_l1_batch_number = io.latest_processed_batch(&mut conn).await?; drop(conn); + + let (catchup_task, rocksdb_cell) = AsyncCatchupTask::new(pool.clone(), rocksdb_path); Ok(Self { pool, l1_batch_params_provider, @@ -267,7 +260,7 @@ impl StorageSyncTask { rocksdb_cell, io, state, - catchup_task, + catchup_task: catchup_task.with_target_l1_batch_number(target_l1_batch_number), }) } @@ -286,9 +279,7 @@ impl StorageSyncTask { const SLEEP_INTERVAL: Duration = Duration::from_millis(50); self.catchup_task.run(stop_receiver.clone()).await?; - let rocksdb = self.rocksdb_cell.get().ok_or_else(|| { - anyhow::anyhow!("Expected RocksDB to be initialized by `AsyncCatchupTask`") - })?; + let rocksdb = self.rocksdb_cell.wait().await?; loop { if *stop_receiver.borrow() { tracing::info!("`StorageSyncTask` was interrupted");