diff --git a/api/goldens/aptos_api__tests__transactions_test__test_get_transaction_by_hash_with_delayed_internal_indexer.json b/api/goldens/aptos_api__tests__transactions_test__test_get_transaction_by_hash_with_delayed_internal_indexer.json new file mode 100644 index 00000000000000..76753f5be8fe00 --- /dev/null +++ b/api/goldens/aptos_api__tests__transactions_test__test_get_transaction_by_hash_with_delayed_internal_indexer.json @@ -0,0 +1,22 @@ +{ + "hash": "", + "sender": "0xa550c18", + "sequence_number": "0", + "max_gas_amount": "100000000", + "gas_unit_price": "0", + "expiration_timestamp_secs": "18446744073709551615", + "payload": { + "function": "0x1::aptos_account::create_account", + "type_arguments": [], + "arguments": [ + "0x34bf7e2d17674feb234371a7ea58efd715f0e56ba20ebf13789480d9d643afaf" + ], + "type": "entry_function_payload" + }, + "signature": { + "public_key": "0x14418f867a0bd6d42abb2daa50cd68a5a869ce208282481f57504f630510d0d3", + "signature": "0x95915d42cd822b6195581e9be3c164b70afeb9228ebb68c2e3f14240e3f43a164caabae8096163c6a341fc3830b36618b4619b7d5f2edcd603690e91a62fdb05", + "type": "ed25519_signature" + }, + "type": "pending_transaction" +} diff --git a/api/src/context.rs b/api/src/context.rs index 4cea588b9906a8..ade7f9920aeec8 100644 --- a/api/src/context.rs +++ b/api/src/context.rs @@ -267,6 +267,20 @@ impl Context { self.get_latest_storage_ledger_info() } + pub fn get_latest_internal_and_storage_ledger_info( + &self, + ) -> Result<(Option, LedgerInfo), E> { + if let Some(indexer_reader) = self.indexer_reader.as_ref() { + if indexer_reader.is_internal_indexer_enabled() { + return Ok(( + Some(self.get_latest_internal_indexer_ledger_info()?), + self.get_latest_storage_ledger_info()?, + )); + } + } + Ok((None, self.get_latest_storage_ledger_info()?)) + } + pub fn get_latest_ledger_info_and_verify_lookup_version( &self, requested_ledger_version: Option, @@ -942,6 +956,10 @@ impl Context { } } + pub fn get_indexer_reader(&self) -> Option<&Arc> { + self.indexer_reader.as_ref() + } + fn next_bucket(&self, gas_unit_price: u64) -> u64 { match self .node_config diff --git a/api/src/tests/accounts_test.rs b/api/src/tests/accounts_test.rs index a5199cf61c452b..a6fed2893e7fa5 100644 --- a/api/src/tests/accounts_test.rs +++ b/api/src/tests/accounts_test.rs @@ -33,7 +33,7 @@ async fn test_get_account_resources_by_address_0x0() { context.check_golden_output(resp); } -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +#[tokio::test(flavor = "multi_thread", worker_threads = 3)] async fn test_get_account_resources_by_valid_account_address() { let context = new_test_context(current_function_name!()); let addresses = vec!["0x1", "0x00000000000000000000000000000001"]; @@ -144,6 +144,10 @@ async fn test_account_resources_by_ledger_version_with_context(mut context: Test async fn test_get_account_resources_by_ledger_version() { let context = new_test_context(current_function_name!()); test_account_resources_by_ledger_version_with_context(context).await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_get_account_resources_by_ledger_version_with_shard_context() { let shard_context = new_test_context_with_db_sharding_and_internal_indexer(current_function_name!()); test_account_resources_by_ledger_version_with_context(shard_context).await; diff --git a/api/src/tests/mod.rs b/api/src/tests/mod.rs index e7978f66a126e8..5e7573d8920291 100644 --- a/api/src/tests/mod.rs +++ b/api/src/tests/mod.rs @@ -21,7 +21,7 @@ mod transactions_test; mod view_function; mod webauthn_secp256r1_ecdsa; -use aptos_api_test_context::{new_test_context as super_new_test_context, TestContext}; +use aptos_api_test_context::{new_test_context_inner as super_new_test_context, TestContext}; use aptos_config::config::{internal_indexer_db_config::InternalIndexerDBConfig, NodeConfig}; fn new_test_context(test_name: String) -> TestContext { @@ -29,12 +29,22 @@ fn new_test_context(test_name: String) -> TestContext { } fn new_test_context_with_config(test_name: String, node_config: NodeConfig) -> TestContext { - super_new_test_context(test_name, node_config, false) + super_new_test_context(test_name, node_config, false, None) } fn new_test_context_with_db_sharding_and_internal_indexer(test_name: String) -> TestContext { let mut node_config = NodeConfig::default(); node_config.storage.rocksdb_configs.enable_storage_sharding = true; - node_config.indexer_db_config = InternalIndexerDBConfig::new(true, true, true, 10_000); - super_new_test_context(test_name, node_config, true) + node_config.indexer_db_config = InternalIndexerDBConfig::new(true, true, true, 10); + super_new_test_context(test_name, node_config, true, None) +} + +fn new_test_context_with_sharding_and_delayed_internal_indexer( + test_name: String, + end_version: Option, +) -> TestContext { + let mut node_config = NodeConfig::default(); + node_config.storage.rocksdb_configs.enable_storage_sharding = true; + node_config.indexer_db_config = InternalIndexerDBConfig::new(true, true, true, 1); + super_new_test_context(test_name, node_config, true, end_version) } diff --git a/api/src/tests/transactions_test.rs b/api/src/tests/transactions_test.rs index 292c5318f8e6d7..2b3050f7307188 100644 --- a/api/src/tests/transactions_test.rs +++ b/api/src/tests/transactions_test.rs @@ -5,6 +5,7 @@ use super::new_test_context; use crate::tests::{ new_test_context_with_config, new_test_context_with_db_sharding_and_internal_indexer, + new_test_context_with_sharding_and_delayed_internal_indexer, }; use aptos_api_test_context::{assert_json, current_function_name, pretty, TestContext}; use aptos_config::config::{GasEstimationStaticOverride, NodeConfig}; @@ -491,6 +492,26 @@ async fn test_get_transaction_by_hash() { assert_json(resp, txns[0].clone()); } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_get_transaction_by_hash_with_delayed_internal_indexer() { + let mut context = new_test_context_with_sharding_and_delayed_internal_indexer( + current_function_name!(), + Some(1), + ); + let account = context.gen_account(); + let txn = context.create_user_account(&account).await; + let committed_hash = txn.committed_hash().to_hex_literal(); + context.commit_block(&vec![txn.clone()]).await; + let _ = context + .get_indexer_reader() + .unwrap() + .wait_for_internal_indexer(1); + let resp = context + .get(&format!("/transactions/by_hash/{}", committed_hash)) + .await; + context.check_golden_output(resp); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_get_transaction_by_hash_not_found() { let mut context = new_test_context(current_function_name!()); diff --git a/api/src/transactions.rs b/api/src/transactions.rs index 1e1214361961b0..0127718d123a63 100644 --- a/api/src/transactions.rs +++ b/api/src/transactions.rs @@ -793,32 +793,44 @@ impl TransactionsApi { let context = self.context.clone(); let accept_type = accept_type.clone(); - let ledger_info = api_spawn_blocking(move || context.get_latest_ledger_info()).await?; + let (internal_ledger_info_opt, storage_ledger_info) = + api_spawn_blocking(move || context.get_latest_internal_and_storage_ledger_info()) + .await?; let txn_data = self - .get_by_hash(hash.into(), &ledger_info) + .get_by_hash( + hash.into(), + &storage_ledger_info, + internal_ledger_info_opt.as_ref(), + ) .await .context(format!("Failed to get transaction by hash {}", hash)) .map_err(|err| { BasicErrorWith404::internal_with_code( err, AptosErrorCode::InternalError, - &ledger_info, + &storage_ledger_info, ) })? .context(format!("Failed to find transaction with hash: {}", hash)) - .map_err(|_| transaction_not_found_by_hash(hash, &ledger_info))?; - - if let TransactionData::Pending(_) = txn_data { - if (start_time.elapsed().as_millis() as u64) < wait_by_hash_timeout_ms { - tokio::time::sleep(Duration::from_millis(wait_by_hash_poll_interval_ms)).await; - continue; - } + .map_err(|_| transaction_not_found_by_hash(hash, &storage_ledger_info))?; + + if matches!( + txn_data, + TransactionData::Pending(_) | TransactionData::PendingQueryable(_) + ) && (start_time.elapsed().as_millis() as u64) < wait_by_hash_timeout_ms + { + tokio::time::sleep(Duration::from_millis(wait_by_hash_poll_interval_ms)).await; + continue; } let api = self.clone(); return api_spawn_blocking(move || { - api.get_transaction_inner(&accept_type, txn_data, &ledger_info) + api.get_transaction_inner( + &accept_type, + txn_data, + &internal_ledger_info_opt.unwrap_or(storage_ledger_info), + ) }) .await; } @@ -832,25 +844,37 @@ impl TransactionsApi { let context = self.context.clone(); let accept_type = accept_type.clone(); - let ledger_info = api_spawn_blocking(move || context.get_latest_ledger_info()).await?; + let (internal_ledger_info_opt, storage_ledger_info) = + api_spawn_blocking(move || context.get_latest_internal_and_storage_ledger_info()) + .await?; let txn_data = self - .get_by_hash(hash.into(), &ledger_info) + .get_by_hash( + hash.into(), + &storage_ledger_info, + internal_ledger_info_opt.as_ref(), + ) .await .context(format!("Failed to get transaction by hash {}", hash)) .map_err(|err| { BasicErrorWith404::internal_with_code( err, AptosErrorCode::InternalError, - &ledger_info, + &storage_ledger_info, ) })? .context(format!("Failed to find transaction with hash: {}", hash)) - .map_err(|_| transaction_not_found_by_hash(hash, &ledger_info))?; + .map_err(|_| transaction_not_found_by_hash(hash, &storage_ledger_info))?; let api = self.clone(); - api_spawn_blocking(move || api.get_transaction_inner(&accept_type, txn_data, &ledger_info)) - .await + api_spawn_blocking(move || { + api.get_transaction_inner( + &accept_type, + txn_data, + &internal_ledger_info_opt.unwrap_or(storage_ledger_info), + ) + }) + .await } fn get_transaction_by_version_inner( @@ -921,6 +945,19 @@ impl TransactionsApi { ledger_info, ) })?, + TransactionData::PendingQueryable(txn) => state_view + .as_converter(self.context.db.clone(), self.context.indexer_reader.clone()) + .try_into_pending_transaction_from_pending_queryable_transaction(txn) + .context( + "Failed to convert on pending queryable transaction to Transaction", + ) + .map_err(|err| { + BasicErrorWith404::internal_with_code( + err, + AptosErrorCode::InternalError, + ledger_info, + ) + })?, }; BasicResponse::try_from_json((transaction, ledger_info, BasicResponseStatus::Ok)) @@ -946,9 +983,11 @@ impl TransactionsApi { return Ok(GetByVersionResponse::VersionTooOld); } Ok(GetByVersionResponse::Found( - self.context - .get_transaction_by_version(version, ledger_info.version())? - .into(), + TransactionData::from_transaction_onchain_data( + self.context + .get_transaction_by_version(version, ledger_info.version())?, + ledger_info.version(), + ), )) } @@ -959,22 +998,46 @@ impl TransactionsApi { async fn get_by_hash( &self, hash: aptos_crypto::HashValue, - ledger_info: &LedgerInfo, + storage_ledger_info: &LedgerInfo, + internal_ledger_info: Option<&LedgerInfo>, ) -> anyhow::Result> { let context = self.context.clone(); - let version = ledger_info.version(); + let version = storage_ledger_info.version(); let from_db = tokio::task::spawn_blocking(move || context.get_transaction_by_hash(hash, version)) .await .context("Failed to join task to read transaction by hash")? .context("Failed to read transaction by hash from DB")?; Ok(match from_db { - None => self - .context - .get_pending_transaction_by_hash(hash) - .await? - .map(|t| t.into()), - _ => from_db.map(|t| t.into()), + None => match self.context.get_pending_transaction_by_hash(hash).await? { + None => { + // try to get the transaction from db again to make sure it is not missed earlier + let context_clone = self.context.clone(); + tokio::task::spawn_blocking(move || { + context_clone.get_transaction_by_hash(hash, version) + }) + .await + .context("Failed to join task to read transaction by hash")? + .context("Failed to read transaction by hash from DB")? + .map(|t| { + TransactionData::from_transaction_onchain_data( + t, + internal_ledger_info + .unwrap_or(storage_ledger_info) + .ledger_version + .into(), + ) + }) + }, + Some(t) => Some(t.into()), + }, + Some(t) => Some(TransactionData::from_transaction_onchain_data( + t, + internal_ledger_info + .unwrap_or(storage_ledger_info) + .ledger_version + .into(), + )), }) } diff --git a/api/test-context/src/test_context.rs b/api/test-context/src/test_context.rs index b9f938de52b125..1a39eb12c51505 100644 --- a/api/test-context/src/test_context.rs +++ b/api/test-context/src/test_context.rs @@ -40,6 +40,7 @@ use aptos_types::{ block_info::BlockInfo, block_metadata::BlockMetadata, chain_id::ChainId, + indexer::indexer_db_reader::IndexerReader, ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, transaction::{ signature_verified_transaction::into_signature_verified_block, Transaction, @@ -94,9 +95,18 @@ impl ApiSpecificConfig { } pub fn new_test_context( + test_name: String, + node_config: NodeConfig, + use_db_with_indexer: bool, +) -> TestContext { + new_test_context_inner(test_name, node_config, use_db_with_indexer, None) +} + +pub fn new_test_context_inner( test_name: String, mut node_config: NodeConfig, use_db_with_indexer: bool, + end_version: Option, ) -> TestContext { // Speculative logging uses a global variable and when many instances use it together, they // panic, so we disable this to run tests. @@ -155,7 +165,7 @@ pub fn new_test_context( .storage .set_data_dir(tmp_dir.path().to_path_buf()); let mock_indexer_service = - MockInternalIndexerDBService::new_for_test(db_rw.reader.clone(), &node_config); + MockInternalIndexerDBService::new_for_test(db_rw.reader.clone(), &node_config, end_version); let context = Context::new( ChainId::test(), @@ -425,6 +435,10 @@ impl TestContext { .await; } + pub fn get_indexer_reader(&self) -> Option<&Arc> { + self.context.get_indexer_reader() + } + pub async fn create_multisig_account( &mut self, account: &mut LocalAccount, @@ -562,6 +576,16 @@ impl TestContext { self.context.get_latest_ledger_info::().unwrap() } + pub fn get_latest_storage_ledger_info(&self) -> aptos_api_types::LedgerInfo { + self.context + .get_latest_storage_ledger_info::() + .unwrap() + } + + pub fn get_indexer_readers(&self) -> Option<&Arc> { + self.context.get_indexer_reader() + } + pub fn get_transactions(&self, start: u64, limit: u16) -> Vec { self.context .get_transactions(start, limit, self.get_latest_ledger_info().version()) diff --git a/api/types/src/convert.rs b/api/types/src/convert.rs index 85fecd9ec32ec6..74a976d4ae5190 100644 --- a/api/types/src/convert.rs +++ b/api/types/src/convert.rs @@ -167,6 +167,23 @@ impl<'a, S: StateView> MoveConverter<'a, S> { Ok((txn, payload).into()) } + pub fn try_into_pending_transaction_from_pending_queryable_transaction( + &self, + txn: TransactionOnChainData, + ) -> Result { + use aptos_types::transaction::Transaction::UserTransaction; + match txn.transaction { + UserTransaction(t) => { + let payload = self.try_into_transaction_payload(t.payload().clone())?; + Ok((t, payload).into()) + }, + _ => Err(format_err!( + "Expected UserTransaction, but got {:?}", + txn.transaction + )), + } + } + pub fn try_into_pending_transaction_poem( &self, txn: SignedTransaction, diff --git a/api/types/src/transaction.rs b/api/types/src/transaction.rs index 4af7dae0b843ee..f11c4a55667834 100755 --- a/api/types/src/transaction.rs +++ b/api/types/src/transaction.rs @@ -68,11 +68,20 @@ pub enum TransactionData { OnChain(TransactionOnChainData), /// A transaction currently sitting in mempool Pending(Box), + /// A transaction committed to storage but not yet to internal indexer yet. + PendingQueryable(TransactionOnChainData), } -impl From for TransactionData { - fn from(txn: TransactionOnChainData) -> Self { - Self::OnChain(txn) +impl TransactionData { + pub fn from_transaction_onchain_data( + txn: TransactionOnChainData, + latest_ledger_version: u64, + ) -> Self { + if txn.version > latest_ledger_version { + Self::PendingQueryable(txn) + } else { + Self::OnChain(txn) + } } } @@ -383,7 +392,7 @@ pub struct TransactionInfo { pub epoch: Option, } -/// A transaction waiting in mempool +/// A transaction waiting in mempool or not committed to internal indexer yet #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Object)] pub struct PendingTransaction { pub hash: HashValue, diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs index ca26fd0c6e49e7..e69278203d5c3a 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs @@ -158,6 +158,25 @@ impl InternalIndexerDBService { start_version = next_version; } } + + // For internal testing + pub async fn run_with_end_version( + &mut self, + node_config: &NodeConfig, + end_version: Option, + ) -> Result<()> { + let mut start_version = self.get_start_version(node_config).await?; + while start_version <= end_version.unwrap_or(std::u64::MAX) { + let next_version = self.db_indexer.process_a_batch(start_version)?; + if next_version == start_version { + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + continue; + } + start_version = next_version; + } + + Ok(()) + } } pub struct MockInternalIndexerDBService { @@ -166,7 +185,11 @@ pub struct MockInternalIndexerDBService { } impl MockInternalIndexerDBService { - pub fn new_for_test(db_reader: Arc, node_config: &NodeConfig) -> Self { + pub fn new_for_test( + db_reader: Arc, + node_config: &NodeConfig, + end_version: Option, + ) -> Self { if !node_config .indexer_db_config .is_internal_indexer_db_enabled() @@ -184,7 +207,7 @@ impl MockInternalIndexerDBService { let config_clone = node_config.to_owned(); handle.spawn(async move { internal_indexer_db_service - .run(&config_clone) + .run_with_end_version(&config_clone, end_version) .await .unwrap(); }); diff --git a/storage/indexer/src/db_indexer.rs b/storage/indexer/src/db_indexer.rs index f21aade0905a5a..279bd7eeabb3e3 100644 --- a/storage/indexer/src/db_indexer.rs +++ b/storage/indexer/src/db_indexer.rs @@ -407,6 +407,7 @@ impl DBIndexer { version += 1; Ok::<(), AptosDbError>(()) })?; + assert!(version > 0, "batch number should be greater than 0"); assert_eq!(num_transactions, version - start_version); if self.indexer_db.transaction_enabled() { batch.put::( diff --git a/testsuite/generate-format/tests/staged/api.yaml b/testsuite/generate-format/tests/staged/api.yaml index 72e38f150ea12a..bba4efafc8c59e 100644 --- a/testsuite/generate-format/tests/staged/api.yaml +++ b/testsuite/generate-format/tests/staged/api.yaml @@ -679,6 +679,10 @@ TransactionData: Pending: NEWTYPE: TYPENAME: SignedTransaction + 2: + PendingQueryable: + NEWTYPE: + TYPENAME: TransactionOnChainData TransactionInfo: ENUM: 0: