Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(en): Fix reorg detection in presence of tree data fetcher #2197

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/node/node_sync/src/tree_data_fetcher/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub(super) enum StepOutcomeLabel {
UpdatedBatch,
NoProgress,
RemoteHashMissing,
PossibleReorg,
TransientError,
}

Expand Down Expand Up @@ -91,6 +92,7 @@ impl TreeDataFetcherMetrics {
}
Ok(StepOutcome::NoProgress) => StepOutcomeLabel::NoProgress,
Ok(StepOutcome::RemoteHashMissing) => StepOutcomeLabel::RemoteHashMissing,
Ok(StepOutcome::PossibleReorg) => StepOutcomeLabel::PossibleReorg,
Err(err) if err.is_transient() => StepOutcomeLabel::TransientError,
Err(_) => return, // fatal error; the node will exit soon anyway
};
Expand Down
60 changes: 50 additions & 10 deletions core/node/node_sync/src/tree_data_fetcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ use serde::Serialize;
#[cfg(test)]
use tokio::sync::mpsc;
use tokio::sync::watch;
use zksync_dal::{ConnectionPool, Core, CoreDal, DalError};
use zksync_dal::{Connection, ConnectionPool, Core, CoreDal, DalError};
use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck};
use zksync_types::{block::L1BatchTreeData, Address, L1BatchNumber};
use zksync_types::{
block::{L1BatchTreeData, L2BlockHeader},
Address, L1BatchNumber,
};
use zksync_web3_decl::{
client::{DynClient, L1, L2},
error::EnrichedClientError,
Expand Down Expand Up @@ -77,6 +80,7 @@ enum StepOutcome {
UpdatedBatch(L1BatchNumber),
NoProgress,
RemoteHashMissing,
PossibleReorg,
}

/// Component fetching tree data (i.e., state root hashes for L1 batches) from external sources, such as
Expand Down Expand Up @@ -133,7 +137,6 @@ impl TreeDataFetcher {
);

let l1_provider = L1DataProvider::new(
self.pool.clone(),
eth_client.for_component("tree_data_fetcher"),
diamond_proxy_address,
)?;
Expand All @@ -147,7 +150,7 @@ impl TreeDataFetcher {
self.health_updater.subscribe()
}

async fn get_batch_to_fetch(&self) -> anyhow::Result<Option<L1BatchNumber>> {
async fn get_batch_to_fetch(&self) -> anyhow::Result<Option<(L1BatchNumber, L2BlockHeader)>> {
let mut storage = self.pool.connection_tagged("tree_data_fetcher").await?;
// Fetch data in a readonly transaction to have a consistent view of the storage
let mut storage = storage.start_transaction().await?;
Expand All @@ -172,20 +175,41 @@ impl TreeDataFetcher {
earliest_l1_batch
};
Ok(if l1_batch_to_fetch <= last_l1_batch {
Some(l1_batch_to_fetch)
let last_l2_block = Self::get_last_l2_block(&mut storage, l1_batch_to_fetch).await?;
Some((l1_batch_to_fetch, last_l2_block))
} else {
None
})
}

async fn get_last_l2_block(
storage: &mut Connection<'_, Core>,
number: L1BatchNumber,
) -> anyhow::Result<L2BlockHeader> {
let (_, last_l2_block_number) = storage
.blocks_dal()
.get_l2_block_range_of_l1_batch(number)
.await?
.with_context(|| format!("L1 batch #{number} disappeared from Postgres"))?;
storage
.blocks_dal()
.get_l2_block_header(last_l2_block_number)
.await?
.with_context(|| format!("L2 block #{last_l2_block_number} (last for L1 batch #{number}) disappeared from Postgres"))
}

async fn step(&mut self) -> Result<StepOutcome, TreeDataFetcherError> {
let Some(l1_batch_to_fetch) = self.get_batch_to_fetch().await? else {
let Some((l1_batch_to_fetch, last_l2_block_header)) = self.get_batch_to_fetch().await?
else {
return Ok(StepOutcome::NoProgress);
};

tracing::debug!("Fetching tree data for L1 batch #{l1_batch_to_fetch} from main node");
tracing::debug!("Fetching tree data for L1 batch #{l1_batch_to_fetch}");
let stage_latency = self.metrics.stage_latency[&ProcessingStage::Fetch].start();
let root_hash_result = self.data_provider.batch_details(l1_batch_to_fetch).await?;
let root_hash_result = self
.data_provider
.batch_details(l1_batch_to_fetch, &last_l2_block_header)
.await?;
stage_latency.observe();
let root_hash = match root_hash_result {
Ok(output) => {
Expand All @@ -199,17 +223,23 @@ impl TreeDataFetcher {
}
Err(MissingData::Batch) => {
let err = anyhow::anyhow!(
"L1 batch #{l1_batch_to_fetch} is sealed locally, but is not present on the main node, \
"L1 batch #{l1_batch_to_fetch} is sealed locally, but is not present externally, \
which is assumed to store batch info indefinitely"
);
return Err(err.into());
}
Err(MissingData::RootHash) => {
tracing::debug!(
"L1 batch #{l1_batch_to_fetch} does not have root hash computed on the main node"
"L1 batch #{l1_batch_to_fetch} does not have root hash computed externally"
);
return Ok(StepOutcome::RemoteHashMissing);
}
Err(MissingData::PossibleReorg) => {
tracing::debug!(
"L1 batch #{l1_batch_to_fetch} potentially diverges from the external source"
);
return Ok(StepOutcome::PossibleReorg);
}
};

let stage_latency = self.metrics.stage_latency[&ProcessingStage::Persistence].start();
Expand Down Expand Up @@ -266,6 +296,16 @@ impl TreeDataFetcher {
self.update_health(last_updated_l1_batch);
true
}
Ok(StepOutcome::PossibleReorg) => {
tracing::info!("Potential chain reorg detected by tree data fetcher; not updating tree data");
// Since we don't trust the reorg logic in the tree data fetcher, we let it continue working
// so that, if there's a false positive, the whole node doesn't crash (or is in a crash loop in the worst-case scenario).
let health = TreeDataFetcherHealth::Affected {
error: "Potential chain reorg".to_string(),
};
self.health_updater.update(health.into());
true
}
Err(err) if err.is_transient() => {
tracing::warn!(
"Transient error in tree data fetcher, will retry after a delay: {err:?}"
Expand Down
79 changes: 50 additions & 29 deletions core/node/node_sync/src/tree_data_fetcher/provider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ use std::fmt;
use anyhow::Context;
use async_trait::async_trait;
use vise::{EncodeLabelSet, EncodeLabelValue};
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_eth_client::EthInterface;
use zksync_types::{web3, Address, L1BatchNumber, H256, U256, U64};
use zksync_types::{block::L2BlockHeader, web3, Address, L1BatchNumber, H256, U256, U64};
use zksync_web3_decl::{
client::{DynClient, L1, L2},
error::{ClientRpcContext, EnrichedClientError, EnrichedClientResult},
Expand All @@ -26,6 +25,8 @@ pub(super) enum MissingData {
/// The provider lacks a root hash for a requested L1 batch; the batch itself is present on the provider.
#[error("no root hash for L1 batch")]
RootHash,
#[error("possible chain reorg detected")]
PossibleReorg,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)]
Expand All @@ -48,14 +49,23 @@ pub(super) type TreeDataProviderResult =
#[async_trait]
pub(super) trait TreeDataProvider: fmt::Debug + Send + Sync + 'static {
/// Fetches a state root hash for the L1 batch with the specified number.
/// The method receives a header of the last L2 block in the batch, which can be used to check L1 batch consistency etc.
///
/// It is guaranteed that this method will be called with monotonically increasing `number`s (although not necessarily sequential ones).
async fn batch_details(&mut self, number: L1BatchNumber) -> TreeDataProviderResult;
async fn batch_details(
&mut self,
number: L1BatchNumber,
last_l2_block: &L2BlockHeader,
) -> TreeDataProviderResult;
}

#[async_trait]
impl TreeDataProvider for Box<DynClient<L2>> {
async fn batch_details(&mut self, number: L1BatchNumber) -> TreeDataProviderResult {
async fn batch_details(
&mut self,
number: L1BatchNumber,
last_l2_block: &L2BlockHeader,
) -> TreeDataProviderResult {
let Some(batch_details) = self
.get_l1_batch_details(number)
.rpc_context("get_l1_batch_details")
Expand All @@ -64,6 +74,24 @@ impl TreeDataProvider for Box<DynClient<L2>> {
else {
return Ok(Err(MissingData::Batch));
};

// Check the local data correspondence.
let remote_l2_block_hash = self
.get_block_details(last_l2_block.number)
.rpc_context("get_block_details")
.with_arg("number", &last_l2_block.number)
.await?
.and_then(|block| block.base.root_hash);
if remote_l2_block_hash != Some(last_l2_block.hash) {
let last_l2_block_number = last_l2_block.number;
let last_l2_block_hash = last_l2_block.hash;
tracing::info!(
"Fetched hash of the last L2 block #{last_l2_block_number} in L1 batch #{number} ({remote_l2_block_hash:?}) \
does not match the local one ({last_l2_block_hash:?}); this can be caused by a chain reorg"
);
return Ok(Err(MissingData::PossibleReorg));
}

Ok(batch_details
.base
.root_hash
Expand Down Expand Up @@ -94,7 +122,6 @@ struct PastL1BatchInfo {
/// (provided it's not too far behind the seal timestamp of the batch).
#[derive(Debug)]
pub(super) struct L1DataProvider {
pool: ConnectionPool<Core>,
eth_client: Box<DynClient<L1>>,
diamond_proxy_address: Address,
block_commit_signature: H256,
Expand All @@ -109,7 +136,6 @@ impl L1DataProvider {
const L1_BLOCK_RANGE: U64 = U64([20_000]);

pub fn new(
pool: ConnectionPool<Core>,
eth_client: Box<DynClient<L1>>,
diamond_proxy_address: Address,
) -> anyhow::Result<Self> {
Expand All @@ -118,29 +144,13 @@ impl L1DataProvider {
.context("missing `BlockCommit` event")?
.signature();
Ok(Self {
pool,
eth_client,
diamond_proxy_address,
block_commit_signature,
past_l1_batch: None,
})
}

async fn l1_batch_seal_timestamp(&self, number: L1BatchNumber) -> anyhow::Result<u64> {
let mut storage = self.pool.connection_tagged("tree_data_fetcher").await?;
let (_, last_l2_block_number) = storage
.blocks_dal()
.get_l2_block_range_of_l1_batch(number)
.await?
.with_context(|| format!("L1 batch #{number} does not have L2 blocks"))?;
let block_header = storage
.blocks_dal()
.get_l2_block_header(last_l2_block_number)
.await?
.with_context(|| format!("L2 block #{last_l2_block_number} (last block in L1 batch #{number}) disappeared"))?;
Ok(block_header.timestamp)
}

/// Guesses the number of an L1 block with a `BlockCommit` event for the specified L1 batch.
/// The guess is based on the L1 batch seal timestamp.
async fn guess_l1_commit_block_number(
Expand Down Expand Up @@ -206,8 +216,12 @@ impl L1DataProvider {

#[async_trait]
impl TreeDataProvider for L1DataProvider {
async fn batch_details(&mut self, number: L1BatchNumber) -> TreeDataProviderResult {
let l1_batch_seal_timestamp = self.l1_batch_seal_timestamp(number).await?;
async fn batch_details(
&mut self,
number: L1BatchNumber,
last_l2_block: &L2BlockHeader,
) -> TreeDataProviderResult {
let l1_batch_seal_timestamp = last_l2_block.timestamp;
let from_block = self.past_l1_batch.and_then(|info| {
assert!(
info.number < number,
Expand Down Expand Up @@ -297,8 +311,11 @@ impl TreeDataProvider for L1DataProvider {
}))
}
_ => {
tracing::warn!("Non-unique `BlockCommit` event for L1 batch #{number} queried using {filter:?}: {logs:?}");
Ok(Err(MissingData::RootHash))
tracing::warn!(
"Non-unique `BlockCommit` event for L1 batch #{number} queried using {filter:?}, potentially as a result \
of a chain reorg: {logs:?}"
);
Ok(Err(MissingData::PossibleReorg))
}
}
}
Expand All @@ -313,9 +330,13 @@ pub(super) struct CombinedDataProvider {

#[async_trait]
impl TreeDataProvider for CombinedDataProvider {
async fn batch_details(&mut self, number: L1BatchNumber) -> TreeDataProviderResult {
async fn batch_details(
&mut self,
number: L1BatchNumber,
last_l2_block: &L2BlockHeader,
) -> TreeDataProviderResult {
if let Some(l1) = &mut self.l1 {
match l1.batch_details(number).await {
match l1.batch_details(number, last_l2_block).await {
Err(err) => {
if err.is_transient() {
tracing::info!(
Expand All @@ -342,6 +363,6 @@ impl TreeDataProvider for CombinedDataProvider {
}
}
}
self.fallback.batch_details(number).await
self.fallback.batch_details(number, last_l2_block).await
}
}
Loading
Loading