Skip to content

Commit

Permalink
refactor(en): Fetch old l1 batch hashes from L1 – metrics (#2131)
Browse files Browse the repository at this point in the history
## What ❔

Adds a couple of metrics / logs for tree data fetcher related to
fetching data from L1. Follow-up after #2000.

## Why ❔

These metrics / logs would allow to track tree data fetcher health more
thoroughly.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
  • Loading branch information
slowli authored Jun 4, 2024
1 parent 3e7cbe4 commit af39ca3
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 48 deletions.
19 changes: 18 additions & 1 deletion core/node/node_sync/src/tree_data_fetcher/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,22 @@ use vise::{
Info, Metrics, Unit,
};

use super::{StepOutcome, TreeDataFetcher, TreeDataFetcherError};
use super::{provider::TreeDataProviderSource, StepOutcome, TreeDataFetcher, TreeDataFetcherError};

#[derive(Debug, EncodeLabelSet)]
struct TreeDataFetcherInfo {
#[metrics(unit = Unit::Seconds)]
poll_interval: DurationAsSecs,
diamond_proxy_address: Option<String>,
}

impl From<&TreeDataFetcher> for TreeDataFetcherInfo {
fn from(fetcher: &TreeDataFetcher) -> Self {
Self {
poll_interval: fetcher.poll_interval.into(),
diamond_proxy_address: fetcher
.diamond_proxy_address
.map(|addr| format!("{addr:?}")),
}
}
}
Expand All @@ -39,6 +43,10 @@ pub(super) enum StepOutcomeLabel {
TransientError,
}

const BLOCK_DIFF_BUCKETS: Buckets = Buckets::values(&[
10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1_000.0, 2_000.0, 5_000.0, 10_000.0, 20_000.0, 50_000.0,
]);

#[derive(Debug, Metrics)]
#[metrics(prefix = "external_node_tree_data_fetcher")]
pub(super) struct TreeDataFetcherMetrics {
Expand All @@ -51,6 +59,15 @@ pub(super) struct TreeDataFetcherMetrics {
/// Latency of a particular stage of processing a single L1 batch.
#[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)]
pub stage_latency: Family<ProcessingStage, Histogram<Duration>>,
/// Number of steps during binary search of the L1 commit block number.
#[metrics(buckets = Buckets::linear(0.0..=32.0, 2.0))]
pub l1_commit_block_number_binary_search_steps: Histogram<usize>,
/// Difference between the "from" block specified in the event filter and the L1 block number of the fetched event.
/// Large values here can signal that fetching data from L1 can break because the filter won't get necessary events.
#[metrics(buckets = BLOCK_DIFF_BUCKETS)]
pub l1_commit_block_number_from_diff: Histogram<u64>,
/// Number of root hashes fetched from a particular source.
pub root_hash_sources: Family<TreeDataProviderSource, Counter>,
}

impl TreeDataFetcherMetrics {
Expand Down
19 changes: 18 additions & 1 deletion core/node/node_sync/src/tree_data_fetcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ enum StepOutcome {
#[derive(Debug)]
pub struct TreeDataFetcher {
data_provider: Box<dyn TreeDataProvider>,
// Used in the Info metric
diamond_proxy_address: Option<Address>,
pool: ConnectionPool<Core>,
metrics: &'static TreeDataFetcherMetrics,
health_updater: HealthUpdater,
Expand All @@ -107,6 +109,7 @@ impl TreeDataFetcher {
pub fn new(client: Box<DynClient<L2>>, pool: ConnectionPool<Core>) -> Self {
Self {
data_provider: Box::new(client.for_component("tree_data_fetcher")),
diamond_proxy_address: None,
pool,
metrics: &METRICS,
health_updater: ReactiveHealthCheck::new("tree_data_fetcher").1,
Expand All @@ -124,12 +127,18 @@ impl TreeDataFetcher {
eth_client: Box<DynClient<L1>>,
diamond_proxy_address: Address,
) -> anyhow::Result<Self> {
anyhow::ensure!(
self.diamond_proxy_address.is_none(),
"L1 tree data provider is already set up"
);

let l1_provider = L1DataProvider::new(
self.pool.clone(),
eth_client.for_component("tree_data_fetcher"),
diamond_proxy_address,
)?;
self.data_provider = Box::new(l1_provider.with_fallback(self.data_provider));
self.diamond_proxy_address = Some(diamond_proxy_address);
Ok(self)
}

Expand Down Expand Up @@ -179,7 +188,15 @@ impl TreeDataFetcher {
let root_hash_result = self.data_provider.batch_details(l1_batch_to_fetch).await?;
stage_latency.observe();
let root_hash = match root_hash_result {
Ok(hash) => hash,
Ok(output) => {
tracing::debug!(
"Received root hash for L1 batch #{l1_batch_to_fetch} from {source:?}: {root_hash:?}",
source = output.source,
root_hash = output.root_hash
);
self.metrics.root_hash_sources[&output.source].inc();
output.root_hash
}
Err(MissingData::Batch) => {
let err = anyhow::anyhow!(
"L1 batch #{l1_batch_to_fetch} is sealed locally, but is not present on the main node, \
Expand Down
82 changes: 54 additions & 28 deletions core/node/node_sync/src/tree_data_fetcher/provider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::fmt;

use anyhow::Context;
use async_trait::async_trait;
use vise::{EncodeLabelSet, EncodeLabelValue};
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_eth_client::EthInterface;
use zksync_types::{web3, Address, L1BatchNumber, H256, U256, U64};
Expand All @@ -12,13 +13,13 @@ use zksync_web3_decl::{
namespaces::ZksNamespaceClient,
};

use super::TreeDataFetcherResult;
use super::{metrics::METRICS, TreeDataFetcherResult};

#[cfg(test)]
mod tests;

#[derive(Debug, thiserror::Error)]
pub(crate) enum MissingData {
pub(super) enum MissingData {
/// The provider lacks a requested L1 batch.
#[error("no requested L1 batch")]
Batch,
Expand All @@ -27,24 +28,34 @@ pub(crate) enum MissingData {
RootHash,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)]
#[metrics(label = "source", rename_all = "snake_case")]
pub(super) enum TreeDataProviderSource {
L1CommitEvent,
BatchDetailsRpc,
}

#[derive(Debug)]
pub(super) struct TreeDataProviderOutput {
pub root_hash: H256,
pub source: TreeDataProviderSource,
}

pub(super) type TreeDataProviderResult =
TreeDataFetcherResult<Result<TreeDataProviderOutput, MissingData>>;

/// External provider of tree data, such as main node (via JSON-RPC).
#[async_trait]
pub(crate) trait TreeDataProvider: fmt::Debug + Send + Sync + 'static {
pub(super) trait TreeDataProvider: fmt::Debug + Send + Sync + 'static {
/// Fetches a state root hash for the L1 batch with the specified number.
///
/// It is guaranteed that this method will be called with monotonically increasing `number`s (although not necessarily sequential ones).
async fn batch_details(
&mut self,
number: L1BatchNumber,
) -> TreeDataFetcherResult<Result<H256, MissingData>>;
async fn batch_details(&mut self, number: L1BatchNumber) -> TreeDataProviderResult;
}

#[async_trait]
impl TreeDataProvider for Box<DynClient<L2>> {
async fn batch_details(
&mut self,
number: L1BatchNumber,
) -> TreeDataFetcherResult<Result<H256, MissingData>> {
async fn batch_details(&mut self, number: L1BatchNumber) -> TreeDataProviderResult {
let Some(batch_details) = self
.get_l1_batch_details(number)
.rpc_context("get_l1_batch_details")
Expand All @@ -53,7 +64,14 @@ impl TreeDataProvider for Box<DynClient<L2>> {
else {
return Ok(Err(MissingData::Batch));
};
Ok(batch_details.base.root_hash.ok_or(MissingData::RootHash))
Ok(batch_details
.base
.root_hash
.ok_or(MissingData::RootHash)
.map(|root_hash| TreeDataProviderOutput {
root_hash,
source: TreeDataProviderSource::BatchDetailsRpc,
}))
}
}

Expand Down Expand Up @@ -128,21 +146,22 @@ impl L1DataProvider {
async fn guess_l1_commit_block_number(
eth_client: &DynClient<L1>,
l1_batch_seal_timestamp: u64,
) -> EnrichedClientResult<U64> {
) -> EnrichedClientResult<(U64, usize)> {
let l1_batch_seal_timestamp = U256::from(l1_batch_seal_timestamp);
let (latest_number, latest_timestamp) =
Self::get_block(eth_client, web3::BlockNumber::Latest).await?;
if latest_timestamp < l1_batch_seal_timestamp {
return Ok(latest_number); // No better estimate at this point
return Ok((latest_number, 0)); // No better estimate at this point
}
let (earliest_number, earliest_timestamp) =
Self::get_block(eth_client, web3::BlockNumber::Earliest).await?;
if earliest_timestamp > l1_batch_seal_timestamp {
return Ok(earliest_number); // No better estimate at this point
return Ok((earliest_number, 0)); // No better estimate at this point
}

// At this point, we have `earliest_timestamp <= l1_batch_seal_timestamp <= latest_timestamp`.
// Binary-search the range until we're sort of accurate.
let mut steps = 0;
let mut left = earliest_number;
let mut right = latest_number;
while left + Self::L1_BLOCK_ACCURACY < right {
Expand All @@ -154,8 +173,9 @@ impl L1DataProvider {
} else {
right = middle;
}
steps += 1;
}
Ok(left)
Ok((left, steps))
}

/// Gets a block that should be present on L1.
Expand Down Expand Up @@ -186,10 +206,7 @@ impl L1DataProvider {

#[async_trait]
impl TreeDataProvider for L1DataProvider {
async fn batch_details(
&mut self,
number: L1BatchNumber,
) -> TreeDataFetcherResult<Result<H256, MissingData>> {
async fn batch_details(&mut self, number: L1BatchNumber) -> TreeDataProviderResult {
let l1_batch_seal_timestamp = self.l1_batch_seal_timestamp(number).await?;
let from_block = self.past_l1_batch.and_then(|info| {
assert!(
Expand All @@ -213,15 +230,18 @@ impl TreeDataProvider for L1DataProvider {
let from_block = match from_block {
Some(number) => number,
None => {
let approximate_block = Self::guess_l1_commit_block_number(
let (approximate_block, steps) = Self::guess_l1_commit_block_number(
self.eth_client.as_ref(),
l1_batch_seal_timestamp,
)
.await?;
tracing::debug!(
number = number.0,
"Guessed L1 block number for L1 batch #{number} commit: {approximate_block}"
"Guessed L1 block number for L1 batch #{number} commit in {steps} binary search steps: {approximate_block}"
);
METRICS
.l1_commit_block_number_binary_search_steps
.observe(steps);
// Subtract to account for imprecise L1 and L2 timestamps etc.
approximate_block.saturating_sub(Self::L1_BLOCK_ACCURACY)
}
Expand All @@ -245,14 +265,20 @@ impl TreeDataProvider for L1DataProvider {
match logs.as_slice() {
[] => Ok(Err(MissingData::Batch)),
[log] => {
let root_hash_topic = log.topics.get(2).copied().ok_or_else(|| {
let root_hash = log.topics.get(2).copied().ok_or_else(|| {
let err = "Bogus `BlockCommit` event, does not have the root hash topic";
EnrichedClientError::new(ClientError::Custom(err.into()), "batch_details")
.with_arg("filter", &filter)
.with_arg("log", &log)
})?;
// `unwrap()` is safe due to the filtering above
let l1_commit_block_number = log.block_number.unwrap();
let diff = l1_commit_block_number.saturating_sub(from_block).as_u64();
METRICS.l1_commit_block_number_from_diff.observe(diff);
tracing::debug!(
"`BlockCommit` event for L1 batch #{number} is at block #{l1_commit_block_number}, \
{diff} block(s) after the `from` block from the filter"
);

let l1_commit_block = self.eth_client.block(l1_commit_block_number.into()).await?;
let l1_commit_block = l1_commit_block.ok_or_else(|| {
Expand All @@ -265,7 +291,10 @@ impl TreeDataProvider for L1DataProvider {
l1_commit_block_number,
l1_commit_block_timestamp: l1_commit_block.timestamp,
});
Ok(Ok(root_hash_topic))
Ok(Ok(TreeDataProviderOutput {
root_hash,
source: TreeDataProviderSource::L1CommitEvent,
}))
}
_ => {
tracing::warn!("Non-unique `BlockCommit` event for L1 batch #{number} queried using {filter:?}: {logs:?}");
Expand All @@ -284,10 +313,7 @@ pub(super) struct CombinedDataProvider {

#[async_trait]
impl TreeDataProvider for CombinedDataProvider {
async fn batch_details(
&mut self,
number: L1BatchNumber,
) -> TreeDataFetcherResult<Result<H256, MissingData>> {
async fn batch_details(&mut self, number: L1BatchNumber) -> TreeDataProviderResult {
if let Some(l1) = &mut self.l1 {
match l1.batch_details(number).await {
Err(err) => {
Expand Down
19 changes: 12 additions & 7 deletions core/node/node_sync/src/tree_data_fetcher/provider/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ async fn guessing_l1_commit_block_number() {
let eth_client = eth_params.client();

for timestamp in [0, 100, 1_000, 5_000, 10_000, 100_000] {
let guessed_block_number =
let (guessed_block_number, step_count) =
L1DataProvider::guess_l1_commit_block_number(&eth_client, timestamp)
.await
.unwrap();
Expand All @@ -145,6 +145,8 @@ async fn guessing_l1_commit_block_number() {
guessed_block_number.abs_diff(timestamp.into()) <= L1DataProvider::L1_BLOCK_ACCURACY,
"timestamp={timestamp}, guessed={guessed_block_number}"
);
assert!(step_count > 0);
assert!(step_count < 100);
}
}

Expand All @@ -167,12 +169,13 @@ async fn test_using_l1_data_provider(l1_batch_timestamps: &[u64]) {
L1DataProvider::new(pool, Box::new(eth_params.client()), DIAMOND_PROXY_ADDRESS).unwrap();
for i in 0..l1_batch_timestamps.len() {
let number = L1BatchNumber(i as u32 + 1);
let root_hash = provider
let output = provider
.batch_details(number)
.await
.unwrap()
.expect("no root hash");
assert_eq!(root_hash, H256::repeat_byte(number.0 as u8));
assert_eq!(output.root_hash, H256::repeat_byte(number.0 as u8));
assert_matches!(output.source, TreeDataProviderSource::L1CommitEvent);

let past_l1_batch = provider.past_l1_batch.unwrap();
assert_eq!(past_l1_batch.number, number);
Expand Down Expand Up @@ -217,21 +220,23 @@ async fn combined_data_provider_errors() {
.with_fallback(Box::new(main_node_client));

// L1 batch #1 should be obtained from L1
let root_hash = provider
let output = provider
.batch_details(L1BatchNumber(1))
.await
.unwrap()
.expect("no root hash");
assert_eq!(root_hash, H256::repeat_byte(1));
assert_eq!(output.root_hash, H256::repeat_byte(1));
assert_matches!(output.source, TreeDataProviderSource::L1CommitEvent);
assert!(provider.l1.is_some());

// L1 batch #2 should be obtained from L2
let root_hash = provider
let output = provider
.batch_details(L1BatchNumber(2))
.await
.unwrap()
.expect("no root hash");
assert_eq!(root_hash, H256::repeat_byte(2));
assert_eq!(output.root_hash, H256::repeat_byte(2));
assert_matches!(output.source, TreeDataProviderSource::BatchDetailsRpc);
assert!(provider.l1.is_none());

// L1 batch #3 is not present anywhere.
Expand Down
Loading

0 comments on commit af39ca3

Please sign in to comment.