Skip to content

Commit

Permalink
feat(eth-watch): redesign to support multiple chains (#2867)
Browse files Browse the repository at this point in the history
This PR contains code from sync-layer-stable + I removed old upgrades
processor and updated unit-tests to use the new one

---------

Signed-off-by: tomg10 <lemures64@gmail.com>
  • Loading branch information
tomg10 authored Sep 26, 2024
1 parent e214dd0 commit aa72d84
Show file tree
Hide file tree
Showing 19 changed files with 665 additions and 348 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions core/lib/contracts/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ const DIAMOND_INIT_CONTRACT_FILE: (&str, &str) = (
const GOVERNANCE_CONTRACT_FILE: (&str, &str) = ("governance", "IGovernance.sol/IGovernance.json");
const CHAIN_ADMIN_CONTRACT_FILE: (&str, &str) = ("governance", "IChainAdmin.sol/IChainAdmin.json");
const GETTERS_FACET_CONTRACT_FILE: (&str, &str) = (
"state-transition/chain-deps/facets",
"Getters.sol/GettersFacet.json",
"state-transition/chain-interfaces",
"IGetters.sol/IGetters.json",
);

const MULTICALL3_CONTRACT_FILE: (&str, &str) = ("dev-contracts", "Multicall3.sol/Multicall3.json");
const VERIFIER_CONTRACT_FILE: (&str, &str) = ("state-transition", "Verifier.sol/Verifier.json");

const _IERC20_CONTRACT_FILE: &str =
"contracts/l1-contracts/artifacts/contracts/common/interfaces/IERC20.sol/IERC20.json";
const _FAIL_ON_RECEIVE_CONTRACT_FILE: &str =
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
DROP TABLE IF EXISTS processed_events;

DROP TYPE IF EXISTS event_type;

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TYPE event_type AS ENUM ('ProtocolUpgrades', 'PriorityTransactions');

CREATE TABLE processed_events
(
type event_type NOT NULL,
chain_id BIGINT NOT NULL,
next_block_to_process BIGINT NOT NULL,
PRIMARY KEY (chain_id, type)
)
154 changes: 154 additions & 0 deletions core/lib/dal/src/eth_watcher_dal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
use zksync_db_connection::{connection::Connection, error::DalResult, instrument::InstrumentExt};
use zksync_types::SLChainId;

use crate::Core;

pub struct EthWatcherDal<'a, 'c> {
pub(crate) storage: &'a mut Connection<'c, Core>,
}

#[derive(Debug, Copy, Clone, sqlx::Type)]
#[sqlx(type_name = "event_type")]
pub enum EventType {
ProtocolUpgrades,
PriorityTransactions,
}

impl EthWatcherDal<'_, '_> {
// Returns last set value of next_block_to_process for given event_type and chain_id.
// If the value was missing, initializes it with provided next_block_to_process value
pub async fn get_or_set_next_block_to_process(
&mut self,
event_type: EventType,
chain_id: SLChainId,
next_block_to_process: u64,
) -> DalResult<u64> {
let result = sqlx::query!(
r#"
SELECT
next_block_to_process
FROM
processed_events
WHERE
TYPE = $1
AND chain_id = $2
"#,
event_type as EventType,
chain_id.0 as i64
)
.instrument("get_or_set_next_block_to_process")
.with_arg("event_type", &event_type)
.with_arg("chain_id", &chain_id)
.fetch_optional(self.storage)
.await?;

if let Some(row) = result {
Ok(row.next_block_to_process as u64)
} else {
sqlx::query!(
r#"
INSERT INTO
processed_events (
TYPE,
chain_id,
next_block_to_process
)
VALUES
($1, $2, $3)
"#,
event_type as EventType,
chain_id.0 as i64,
next_block_to_process as i64
)
.instrument("get_or_set_next_block_to_process - insert")
.with_arg("event_type", &event_type)
.with_arg("chain_id", &chain_id)
.execute(self.storage)
.await?;

Ok(next_block_to_process)
}
}

pub async fn update_next_block_to_process(
&mut self,
event_type: EventType,
chain_id: SLChainId,
next_block_to_process: u64,
) -> DalResult<()> {
sqlx::query!(
r#"
UPDATE processed_events
SET
next_block_to_process = $3
WHERE
TYPE = $1
AND chain_id = $2
"#,
event_type as EventType,
chain_id.0 as i64,
next_block_to_process as i64
)
.instrument("update_next_block_to_process")
.with_arg("event_type", &event_type)
.with_arg("chain_id", &chain_id)
.execute(self.storage)
.await?;
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{ConnectionPool, Core, CoreDal};

#[tokio::test]
async fn test_get_or_set_next_block_to_process_with_different_event_types() {
let pool = ConnectionPool::<Core>::test_pool().await;
let mut conn = pool.connection().await.unwrap();
let mut dal = conn.processed_events_dal();

// Test with ProtocolUpgrades
let next_block = dal
.get_or_set_next_block_to_process(EventType::ProtocolUpgrades, SLChainId(1), 100)
.await
.expect("Failed to get or set next block to process");
assert_eq!(next_block, 100);

// Test with PriorityTransactions
let next_block = dal
.get_or_set_next_block_to_process(EventType::PriorityTransactions, SLChainId(1), 200)
.await
.expect("Failed to get or set next block to process");
assert_eq!(next_block, 200);

// Test with PriorityTransactions
let next_block = dal
.get_or_set_next_block_to_process(EventType::PriorityTransactions, SLChainId(2), 300)
.await
.expect("Failed to get or set next block to process");
assert_eq!(next_block, 300);

// Verify that the initial block is not updated for ProtocolUpgrades
let next_block = dal
.get_or_set_next_block_to_process(EventType::ProtocolUpgrades, SLChainId(1), 150)
.await
.expect("Failed to get or set next block to process");
assert_eq!(next_block, 100);

// Verify that the initial block is not updated for PriorityTransactions
let next_block = dal
.get_or_set_next_block_to_process(EventType::PriorityTransactions, SLChainId(1), 250)
.await
.expect("Failed to get or set next block to process");
assert_eq!(next_block, 200);

// Verify that the initial block is not updated for PriorityTransactions
let next_block = dal
.get_or_set_next_block_to_process(EventType::PriorityTransactions, SLChainId(2), 350)
.await
.expect("Failed to get or set next block to process");
assert_eq!(next_block, 300);
}
}
12 changes: 10 additions & 2 deletions core/lib/dal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ use crate::{
base_token_dal::BaseTokenDal, blocks_dal::BlocksDal, blocks_web3_dal::BlocksWeb3Dal,
consensus_dal::ConsensusDal, contract_verification_dal::ContractVerificationDal,
data_availability_dal::DataAvailabilityDal, eth_sender_dal::EthSenderDal,
events_dal::EventsDal, events_web3_dal::EventsWeb3Dal, factory_deps_dal::FactoryDepsDal,
proof_generation_dal::ProofGenerationDal, protocol_versions_dal::ProtocolVersionsDal,
eth_watcher_dal::EthWatcherDal, events_dal::EventsDal, events_web3_dal::EventsWeb3Dal,
factory_deps_dal::FactoryDepsDal, proof_generation_dal::ProofGenerationDal,
protocol_versions_dal::ProtocolVersionsDal,
protocol_versions_web3_dal::ProtocolVersionsWeb3Dal, pruning_dal::PruningDal,
snapshot_recovery_dal::SnapshotRecoveryDal, snapshots_creator_dal::SnapshotsCreatorDal,
snapshots_dal::SnapshotsDal, storage_logs_dal::StorageLogsDal,
Expand All @@ -35,6 +36,7 @@ pub mod consensus_dal;
pub mod contract_verification_dal;
mod data_availability_dal;
pub mod eth_sender_dal;
pub mod eth_watcher_dal;
pub mod events_dal;
pub mod events_web3_dal;
pub mod factory_deps_dal;
Expand Down Expand Up @@ -132,6 +134,8 @@ where
fn vm_runner_dal(&mut self) -> VmRunnerDal<'_, 'a>;

fn base_token_dal(&mut self) -> BaseTokenDal<'_, 'a>;

fn processed_events_dal(&mut self) -> EthWatcherDal<'_, 'a>;
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -258,4 +262,8 @@ impl<'a> CoreDal<'a> for Connection<'a, Core> {
fn base_token_dal(&mut self) -> BaseTokenDal<'_, 'a> {
BaseTokenDal { storage: self }
}

fn processed_events_dal(&mut self) -> EthWatcherDal<'_, 'a> {
EthWatcherDal { storage: self }
}
}
1 change: 1 addition & 0 deletions core/node/eth_watch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ async-recursion.workspace = true

[dev-dependencies]
zksync_concurrency.workspace = true
test-log.workspace = true
Loading

0 comments on commit aa72d84

Please sign in to comment.