From c3b9c38ca07f01e6f7b2d7e631b2b811cacecf3a Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Wed, 12 Jun 2024 11:41:10 +0300 Subject: [PATCH] feat(merkle-tree): Rework tree rollback (#2207) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Reworks tree rollback so that it's supported on the distributed external node, including the case when a node runs multiple trees. - Desync with Postgres is now detected on metadata calculator initialization, and the tree is truncated correspondingly. - The old approach is left intact as a safety guard. ## Why ❔ Right now, reorg logic on EN relies on a node running a single tree, and block reverter being a singleton. Both these assumptions are bogus in case of a distributed EN. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. - [x] Spellcheck has been run via `zk spellcheck`. --- core/lib/merkle_tree/src/domain.rs | 6 + core/lib/snapshots_applier/src/tests/utils.rs | 4 +- core/node/metadata_calculator/src/helpers.rs | 18 +- core/node/metadata_calculator/src/lib.rs | 14 +- core/node/metadata_calculator/src/tests.rs | 200 ++++++++++++++- core/node/metadata_calculator/src/updater.rs | 230 +++++++++++++----- 6 files changed, 400 insertions(+), 72 deletions(-) diff --git a/core/lib/merkle_tree/src/domain.rs b/core/lib/merkle_tree/src/domain.rs index 5e3bc77ab93..ffc4b0b8410 100644 --- a/core/lib/merkle_tree/src/domain.rs +++ b/core/lib/merkle_tree/src/domain.rs @@ -166,6 +166,12 @@ impl ZkSyncTree { self.tree.latest_root_hash() } + /// Returns the root hash and leaf count at the specified L1 batch. + pub fn root_info(&self, l1_batch_number: L1BatchNumber) -> Option<(ValueHash, u64)> { + let root = self.tree.root(l1_batch_number.0.into())?; + Some((root.hash(&Blake2Hasher), root.leaf_count())) + } + /// Checks whether this tree is empty. pub fn is_empty(&self) -> bool { let Some(version) = self.tree.latest_version() else { diff --git a/core/lib/snapshots_applier/src/tests/utils.rs b/core/lib/snapshots_applier/src/tests/utils.rs index d3d1c3ae6e0..b48277a88e5 100644 --- a/core/lib/snapshots_applier/src/tests/utils.rs +++ b/core/lib/snapshots_applier/src/tests/utils.rs @@ -332,12 +332,12 @@ impl ObjectStore for HangingObjectStore { let mut should_proceed = true; self.count_sender.send_modify(|count| { *count += 1; - if dbg!(*count) > self.stop_after_count { + if *count > self.stop_after_count { should_proceed = false; } }); - if dbg!(should_proceed) { + if should_proceed { self.inner.get_raw(bucket, key).await } else { future::pending().await // Hang up the snapshot applier task diff --git a/core/node/metadata_calculator/src/helpers.rs b/core/node/metadata_calculator/src/helpers.rs index 20fd0babaac..5ac9e329c62 100644 --- a/core/node/metadata_calculator/src/helpers.rs +++ b/core/node/metadata_calculator/src/helpers.rs @@ -27,7 +27,9 @@ use zksync_merkle_tree::{ }; use zksync_storage::{RocksDB, RocksDBOptions, StalledWritesRetries, WeakRocksDB}; use zksync_types::{ - block::L1BatchHeader, writes::TreeWrite, AccountTreeId, L1BatchNumber, StorageKey, H256, + block::{L1BatchHeader, L1BatchTreeData}, + writes::TreeWrite, + AccountTreeId, L1BatchNumber, StorageKey, H256, }; use super::{ @@ -233,11 +235,23 @@ impl AsyncTree { self.as_ref().next_l1_batch_number() } + pub fn min_l1_batch_number(&self) -> Option { + self.as_ref().reader().min_l1_batch_number() + } + #[cfg(test)] pub fn root_hash(&self) -> H256 { self.as_ref().root_hash() } + pub fn data_for_l1_batch(&self, l1_batch_number: L1BatchNumber) -> Option { + let (hash, leaf_count) = self.as_ref().root_info(l1_batch_number)?; + Some(L1BatchTreeData { + hash, + rollup_last_leaf_index: leaf_count + 1, + }) + } + /// Returned errors are unrecoverable; the tree must not be used after an error is returned. pub async fn process_l1_batch( &mut self, @@ -279,7 +293,7 @@ impl AsyncTree { Ok(()) } - pub fn revert_logs(&mut self, last_l1_batch_to_keep: L1BatchNumber) -> anyhow::Result<()> { + pub fn roll_back_logs(&mut self, last_l1_batch_to_keep: L1BatchNumber) -> anyhow::Result<()> { self.as_mut().roll_back_logs(last_l1_batch_to_keep) } } diff --git a/core/node/metadata_calculator/src/lib.rs b/core/node/metadata_calculator/src/lib.rs index 4a422f243f4..b57f0dfacb7 100644 --- a/core/node/metadata_calculator/src/lib.rs +++ b/core/node/metadata_calculator/src/lib.rs @@ -217,7 +217,7 @@ impl MetadataCalculator { GenericAsyncTree::new(db, &self.config).await } - pub async fn run(self, stop_receiver: watch::Receiver) -> anyhow::Result<()> { + pub async fn run(self, mut stop_receiver: watch::Receiver) -> anyhow::Result<()> { let tree = self.create_tree().await?; let tree = tree .ensure_ready( @@ -231,13 +231,19 @@ impl MetadataCalculator { let Some(mut tree) = tree else { return Ok(()); // recovery was aborted because a stop signal was received }; - + // Set a tree reader before the tree is fully initialized to not wait for the first L1 batch to appear in Postgres. let tree_reader = tree.reader(); - let tree_info = tree_reader.clone().info().await; + self.tree_reader.send_replace(Some(tree_reader)); + + tree.ensure_consistency(&self.delayer, &self.pool, &mut stop_receiver) + .await?; if !self.pruning_handles_sender.is_closed() { + // Unlike tree reader, we shouldn't initialize pruning (as a task modifying the tree) before the tree is guaranteed + // to be consistent with Postgres. self.pruning_handles_sender.send(tree.pruner()).ok(); } - self.tree_reader.send_replace(Some(tree_reader)); + + let tree_info = tree.reader().info().await; tracing::info!("Merkle tree is initialized and ready to process L1 batches: {tree_info:?}"); self.health_updater .update(MerkleTreeHealth::MainLoop(tree_info).into()); diff --git a/core/node/metadata_calculator/src/tests.rs b/core/node/metadata_calculator/src/tests.rs index 0406544614d..20a814630fa 100644 --- a/core/node/metadata_calculator/src/tests.rs +++ b/core/node/metadata_calculator/src/tests.rs @@ -5,6 +5,7 @@ use std::{future::Future, ops, panic, path::Path, sync::Arc, time::Duration}; use assert_matches::assert_matches; use itertools::Itertools; use tempfile::TempDir; +use test_casing::{test_casing, Product}; use tokio::sync::{mpsc, watch}; use zksync_config::configs::{ chain::OperationsManagerConfig, @@ -19,8 +20,8 @@ use zksync_object_store::{MockObjectStore, ObjectStore}; use zksync_prover_interface::inputs::PrepareBasicCircuitsJob; use zksync_storage::RocksDB; use zksync_types::{ - block::L1BatchHeader, AccountTreeId, Address, L1BatchNumber, L2BlockNumber, StorageKey, - StorageLog, H256, + block::{L1BatchHeader, L1BatchTreeData}, + AccountTreeId, Address, L1BatchNumber, L2BlockNumber, StorageKey, StorageLog, H256, }; use zksync_utils::u32_to_h256; @@ -28,7 +29,9 @@ use super::{ helpers::L1BatchWithLogs, GenericAsyncTree, MetadataCalculator, MetadataCalculatorConfig, MetadataCalculatorRecoveryConfig, }; +use crate::helpers::{AsyncTree, Delayer}; +const POLL_INTERVAL: Duration = Duration::from_millis(50); const RUN_TIMEOUT: Duration = Duration::from_secs(30); async fn run_with_timeout(timeout: Duration, action: F) -> T @@ -47,7 +50,7 @@ pub(super) fn mock_config(db_path: &Path) -> MetadataCalculatorConfig { db_path: db_path.to_str().unwrap().to_owned(), max_open_files: None, mode: MerkleTreeMode::Full, - delay_interval: Duration::from_millis(100), + delay_interval: POLL_INTERVAL, max_l1_batches_per_iter: 10, multi_get_chunk_size: 500, block_cache_capacity: 0, @@ -74,6 +77,150 @@ async fn genesis_creation() { assert_eq!(tree.next_l1_batch_number(), L1BatchNumber(1)); } +#[tokio::test] +async fn low_level_genesis_creation() { + let pool = ConnectionPool::::test_pool().await; + let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); + insert_genesis_batch( + &mut pool.connection().await.unwrap(), + &GenesisParams::mock(), + ) + .await + .unwrap(); + reset_db_state(&pool, 1).await; + + let db = RocksDB::new(temp_dir.path()).unwrap(); + let mut tree = AsyncTree::new(db.into(), MerkleTreeMode::Lightweight).unwrap(); + let (_stop_sender, mut stop_receiver) = watch::channel(false); + tree.ensure_consistency(&Delayer::new(POLL_INTERVAL), &pool, &mut stop_receiver) + .await + .unwrap(); + + assert!(!tree.is_empty()); + assert_eq!(tree.next_l1_batch_number(), L1BatchNumber(1)); +} + +#[test_casing(8, Product(([1, 4, 7, 9], [false, true])))] +#[tokio::test] +async fn tree_truncation_on_l1_batch_divergence( + last_common_l1_batch: u32, + overwrite_tree_data: bool, +) { + const INITIAL_BATCH_COUNT: usize = 10; + + assert!((last_common_l1_batch as usize) < INITIAL_BATCH_COUNT); + let last_common_l1_batch = L1BatchNumber(last_common_l1_batch); + + let pool = ConnectionPool::::test_pool().await; + let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); + let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await; + reset_db_state(&pool, INITIAL_BATCH_COUNT).await; + run_calculator(calculator).await; + + let mut storage = pool.connection().await.unwrap(); + remove_l1_batches(&mut storage, last_common_l1_batch).await; + // Extend the state with new L1 batches. + let logs = gen_storage_logs(100..200, 5); + extend_db_state(&mut storage, logs).await; + + if overwrite_tree_data { + for number in (last_common_l1_batch.0 + 1)..(last_common_l1_batch.0 + 6) { + let new_tree_data = L1BatchTreeData { + hash: H256::from_low_u64_be(number.into()), + rollup_last_leaf_index: 200, // doesn't matter + }; + storage + .blocks_dal() + .save_l1_batch_tree_data(L1BatchNumber(number), &new_tree_data) + .await + .unwrap(); + } + } + + let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await; + let tree = calculator.create_tree().await.unwrap(); + let GenericAsyncTree::Ready(mut tree) = tree else { + panic!("Unexpected tree state: {tree:?}"); + }; + assert_eq!( + tree.next_l1_batch_number(), + L1BatchNumber(INITIAL_BATCH_COUNT as u32 + 1) + ); + + let (_stop_sender, mut stop_receiver) = watch::channel(false); + tree.ensure_consistency(&Delayer::new(POLL_INTERVAL), &pool, &mut stop_receiver) + .await + .unwrap(); + assert_eq!(tree.next_l1_batch_number(), last_common_l1_batch + 1); +} + +#[test_casing(4, [1, 4, 6, 7])] +#[tokio::test] +async fn tree_truncation_on_l1_batch_divergence_in_pruned_tree(retained_l1_batch: u32) { + const INITIAL_BATCH_COUNT: usize = 10; + const LAST_COMMON_L1_BATCH: L1BatchNumber = L1BatchNumber(6); + + let retained_l1_batch = L1BatchNumber(retained_l1_batch); + + let pool = ConnectionPool::::test_pool().await; + let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); + let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await; + reset_db_state(&pool, INITIAL_BATCH_COUNT).await; + run_calculator(calculator).await; + + let mut storage = pool.connection().await.unwrap(); + remove_l1_batches(&mut storage, LAST_COMMON_L1_BATCH).await; + // Extend the state with new L1 batches. + let logs = gen_storage_logs(100..200, 5); + extend_db_state(&mut storage, logs).await; + + for number in (LAST_COMMON_L1_BATCH.0 + 1)..(LAST_COMMON_L1_BATCH.0 + 6) { + let new_tree_data = L1BatchTreeData { + hash: H256::from_low_u64_be(number.into()), + rollup_last_leaf_index: 200, // doesn't matter + }; + storage + .blocks_dal() + .save_l1_batch_tree_data(L1BatchNumber(number), &new_tree_data) + .await + .unwrap(); + } + + let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await; + let tree = calculator.create_tree().await.unwrap(); + let GenericAsyncTree::Ready(mut tree) = tree else { + panic!("Unexpected tree state: {tree:?}"); + }; + + let reader = tree.reader(); + let (mut pruner, pruner_handle) = tree.pruner(); + pruner.set_poll_interval(POLL_INTERVAL); + tokio::task::spawn_blocking(|| pruner.run()); + pruner_handle + .set_target_retained_version(retained_l1_batch.0.into()) + .unwrap(); + // Wait until the tree is pruned + while reader.clone().info().await.min_l1_batch_number < Some(retained_l1_batch) { + tokio::time::sleep(POLL_INTERVAL).await; + } + + let (_stop_sender, mut stop_receiver) = watch::channel(false); + let consistency_result = tree + .ensure_consistency(&Delayer::new(POLL_INTERVAL), &pool, &mut stop_receiver) + .await; + + if retained_l1_batch <= LAST_COMMON_L1_BATCH { + consistency_result.unwrap(); + assert_eq!(tree.next_l1_batch_number(), LAST_COMMON_L1_BATCH + 1); + } else { + let err = consistency_result.unwrap_err(); + assert!( + format!("{err:#}").contains("diverging min L1 batch"), + "{err:#}" + ); + } +} + #[tokio::test] async fn basic_workflow() { let pool = ConnectionPool::::test_pool().await; @@ -279,7 +426,7 @@ async fn shutting_down_calculator() { let (stop_sx, stop_rx) = watch::channel(false); let calculator_task = tokio::spawn(calculator.run(stop_rx)); - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(POLL_INTERVAL).await; stop_sx.send_replace(true); run_with_timeout(RUN_TIMEOUT, calculator_task) .await @@ -342,7 +489,7 @@ async fn test_postgres_backup_recovery( insert_initial_writes_for_batch(&mut txn, batch_header.number).await; txn.commit().await.unwrap(); if sleep_between_batches { - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(POLL_INTERVAL).await; } } drop(storage); @@ -640,6 +787,23 @@ async fn remove_l1_batches( batch_headers.push(header.unwrap()); } + let (_, last_l2_block_to_keep) = storage + .blocks_dal() + .get_l2_block_range_of_l1_batch(last_l1_batch_to_keep) + .await + .unwrap() + .expect("L1 batch has no blocks"); + + storage + .storage_logs_dal() + .roll_back_storage_logs(last_l2_block_to_keep) + .await + .unwrap(); + storage + .blocks_dal() + .delete_l2_blocks(last_l2_block_to_keep) + .await + .unwrap(); storage .blocks_dal() .delete_l1_batches(last_l1_batch_to_keep) @@ -740,3 +904,29 @@ async fn deduplication_works_as_expected() { assert_eq!(initial_writes[key].0, L1BatchNumber(4)); } } + +#[test_casing(3, [3, 5, 8])] +#[tokio::test] +async fn l1_batch_divergence_entire_workflow(last_common_l1_batch: u32) { + const INITIAL_BATCH_COUNT: usize = 10; + + assert!((last_common_l1_batch as usize) < INITIAL_BATCH_COUNT); + let last_common_l1_batch = L1BatchNumber(last_common_l1_batch); + + let pool = ConnectionPool::::test_pool().await; + let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); + let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await; + reset_db_state(&pool, INITIAL_BATCH_COUNT).await; + run_calculator(calculator).await; + + let mut storage = pool.connection().await.unwrap(); + remove_l1_batches(&mut storage, last_common_l1_batch).await; + // Extend the state with new L1 batches. + let logs = gen_storage_logs(100..200, 5); + extend_db_state(&mut storage, logs).await; + let expected_root_hash = expected_tree_hash(&pool).await; + + let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await; + let final_root_hash = run_calculator(calculator).await; + assert_eq!(final_root_hash, expected_root_hash); +} diff --git a/core/node/metadata_calculator/src/updater.rs b/core/node/metadata_calculator/src/updater.rs index cca6fce6d4c..94aa176e87d 100644 --- a/core/node/metadata_calculator/src/updater.rs +++ b/core/node/metadata_calculator/src/updater.rs @@ -205,72 +205,14 @@ impl TreeUpdater { pool: &ConnectionPool, mut stop_receiver: watch::Receiver, ) -> anyhow::Result<()> { - let Some(earliest_l1_batch) = - wait_for_l1_batch(pool, delayer.delay_interval(), &mut stop_receiver).await? - else { - return Ok(()); // Stop signal received - }; - let mut storage = pool.connection_tagged("metadata_calculator").await?; - - // Ensure genesis creation let tree = &mut self.tree; - if tree.is_empty() { - anyhow::ensure!( - earliest_l1_batch == L1BatchNumber(0), - "Non-zero earliest L1 batch #{earliest_l1_batch} is not supported without previous tree recovery" - ); - let batch = L1BatchWithLogs::new(&mut storage, earliest_l1_batch, tree.mode()) - .await - .with_context(|| { - format!("failed fetching tree input for L1 batch #{earliest_l1_batch}") - })? - .context("Missing storage logs for the genesis L1 batch")?; - tree.process_l1_batch(batch).await?; - tree.save().await?; - } let mut next_l1_batch_to_seal = tree.next_l1_batch_number(); - - let current_db_batch = storage.blocks_dal().get_sealed_l1_batch_number().await?; - let last_l1_batch_with_tree_data = storage - .blocks_dal() - .get_last_l1_batch_number_with_tree_data() - .await?; - drop(storage); - tracing::info!( "Initialized metadata calculator with {max_batches_per_iter} max L1 batches per iteration. \ - Next L1 batch for Merkle tree: {next_l1_batch_to_seal}, current Postgres L1 batch: {current_db_batch:?}, \ - last L1 batch with metadata: {last_l1_batch_with_tree_data:?}", + Next L1 batch for Merkle tree: {next_l1_batch_to_seal}", max_batches_per_iter = self.max_l1_batches_per_iter ); - // It may be the case that we don't have any L1 batches with metadata in Postgres, e.g. after - // recovering from a snapshot. We cannot wait for such a batch to appear (*this* is the component - // responsible for their appearance!), but fortunately most of the updater doesn't depend on it. - if let Some(last_l1_batch_with_tree_data) = last_l1_batch_with_tree_data { - let backup_lag = - (last_l1_batch_with_tree_data.0 + 1).saturating_sub(next_l1_batch_to_seal.0); - METRICS.backup_lag.set(backup_lag.into()); - - if next_l1_batch_to_seal > last_l1_batch_with_tree_data + 1 { - // Check stop signal before proceeding with a potentially time-consuming operation. - if *stop_receiver.borrow_and_update() { - tracing::info!("Stop signal received, metadata_calculator is shutting down"); - return Ok(()); - } - - tracing::warn!( - "Next L1 batch of the tree ({next_l1_batch_to_seal}) is greater than last L1 batch with metadata in Postgres \ - ({last_l1_batch_with_tree_data}); this may be a result of restoring Postgres from a snapshot. \ - Truncating Merkle tree versions so that this mismatch is fixed..." - ); - tree.revert_logs(last_l1_batch_with_tree_data)?; - tree.save().await?; - next_l1_batch_to_seal = tree.next_l1_batch_number(); - tracing::info!("Truncated Merkle tree to L1 batch #{next_l1_batch_to_seal}"); - } - } - loop { if *stop_receiver.borrow_and_update() { tracing::info!("Stop signal received, metadata_calculator is shutting down"); @@ -306,3 +248,173 @@ impl TreeUpdater { Ok(()) } } + +impl AsyncTree { + async fn ensure_genesis( + &mut self, + storage: &mut Connection<'_, Core>, + earliest_l1_batch: L1BatchNumber, + ) -> anyhow::Result<()> { + if !self.is_empty() { + return Ok(()); + } + + anyhow::ensure!( + earliest_l1_batch == L1BatchNumber(0), + "Non-zero earliest L1 batch #{earliest_l1_batch} is not supported without previous tree recovery" + ); + let batch = L1BatchWithLogs::new(storage, earliest_l1_batch, self.mode()) + .await + .with_context(|| { + format!("failed fetching tree input for L1 batch #{earliest_l1_batch}") + })? + .context("Missing storage logs for the genesis L1 batch")?; + self.process_l1_batch(batch).await?; + self.save().await?; + Ok(()) + } + + /// Invariant: the tree is not ahead of Postgres. + async fn ensure_no_l1_batch_divergence( + &mut self, + pool: &ConnectionPool, + ) -> anyhow::Result<()> { + let Some(last_tree_l1_batch) = self.next_l1_batch_number().checked_sub(1) else { + // No L1 batches in the tree means no divergence. + return Ok(()); + }; + let last_tree_l1_batch = L1BatchNumber(last_tree_l1_batch); + + let mut storage = pool.connection_tagged("metadata_calculator").await?; + if self + .l1_batch_matches(&mut storage, last_tree_l1_batch) + .await? + { + tracing::debug!( + "Last l1 batch in tree #{last_tree_l1_batch} has same data in tree and Postgres" + ); + return Ok(()); + } + + tracing::debug!("Last l1 batch in tree #{last_tree_l1_batch} has diverging data in tree and Postgres; searching for the last common L1 batch"); + let min_tree_l1_batch = self + .min_l1_batch_number() + .context("tree shouldn't be empty at this point")?; + anyhow::ensure!( + min_tree_l1_batch <= last_tree_l1_batch, + "potential Merkle tree corruption: minimum L1 batch number ({min_tree_l1_batch}) exceeds the last L1 batch ({last_tree_l1_batch})" + ); + + anyhow::ensure!( + self.l1_batch_matches(&mut storage, min_tree_l1_batch).await?, + "diverging min L1 batch in the tree #{min_tree_l1_batch}; the tree cannot recover from this" + ); + + let mut left = min_tree_l1_batch.0; + let mut right = last_tree_l1_batch.0; + while left + 1 < right { + let middle = (left + right) / 2; + let batch_matches = self + .l1_batch_matches(&mut storage, L1BatchNumber(middle)) + .await?; + if batch_matches { + left = middle; + } else { + right = middle; + } + } + let last_common_l1_batch_number = L1BatchNumber(left); + tracing::info!("Found last common L1 batch between tree and Postgres: #{last_common_l1_batch_number}; will revert tree to it"); + + self.roll_back_logs(last_common_l1_batch_number)?; + self.save().await?; + Ok(()) + } + + async fn l1_batch_matches( + &self, + storage: &mut Connection<'_, Core>, + l1_batch: L1BatchNumber, + ) -> anyhow::Result { + if l1_batch == L1BatchNumber(0) { + // Corner case: root hash for L1 batch #0 persisted in Postgres is fictive (set to `H256::zero()`). + return Ok(true); + } + + let Some(tree_data) = self.data_for_l1_batch(l1_batch) else { + // Corner case: the L1 batch was pruned in the tree. + return Ok(true); + }; + let Some(tree_data_from_postgres) = storage + .blocks_dal() + .get_l1_batch_tree_data(l1_batch) + .await? + else { + // Corner case: the L1 batch was pruned in Postgres (including initial snapshot recovery). + return Ok(true); + }; + + let data_matches = tree_data == tree_data_from_postgres; + if !data_matches { + tracing::warn!( + "Detected diverging tree data for L1 batch #{l1_batch}; data in tree is: {tree_data:?}, \ + data in Postgres is: {tree_data_from_postgres:?}" + ); + } + Ok(data_matches) + } + + /// Ensures that the tree is consistent with Postgres, truncating the tree if necessary. + /// This will wait for at least one L1 batch to appear in Postgres if necessary. + pub(crate) async fn ensure_consistency( + &mut self, + delayer: &Delayer, + pool: &ConnectionPool, + stop_receiver: &mut watch::Receiver, + ) -> anyhow::Result<()> { + let Some(earliest_l1_batch) = + wait_for_l1_batch(pool, delayer.delay_interval(), stop_receiver).await? + else { + return Ok(()); // Stop signal received + }; + let mut storage = pool.connection_tagged("metadata_calculator").await?; + + self.ensure_genesis(&mut storage, earliest_l1_batch).await?; + let next_l1_batch_to_seal = self.next_l1_batch_number(); + + let current_db_batch = storage.blocks_dal().get_sealed_l1_batch_number().await?; + let last_l1_batch_with_tree_data = storage + .blocks_dal() + .get_last_l1_batch_number_with_tree_data() + .await?; + drop(storage); + + tracing::info!( + "Next L1 batch for Merkle tree: {next_l1_batch_to_seal}, current Postgres L1 batch: {current_db_batch:?}, \ + last L1 batch with metadata: {last_l1_batch_with_tree_data:?}" + ); + + // It may be the case that we don't have any L1 batches with metadata in Postgres, e.g. after + // recovering from a snapshot. We cannot wait for such a batch to appear (*this* is the component + // responsible for their appearance!), but fortunately most of the updater doesn't depend on it. + if let Some(last_l1_batch_with_tree_data) = last_l1_batch_with_tree_data { + let backup_lag = + (last_l1_batch_with_tree_data.0 + 1).saturating_sub(next_l1_batch_to_seal.0); + METRICS.backup_lag.set(backup_lag.into()); + + if next_l1_batch_to_seal > last_l1_batch_with_tree_data + 1 { + tracing::warn!( + "Next L1 batch of the tree ({next_l1_batch_to_seal}) is greater than last L1 batch with metadata in Postgres \ + ({last_l1_batch_with_tree_data}); this may be a result of restoring Postgres from a snapshot. \ + Truncating Merkle tree versions so that this mismatch is fixed..." + ); + self.roll_back_logs(last_l1_batch_with_tree_data)?; + self.save().await?; + tracing::info!("Truncated Merkle tree to L1 batch #{next_l1_batch_to_seal}"); + } + + self.ensure_no_l1_batch_divergence(pool).await?; + } + Ok(()) + } +}