diff --git a/core/lib/dal/src/models/storage_transaction.rs b/core/lib/dal/src/models/storage_transaction.rs index 8d575bb8ab6..1dfd5f4b6a0 100644 --- a/core/lib/dal/src/models/storage_transaction.rs +++ b/core/lib/dal/src/models/storage_transaction.rs @@ -397,7 +397,7 @@ impl From for TransactionReceipt { } #[derive(Debug, Clone, sqlx::FromRow)] -pub struct StorageTransactionDetails { +pub(crate) struct StorageTransactionDetails { pub is_priority: bool, pub initiator_address: Vec, pub gas_limit: Option, diff --git a/core/lib/db_connection/src/connection.rs b/core/lib/db_connection/src/connection.rs index e019739e16f..99cab4fee17 100644 --- a/core/lib/db_connection/src/connection.rs +++ b/core/lib/db_connection/src/connection.rs @@ -16,6 +16,7 @@ use sqlx::{ use crate::{ connection_pool::ConnectionPool, error::{DalConnectionError, DalResult}, + instrument::InstrumentExt, metrics::CONNECTION_METRICS, utils::InternalMarker, }; @@ -183,6 +184,7 @@ impl<'a, DB: DbMarker> Connection<'a, DB> { } } + /// Starts a transaction or a new checkpoint within the current transaction. pub async fn start_transaction(&mut self) -> DalResult> { let (conn, tags) = self.conn_and_tags(); let inner = ConnectionInner::Transaction { @@ -198,6 +200,24 @@ impl<'a, DB: DbMarker> Connection<'a, DB> { }) } + /// Starts building a new transaction with custom settings. Unlike [`Self::start_transaction()`], this method + /// will error if called from a transaction; it is a logical error to change transaction settings in the middle of it. + pub fn transaction_builder(&mut self) -> DalResult> { + if let ConnectionInner::Transaction { tags, .. } = &self.inner { + let err = io::Error::new( + io::ErrorKind::Other, + "`Connection::transaction_builder()` can only be invoked outside of a transaction", + ); + return Err( + DalConnectionError::start_transaction(sqlx::Error::Io(err), tags.cloned()).into(), + ); + } + Ok(TransactionBuilder { + connection: self, + is_readonly: false, + }) + } + /// Checks if the `Connection` is currently within database transaction. pub fn in_transaction(&self) -> bool { matches!(self.inner, ConnectionInner::Transaction { .. }) @@ -260,9 +280,36 @@ impl<'a, DB: DbMarker> Connection<'a, DB> { } } +/// Builder of transactions allowing to configure transaction characteristics (for now, just its readonly status). +#[derive(Debug)] +pub struct TransactionBuilder<'a, 'c, DB: DbMarker> { + connection: &'a mut Connection<'c, DB>, + is_readonly: bool, +} + +impl<'a, DB: DbMarker> TransactionBuilder<'a, '_, DB> { + /// Sets the readonly status of the created transaction. + pub fn set_readonly(mut self) -> Self { + self.is_readonly = true; + self + } + + /// Builds the transaction with the provided characteristics. + pub async fn build(self) -> DalResult> { + let mut transaction = self.connection.start_transaction().await?; + if self.is_readonly { + sqlx::query("SET TRANSACTION READ ONLY") + .instrument("set_transaction_characteristics") + .execute(&mut transaction) + .await?; + } + Ok(transaction) + } +} + #[cfg(test)] mod tests { - use crate::{connection_pool::ConnectionPool, utils::InternalMarker}; + use super::*; #[tokio::test] async fn processor_tags_propagate_to_transactions() { @@ -296,4 +343,31 @@ mod tests { assert!(traced.is_empty()); } } + + #[tokio::test] + async fn creating_readonly_transaction() { + let pool = ConnectionPool::::constrained_test_pool(1).await; + let mut connection = pool.connection().await.unwrap(); + let mut readonly_transaction = connection + .transaction_builder() + .unwrap() + .set_readonly() + .build() + .await + .unwrap(); + assert!(readonly_transaction.in_transaction()); + + sqlx::query("SELECT COUNT(*) AS \"count?\" FROM miniblocks") + .instrument("test") + .fetch_optional(&mut readonly_transaction) + .await + .unwrap() + .expect("no row returned"); + // Check that it's impossible to execute write statements in the transaction. + sqlx::query("DELETE FROM miniblocks") + .instrument("test") + .execute(&mut readonly_transaction) + .await + .unwrap_err(); + } } diff --git a/core/node/api_server/src/tx_sender/proxy.rs b/core/node/api_server/src/tx_sender/proxy.rs index 41f56c81326..a1fa77d2f1b 100644 --- a/core/node/api_server/src/tx_sender/proxy.rs +++ b/core/node/api_server/src/tx_sender/proxy.rs @@ -1,31 +1,39 @@ use std::{ - collections::{BTreeSet, HashMap}, + collections::{BTreeSet, HashMap, HashSet}, future::Future, sync::Arc, time::Duration, }; use anyhow::Context; +use chrono::{TimeZone, Utc}; use tokio::sync::{watch, RwLock}; use zksync_dal::{ - helpers::wait_for_l1_batch, transactions_dal::L2TxSubmissionResult, ConnectionPool, Core, - CoreDal, + helpers::wait_for_l1_batch, transactions_dal::L2TxSubmissionResult, Connection, ConnectionPool, + Core, CoreDal, DalError, }; use zksync_shared_metrics::{TxStage, APP_METRICS}; -use zksync_types::{ - api::{BlockId, Transaction, TransactionDetails, TransactionId}, - fee::TransactionExecutionMetrics, - l2::L2Tx, - Address, Nonce, H256, -}; +use zksync_types::{api, fee::TransactionExecutionMetrics, l2::L2Tx, Address, Nonce, H256, U256}; use zksync_web3_decl::{ client::{DynClient, L2}, error::{ClientRpcContext, EnrichedClientResult, Web3Error}, - namespaces::{EthNamespaceClient, ZksNamespaceClient}, + namespaces::EthNamespaceClient, }; use super::{tx_sink::TxSink, SubmitTxError}; +/// In-memory transaction cache for a full node. Works like an ad-hoc mempool replacement, with the important limitation that +/// it's not synchronized across the network. +/// +/// # Managing cache growth +/// +/// To keep cache at reasonable size, the following garbage collection procedures are implemented: +/// +/// - [`Self::run_updates()`] periodically gets nonces for all distinct accounts for the transactions in cache and removes +/// all transactions with stale nonces. This includes both transactions included into L2 blocks and replaced transactions. +/// - The same nonce filtering logic is applied for the transaction initiator address each time a transaction is fetched from cache. +/// We don't want to return such transactions if they are already included in an L2 block or replaced locally, but `Self::run_updates()` +/// hasn't run yet. #[derive(Debug, Clone, Default)] pub(crate) struct TxCache { inner: Arc>, @@ -33,10 +41,60 @@ pub(crate) struct TxCache { #[derive(Debug, Default)] struct TxCacheInner { - tx_cache: HashMap, + transactions_by_hash: HashMap, + tx_hashes_by_initiator: HashMap<(Address, Nonce), HashSet>, nonces_by_account: HashMap>, } +impl TxCacheInner { + /// Removes transactions from the cache based on nonces for accounts loaded from Postgres. + fn collect_garbage(&mut self, nonces_for_accounts: &HashMap) { + self.nonces_by_account.retain(|address, account_nonces| { + let stored_nonce = nonces_for_accounts + .get(address) + .copied() + .unwrap_or(Nonce(0)); + // Retain only nonces starting from the stored one, and remove transactions with all past nonces; + // this includes both successfully executed and replaced transactions. + let retained_nonces = account_nonces.split_off(&stored_nonce); + for &nonce in &*account_nonces { + if let Some(tx_hashes) = self.tx_hashes_by_initiator.remove(&(*address, nonce)) { + for tx_hash in tx_hashes { + self.transactions_by_hash.remove(&tx_hash); + } + } + } + *account_nonces = retained_nonces; + // If we've removed all nonces, drop the account entry so we don't request stored nonces for it later. + !account_nonces.is_empty() + }); + } + + /// Same as `collect_garbage()`, but optimized for a single `(account, nonce)` entry. + fn collect_garbage_for_account(&mut self, initiator_address: Address, stored_nonce: Nonce) { + let Some(account_nonces) = self.nonces_by_account.get_mut(&initiator_address) else { + return; + }; + + let retained_nonces = account_nonces.split_off(&stored_nonce); + for &nonce in &*account_nonces { + if let Some(tx_hashes) = self + .tx_hashes_by_initiator + .remove(&(initiator_address, nonce)) + { + for tx_hash in tx_hashes { + self.transactions_by_hash.remove(&tx_hash); + } + } + } + *account_nonces = retained_nonces; + + if account_nonces.is_empty() { + self.nonces_by_account.remove(&initiator_address); + } + } +} + impl TxCache { async fn push(&self, tx: L2Tx) { let mut inner = self.inner.write().await; @@ -45,11 +103,44 @@ impl TxCache { .entry(tx.initiator_account()) .or_default() .insert(tx.nonce()); - inner.tx_cache.insert(tx.hash(), tx); + inner + .tx_hashes_by_initiator + .entry((tx.initiator_account(), tx.nonce())) + .or_default() + .insert(tx.hash()); + inner.transactions_by_hash.insert(tx.hash(), tx); } - async fn get_tx(&self, tx_hash: H256) -> Option { - self.inner.read().await.tx_cache.get(&tx_hash).cloned() + async fn get(&self, tx_hash: H256) -> Option { + self.inner + .read() + .await + .transactions_by_hash + .get(&tx_hash) + .cloned() + } + + async fn remove(&self, tx_hash: H256) { + let mut inner = self.inner.write().await; + let Some(tx) = inner.transactions_by_hash.remove(&tx_hash) else { + // The transaction is already removed; this is fine. + return; + }; + + let initiator_and_nonce = (tx.initiator_account(), tx.nonce()); + if let Some(txs) = inner.tx_hashes_by_initiator.get_mut(&initiator_and_nonce) { + txs.remove(&tx_hash); + if txs.is_empty() { + inner.tx_hashes_by_initiator.remove(&initiator_and_nonce); + // No transactions with `initiator_and_nonce` remain in the cache; remove the nonce record as well + if let Some(nonces) = inner.nonces_by_account.get_mut(&tx.initiator_account()) { + nonces.remove(&tx.nonce()); + if nonces.is_empty() { + inner.nonces_by_account.remove(&tx.initiator_account()); + } + } + } + } } async fn get_nonces_for_account(&self, account_address: Address) -> BTreeSet { @@ -61,9 +152,24 @@ impl TxCache { } } - async fn remove_tx(&self, tx_hash: H256) { - self.inner.write().await.tx_cache.remove(&tx_hash); - // We intentionally don't change `nonces_by_account`; they should only be changed in response to new L2 blocks + async fn step(&self, pool: &ConnectionPool) -> anyhow::Result<()> { + let addresses: Vec<_> = { + // Split into 2 statements for readability. + let inner = self.inner.read().await; + inner.nonces_by_account.keys().copied().collect() + }; + let mut storage = pool.connection_tagged("api").await?; + let nonces_for_accounts = storage + .storage_web3_dal() + .get_nonces_for_addresses(&addresses) + .await?; + drop(storage); // Don't hold both `storage` and lock on `inner` at the same time. + + self.inner + .write() + .await + .collect_garbage(&nonces_for_accounts); + Ok(()) } async fn run_updates( @@ -91,38 +197,11 @@ impl TxCache { return Ok(()); } - loop { - if *stop_receiver.borrow() { - return Ok(()); - } - - let addresses: Vec<_> = { - // Split into 2 statements for readability. - let inner = self.inner.read().await; - inner.nonces_by_account.keys().copied().collect() - }; - let mut storage = pool.connection_tagged("api").await?; - let nonces_for_accounts = storage - .storage_web3_dal() - .get_nonces_for_addresses(&addresses) - .await?; - drop(storage); // Don't hold both `storage` and lock on `inner` at the same time. - - let mut inner = self.inner.write().await; - inner.nonces_by_account.retain(|address, account_nonces| { - let stored_nonce = nonces_for_accounts - .get(address) - .copied() - .unwrap_or(Nonce(0)); - // Retain only nonces starting from the stored one. - *account_nonces = account_nonces.split_off(&stored_nonce); - // If we've removed all nonces, drop the account entry so we don't request stored nonces for it later. - !account_nonces.is_empty() - }); - drop(inner); - + while !*stop_receiver.borrow() { + self.step(&pool).await?; tokio::time::sleep(UPDATE_INTERVAL).await; } + Ok(()) } } @@ -137,8 +216,8 @@ pub struct TxProxy { impl TxProxy { pub fn new(client: Box>) -> Self { Self { - client: client.for_component("tx_proxy"), tx_cache: TxCache::default(), + client: client.for_component("tx_proxy"), } } @@ -154,16 +233,34 @@ impl TxProxy { .await } - async fn save_tx(&self, tx: L2Tx) { - self.tx_cache.push(tx).await; - } - - async fn find_tx(&self, tx_hash: H256) -> Option { - self.tx_cache.get_tx(tx_hash).await - } + async fn find_tx( + &self, + storage: &mut Connection<'_, Core>, + tx_hash: H256, + ) -> Result, Web3Error> { + let Some(tx) = self.tx_cache.get(tx_hash).await else { + return Ok(None); + }; - async fn forget_tx(&self, tx_hash: H256) { - self.tx_cache.remove_tx(tx_hash).await; + let initiator_address = tx.initiator_account(); + let nonce_map = storage + .storage_web3_dal() + .get_nonces_for_addresses(&[initiator_address]) + .await + .map_err(DalError::generalize)?; + if let Some(&stored_nonce) = nonce_map.get(&initiator_address) { + // `stored_nonce` is the *next* nonce of the `initiator_address` account, thus, strict inequality check + if tx.nonce() < stored_nonce { + // Transaction is included in a block or replaced; either way, it should be removed from the cache. + self.tx_cache + .inner + .write() + .await + .collect_garbage_for_account(initiator_address, stored_nonce); + return Ok(None); + } + } + Ok(Some(tx)) } async fn next_nonce_by_initiator_account( @@ -185,45 +282,6 @@ impl TxProxy { pending_nonce } - async fn request_tx(&self, id: TransactionId) -> EnrichedClientResult> { - match id { - TransactionId::Block(BlockId::Hash(block), index) => { - self.client - .get_transaction_by_block_hash_and_index(block, index) - .rpc_context("get_transaction_by_block_hash_and_index") - .with_arg("block", &block) - .with_arg("index", &index) - .await - } - TransactionId::Block(BlockId::Number(block), index) => { - self.client - .get_transaction_by_block_number_and_index(block, index) - .rpc_context("get_transaction_by_block_number_and_index") - .with_arg("block", &block) - .with_arg("index", &index) - .await - } - TransactionId::Hash(hash) => { - self.client - .get_transaction_by_hash(hash) - .rpc_context("get_transaction_by_hash") - .with_arg("hash", &hash) - .await - } - } - } - - async fn request_tx_details( - &self, - hash: H256, - ) -> EnrichedClientResult> { - self.client - .get_transaction_details(hash) - .rpc_context("get_transaction_details") - .with_arg("hash", &hash) - .await - } - pub fn run_account_nonce_sweeper( &self, pool: ConnectionPool, @@ -244,12 +302,12 @@ impl TxSink for TxProxy { // We're running an external node: we have to proxy the transaction to the main node. // But before we do that, save the tx to cache in case someone will request it // Before it reaches the main node. - self.save_tx(tx.clone()).await; - self.submit_tx_impl(tx).await?; - // Now, after we are sure that the tx is on the main node, remove it from cache - // since we don't want to store txs that might have been replaced or otherwise removed - // from the mempool. - self.forget_tx(tx.hash()).await; + self.tx_cache.push(tx.clone()).await; + if let Err(err) = self.submit_tx_impl(tx).await { + // Remove the transaction from the cache on failure so that it doesn't occupy space in the cache indefinitely. + self.tx_cache.remove(tx.hash()).await; + return Err(err.into()); + } APP_METRICS.processed_txs[&TxStage::Proxied].inc(); Ok(L2TxSubmissionResult::Proxied) } @@ -269,18 +327,416 @@ impl TxSink for TxProxy { )) } - async fn lookup_tx(&self, id: TransactionId) -> Result, Web3Error> { - if let TransactionId::Hash(hash) = id { + async fn lookup_tx( + &self, + storage: &mut Connection<'_, Core>, + id: api::TransactionId, + ) -> Result, Web3Error> { + if let api::TransactionId::Hash(hash) = id { // If the transaction is not in the db, check the cache - if let Some(tx) = self.find_tx(hash).await { + if let Some(tx) = self.find_tx(storage, hash).await? { + // check nonce for initiator return Ok(Some(tx.into())); } } - // If the transaction is not in the cache, query main node - Ok(self.request_tx(id).await?) + Ok(None) + } + + async fn lookup_tx_details( + &self, + storage: &mut Connection<'_, Core>, + hash: H256, + ) -> Result, Web3Error> { + if let Some(tx) = self.find_tx(storage, hash).await? { + let received_at_ms = + i64::try_from(tx.received_timestamp_ms).context("received timestamp overflow")?; + let received_at = Utc + .timestamp_millis_opt(received_at_ms) + .single() + .context("received timestamp overflow")?; + return Ok(Some(api::TransactionDetails { + is_l1_originated: false, + status: api::TransactionStatus::Pending, + fee: U256::zero(), // always zero for pending transactions + gas_per_pubdata: tx.common_data.fee.gas_per_pubdata_limit, + initiator_address: tx.initiator_account(), + received_at, + eth_commit_tx_hash: None, + eth_prove_tx_hash: None, + eth_execute_tx_hash: None, + })); + } + Ok(None) + } +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::{AtomicBool, Ordering}; + + use test_casing::test_casing; + use zksync_node_genesis::{insert_genesis_batch, mock_genesis_config, GenesisParams}; + use zksync_node_test_utils::{create_l2_block, create_l2_transaction}; + use zksync_types::{get_nonce_key, web3::Bytes, L2BlockNumber, StorageLog}; + use zksync_web3_decl::{client::MockClient, jsonrpsee::core::ClientError}; + + use super::*; + + #[tokio::test] + async fn tx_cache_basics() { + let pool = ConnectionPool::::test_pool().await; + let mut storage = pool.connection().await.unwrap(); + let params = GenesisParams::load_genesis_params(mock_genesis_config()).unwrap(); + insert_genesis_batch(&mut storage, ¶ms).await.unwrap(); + + let tx = create_l2_transaction(10, 100); + let send_tx_called = Arc::new(AtomicBool::new(false)); + let main_node_client = MockClient::builder(L2::default()) + .method("eth_sendRawTransaction", { + let send_tx_called = send_tx_called.clone(); + let tx = tx.clone(); + move |bytes: Bytes| { + assert_eq!(bytes.0, tx.common_data.input_data().unwrap()); + send_tx_called.store(true, Ordering::Relaxed); + Ok(tx.hash()) + } + }) + .build(); + + let proxy = TxProxy::new(Box::new(main_node_client)); + proxy + .submit_tx(&tx, TransactionExecutionMetrics::default()) + .await + .unwrap(); + assert!(send_tx_called.load(Ordering::Relaxed)); + + // Check that the transaction is present in the cache + assert_eq!(proxy.tx_cache.get(tx.hash()).await.unwrap(), tx); + let found_tx = proxy + .lookup_tx(&mut storage, api::TransactionId::Hash(tx.hash())) + .await + .unwrap() + .expect("no transaction"); + assert_eq!(found_tx.hash, tx.hash()); + + let pending_nonce = proxy + .lookup_pending_nonce(tx.initiator_account(), 0) + .await + .unwrap() + .expect("no nonce"); + assert_eq!(pending_nonce, tx.nonce()); + + let tx_details = proxy + .lookup_tx_details(&mut storage, tx.hash()) + .await + .unwrap() + .expect("no transaction"); + assert_eq!(tx_details.initiator_address, tx.initiator_account()); + } + + #[tokio::test] + async fn low_level_transaction_cache_operations() { + let tx_cache = TxCache::default(); + let tx = create_l2_transaction(10, 100); + let tx_hash = tx.hash(); + + tx_cache.push(tx.clone()).await; + assert_eq!(tx_cache.get(tx_hash).await.unwrap(), tx); + assert_eq!( + tx_cache + .get_nonces_for_account(tx.initiator_account()) + .await, + BTreeSet::from([Nonce(0)]) + ); + + tx_cache.remove(tx_hash).await; + assert_eq!(tx_cache.get(tx_hash).await, None); + assert_eq!( + tx_cache + .get_nonces_for_account(tx.initiator_account()) + .await, + BTreeSet::new() + ); + + { + let inner = tx_cache.inner.read().await; + assert!(inner.transactions_by_hash.is_empty(), "{inner:?}"); + assert!(inner.nonces_by_account.is_empty(), "{inner:?}"); + assert!(inner.tx_hashes_by_initiator.is_empty(), "{inner:?}"); + } + } + + #[tokio::test] + async fn low_level_transaction_cache_operations_with_replacing_transaction() { + let tx_cache = TxCache::default(); + let tx = create_l2_transaction(10, 100); + let tx_hash = tx.hash(); + let mut replacing_tx = create_l2_transaction(10, 100); + replacing_tx.common_data.initiator_address = tx.initiator_account(); + let replacing_tx_hash = replacing_tx.hash(); + assert_ne!(replacing_tx_hash, tx_hash); + + tx_cache.push(tx.clone()).await; + tx_cache.push(replacing_tx).await; + tx_cache.get(tx_hash).await.unwrap(); + tx_cache.get(replacing_tx_hash).await.unwrap(); + // Both transactions have the same nonce + assert_eq!( + tx_cache + .get_nonces_for_account(tx.initiator_account()) + .await, + BTreeSet::from([Nonce(0)]) + ); + + tx_cache.remove(tx_hash).await; + assert_eq!(tx_cache.get(tx_hash).await, None); + assert_eq!( + tx_cache + .get_nonces_for_account(tx.initiator_account()) + .await, + BTreeSet::from([Nonce(0)]) + ); + } + + #[tokio::test] + async fn transaction_is_not_stored_in_cache_on_main_node_failure() { + let pool = ConnectionPool::::test_pool().await; + let mut storage = pool.connection().await.unwrap(); + let params = GenesisParams::load_genesis_params(mock_genesis_config()).unwrap(); + insert_genesis_batch(&mut storage, ¶ms).await.unwrap(); + + let tx = create_l2_transaction(10, 100); + let main_node_client = MockClient::builder(L2::default()) + .method("eth_sendRawTransaction", |_bytes: Bytes| { + Err::(ClientError::RequestTimeout) + }) + .build(); + + let proxy = TxProxy::new(Box::new(main_node_client)); + proxy + .submit_tx(&tx, TransactionExecutionMetrics::default()) + .await + .unwrap_err(); + + let found_tx = proxy.find_tx(&mut storage, tx.hash()).await.unwrap(); + assert!(found_tx.is_none(), "{found_tx:?}"); + } + + #[derive(Debug, Clone, Copy)] + enum CacheUpdateMethod { + BackgroundTask, + Query, + QueryDetails, + } + + impl CacheUpdateMethod { + const ALL: [Self; 3] = [Self::BackgroundTask, Self::Query, Self::QueryDetails]; + + async fn apply(self, pool: &ConnectionPool, proxy: &TxProxy, tx_hash: H256) { + match self { + CacheUpdateMethod::BackgroundTask => { + proxy.tx_cache.step(pool).await.unwrap(); + } + CacheUpdateMethod::Query => { + let looked_up_tx = proxy + .lookup_tx( + &mut pool.connection().await.unwrap(), + api::TransactionId::Hash(tx_hash), + ) + .await + .unwrap(); + assert!(looked_up_tx.is_none()); + } + CacheUpdateMethod::QueryDetails => { + let looked_up_tx = proxy + .lookup_tx_details(&mut pool.connection().await.unwrap(), tx_hash) + .await + .unwrap(); + assert!(looked_up_tx.is_none()); + } + } + } + } + + #[test_casing(3, CacheUpdateMethod::ALL)] + #[tokio::test] + async fn removing_sealed_transaction_from_cache(cache_update_method: CacheUpdateMethod) { + let pool = ConnectionPool::::test_pool().await; + let mut storage = pool.connection().await.unwrap(); + let params = GenesisParams::load_genesis_params(mock_genesis_config()).unwrap(); + insert_genesis_batch(&mut storage, ¶ms).await.unwrap(); + + let tx = create_l2_transaction(10, 100); + let main_node_client = MockClient::builder(L2::default()) + .method("eth_sendRawTransaction", |_bytes: Bytes| Ok(H256::zero())) + .build(); + + // Add transaction to the cache + let proxy = TxProxy::new(Box::new(main_node_client)); + proxy + .submit_tx(&tx, TransactionExecutionMetrics::default()) + .await + .unwrap(); + assert_eq!(proxy.tx_cache.get(tx.hash()).await.unwrap(), tx); + { + let cache_inner = proxy.tx_cache.inner.read().await; + assert!(cache_inner.transactions_by_hash.contains_key(&tx.hash())); + assert!(cache_inner + .nonces_by_account + .contains_key(&tx.initiator_account())); + assert!(cache_inner + .tx_hashes_by_initiator + .contains_key(&(tx.initiator_account(), Nonce(0)))); + } + + // Emulate the transaction getting sealed. + storage + .blocks_dal() + .insert_l2_block(&create_l2_block(1)) + .await + .unwrap(); + let nonce_key = get_nonce_key(&tx.initiator_account()); + let nonce_log = StorageLog::new_write_log(nonce_key, H256::from_low_u64_be(1)); + storage + .storage_logs_dal() + .insert_storage_logs(L2BlockNumber(1), &[(H256::zero(), vec![nonce_log])]) + .await + .unwrap(); + + cache_update_method.apply(&pool, &proxy, tx.hash()).await; + + // Transaction should be removed from the cache + assert!(proxy.tx_cache.get(tx.hash()).await.is_none()); + { + let cache_inner = proxy.tx_cache.inner.read().await; + assert!(!cache_inner.transactions_by_hash.contains_key(&tx.hash())); + assert!(!cache_inner + .nonces_by_account + .contains_key(&tx.initiator_account())); + assert!(!cache_inner + .tx_hashes_by_initiator + .contains_key(&(tx.initiator_account(), Nonce(0)))); + } + + let looked_up_tx = proxy + .lookup_tx(&mut storage, api::TransactionId::Hash(tx.hash())) + .await + .unwrap(); + assert!(looked_up_tx.is_none()); + let looked_up_tx = proxy + .lookup_tx_details(&mut storage, tx.hash()) + .await + .unwrap(); + assert!(looked_up_tx.is_none()); } - async fn lookup_tx_details(&self, hash: H256) -> Result, Web3Error> { - Ok(self.request_tx_details(hash).await?) + #[test_casing(3, CacheUpdateMethod::ALL)] + #[tokio::test] + async fn removing_replaced_transaction_from_cache(cache_update_method: CacheUpdateMethod) { + let pool = ConnectionPool::::test_pool().await; + let mut storage = pool.connection().await.unwrap(); + let params = GenesisParams::load_genesis_params(mock_genesis_config()).unwrap(); + insert_genesis_batch(&mut storage, ¶ms).await.unwrap(); + + let tx = create_l2_transaction(10, 100); + let mut replacing_tx = create_l2_transaction(10, 100); + assert_eq!(tx.nonce(), replacing_tx.nonce()); + replacing_tx.common_data.initiator_address = tx.initiator_account(); + let mut future_tx = create_l2_transaction(10, 100); + future_tx.common_data.initiator_address = tx.initiator_account(); + future_tx.common_data.nonce = Nonce(1); + + let main_node_client = MockClient::builder(L2::default()) + .method("eth_sendRawTransaction", |_bytes: Bytes| Ok(H256::zero())) + .build(); + let proxy = TxProxy::new(Box::new(main_node_client)); + proxy + .submit_tx(&tx, TransactionExecutionMetrics::default()) + .await + .unwrap(); + proxy + .submit_tx(&replacing_tx, TransactionExecutionMetrics::default()) + .await + .unwrap(); + proxy + .submit_tx(&future_tx, TransactionExecutionMetrics::default()) + .await + .unwrap(); + { + let cache_inner = proxy.tx_cache.inner.read().await; + assert_eq!(cache_inner.nonces_by_account.len(), 1); + let account_nonces = &cache_inner.nonces_by_account[&tx.initiator_account()]; + assert_eq!(*account_nonces, BTreeSet::from([Nonce(0), Nonce(1)])); + assert_eq!(cache_inner.tx_hashes_by_initiator.len(), 2); + assert_eq!( + cache_inner.tx_hashes_by_initiator[&(tx.initiator_account(), Nonce(0))], + HashSet::from([tx.hash(), replacing_tx.hash()]) + ); + assert_eq!( + cache_inner.tx_hashes_by_initiator[&(tx.initiator_account(), Nonce(1))], + HashSet::from([future_tx.hash()]) + ); + } + + // Emulate the replacing transaction getting sealed. + storage + .blocks_dal() + .insert_l2_block(&create_l2_block(1)) + .await + .unwrap(); + let nonce_key = get_nonce_key(&tx.initiator_account()); + let nonce_log = StorageLog::new_write_log(nonce_key, H256::from_low_u64_be(1)); + storage + .storage_logs_dal() + .insert_storage_logs(L2BlockNumber(1), &[(H256::zero(), vec![nonce_log])]) + .await + .unwrap(); + + cache_update_method + .apply(&pool, &proxy, replacing_tx.hash()) + .await; + + // Original and replacing transactions should be removed from the cache, and the future transaction should be retained. + { + let cache_inner = proxy.tx_cache.inner.read().await; + assert!(!cache_inner.transactions_by_hash.contains_key(&tx.hash())); + assert!(!cache_inner + .transactions_by_hash + .contains_key(&replacing_tx.hash())); + assert_eq!( + cache_inner.nonces_by_account[&tx.initiator_account()], + BTreeSet::from([Nonce(1)]) + ); + assert!(!cache_inner + .tx_hashes_by_initiator + .contains_key(&(tx.initiator_account(), Nonce(0)))); + assert_eq!( + cache_inner.tx_hashes_by_initiator[&(tx.initiator_account(), Nonce(1))], + HashSet::from([future_tx.hash()]) + ); + } + + for missing_hash in [tx.hash(), replacing_tx.hash()] { + let looked_up_tx = proxy + .lookup_tx(&mut storage, api::TransactionId::Hash(missing_hash)) + .await + .unwrap(); + assert!(looked_up_tx.is_none()); + let looked_up_tx = proxy + .lookup_tx_details(&mut storage, missing_hash) + .await + .unwrap(); + assert!(looked_up_tx.is_none()); + } + proxy + .lookup_tx(&mut storage, api::TransactionId::Hash(future_tx.hash())) + .await + .unwrap() + .expect("no transaction"); + proxy + .lookup_tx_details(&mut storage, future_tx.hash()) + .await + .unwrap() + .expect("no transaction"); } } diff --git a/core/node/api_server/src/tx_sender/tx_sink.rs b/core/node/api_server/src/tx_sender/tx_sink.rs index 89a69345965..5edf21b0701 100644 --- a/core/node/api_server/src/tx_sender/tx_sink.rs +++ b/core/node/api_server/src/tx_sender/tx_sink.rs @@ -1,4 +1,4 @@ -use zksync_dal::transactions_dal::L2TxSubmissionResult; +use zksync_dal::{transactions_dal::L2TxSubmissionResult, Connection, Core}; use zksync_types::{ api::{Transaction, TransactionDetails, TransactionId}, fee::TransactionExecutionMetrics, @@ -42,7 +42,11 @@ pub trait TxSink: std::fmt::Debug + Send + Sync + 'static { /// Attempts to look up the transaction by its API ID in the sink-specific storage. /// By default, returns `Ok(None)`. - async fn lookup_tx(&self, _id: TransactionId) -> Result, Web3Error> { + async fn lookup_tx( + &self, + _storage: &mut Connection<'_, Core>, + _id: TransactionId, + ) -> Result, Web3Error> { Ok(None) } @@ -50,6 +54,7 @@ pub trait TxSink: std::fmt::Debug + Send + Sync + 'static { /// By default, returns `Ok(None)`. async fn lookup_tx_details( &self, + _storage: &mut Connection<'_, Core>, _hash: H256, ) -> Result, Web3Error> { Ok(None) diff --git a/core/node/api_server/src/utils.rs b/core/node/api_server/src/utils.rs index e95ed019f8c..6769e773dc7 100644 --- a/core/node/api_server/src/utils.rs +++ b/core/node/api_server/src/utils.rs @@ -6,6 +6,21 @@ use std::{ time::{Duration, Instant}, }; +use zksync_dal::{Connection, Core, DalError}; +use zksync_web3_decl::error::Web3Error; + +/// Opens a readonly transaction over the specified connection. +pub(crate) async fn open_readonly_transaction<'r>( + conn: &'r mut Connection<'_, Core>, +) -> Result, Web3Error> { + let builder = conn.transaction_builder().map_err(DalError::generalize)?; + Ok(builder + .set_readonly() + .build() + .await + .map_err(DalError::generalize)?) +} + /// Allows filtering events (e.g., for logging) so that they are reported no more frequently than with a configurable interval. /// /// Current implementation uses thread-local vars in order to not rely on mutexes or other cross-thread primitives. diff --git a/core/node/api_server/src/web3/namespaces/eth.rs b/core/node/api_server/src/web3/namespaces/eth.rs index e2224ce92cd..d1801fde6e4 100644 --- a/core/node/api_server/src/web3/namespaces/eth.rs +++ b/core/node/api_server/src/web3/namespaces/eth.rs @@ -18,8 +18,9 @@ use zksync_web3_decl::{ types::{Address, Block, Filter, FilterChanges, Log, U64}, }; -use crate::web3::{ - backend_jsonrpsee::MethodTracer, metrics::API_METRICS, state::RpcState, TypedFilter, +use crate::{ + utils::open_readonly_transaction, + web3::{backend_jsonrpsee::MethodTracer, metrics::API_METRICS, state::RpcState, TypedFilter}, }; pub const EVENT_TOPIC_NUMBER_LIMIT: usize = 4; @@ -463,6 +464,9 @@ impl EthNamespace { id: TransactionId, ) -> Result, Web3Error> { let mut storage = self.state.acquire_connection().await?; + // Open a readonly transaction to have a consistent view of Postgres + let mut storage = open_readonly_transaction(&mut storage).await?; + let chain_id = self.state.api_config.l2_chain_id; let mut transaction = match id { TransactionId::Hash(hash) => storage @@ -497,7 +501,7 @@ impl EthNamespace { }; if transaction.is_none() { - transaction = self.state.tx_sink().lookup_tx(id).await?; + transaction = self.state.tx_sink().lookup_tx(&mut storage, id).await?; } Ok(transaction) } diff --git a/core/node/api_server/src/web3/namespaces/zks.rs b/core/node/api_server/src/web3/namespaces/zks.rs index f65dcb2525c..6b872bcf637 100644 --- a/core/node/api_server/src/web3/namespaces/zks.rs +++ b/core/node/api_server/src/web3/namespaces/zks.rs @@ -29,7 +29,10 @@ use zksync_web3_decl::{ types::{Address, Token, H256}, }; -use crate::web3::{backend_jsonrpsee::MethodTracer, metrics::API_METRICS, RpcState}; +use crate::{ + utils::open_readonly_transaction, + web3::{backend_jsonrpsee::MethodTracer, metrics::API_METRICS, RpcState}, +}; #[derive(Debug)] pub(crate) struct ZksNamespace { @@ -399,15 +402,20 @@ impl ZksNamespace { hash: H256, ) -> Result, Web3Error> { let mut storage = self.state.acquire_connection().await?; + // Open a readonly transaction to have a consistent view of Postgres + let mut storage = open_readonly_transaction(&mut storage).await?; let mut tx_details = storage .transactions_web3_dal() .get_transaction_details(hash) .await .map_err(DalError::generalize)?; - drop(storage); if tx_details.is_none() { - tx_details = self.state.tx_sink().lookup_tx_details(hash).await?; + tx_details = self + .state + .tx_sink() + .lookup_tx_details(&mut storage, hash) + .await?; } Ok(tx_details) }