From 7be56e576fbb4d404319a6c099dcf1d5cb0b476a Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Mon, 26 Feb 2024 16:59:02 +0100 Subject: [PATCH] feat(aggregator): support two operator addresses in sender and aggregator (#1201) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Adds optional ability to send commit transactions from a separate operator address on L1. ## Why ❔ ## Checklist - [ ] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [ ] Tests for the changes have been added / updated. - [ ] Documentation comments have been added / updated. - [ ] Code has been formatted via `zk fmt` and `zk lint`. - [ ] Spellcheck has been run via `zk spellcheck`. - [ ] Linkcheck has been run via `zk linkcheck`. --------- Co-authored-by: koloz --- core/lib/config/src/configs/eth_sender.rs | 7 + ...16065fcad42c6b621efb3a135a16b477dcfd9.json | 8 +- ...40e7eb9f598999c28015a504605f88bf84b33.json | 8 +- ...3ce80f9b2b27758651ccfc09df61a4ae8a363.json | 8 +- ...bd499dc63ba6a4d5bcde2031315593431809.json} | 15 +- ...b8ba0b771cb45573aca81db254f6bcfc17c77.json | 20 -- ...61f3a71f5c6737e08745a37b41b22f4dfd030.json | 202 ------------------ ...21154502_add-from-addr-to-eth-txs.down.sql | 1 + ...0221154502_add-from-addr-to-eth-txs.up.sql | 1 + core/lib/dal/src/blocks_dal.rs | 36 +++- core/lib/dal/src/eth_sender_dal.rs | 43 +++- core/lib/dal/src/models/storage_eth_tx.rs | 4 + .../eth_client/src/clients/http/signing.rs | 35 ++- core/lib/types/src/eth_sender.rs | 3 + .../zksync_core/src/eth_sender/aggregator.rs | 31 ++- .../src/eth_sender/eth_tx_aggregator.rs | 48 ++++- .../src/eth_sender/eth_tx_manager.rs | 89 +++++++- core/lib/zksync_core/src/eth_sender/tests.rs | 37 +++- core/lib/zksync_core/src/lib.rs | 11 +- 19 files changed, 337 insertions(+), 270 deletions(-) rename core/lib/dal/.sqlx/{query-66e012ce974c38d9fe84cfc7eb28927f9e976319a305e0928ff366d535a97104.json => query-8e89c34f8693d890a38654b0daf4bd499dc63ba6a4d5bcde2031315593431809.json} (79%) delete mode 100644 core/lib/dal/.sqlx/query-90f7657bae05c4bad6902c6bfb1b8ba0b771cb45573aca81db254f6bcfc17c77.json delete mode 100644 core/lib/dal/.sqlx/query-bab1857df66bbef57705ae7796161f3a71f5c6737e08745a37b41b22f4dfd030.json create mode 100644 core/lib/dal/migrations/20240221154502_add-from-addr-to-eth-txs.down.sql create mode 100644 core/lib/dal/migrations/20240221154502_add-from-addr-to-eth-txs.up.sql diff --git a/core/lib/config/src/configs/eth_sender.rs b/core/lib/config/src/configs/eth_sender.rs index cd44daed17f..a0ce9786279 100644 --- a/core/lib/config/src/configs/eth_sender.rs +++ b/core/lib/config/src/configs/eth_sender.rs @@ -115,6 +115,13 @@ impl SenderConfig { .ok() .map(|pk| pk.parse().unwrap()) } + + // Don't load blobs private key, if it's not required + pub fn private_key_blobs(&self) -> Option { + std::env::var("ETH_SENDER_SENDER_OPERATOR_BLOBS_PRIVATE_KEY") + .ok() + .map(|pk| pk.parse().unwrap()) + } } #[derive(Debug, Deserialize, Copy, Clone, PartialEq)] diff --git a/core/lib/dal/.sqlx/query-23be43bf705d679ca751c89353716065fcad42c6b621efb3a135a16b477dcfd9.json b/core/lib/dal/.sqlx/query-23be43bf705d679ca751c89353716065fcad42c6b621efb3a135a16b477dcfd9.json index 8c63a924c0a..95f55cec3d3 100644 --- a/core/lib/dal/.sqlx/query-23be43bf705d679ca751c89353716065fcad42c6b621efb3a135a16b477dcfd9.json +++ b/core/lib/dal/.sqlx/query-23be43bf705d679ca751c89353716065fcad42c6b621efb3a135a16b477dcfd9.json @@ -62,6 +62,11 @@ "ordinal": 11, "name": "predicted_gas_cost", "type_info": "Int8" + }, + { + "ordinal": 12, + "name": "from_addr", + "type_info": "Bytea" } ], "parameters": { @@ -79,7 +84,8 @@ false, true, true, - false + false, + true ] }, "hash": "23be43bf705d679ca751c89353716065fcad42c6b621efb3a135a16b477dcfd9" diff --git a/core/lib/dal/.sqlx/query-5659480e5d79dab3399e35539b240e7eb9f598999c28015a504605f88bf84b33.json b/core/lib/dal/.sqlx/query-5659480e5d79dab3399e35539b240e7eb9f598999c28015a504605f88bf84b33.json index 399b0d02845..0c56db64329 100644 --- a/core/lib/dal/.sqlx/query-5659480e5d79dab3399e35539b240e7eb9f598999c28015a504605f88bf84b33.json +++ b/core/lib/dal/.sqlx/query-5659480e5d79dab3399e35539b240e7eb9f598999c28015a504605f88bf84b33.json @@ -62,6 +62,11 @@ "ordinal": 11, "name": "predicted_gas_cost", "type_info": "Int8" + }, + { + "ordinal": 12, + "name": "from_addr", + "type_info": "Bytea" } ], "parameters": { @@ -81,7 +86,8 @@ false, true, true, - false + false, + true ] }, "hash": "5659480e5d79dab3399e35539b240e7eb9f598999c28015a504605f88bf84b33" diff --git a/core/lib/dal/.sqlx/query-6692ff6c0fbb2fc94f5cd2837a43ce80f9b2b27758651ccfc09df61a4ae8a363.json b/core/lib/dal/.sqlx/query-6692ff6c0fbb2fc94f5cd2837a43ce80f9b2b27758651ccfc09df61a4ae8a363.json index 586cace7617..6730bc909c2 100644 --- a/core/lib/dal/.sqlx/query-6692ff6c0fbb2fc94f5cd2837a43ce80f9b2b27758651ccfc09df61a4ae8a363.json +++ b/core/lib/dal/.sqlx/query-6692ff6c0fbb2fc94f5cd2837a43ce80f9b2b27758651ccfc09df61a4ae8a363.json @@ -62,6 +62,11 @@ "ordinal": 11, "name": "predicted_gas_cost", "type_info": "Int8" + }, + { + "ordinal": 12, + "name": "from_addr", + "type_info": "Bytea" } ], "parameters": { @@ -81,7 +86,8 @@ false, true, true, - false + false, + true ] }, "hash": "6692ff6c0fbb2fc94f5cd2837a43ce80f9b2b27758651ccfc09df61a4ae8a363" diff --git a/core/lib/dal/.sqlx/query-66e012ce974c38d9fe84cfc7eb28927f9e976319a305e0928ff366d535a97104.json b/core/lib/dal/.sqlx/query-8e89c34f8693d890a38654b0daf4bd499dc63ba6a4d5bcde2031315593431809.json similarity index 79% rename from core/lib/dal/.sqlx/query-66e012ce974c38d9fe84cfc7eb28927f9e976319a305e0928ff366d535a97104.json rename to core/lib/dal/.sqlx/query-8e89c34f8693d890a38654b0daf4bd499dc63ba6a4d5bcde2031315593431809.json index e07fbfbd70b..ddee50fb086 100644 --- a/core/lib/dal/.sqlx/query-66e012ce974c38d9fe84cfc7eb28927f9e976319a305e0928ff366d535a97104.json +++ b/core/lib/dal/.sqlx/query-8e89c34f8693d890a38654b0daf4bd499dc63ba6a4d5bcde2031315593431809.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n INSERT INTO\n eth_txs (\n raw_tx,\n nonce,\n tx_type,\n contract_address,\n predicted_gas_cost,\n created_at,\n updated_at\n )\n VALUES\n ($1, $2, $3, $4, $5, NOW(), NOW())\n RETURNING\n *\n ", + "query": "\n INSERT INTO\n eth_txs (\n raw_tx,\n nonce,\n tx_type,\n contract_address,\n predicted_gas_cost,\n created_at,\n updated_at,\n from_addr\n )\n VALUES\n ($1, $2, $3, $4, $5, NOW(), NOW(), $6)\n RETURNING\n *\n ", "describe": { "columns": [ { @@ -62,6 +62,11 @@ "ordinal": 11, "name": "predicted_gas_cost", "type_info": "Int8" + }, + { + "ordinal": 12, + "name": "from_addr", + "type_info": "Bytea" } ], "parameters": { @@ -70,7 +75,8 @@ "Int8", "Text", "Text", - "Int8" + "Int8", + "Bytea" ] }, "nullable": [ @@ -85,8 +91,9 @@ false, true, true, - false + false, + true ] }, - "hash": "66e012ce974c38d9fe84cfc7eb28927f9e976319a305e0928ff366d535a97104" + "hash": "8e89c34f8693d890a38654b0daf4bd499dc63ba6a4d5bcde2031315593431809" } diff --git a/core/lib/dal/.sqlx/query-90f7657bae05c4bad6902c6bfb1b8ba0b771cb45573aca81db254f6bcfc17c77.json b/core/lib/dal/.sqlx/query-90f7657bae05c4bad6902c6bfb1b8ba0b771cb45573aca81db254f6bcfc17c77.json deleted file mode 100644 index dfd7cd9c555..00000000000 --- a/core/lib/dal/.sqlx/query-90f7657bae05c4bad6902c6bfb1b8ba0b771cb45573aca81db254f6bcfc17c77.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n nonce\n FROM\n eth_txs\n ORDER BY\n id DESC\n LIMIT\n 1\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "nonce", - "type_info": "Int8" - } - ], - "parameters": { - "Left": [] - }, - "nullable": [ - false - ] - }, - "hash": "90f7657bae05c4bad6902c6bfb1b8ba0b771cb45573aca81db254f6bcfc17c77" -} diff --git a/core/lib/dal/.sqlx/query-bab1857df66bbef57705ae7796161f3a71f5c6737e08745a37b41b22f4dfd030.json b/core/lib/dal/.sqlx/query-bab1857df66bbef57705ae7796161f3a71f5c6737e08745a37b41b22f4dfd030.json deleted file mode 100644 index 4510b6084d2..00000000000 --- a/core/lib/dal/.sqlx/query-bab1857df66bbef57705ae7796161f3a71f5c6737e08745a37b41b22f4dfd030.json +++ /dev/null @@ -1,202 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n number,\n timestamp,\n l1_tx_count,\n l2_tx_count,\n bloom,\n priority_ops_onchain_data,\n hash,\n commitment,\n eth_prove_tx_id,\n eth_commit_tx_id,\n eth_execute_tx_id,\n merkle_root_hash,\n l2_to_l1_logs,\n l2_to_l1_messages,\n used_contract_hashes,\n compressed_initial_writes,\n compressed_repeated_writes,\n l2_l1_merkle_root,\n rollup_last_leaf_index,\n zkporter_is_available,\n bootloader_code_hash,\n default_aa_code_hash,\n aux_data_hash,\n pass_through_data_hash,\n meta_parameters_hash,\n protocol_version,\n compressed_state_diffs,\n system_logs,\n events_queue_commitment,\n bootloader_initial_content_commitment,\n pubdata_input\n FROM\n l1_batches\n LEFT JOIN commitments ON commitments.l1_batch_number = l1_batches.number\n WHERE\n eth_commit_tx_id IS NOT NULL\n AND eth_prove_tx_id IS NULL\n ORDER BY\n number\n LIMIT\n $1\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "number", - "type_info": "Int8" - }, - { - "ordinal": 1, - "name": "timestamp", - "type_info": "Int8" - }, - { - "ordinal": 2, - "name": "l1_tx_count", - "type_info": "Int4" - }, - { - "ordinal": 3, - "name": "l2_tx_count", - "type_info": "Int4" - }, - { - "ordinal": 4, - "name": "bloom", - "type_info": "Bytea" - }, - { - "ordinal": 5, - "name": "priority_ops_onchain_data", - "type_info": "ByteaArray" - }, - { - "ordinal": 6, - "name": "hash", - "type_info": "Bytea" - }, - { - "ordinal": 7, - "name": "commitment", - "type_info": "Bytea" - }, - { - "ordinal": 8, - "name": "eth_prove_tx_id", - "type_info": "Int4" - }, - { - "ordinal": 9, - "name": "eth_commit_tx_id", - "type_info": "Int4" - }, - { - "ordinal": 10, - "name": "eth_execute_tx_id", - "type_info": "Int4" - }, - { - "ordinal": 11, - "name": "merkle_root_hash", - "type_info": "Bytea" - }, - { - "ordinal": 12, - "name": "l2_to_l1_logs", - "type_info": "ByteaArray" - }, - { - "ordinal": 13, - "name": "l2_to_l1_messages", - "type_info": "ByteaArray" - }, - { - "ordinal": 14, - "name": "used_contract_hashes", - "type_info": "Jsonb" - }, - { - "ordinal": 15, - "name": "compressed_initial_writes", - "type_info": "Bytea" - }, - { - "ordinal": 16, - "name": "compressed_repeated_writes", - "type_info": "Bytea" - }, - { - "ordinal": 17, - "name": "l2_l1_merkle_root", - "type_info": "Bytea" - }, - { - "ordinal": 18, - "name": "rollup_last_leaf_index", - "type_info": "Int8" - }, - { - "ordinal": 19, - "name": "zkporter_is_available", - "type_info": "Bool" - }, - { - "ordinal": 20, - "name": "bootloader_code_hash", - "type_info": "Bytea" - }, - { - "ordinal": 21, - "name": "default_aa_code_hash", - "type_info": "Bytea" - }, - { - "ordinal": 22, - "name": "aux_data_hash", - "type_info": "Bytea" - }, - { - "ordinal": 23, - "name": "pass_through_data_hash", - "type_info": "Bytea" - }, - { - "ordinal": 24, - "name": "meta_parameters_hash", - "type_info": "Bytea" - }, - { - "ordinal": 25, - "name": "protocol_version", - "type_info": "Int4" - }, - { - "ordinal": 26, - "name": "compressed_state_diffs", - "type_info": "Bytea" - }, - { - "ordinal": 27, - "name": "system_logs", - "type_info": "ByteaArray" - }, - { - "ordinal": 28, - "name": "events_queue_commitment", - "type_info": "Bytea" - }, - { - "ordinal": 29, - "name": "bootloader_initial_content_commitment", - "type_info": "Bytea" - }, - { - "ordinal": 30, - "name": "pubdata_input", - "type_info": "Bytea" - } - ], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [ - false, - false, - false, - false, - false, - false, - true, - true, - true, - true, - true, - true, - false, - false, - false, - true, - true, - true, - true, - true, - true, - true, - true, - true, - true, - true, - true, - false, - true, - true, - true - ] - }, - "hash": "bab1857df66bbef57705ae7796161f3a71f5c6737e08745a37b41b22f4dfd030" -} diff --git a/core/lib/dal/migrations/20240221154502_add-from-addr-to-eth-txs.down.sql b/core/lib/dal/migrations/20240221154502_add-from-addr-to-eth-txs.down.sql new file mode 100644 index 00000000000..7d4548298d2 --- /dev/null +++ b/core/lib/dal/migrations/20240221154502_add-from-addr-to-eth-txs.down.sql @@ -0,0 +1 @@ +ALTER TABLE eth_txs DROP COLUMN from_addr; diff --git a/core/lib/dal/migrations/20240221154502_add-from-addr-to-eth-txs.up.sql b/core/lib/dal/migrations/20240221154502_add-from-addr-to-eth-txs.up.sql new file mode 100644 index 00000000000..fa64812107b --- /dev/null +++ b/core/lib/dal/migrations/20240221154502_add-from-addr-to-eth-txs.up.sql @@ -0,0 +1 @@ +ALTER TABLE eth_txs ADD COLUMN from_addr BYTEA; diff --git a/core/lib/dal/src/blocks_dal.rs b/core/lib/dal/src/blocks_dal.rs index 4d1029e6492..79dc66c26c8 100644 --- a/core/lib/dal/src/blocks_dal.rs +++ b/core/lib/dal/src/blocks_dal.rs @@ -1090,12 +1090,25 @@ impl BlocksDal<'_, '_> { } /// This method returns batches that are confirmed on L1. That is, it doesn't wait for the proofs to be generated. + /// + /// # Params: + /// * `commited_tx_confirmed`: whether to look for ready proofs only for txs for which + /// respective commit transactions have been confirmed by the network. pub async fn get_ready_for_dummy_proof_l1_batches( &mut self, + only_commited_batches: bool, limit: usize, ) -> anyhow::Result> { - let raw_batches = sqlx::query_as!( - StorageL1Batch, + let (confirmed_at_not_null, join_on_eth_tx_history) = if only_commited_batches { + ( + "AND confirmed_at IS NOT NULL", + "JOIN eth_txs_history ON eth_commit_tx_id = eth_tx_id", + ) + } else { + ("", "") + }; + + let query = format!( r#" SELECT number, @@ -1132,20 +1145,27 @@ impl BlocksDal<'_, '_> { FROM l1_batches LEFT JOIN commitments ON commitments.l1_batch_number = l1_batches.number + {join_on_eth_tx_history} WHERE eth_commit_tx_id IS NOT NULL AND eth_prove_tx_id IS NULL + {confirmed_at_not_null} ORDER BY number LIMIT $1 "#, - limit as i32 - ) - .instrument("get_ready_for_dummy_proof_l1_batches") - .with_arg("limit", &limit) - .fetch_all(self.storage) - .await?; + ); + + let mut query = sqlx::query_as(&query); + + query = query.bind(limit as i32); + + let raw_batches: Vec = query + .instrument("get_ready_for_dummy_proof_l1_batches") + .with_arg("limit", &limit) + .fetch_all(self.storage) + .await?; self.map_l1_batches(raw_batches) .await diff --git a/core/lib/dal/src/eth_sender_dal.rs b/core/lib/dal/src/eth_sender_dal.rs index 5e490e6590a..8f696a6b8b6 100644 --- a/core/lib/dal/src/eth_sender_dal.rs +++ b/core/lib/dal/src/eth_sender_dal.rs @@ -174,6 +174,7 @@ impl EthSenderDal<'_, '_> { tx_type: AggregatedActionType, contract_address: Address, predicted_gas_cost: u32, + from_address: Option
, ) -> sqlx::Result { let address = format!("{:#x}", contract_address); let eth_tx = sqlx::query_as!( @@ -187,10 +188,11 @@ impl EthSenderDal<'_, '_> { contract_address, predicted_gas_cost, created_at, - updated_at + updated_at, + from_addr ) VALUES - ($1, $2, $3, $4, $5, NOW(), NOW()) + ($1, $2, $3, $4, $5, NOW(), NOW(), $6) RETURNING * "#, @@ -198,7 +200,8 @@ impl EthSenderDal<'_, '_> { nonce as i64, tx_type.to_string(), address, - predicted_gas_cost as i64 + predicted_gas_cost as i64, + from_address.map(|a| a.0.to_vec()), ) .fetch_one(self.storage.conn()) .await?; @@ -510,22 +513,42 @@ impl EthSenderDal<'_, '_> { Ok(history_item.map(|tx| tx.into())) } - pub async fn get_next_nonce(&mut self) -> sqlx::Result> { - let row = sqlx::query!( + /// Returns the next nonce for the operator account + /// + /// # Params + /// * `from_address`: an optional value indicating that nonce must be returned for a custom + /// operator address which is not the "main" one. For example, a separate custom operator + /// sends the blob transactions. For such a case this should be `Some`. For requesting the + /// none of the main operator this parameter should be set to `None`. + pub async fn get_next_nonce( + &mut self, + from_address: Option
, + ) -> sqlx::Result> { + let optional_where_clause = from_address + .map(|a| format!("WHERE from_addr = decode('{}', 'hex')", hex::encode(a.0))) + .unwrap_or("WHERE from_addr IS NULL".to_owned()); + + let query = format!( r#" SELECT nonce FROM eth_txs + {optional_where_clause} ORDER BY id DESC LIMIT 1 - "# - ) - .fetch_optional(self.storage.conn()) - .await?; - Ok(row.map(|row| row.nonce as u64 + 1)) + "#, + ); + let query = sqlx::query(&query); + + let nonce: Option = query + .fetch_optional(self.storage.conn()) + .await? + .map(|row| row.get("nonce")); + + Ok(nonce.map(|n| n as u64 + 1)) } pub async fn mark_failed_transaction(&mut self, eth_tx_id: u32) -> sqlx::Result<()> { diff --git a/core/lib/dal/src/models/storage_eth_tx.rs b/core/lib/dal/src/models/storage_eth_tx.rs index 9026be8326d..bba0af070b0 100644 --- a/core/lib/dal/src/models/storage_eth_tx.rs +++ b/core/lib/dal/src/models/storage_eth_tx.rs @@ -22,6 +22,9 @@ pub struct StorageEthTx { pub updated_at: NaiveDateTime, // TODO (SMA-1614): remove the field pub sent_at_block: Option, + // If this field is `Some` this means that this transaction was sent by a custom operator + // such as blob sender operator. + pub from_addr: Option>, } #[derive(Debug, Default)] @@ -67,6 +70,7 @@ impl From for EthTx { tx_type: AggregatedActionType::from_str(&tx.tx_type).expect("Wrong agg type"), created_at_timestamp: tx.created_at.timestamp() as u64, predicted_gas_cost: tx.predicted_gas_cost as u64, + from_addr: tx.from_addr.map(|f| Address::from_slice(&f)), } } } diff --git a/core/lib/eth_client/src/clients/http/signing.rs b/core/lib/eth_client/src/clients/http/signing.rs index 6e3dd3d223d..a303a7fee42 100644 --- a/core/lib/eth_client/src/clients/http/signing.rs +++ b/core/lib/eth_client/src/clients/http/signing.rs @@ -35,11 +35,44 @@ impl PKSigningClient { ) -> Self { // Gather required data from the config. // It's done explicitly to simplify getting rid of this function later. - let main_node_url = ð_client.web3_url; let operator_private_key = eth_sender .sender .private_key() .expect("Operator private key is required for signing client"); + + Self::from_config_inner( + eth_sender, + contracts_config, + eth_client, + operator_private_key, + ) + } + + /// Create an signing client for the blobs account + pub fn from_config_blobs( + eth_sender: ÐSenderConfig, + contracts_config: &ContractsConfig, + eth_client: ÐClientConfig, + ) -> Option { + // Gather required data from the config. + // It's done explicitly to simplify getting rid of this function later. + let operator_private_key = eth_sender.sender.private_key_blobs()?; + + Some(Self::from_config_inner( + eth_sender, + contracts_config, + eth_client, + operator_private_key, + )) + } + + fn from_config_inner( + eth_sender: ÐSenderConfig, + contracts_config: &ContractsConfig, + eth_client: ÐClientConfig, + operator_private_key: H256, + ) -> Self { + let main_node_url = ð_client.web3_url; let diamond_proxy_addr = contracts_config.diamond_proxy_addr; let default_priority_fee_per_gas = eth_sender.gas_adjuster.default_priority_fee_per_gas; let l1_chain_id = eth_client.chain_id; diff --git a/core/lib/types/src/eth_sender.rs b/core/lib/types/src/eth_sender.rs index 7778d825208..b717c0c538a 100644 --- a/core/lib/types/src/eth_sender.rs +++ b/core/lib/types/src/eth_sender.rs @@ -9,6 +9,9 @@ pub struct EthTx { pub tx_type: AggregatedActionType, pub created_at_timestamp: u64, pub predicted_gas_cost: u64, + /// If this field is `Some` then it contains address of a custom operator that has sent + /// this transaction. If it is set to `None` this transaction was sent by the main operator. + pub from_addr: Option
, } impl std::fmt::Debug for EthTx { diff --git a/core/lib/zksync_core/src/eth_sender/aggregator.rs b/core/lib/zksync_core/src/eth_sender/aggregator.rs index fe887db6469..7db2037f5bc 100644 --- a/core/lib/zksync_core/src/eth_sender/aggregator.rs +++ b/core/lib/zksync_core/src/eth_sender/aggregator.rs @@ -29,10 +29,20 @@ pub struct Aggregator { execute_criteria: Vec>, config: SenderConfig, blob_store: Arc, + /// If we are operating in 4844 mode we need to wait for commit transaction + /// to get included before sending the respective prove and execute transactions. + /// In non-4844 mode of operation we operate with the single address and this + /// means no wait is needed: nonces will still provide the correct ordering of + /// transactions. + operate_4844_mode: bool, } impl Aggregator { - pub fn new(config: SenderConfig, blob_store: Arc) -> Self { + pub fn new( + config: SenderConfig, + blob_store: Arc, + operate_4844_mode: bool, + ) -> Self { Self { commit_criteria: vec![ Box::from(NumberCriterion { @@ -88,6 +98,7 @@ impl Aggregator { ], config, blob_store, + operate_4844_mode, } } @@ -231,6 +242,7 @@ impl Aggregator { l1_verifier_config: L1VerifierConfig, proof_loading_mode: &ProofLoadingMode, blob_store: &dyn ObjectStore, + is_4844_mode: bool, ) -> Option { let previous_proven_batch_number = storage .blocks_dal() @@ -240,12 +252,23 @@ impl Aggregator { let batch_to_prove = previous_proven_batch_number + 1; // Return `None` if batch is not committed yet. - storage + let commit_tx_id = storage .blocks_dal() .get_eth_commit_tx_id(batch_to_prove) .await .unwrap()?; + if is_4844_mode + && storage + .eth_sender_dal() + .get_confirmed_tx_hash_by_eth_tx_id(commit_tx_id as u32) + .await + .unwrap() + .is_none() + { + return None; + } + if let Some(version_id) = storage .blocks_dal() .get_batch_protocol_version_id(batch_to_prove) @@ -350,6 +373,7 @@ impl Aggregator { l1_verifier_config, &self.config.proof_loading_mode, &*self.blob_store, + self.operate_4844_mode, ) .await } @@ -357,7 +381,7 @@ impl Aggregator { ProofSendingMode::SkipEveryProof => { let ready_for_proof_l1_batches = storage .blocks_dal() - .get_ready_for_dummy_proof_l1_batches(limit) + .get_ready_for_dummy_proof_l1_batches(self.operate_4844_mode, limit) .await .unwrap(); self.prepare_dummy_proof_operation( @@ -375,6 +399,7 @@ impl Aggregator { l1_verifier_config, &self.config.proof_loading_mode, &*self.blob_store, + self.operate_4844_mode, ) .await { diff --git a/core/lib/zksync_core/src/eth_sender/eth_tx_aggregator.rs b/core/lib/zksync_core/src/eth_sender/eth_tx_aggregator.rs index 13045119875..8d45d28d704 100644 --- a/core/lib/zksync_core/src/eth_sender/eth_tx_aggregator.rs +++ b/core/lib/zksync_core/src/eth_sender/eth_tx_aggregator.rs @@ -10,12 +10,13 @@ use zksync_l1_contract_interface::{ Detokenize, Tokenizable, Tokenize, }; use zksync_types::{ + aggregated_operations::AggregatedActionType, commitment::SerializeCommitment, eth_sender::EthTx, ethabi::Token, l2_to_l1_log::UserL2ToL1Log, protocol_version::{L1VerifierConfig, VerifierParams}, - web3::contract::Error as Web3ContractError, + web3::{contract::Error as Web3ContractError, types::BlockNumber}, Address, L2ChainId, ProtocolVersionId, H256, U256, }; @@ -52,10 +53,17 @@ pub struct EthTxAggregator { pub(super) main_zksync_contract_address: Address, functions: ZkSyncFunctions, base_nonce: u64, + base_nonce_custom_commit_sender: Option, rollup_chain_id: L2ChainId, + /// If set to `Some` node is operating in the 4844 mode with two operator + /// addresses at play: the main one and the custom address for sending commit + /// transactions. The `Some` then contains the address of this custom operator + /// address. + custom_commit_sender_addr: Option
, } impl EthTxAggregator { + #[allow(clippy::too_many_arguments)] pub async fn new( config: SenderConfig, aggregator: Aggregator, @@ -64,6 +72,7 @@ impl EthTxAggregator { l1_multicall3_address: Address, main_zksync_contract_address: Address, rollup_chain_id: L2ChainId, + custom_commit_sender_addr: Option
, ) -> Self { let functions = ZkSyncFunctions::default(); let base_nonce = eth_client @@ -71,6 +80,17 @@ impl EthTxAggregator { .await .unwrap() .as_u64(); + + let base_nonce_custom_commit_sender = match custom_commit_sender_addr { + Some(addr) => Some( + eth_client + .nonce_at_for_account(addr, BlockNumber::Pending, "eth_sender") + .await + .unwrap() + .as_u64(), + ), + None => None, + }; Self { config, aggregator, @@ -80,7 +100,9 @@ impl EthTxAggregator { main_zksync_contract_address, functions, base_nonce, + base_nonce_custom_commit_sender, rollup_chain_id, + custom_commit_sender_addr, } } @@ -460,10 +482,17 @@ impl EthTxAggregator { contracts_are_pre_shared_bridge: bool, ) -> Result { let mut transaction = storage.start_transaction().await.unwrap(); - let nonce = self.get_next_nonce(&mut transaction).await?; + let op_type = aggregated_op.get_action_type(); + // We may be using a custom sender for commit transactions, so use this + // var whatever it actually is: a `None` for single-addr operator or `Some` + // for multi-addr operator in 4844 mode. + let sender_addr = match op_type { + AggregatedActionType::Commit => self.custom_commit_sender_addr, + _ => None, + }; + let nonce = self.get_next_nonce(&mut transaction, sender_addr).await?; let calldata = self.encode_aggregated_op(aggregated_op, contracts_are_pre_shared_bridge); let l1_batch_number_range = aggregated_op.l1_batch_range(); - let op_type = aggregated_op.get_action_type(); let predicted_gas_for_batches = transaction .blocks_dal() @@ -480,6 +509,7 @@ impl EthTxAggregator { op_type, self.timelock_contract_address, eth_tx_predicted_gas, + sender_addr, ) .await .unwrap(); @@ -496,15 +526,23 @@ impl EthTxAggregator { async fn get_next_nonce( &self, storage: &mut StorageProcessor<'_>, + from_addr: Option
, ) -> Result { let db_nonce = storage .eth_sender_dal() - .get_next_nonce() + .get_next_nonce(from_addr) .await .unwrap() .unwrap_or(0); // Between server starts we can execute some txs using operator account or remove some txs from the database // At the start we have to consider this fact and get the max nonce. - Ok(db_nonce.max(self.base_nonce)) + Ok(if from_addr.is_none() { + db_nonce.max(self.base_nonce) + } else { + db_nonce.max( + self.base_nonce_custom_commit_sender + .expect("custom base nonce is expected to be initialized; qed"), + ) + }) } } diff --git a/core/lib/zksync_core/src/eth_sender/eth_tx_manager.rs b/core/lib/zksync_core/src/eth_sender/eth_tx_manager.rs index e8a2b0b5af9..d9d48fd69c1 100644 --- a/core/lib/zksync_core/src/eth_sender/eth_tx_manager.rs +++ b/core/lib/zksync_core/src/eth_sender/eth_tx_manager.rs @@ -8,13 +8,14 @@ use zksync_eth_client::{ BoundEthInterface, Error, EthInterface, ExecutedTxStatus, RawTransactionBytes, SignedCallResult, }; use zksync_types::{ + aggregated_operations::AggregatedActionType, eth_sender::EthTx, web3::{ contract::Options, error::Error as Web3Error, types::{BlockId, BlockNumber}, }, - L1BlockNumber, Nonce, H256, U256, + Address, L1BlockNumber, Nonce, H256, U256, }; use zksync_utils::time::seconds_since_epoch; @@ -49,7 +50,11 @@ pub(super) struct L1BlockNumbers { /// with higher gas price #[derive(Debug)] pub struct EthTxManager { + /// A gateway through which the operator normally sends all its transactions. ethereum_gateway: Arc, + /// If the operator is in 4844 mode this is sent to `Some` and used to send + /// commit transactions. + ethereum_gateway_blobs: Option>, config: SenderConfig, gas_adjuster: Arc, } @@ -59,9 +64,11 @@ impl EthTxManager { config: SenderConfig, gas_adjuster: Arc, ethereum_gateway: Arc, + ethereum_gateway_blobs: Option>, ) -> Self { Self { ethereum_gateway, + ethereum_gateway_blobs, config, gas_adjuster, } @@ -276,6 +283,29 @@ impl EthTxManager { Ok(OperatorNonce { finalized, latest }) } + async fn get_blobs_operator_nonce( + &self, + block_numbers: L1BlockNumbers, + ) -> Result, ETHSenderError> { + match &self.ethereum_gateway_blobs { + None => Ok(None), + Some(gateway) => { + let finalized = gateway + .nonce_at(block_numbers.finalized.0.into(), "eth_tx_manager") + .await? + .as_u32() + .into(); + + let latest = gateway + .nonce_at(block_numbers.latest.0.into(), "eth_tx_manager") + .await? + .as_u32() + .into(); + Ok(Some(OperatorNonce { finalized, latest })) + } + } + } + async fn get_l1_block_numbers(&self) -> Result { let (finalized, safe) = if let Some(confirmations) = self.config.wait_confirmations { let latest_block_number = self @@ -332,6 +362,44 @@ impl EthTxManager { ) -> Result, ETHSenderError> { METRICS.track_block_numbers(&l1_block_numbers); let operator_nonce = self.get_operator_nonce(l1_block_numbers).await?; + let blobs_operator_nonce = self.get_blobs_operator_nonce(l1_block_numbers).await?; + let blobs_operator_address = self + .ethereum_gateway_blobs + .as_ref() + .map(|s| s.sender_account()); + + if let Some(res) = self + .monitor_inflight_transactions_inner(storage, l1_block_numbers, operator_nonce, None) + .await? + { + return Ok(Some(res)); + }; + + if let Some(blobs_operator_nonce) = blobs_operator_nonce { + // need to check if both nonce and address are `Some` + if blobs_operator_address.is_none() { + panic!("blobs_operator_address has to be set its nonce is known; qed"); + } + Ok(self + .monitor_inflight_transactions_inner( + storage, + l1_block_numbers, + blobs_operator_nonce, + blobs_operator_address, + ) + .await?) + } else { + Ok(None) + } + } + + async fn monitor_inflight_transactions_inner( + &mut self, + storage: &mut StorageProcessor<'_>, + l1_block_numbers: L1BlockNumbers, + operator_nonce: OperatorNonce, + operator_address: Option
, + ) -> Result, ETHSenderError> { let inflight_txs = storage.eth_sender_dal().get_inflight_txs().await.unwrap(); METRICS.number_of_inflight_txs.set(inflight_txs.len()); @@ -348,6 +416,9 @@ impl EthTxManager { // Not confirmed transactions, ordered by nonce for tx in inflight_txs { tracing::trace!("Checking tx id: {}", tx.id,); + if tx.from_addr != operator_address { + continue; + } // If the `operator_nonce.latest` <= `tx.nonce`, this means // that `tx` is not mined and we should resend it. @@ -404,7 +475,21 @@ impl EthTxManager { base_fee_per_gas: u64, priority_fee_per_gas: u64, ) -> SignedCallResult { - self.ethereum_gateway + // Chose the signing gateway. Use a custom one in case + // the operator is in 4844 mode and the operation at hand is Commit. + // then the optional gateway is used to send this transaction from a + // custom sender account. + let signing_gateway = if let Some(blobs_gateway) = self.ethereum_gateway_blobs.as_ref() { + if tx.tx_type == AggregatedActionType::Commit { + blobs_gateway + } else { + &self.ethereum_gateway + } + } else { + &self.ethereum_gateway + }; + + signing_gateway .sign_prepared_tx_for_addr( tx.raw_tx.clone(), tx.contract_address, diff --git a/core/lib/zksync_core/src/eth_sender/tests.rs b/core/lib/zksync_core/src/eth_sender/tests.rs index 858f6529440..f0ca69b2855 100644 --- a/core/lib/zksync_core/src/eth_sender/tests.rs +++ b/core/lib/zksync_core/src/eth_sender/tests.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use assert_matches::assert_matches; use once_cell::sync::Lazy; +use test_casing::test_casing; use zksync_config::{ configs::eth_sender::{ProofSendingMode, SenderConfig}, ContractsConfig, ETHSenderConfig, GasAdjusterConfig, @@ -60,6 +61,7 @@ impl EthSenderTester { connection_pool: ConnectionPool, history: Vec, non_ordering_confirmations: bool, + aggregator_operate_4844_mode: bool, ) -> Self { let eth_sender_config = ETHSenderConfig::for_tests(); let contracts_config = ContractsConfig::for_tests(); @@ -105,6 +107,7 @@ impl EthSenderTester { Aggregator::new( aggregator_config.clone(), store_factory.create_store().await, + aggregator_operate_4844_mode, ), gateway.clone(), // zkSync contract address @@ -112,6 +115,7 @@ impl EthSenderTester { contracts_config.l1_multicall3_addr, Address::random(), Default::default(), + None, ) .await; @@ -119,6 +123,7 @@ impl EthSenderTester { eth_sender_config.sender, gas_adjuster.clone(), gateway.clone(), + None, ); Self { gateway, @@ -145,10 +150,17 @@ impl EthSenderTester { } // Tests that we send multiple transactions and confirm them all in one iteration. +#[test_casing(2, [false, true])] #[tokio::test] -async fn confirm_many() -> anyhow::Result<()> { +async fn confirm_many(aggregator_operate_4844_mode: bool) -> anyhow::Result<()> { let connection_pool = ConnectionPool::test_pool().await; - let mut tester = EthSenderTester::new(connection_pool, vec![10; 100], false).await; + let mut tester = EthSenderTester::new( + connection_pool, + vec![10; 100], + false, + aggregator_operate_4844_mode, + ) + .await; let mut hashes = vec![]; @@ -224,7 +236,8 @@ async fn confirm_many() -> anyhow::Result<()> { #[tokio::test] async fn resend_each_block() -> anyhow::Result<()> { let connection_pool = ConnectionPool::test_pool().await; - let mut tester = EthSenderTester::new(connection_pool, vec![7, 6, 5, 5, 5, 2, 1], false).await; + let mut tester = + EthSenderTester::new(connection_pool, vec![7, 6, 5, 5, 5, 2, 1], false, false).await; // after this, median should be 6 tester.gateway.advance_block_number(3); @@ -335,7 +348,7 @@ async fn resend_each_block() -> anyhow::Result<()> { #[tokio::test] async fn dont_resend_already_mined() -> anyhow::Result<()> { let connection_pool = ConnectionPool::test_pool().await; - let mut tester = EthSenderTester::new(connection_pool, vec![100; 100], false).await; + let mut tester = EthSenderTester::new(connection_pool, vec![100; 100], false, false).await; let tx = tester .aggregator .save_eth_tx( @@ -406,7 +419,8 @@ async fn dont_resend_already_mined() -> anyhow::Result<()> { #[tokio::test] async fn three_scenarios() -> anyhow::Result<()> { let connection_pool = ConnectionPool::test_pool().await; - let mut tester = EthSenderTester::new(connection_pool.clone(), vec![100; 100], false).await; + let mut tester = + EthSenderTester::new(connection_pool.clone(), vec![100; 100], false, false).await; let mut hashes = vec![]; for _ in 0..3 { @@ -478,7 +492,8 @@ async fn three_scenarios() -> anyhow::Result<()> { #[tokio::test] async fn failed_eth_tx() { let connection_pool = ConnectionPool::test_pool().await; - let mut tester = EthSenderTester::new(connection_pool.clone(), vec![100; 100], false).await; + let mut tester = + EthSenderTester::new(connection_pool.clone(), vec![100; 100], false, false).await; let tx = tester .aggregator @@ -549,7 +564,7 @@ fn l1_batch_with_metadata(header: L1BatchHeader) -> L1BatchWithMetadata { #[tokio::test] async fn correct_order_for_confirmations() -> anyhow::Result<()> { let connection_pool = ConnectionPool::test_pool().await; - let mut tester = EthSenderTester::new(connection_pool, vec![100; 100], true).await; + let mut tester = EthSenderTester::new(connection_pool, vec![100; 100], true, false).await; insert_genesis_protocol_version(&tester).await; let genesis_l1_batch = insert_l1_batch(&tester, L1BatchNumber(0)).await; let first_l1_batch = insert_l1_batch(&tester, L1BatchNumber(1)).await; @@ -610,7 +625,7 @@ async fn correct_order_for_confirmations() -> anyhow::Result<()> { #[tokio::test] async fn skipped_l1_batch_at_the_start() -> anyhow::Result<()> { let connection_pool = ConnectionPool::test_pool().await; - let mut tester = EthSenderTester::new(connection_pool, vec![100; 100], true).await; + let mut tester = EthSenderTester::new(connection_pool, vec![100; 100], true, false).await; insert_genesis_protocol_version(&tester).await; let genesis_l1_batch = insert_l1_batch(&tester, L1BatchNumber(0)).await; let first_l1_batch = insert_l1_batch(&tester, L1BatchNumber(1)).await; @@ -703,7 +718,7 @@ async fn skipped_l1_batch_at_the_start() -> anyhow::Result<()> { #[tokio::test] async fn skipped_l1_batch_in_the_middle() -> anyhow::Result<()> { let connection_pool = ConnectionPool::test_pool().await; - let mut tester = EthSenderTester::new(connection_pool, vec![100; 100], true).await; + let mut tester = EthSenderTester::new(connection_pool, vec![100; 100], true, false).await; insert_genesis_protocol_version(&tester).await; let genesis_l1_batch = insert_l1_batch(&tester, L1BatchNumber(0)).await; let first_l1_batch = insert_l1_batch(&tester, L1BatchNumber(1)).await; @@ -790,7 +805,7 @@ async fn skipped_l1_batch_in_the_middle() -> anyhow::Result<()> { #[tokio::test] async fn test_parse_multicall_data() { let connection_pool = ConnectionPool::test_pool().await; - let tester = EthSenderTester::new(connection_pool, vec![100; 100], false).await; + let tester = EthSenderTester::new(connection_pool, vec![100; 100], false, false).await; let original_correct_form_data = Token::Array(vec![ Token::Tuple(vec![Token::Bool(true), Token::Bytes(vec![1u8; 32])]), @@ -870,7 +885,7 @@ async fn test_parse_multicall_data() { #[tokio::test] async fn get_multicall_data() { let connection_pool = ConnectionPool::test_pool().await; - let mut tester = EthSenderTester::new(connection_pool, vec![100; 100], false).await; + let mut tester = EthSenderTester::new(connection_pool, vec![100; 100], false, false).await; let multicall_data = tester.aggregator.get_multicall_data().await; assert!(multicall_data.is_ok()); } diff --git a/core/lib/zksync_core/src/lib.rs b/core/lib/zksync_core/src/lib.rs index 4f3c9d7faf8..d008df6a968 100644 --- a/core/lib/zksync_core/src/lib.rs +++ b/core/lib/zksync_core/src/lib.rs @@ -30,7 +30,7 @@ use zksync_contracts::{governance_contract, BaseSystemContracts}; use zksync_dal::{healthcheck::ConnectionPoolHealthCheck, ConnectionPool}; use zksync_eth_client::{ clients::{PKSigningClient, QueryClient}, - CallFunctionArgs, EthInterface, + BoundEthInterface, CallFunctionArgs, EthInterface, }; use zksync_health_check::{AppHealthCheck, HealthStatus, ReactiveHealthCheck}; use zksync_object_store::{ObjectStore, ObjectStoreFactory}; @@ -637,11 +637,16 @@ pub async fn initialize_components( .context("eth_sender_config")?; let eth_client = PKSigningClient::from_config(ð_sender, &contracts_config, ð_client_config); + let eth_client_blobs_addr = + PKSigningClient::from_config_blobs(ð_sender, &contracts_config, ð_client_config) + .map(|k| k.sender_account()); + let eth_tx_aggregator_actor = EthTxAggregator::new( eth_sender.sender.clone(), Aggregator::new( eth_sender.sender.clone(), store_factory.create_store().await, + eth_client_blobs_addr.is_some(), ), Arc::new(eth_client), contracts_config.validator_timelock_addr, @@ -652,6 +657,7 @@ pub async fn initialize_components( .as_ref() .context("network_config")? .zksync_network_id, + eth_client_blobs_addr, ) .await; task_futures.push(tokio::spawn( @@ -675,6 +681,8 @@ pub async fn initialize_components( .context("eth_sender_config")?; let eth_client = PKSigningClient::from_config(ð_sender, &contracts_config, ð_client_config); + let eth_client_blobs = + PKSigningClient::from_config_blobs(ð_sender, &contracts_config, ð_client_config); let eth_tx_manager_actor = EthTxManager::new( eth_sender.sender, gas_adjuster @@ -682,6 +690,7 @@ pub async fn initialize_components( .await .context("gas_adjuster.get_or_init()")?, Arc::new(eth_client), + eth_client_blobs.map(|c| Arc::new(c) as Arc), ); task_futures.extend([tokio::spawn( eth_tx_manager_actor.run(eth_manager_pool, stop_receiver.clone()),