Skip to content

Commit

Permalink
fix(eth_watch): fix get_events_inner (#2882)
Browse files Browse the repository at this point in the history
## What ❔

- `get_events_inner` should recursively call itself
- `get_events_inner` should allow passing `None` as topics and/or
addresses

## Why ❔

bug fix

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
perekopskiy authored Sep 16, 2024
1 parent b894039 commit c957dd8
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 16 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ categories = ["cryptography"]
anyhow = "1"
assert_matches = "1.5"
async-trait = "0.1"
async-recursion = "1"
axum = "0.7.5"
backon = "0.4.4"
bigdecimal = "0.4.5"
Expand Down
1 change: 1 addition & 0 deletions core/node/eth_watch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ anyhow.workspace = true
thiserror.workspace = true
async-trait.workspace = true
tracing.workspace = true
async-recursion.workspace = true

[dev-dependencies]
zksync_concurrency.workspace = true
51 changes: 35 additions & 16 deletions core/node/eth_watch/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,21 +100,24 @@ impl EthHttpQueryClient {
.collect()
}

#[async_recursion::async_recursion]
async fn get_events_inner(
&self,
from: BlockNumber,
to: BlockNumber,
topics1: Vec<H256>,
topics2: Vec<H256>,
addresses: Vec<Address>,
topics1: Option<Vec<H256>>,
topics2: Option<Vec<H256>>,
addresses: Option<Vec<Address>>,
retries_left: usize,
) -> EnrichedClientResult<Vec<Log>> {
let filter = FilterBuilder::default()
let mut builder = FilterBuilder::default()
.from_block(from)
.to_block(to)
.topics(Some(topics1), Some(topics2), None, None)
.address(addresses)
.build();
.topics(topics1.clone(), topics2.clone(), None, None);
if let Some(addresses) = addresses.clone() {
builder = builder.address(addresses);
}
let filter = builder.build();
let mut result = self.client.logs(&filter).await;

// This code is compatible with both Infura and Alchemy API providers.
Expand Down Expand Up @@ -168,17 +171,33 @@ impl EthHttpQueryClient {

tracing::warn!("Splitting block range in half: {from:?} - {mid:?} - {to:?}");
let mut first_half = self
.get_events(from, BlockNumber::Number(mid), RETRY_LIMIT)
.get_events_inner(
from,
BlockNumber::Number(mid),
topics1.clone(),
topics2.clone(),
addresses.clone(),
RETRY_LIMIT,
)
.await?;
let mut second_half = self
.get_events(BlockNumber::Number(mid + 1u64), to, RETRY_LIMIT)
.get_events_inner(
BlockNumber::Number(mid + 1u64),
to,
topics1,
topics2,
addresses,
RETRY_LIMIT,
)
.await?;

first_half.append(&mut second_half);
result = Ok(first_half);
} else if should_retry(err_code, err_message) && retries_left > 0 {
tracing::warn!("Retrying. Retries left: {retries_left}");
result = self.get_events(from, to, retries_left - 1).await;
result = self
.get_events_inner(from, to, topics1, topics2, addresses, retries_left - 1)
.await;
}
}

Expand Down Expand Up @@ -216,9 +235,9 @@ impl EthClient for EthHttpQueryClient {
.get_events_inner(
from_block.into(),
to_block.into(),
vec![self.new_upgrade_cut_data_signature],
vec![packed_version],
vec![state_transition_manager_address],
Some(vec![self.new_upgrade_cut_data_signature]),
Some(vec![packed_version]),
Some(vec![state_transition_manager_address]),
RETRY_LIMIT,
)
.await?;
Expand All @@ -235,9 +254,9 @@ impl EthClient for EthHttpQueryClient {
self.get_events_inner(
from,
to,
self.topics.clone(),
Vec::new(),
self.get_default_address_list(),
Some(self.topics.clone()),
None,
Some(self.get_default_address_list()),
retries_left,
)
.await
Expand Down

0 comments on commit c957dd8

Please sign in to comment.