Skip to content

Commit

Permalink
feat(merkle-tree): Rework tree rollback (#2207)
Browse files Browse the repository at this point in the history
## What ❔

Reworks tree rollback so that it's supported on the distributed external
node, including the case when a node runs multiple trees.

- Desync with Postgres is now detected on metadata calculator
initialization, and the tree is truncated correspondingly.
- The old approach is left intact as a safety guard.

## Why ❔

Right now, reorg logic on EN relies on a node running a single tree, and
block reverter being a singleton. Both these assumptions are bogus in
case of a distributed EN.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
  • Loading branch information
slowli authored Jun 12, 2024
1 parent 20da566 commit c3b9c38
Show file tree
Hide file tree
Showing 6 changed files with 400 additions and 72 deletions.
6 changes: 6 additions & 0 deletions core/lib/merkle_tree/src/domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ impl ZkSyncTree {
self.tree.latest_root_hash()
}

/// Returns the root hash and leaf count at the specified L1 batch.
pub fn root_info(&self, l1_batch_number: L1BatchNumber) -> Option<(ValueHash, u64)> {
let root = self.tree.root(l1_batch_number.0.into())?;
Some((root.hash(&Blake2Hasher), root.leaf_count()))
}

/// Checks whether this tree is empty.
pub fn is_empty(&self) -> bool {
let Some(version) = self.tree.latest_version() else {
Expand Down
4 changes: 2 additions & 2 deletions core/lib/snapshots_applier/src/tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,12 @@ impl ObjectStore for HangingObjectStore {
let mut should_proceed = true;
self.count_sender.send_modify(|count| {
*count += 1;
if dbg!(*count) > self.stop_after_count {
if *count > self.stop_after_count {
should_proceed = false;
}
});

if dbg!(should_proceed) {
if should_proceed {
self.inner.get_raw(bucket, key).await
} else {
future::pending().await // Hang up the snapshot applier task
Expand Down
18 changes: 16 additions & 2 deletions core/node/metadata_calculator/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use zksync_merkle_tree::{
};
use zksync_storage::{RocksDB, RocksDBOptions, StalledWritesRetries, WeakRocksDB};
use zksync_types::{
block::L1BatchHeader, writes::TreeWrite, AccountTreeId, L1BatchNumber, StorageKey, H256,
block::{L1BatchHeader, L1BatchTreeData},
writes::TreeWrite,
AccountTreeId, L1BatchNumber, StorageKey, H256,
};

use super::{
Expand Down Expand Up @@ -233,11 +235,23 @@ impl AsyncTree {
self.as_ref().next_l1_batch_number()
}

pub fn min_l1_batch_number(&self) -> Option<L1BatchNumber> {
self.as_ref().reader().min_l1_batch_number()
}

#[cfg(test)]
pub fn root_hash(&self) -> H256 {
self.as_ref().root_hash()
}

pub fn data_for_l1_batch(&self, l1_batch_number: L1BatchNumber) -> Option<L1BatchTreeData> {
let (hash, leaf_count) = self.as_ref().root_info(l1_batch_number)?;
Some(L1BatchTreeData {
hash,
rollup_last_leaf_index: leaf_count + 1,
})
}

/// Returned errors are unrecoverable; the tree must not be used after an error is returned.
pub async fn process_l1_batch(
&mut self,
Expand Down Expand Up @@ -279,7 +293,7 @@ impl AsyncTree {
Ok(())
}

pub fn revert_logs(&mut self, last_l1_batch_to_keep: L1BatchNumber) -> anyhow::Result<()> {
pub fn roll_back_logs(&mut self, last_l1_batch_to_keep: L1BatchNumber) -> anyhow::Result<()> {
self.as_mut().roll_back_logs(last_l1_batch_to_keep)
}
}
Expand Down
14 changes: 10 additions & 4 deletions core/node/metadata_calculator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl MetadataCalculator {
GenericAsyncTree::new(db, &self.config).await
}

pub async fn run(self, stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
pub async fn run(self, mut stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
let tree = self.create_tree().await?;
let tree = tree
.ensure_ready(
Expand All @@ -231,13 +231,19 @@ impl MetadataCalculator {
let Some(mut tree) = tree else {
return Ok(()); // recovery was aborted because a stop signal was received
};

// Set a tree reader before the tree is fully initialized to not wait for the first L1 batch to appear in Postgres.
let tree_reader = tree.reader();
let tree_info = tree_reader.clone().info().await;
self.tree_reader.send_replace(Some(tree_reader));

tree.ensure_consistency(&self.delayer, &self.pool, &mut stop_receiver)
.await?;
if !self.pruning_handles_sender.is_closed() {
// Unlike tree reader, we shouldn't initialize pruning (as a task modifying the tree) before the tree is guaranteed
// to be consistent with Postgres.
self.pruning_handles_sender.send(tree.pruner()).ok();
}
self.tree_reader.send_replace(Some(tree_reader));

let tree_info = tree.reader().info().await;
tracing::info!("Merkle tree is initialized and ready to process L1 batches: {tree_info:?}");
self.health_updater
.update(MerkleTreeHealth::MainLoop(tree_info).into());
Expand Down
200 changes: 195 additions & 5 deletions core/node/metadata_calculator/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{future::Future, ops, panic, path::Path, sync::Arc, time::Duration};
use assert_matches::assert_matches;
use itertools::Itertools;
use tempfile::TempDir;
use test_casing::{test_casing, Product};
use tokio::sync::{mpsc, watch};
use zksync_config::configs::{
chain::OperationsManagerConfig,
Expand All @@ -19,16 +20,18 @@ use zksync_object_store::{MockObjectStore, ObjectStore};
use zksync_prover_interface::inputs::PrepareBasicCircuitsJob;
use zksync_storage::RocksDB;
use zksync_types::{
block::L1BatchHeader, AccountTreeId, Address, L1BatchNumber, L2BlockNumber, StorageKey,
StorageLog, H256,
block::{L1BatchHeader, L1BatchTreeData},
AccountTreeId, Address, L1BatchNumber, L2BlockNumber, StorageKey, StorageLog, H256,
};
use zksync_utils::u32_to_h256;

use super::{
helpers::L1BatchWithLogs, GenericAsyncTree, MetadataCalculator, MetadataCalculatorConfig,
MetadataCalculatorRecoveryConfig,
};
use crate::helpers::{AsyncTree, Delayer};

const POLL_INTERVAL: Duration = Duration::from_millis(50);
const RUN_TIMEOUT: Duration = Duration::from_secs(30);

async fn run_with_timeout<T, F>(timeout: Duration, action: F) -> T
Expand All @@ -47,7 +50,7 @@ pub(super) fn mock_config(db_path: &Path) -> MetadataCalculatorConfig {
db_path: db_path.to_str().unwrap().to_owned(),
max_open_files: None,
mode: MerkleTreeMode::Full,
delay_interval: Duration::from_millis(100),
delay_interval: POLL_INTERVAL,
max_l1_batches_per_iter: 10,
multi_get_chunk_size: 500,
block_cache_capacity: 0,
Expand All @@ -74,6 +77,150 @@ async fn genesis_creation() {
assert_eq!(tree.next_l1_batch_number(), L1BatchNumber(1));
}

#[tokio::test]
async fn low_level_genesis_creation() {
let pool = ConnectionPool::<Core>::test_pool().await;
let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB");
insert_genesis_batch(
&mut pool.connection().await.unwrap(),
&GenesisParams::mock(),
)
.await
.unwrap();
reset_db_state(&pool, 1).await;

let db = RocksDB::new(temp_dir.path()).unwrap();
let mut tree = AsyncTree::new(db.into(), MerkleTreeMode::Lightweight).unwrap();
let (_stop_sender, mut stop_receiver) = watch::channel(false);
tree.ensure_consistency(&Delayer::new(POLL_INTERVAL), &pool, &mut stop_receiver)
.await
.unwrap();

assert!(!tree.is_empty());
assert_eq!(tree.next_l1_batch_number(), L1BatchNumber(1));
}

#[test_casing(8, Product(([1, 4, 7, 9], [false, true])))]
#[tokio::test]
async fn tree_truncation_on_l1_batch_divergence(
last_common_l1_batch: u32,
overwrite_tree_data: bool,
) {
const INITIAL_BATCH_COUNT: usize = 10;

assert!((last_common_l1_batch as usize) < INITIAL_BATCH_COUNT);
let last_common_l1_batch = L1BatchNumber(last_common_l1_batch);

let pool = ConnectionPool::<Core>::test_pool().await;
let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB");
let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await;
reset_db_state(&pool, INITIAL_BATCH_COUNT).await;
run_calculator(calculator).await;

let mut storage = pool.connection().await.unwrap();
remove_l1_batches(&mut storage, last_common_l1_batch).await;
// Extend the state with new L1 batches.
let logs = gen_storage_logs(100..200, 5);
extend_db_state(&mut storage, logs).await;

if overwrite_tree_data {
for number in (last_common_l1_batch.0 + 1)..(last_common_l1_batch.0 + 6) {
let new_tree_data = L1BatchTreeData {
hash: H256::from_low_u64_be(number.into()),
rollup_last_leaf_index: 200, // doesn't matter
};
storage
.blocks_dal()
.save_l1_batch_tree_data(L1BatchNumber(number), &new_tree_data)
.await
.unwrap();
}
}

let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await;
let tree = calculator.create_tree().await.unwrap();
let GenericAsyncTree::Ready(mut tree) = tree else {
panic!("Unexpected tree state: {tree:?}");
};
assert_eq!(
tree.next_l1_batch_number(),
L1BatchNumber(INITIAL_BATCH_COUNT as u32 + 1)
);

let (_stop_sender, mut stop_receiver) = watch::channel(false);
tree.ensure_consistency(&Delayer::new(POLL_INTERVAL), &pool, &mut stop_receiver)
.await
.unwrap();
assert_eq!(tree.next_l1_batch_number(), last_common_l1_batch + 1);
}

#[test_casing(4, [1, 4, 6, 7])]
#[tokio::test]
async fn tree_truncation_on_l1_batch_divergence_in_pruned_tree(retained_l1_batch: u32) {
const INITIAL_BATCH_COUNT: usize = 10;
const LAST_COMMON_L1_BATCH: L1BatchNumber = L1BatchNumber(6);

let retained_l1_batch = L1BatchNumber(retained_l1_batch);

let pool = ConnectionPool::<Core>::test_pool().await;
let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB");
let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await;
reset_db_state(&pool, INITIAL_BATCH_COUNT).await;
run_calculator(calculator).await;

let mut storage = pool.connection().await.unwrap();
remove_l1_batches(&mut storage, LAST_COMMON_L1_BATCH).await;
// Extend the state with new L1 batches.
let logs = gen_storage_logs(100..200, 5);
extend_db_state(&mut storage, logs).await;

for number in (LAST_COMMON_L1_BATCH.0 + 1)..(LAST_COMMON_L1_BATCH.0 + 6) {
let new_tree_data = L1BatchTreeData {
hash: H256::from_low_u64_be(number.into()),
rollup_last_leaf_index: 200, // doesn't matter
};
storage
.blocks_dal()
.save_l1_batch_tree_data(L1BatchNumber(number), &new_tree_data)
.await
.unwrap();
}

let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await;
let tree = calculator.create_tree().await.unwrap();
let GenericAsyncTree::Ready(mut tree) = tree else {
panic!("Unexpected tree state: {tree:?}");
};

let reader = tree.reader();
let (mut pruner, pruner_handle) = tree.pruner();
pruner.set_poll_interval(POLL_INTERVAL);
tokio::task::spawn_blocking(|| pruner.run());
pruner_handle
.set_target_retained_version(retained_l1_batch.0.into())
.unwrap();
// Wait until the tree is pruned
while reader.clone().info().await.min_l1_batch_number < Some(retained_l1_batch) {
tokio::time::sleep(POLL_INTERVAL).await;
}

let (_stop_sender, mut stop_receiver) = watch::channel(false);
let consistency_result = tree
.ensure_consistency(&Delayer::new(POLL_INTERVAL), &pool, &mut stop_receiver)
.await;

if retained_l1_batch <= LAST_COMMON_L1_BATCH {
consistency_result.unwrap();
assert_eq!(tree.next_l1_batch_number(), LAST_COMMON_L1_BATCH + 1);
} else {
let err = consistency_result.unwrap_err();
assert!(
format!("{err:#}").contains("diverging min L1 batch"),
"{err:#}"
);
}
}

#[tokio::test]
async fn basic_workflow() {
let pool = ConnectionPool::<Core>::test_pool().await;
Expand Down Expand Up @@ -279,7 +426,7 @@ async fn shutting_down_calculator() {

let (stop_sx, stop_rx) = watch::channel(false);
let calculator_task = tokio::spawn(calculator.run(stop_rx));
tokio::time::sleep(Duration::from_millis(100)).await;
tokio::time::sleep(POLL_INTERVAL).await;
stop_sx.send_replace(true);
run_with_timeout(RUN_TIMEOUT, calculator_task)
.await
Expand Down Expand Up @@ -342,7 +489,7 @@ async fn test_postgres_backup_recovery(
insert_initial_writes_for_batch(&mut txn, batch_header.number).await;
txn.commit().await.unwrap();
if sleep_between_batches {
tokio::time::sleep(Duration::from_millis(100)).await;
tokio::time::sleep(POLL_INTERVAL).await;
}
}
drop(storage);
Expand Down Expand Up @@ -640,6 +787,23 @@ async fn remove_l1_batches(
batch_headers.push(header.unwrap());
}

let (_, last_l2_block_to_keep) = storage
.blocks_dal()
.get_l2_block_range_of_l1_batch(last_l1_batch_to_keep)
.await
.unwrap()
.expect("L1 batch has no blocks");

storage
.storage_logs_dal()
.roll_back_storage_logs(last_l2_block_to_keep)
.await
.unwrap();
storage
.blocks_dal()
.delete_l2_blocks(last_l2_block_to_keep)
.await
.unwrap();
storage
.blocks_dal()
.delete_l1_batches(last_l1_batch_to_keep)
Expand Down Expand Up @@ -740,3 +904,29 @@ async fn deduplication_works_as_expected() {
assert_eq!(initial_writes[key].0, L1BatchNumber(4));
}
}

#[test_casing(3, [3, 5, 8])]
#[tokio::test]
async fn l1_batch_divergence_entire_workflow(last_common_l1_batch: u32) {
const INITIAL_BATCH_COUNT: usize = 10;

assert!((last_common_l1_batch as usize) < INITIAL_BATCH_COUNT);
let last_common_l1_batch = L1BatchNumber(last_common_l1_batch);

let pool = ConnectionPool::<Core>::test_pool().await;
let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB");
let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await;
reset_db_state(&pool, INITIAL_BATCH_COUNT).await;
run_calculator(calculator).await;

let mut storage = pool.connection().await.unwrap();
remove_l1_batches(&mut storage, last_common_l1_batch).await;
// Extend the state with new L1 batches.
let logs = gen_storage_logs(100..200, 5);
extend_db_state(&mut storage, logs).await;
let expected_root_hash = expected_tree_hash(&pool).await;

let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await;
let final_root_hash = run_calculator(calculator).await;
assert_eq!(final_root_hash, expected_root_hash);
}
Loading

0 comments on commit c3b9c38

Please sign in to comment.