Skip to content

Commit

Permalink
use storage ledger info when getting transaction by hash
Browse files Browse the repository at this point in the history
save progress
  • Loading branch information
areshand committed Sep 24, 2024
1 parent 8689b9a commit e37a496
Show file tree
Hide file tree
Showing 12 changed files with 256 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"hash": "",
"sender": "0xa550c18",
"sequence_number": "0",
"max_gas_amount": "100000000",
"gas_unit_price": "0",
"expiration_timestamp_secs": "18446744073709551615",
"payload": {
"function": "0x1::aptos_account::create_account",
"type_arguments": [],
"arguments": [
"0x34bf7e2d17674feb234371a7ea58efd715f0e56ba20ebf13789480d9d643afaf"
],
"type": "entry_function_payload"
},
"signature": {
"public_key": "0x14418f867a0bd6d42abb2daa50cd68a5a869ce208282481f57504f630510d0d3",
"signature": "0x95915d42cd822b6195581e9be3c164b70afeb9228ebb68c2e3f14240e3f43a164caabae8096163c6a341fc3830b36618b4619b7d5f2edcd603690e91a62fdb05",
"type": "ed25519_signature"
},
"type": "pending_transaction"
}
18 changes: 18 additions & 0 deletions api/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,20 @@ impl Context {
self.get_latest_storage_ledger_info()
}

pub fn get_latest_internal_and_storage_ledger_info<E: ServiceUnavailableError>(
&self,
) -> Result<(Option<LedgerInfo>, LedgerInfo), E> {
if let Some(indexer_reader) = self.indexer_reader.as_ref() {
if indexer_reader.is_internal_indexer_enabled() {
return Ok((
Some(self.get_latest_internal_indexer_ledger_info()?),
self.get_latest_storage_ledger_info()?,
));
}
}
Ok((None, self.get_latest_storage_ledger_info()?))
}

pub fn get_latest_ledger_info_and_verify_lookup_version<E: StdApiError>(
&self,
requested_ledger_version: Option<Version>,
Expand Down Expand Up @@ -942,6 +956,10 @@ impl Context {
}
}

pub fn get_indexer_reader(&self) -> Option<&Arc<dyn IndexerReader>> {
self.indexer_reader.as_ref()
}

fn next_bucket(&self, gas_unit_price: u64) -> u64 {
match self
.node_config
Expand Down
6 changes: 5 additions & 1 deletion api/src/tests/accounts_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async fn test_get_account_resources_by_address_0x0() {
context.check_golden_output(resp);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test(flavor = "multi_thread", worker_threads = 3)]
async fn test_get_account_resources_by_valid_account_address() {
let context = new_test_context(current_function_name!());
let addresses = vec!["0x1", "0x00000000000000000000000000000001"];
Expand Down Expand Up @@ -144,6 +144,10 @@ async fn test_account_resources_by_ledger_version_with_context(mut context: Test
async fn test_get_account_resources_by_ledger_version() {
let context = new_test_context(current_function_name!());
test_account_resources_by_ledger_version_with_context(context).await;
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_get_account_resources_by_ledger_version_with_shard_context() {
let shard_context =
new_test_context_with_db_sharding_and_internal_indexer(current_function_name!());
test_account_resources_by_ledger_version_with_context(shard_context).await;
Expand Down
18 changes: 14 additions & 4 deletions api/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,30 @@ mod transactions_test;
mod view_function;
mod webauthn_secp256r1_ecdsa;

use aptos_api_test_context::{new_test_context as super_new_test_context, TestContext};
use aptos_api_test_context::{new_test_context_inner as super_new_test_context, TestContext};
use aptos_config::config::{internal_indexer_db_config::InternalIndexerDBConfig, NodeConfig};

fn new_test_context(test_name: String) -> TestContext {
new_test_context_with_config(test_name, NodeConfig::default())
}

fn new_test_context_with_config(test_name: String, node_config: NodeConfig) -> TestContext {
super_new_test_context(test_name, node_config, false)
super_new_test_context(test_name, node_config, false, None)
}

fn new_test_context_with_db_sharding_and_internal_indexer(test_name: String) -> TestContext {
let mut node_config = NodeConfig::default();
node_config.storage.rocksdb_configs.enable_storage_sharding = true;
node_config.indexer_db_config = InternalIndexerDBConfig::new(true, true, true, 10_000);
super_new_test_context(test_name, node_config, true)
node_config.indexer_db_config = InternalIndexerDBConfig::new(true, true, true, 10);
super_new_test_context(test_name, node_config, true, None)
}

fn new_test_context_with_sharding_and_delayed_internal_indexer(
test_name: String,
end_version: Option<u64>,
) -> TestContext {
let mut node_config = NodeConfig::default();
node_config.storage.rocksdb_configs.enable_storage_sharding = true;
node_config.indexer_db_config = InternalIndexerDBConfig::new(true, true, true, 1);
super_new_test_context(test_name, node_config, true, end_version)
}
21 changes: 21 additions & 0 deletions api/src/tests/transactions_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use super::new_test_context;
use crate::tests::{
new_test_context_with_config, new_test_context_with_db_sharding_and_internal_indexer,
new_test_context_with_sharding_and_delayed_internal_indexer,
};
use aptos_api_test_context::{assert_json, current_function_name, pretty, TestContext};
use aptos_config::config::{GasEstimationStaticOverride, NodeConfig};
Expand Down Expand Up @@ -491,6 +492,26 @@ async fn test_get_transaction_by_hash() {
assert_json(resp, txns[0].clone());
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_get_transaction_by_hash_with_delayed_internal_indexer() {
let mut context = new_test_context_with_sharding_and_delayed_internal_indexer(
current_function_name!(),
Some(1),
);
let account = context.gen_account();
let txn = context.create_user_account(&account).await;
let committed_hash = txn.committed_hash().to_hex_literal();
context.commit_block(&vec![txn.clone()]).await;
let _ = context
.get_indexer_reader()
.unwrap()
.wait_for_internal_indexer(1);
let resp = context
.get(&format!("/transactions/by_hash/{}", committed_hash))
.await;
context.check_golden_output(resp);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_get_transaction_by_hash_not_found() {
let mut context = new_test_context(current_function_name!());
Expand Down
119 changes: 91 additions & 28 deletions api/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -793,32 +793,44 @@ impl TransactionsApi {
let context = self.context.clone();
let accept_type = accept_type.clone();

let ledger_info = api_spawn_blocking(move || context.get_latest_ledger_info()).await?;
let (internal_ledger_info_opt, storage_ledger_info) =
api_spawn_blocking(move || context.get_latest_internal_and_storage_ledger_info())
.await?;

let txn_data = self
.get_by_hash(hash.into(), &ledger_info)
.get_by_hash(
hash.into(),
&storage_ledger_info,
internal_ledger_info_opt.as_ref(),
)
.await
.context(format!("Failed to get transaction by hash {}", hash))
.map_err(|err| {
BasicErrorWith404::internal_with_code(
err,
AptosErrorCode::InternalError,
&ledger_info,
&storage_ledger_info,
)
})?
.context(format!("Failed to find transaction with hash: {}", hash))
.map_err(|_| transaction_not_found_by_hash(hash, &ledger_info))?;

if let TransactionData::Pending(_) = txn_data {
if (start_time.elapsed().as_millis() as u64) < wait_by_hash_timeout_ms {
tokio::time::sleep(Duration::from_millis(wait_by_hash_poll_interval_ms)).await;
continue;
}
.map_err(|_| transaction_not_found_by_hash(hash, &storage_ledger_info))?;

if matches!(
txn_data,
TransactionData::Pending(_) | TransactionData::PendingQueryable(_)
) && (start_time.elapsed().as_millis() as u64) < wait_by_hash_timeout_ms
{
tokio::time::sleep(Duration::from_millis(wait_by_hash_poll_interval_ms)).await;
continue;
}

let api = self.clone();
return api_spawn_blocking(move || {
api.get_transaction_inner(&accept_type, txn_data, &ledger_info)
api.get_transaction_inner(
&accept_type,
txn_data,
&internal_ledger_info_opt.unwrap_or(storage_ledger_info),
)
})
.await;
}
Expand All @@ -832,25 +844,37 @@ impl TransactionsApi {
let context = self.context.clone();
let accept_type = accept_type.clone();

let ledger_info = api_spawn_blocking(move || context.get_latest_ledger_info()).await?;
let (internal_ledger_info_opt, storage_ledger_info) =
api_spawn_blocking(move || context.get_latest_internal_and_storage_ledger_info())
.await?;

let txn_data = self
.get_by_hash(hash.into(), &ledger_info)
.get_by_hash(
hash.into(),
&storage_ledger_info,
internal_ledger_info_opt.as_ref(),
)
.await
.context(format!("Failed to get transaction by hash {}", hash))
.map_err(|err| {
BasicErrorWith404::internal_with_code(
err,
AptosErrorCode::InternalError,
&ledger_info,
&storage_ledger_info,
)
})?
.context(format!("Failed to find transaction with hash: {}", hash))
.map_err(|_| transaction_not_found_by_hash(hash, &ledger_info))?;
.map_err(|_| transaction_not_found_by_hash(hash, &storage_ledger_info))?;

let api = self.clone();
api_spawn_blocking(move || api.get_transaction_inner(&accept_type, txn_data, &ledger_info))
.await
api_spawn_blocking(move || {
api.get_transaction_inner(
&accept_type,
txn_data,
&internal_ledger_info_opt.unwrap_or(storage_ledger_info),
)
})
.await
}

fn get_transaction_by_version_inner(
Expand Down Expand Up @@ -921,6 +945,19 @@ impl TransactionsApi {
ledger_info,
)
})?,
TransactionData::PendingQueryable(txn) => state_view
.as_converter(self.context.db.clone(), self.context.indexer_reader.clone())
.try_into_pending_transaction_from_pending_queryable_transaction(txn)
.context(
"Failed to convert on pending queryable transaction to Transaction",
)
.map_err(|err| {
BasicErrorWith404::internal_with_code(
err,
AptosErrorCode::InternalError,
ledger_info,
)
})?,
};

BasicResponse::try_from_json((transaction, ledger_info, BasicResponseStatus::Ok))
Expand All @@ -946,9 +983,11 @@ impl TransactionsApi {
return Ok(GetByVersionResponse::VersionTooOld);
}
Ok(GetByVersionResponse::Found(
self.context
.get_transaction_by_version(version, ledger_info.version())?
.into(),
TransactionData::from_transaction_onchain_data(
self.context
.get_transaction_by_version(version, ledger_info.version())?,
ledger_info.version(),
),
))
}

Expand All @@ -959,22 +998,46 @@ impl TransactionsApi {
async fn get_by_hash(
&self,
hash: aptos_crypto::HashValue,
ledger_info: &LedgerInfo,
storage_ledger_info: &LedgerInfo,
internal_ledger_info: Option<&LedgerInfo>,
) -> anyhow::Result<Option<TransactionData>> {
let context = self.context.clone();
let version = ledger_info.version();
let version = storage_ledger_info.version();
let from_db =
tokio::task::spawn_blocking(move || context.get_transaction_by_hash(hash, version))
.await
.context("Failed to join task to read transaction by hash")?
.context("Failed to read transaction by hash from DB")?;
Ok(match from_db {
None => self
.context
.get_pending_transaction_by_hash(hash)
.await?
.map(|t| t.into()),
_ => from_db.map(|t| t.into()),
None => match self.context.get_pending_transaction_by_hash(hash).await? {
None => {
// try to get the transaction from db again to make sure it is not missed earlier
let context_clone = self.context.clone();
tokio::task::spawn_blocking(move || {
context_clone.get_transaction_by_hash(hash, version)
})
.await
.context("Failed to join task to read transaction by hash")?
.context("Failed to read transaction by hash from DB")?
.map(|t| {
TransactionData::from_transaction_onchain_data(
t,
internal_ledger_info
.unwrap_or(storage_ledger_info)
.ledger_version
.into(),
)
})
},
Some(t) => Some(t.into()),
},
Some(t) => Some(TransactionData::from_transaction_onchain_data(
t,
internal_ledger_info
.unwrap_or(storage_ledger_info)
.ledger_version
.into(),
)),
})
}

Expand Down
26 changes: 25 additions & 1 deletion api/test-context/src/test_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use aptos_types::{
block_info::BlockInfo,
block_metadata::BlockMetadata,
chain_id::ChainId,
indexer::indexer_db_reader::IndexerReader,
ledger_info::{LedgerInfo, LedgerInfoWithSignatures},
transaction::{
signature_verified_transaction::into_signature_verified_block, Transaction,
Expand Down Expand Up @@ -94,9 +95,18 @@ impl ApiSpecificConfig {
}

pub fn new_test_context(
test_name: String,
node_config: NodeConfig,
use_db_with_indexer: bool,
) -> TestContext {
new_test_context_inner(test_name, node_config, use_db_with_indexer, None)
}

pub fn new_test_context_inner(
test_name: String,
mut node_config: NodeConfig,
use_db_with_indexer: bool,
end_version: Option<u64>,
) -> TestContext {
// Speculative logging uses a global variable and when many instances use it together, they
// panic, so we disable this to run tests.
Expand Down Expand Up @@ -155,7 +165,7 @@ pub fn new_test_context(
.storage
.set_data_dir(tmp_dir.path().to_path_buf());
let mock_indexer_service =
MockInternalIndexerDBService::new_for_test(db_rw.reader.clone(), &node_config);
MockInternalIndexerDBService::new_for_test(db_rw.reader.clone(), &node_config, end_version);

let context = Context::new(
ChainId::test(),
Expand Down Expand Up @@ -425,6 +435,10 @@ impl TestContext {
.await;
}

pub fn get_indexer_reader(&self) -> Option<&Arc<dyn IndexerReader>> {
self.context.get_indexer_reader()
}

pub async fn create_multisig_account(
&mut self,
account: &mut LocalAccount,
Expand Down Expand Up @@ -562,6 +576,16 @@ impl TestContext {
self.context.get_latest_ledger_info::<BasicError>().unwrap()
}

pub fn get_latest_storage_ledger_info(&self) -> aptos_api_types::LedgerInfo {
self.context
.get_latest_storage_ledger_info::<BasicError>()
.unwrap()
}

pub fn get_indexer_readers(&self) -> Option<&Arc<dyn IndexerReader>> {
self.context.get_indexer_reader()
}

pub fn get_transactions(&self, start: u64, limit: u16) -> Vec<TransactionOnChainData> {
self.context
.get_transactions(start, limit, self.get_latest_ledger_info().version())
Expand Down
Loading

0 comments on commit e37a496

Please sign in to comment.