From 12e06e3989d88dfc2a06e1a32d2209885d98207e Mon Sep 17 00:00:00 2001 From: Xun Li Date: Tue, 26 Jul 2022 15:03:52 -0700 Subject: [PATCH] Fix flaky gateway tests (#3517) --- crates/sui-core/src/authority.rs | 2 +- crates/sui-core/src/gateway_state.rs | 126 ++++++++++-------- crates/sui-core/src/query_helpers.rs | 8 +- .../src/unit_tests/gateway_state_tests.rs | 22 ++- 4 files changed, 89 insertions(+), 69 deletions(-) diff --git a/crates/sui-core/src/authority.rs b/crates/sui-core/src/authority.rs index 10fca60f254a7..fb662850bfc5e 100644 --- a/crates/sui-core/src/authority.rs +++ b/crates/sui-core/src/authority.rs @@ -1247,7 +1247,7 @@ impl AuthorityState { &self, digest: TransactionDigest, ) -> Result<(CertifiedTransaction, TransactionEffects), anyhow::Error> { - QueryHelpers::get_transaction(&self.database, digest) + QueryHelpers::get_transaction(&self.database, &digest) } fn get_indexes(&self) -> SuiResult> { diff --git a/crates/sui-core/src/gateway_state.rs b/crates/sui-core/src/gateway_state.rs index f57ca4db7d25b..351eb04c05c94 100644 --- a/crates/sui-core/src/gateway_state.rs +++ b/crates/sui-core/src/gateway_state.rs @@ -638,7 +638,11 @@ where pending_transaction, } => { debug!(tx_digest=?pending_transaction, "Objects locked by a previous transaction, re-executing the previous transaction"); - if self.retry_pending_tx(pending_transaction).await.is_err() { + if let Err(err) = self.retry_pending_tx(pending_transaction).await { + debug!( + "Retrying pending tx failed: {:?}. Resetting the transaction lock", + err + ); self.store.reset_transaction_lock(&owned_objects).await?; } self.set_transaction_lock(&owned_objects, transaction.clone()) @@ -670,12 +674,21 @@ where } async fn retry_pending_tx(&self, digest: TransactionDigest) -> Result<(), anyhow::Error> { - let tx = self - .store - .get_transaction(&digest)? - .ok_or(SuiError::TransactionNotFound { digest })?; - self.execute_transaction(tx).await?; - Ok(()) + let tx = self.store.get_transaction(&digest)?; + match tx { + Some(tx) => { + self.execute_transaction(tx).await?; + Ok(()) + } + None => { + // It's possible that the tx has been executed already. + if self.store.get_certified_transaction(&digest)?.is_some() { + Ok(()) + } else { + Err(SuiError::TransactionNotFound { digest }.into()) + } + } + } } async fn download_object_from_authorities(&self, object_id: ObjectID) -> SuiResult { @@ -683,7 +696,8 @@ where if let ObjectRead::Exists(obj_ref, object, _) = &result { let local_object = self.store.get_object(&object_id)?; if local_object.is_none() - || &local_object.unwrap().compute_object_reference() != obj_ref + // We only update local object if the validator version is newer. + || local_object.unwrap().version() < obj_ref.1 { self.store.insert_object_direct(*obj_ref, object).await?; } @@ -1075,53 +1089,61 @@ where debug!(tx_digest = ?tx_digest, "Received execute_transaction request"); - let span = tracing::debug_span!( - "gateway_execute_transaction", - ?tx_digest, - tx_kind = tx.data.kind_as_str() - ); - - // Use start_coarse_time() if the below turns out to have a perf impact - let timer = self.metrics.transaction_latency.start_timer(); - let mut res = self - .execute_transaction_impl(tx.clone(), false) - .instrument(span.clone()) - .await; - // NOTE: below only records latency if this completes. - timer.stop_and_record(); - - let mut remaining_retries = MAX_NUM_TX_RETRIES; - while res.is_err() { - if remaining_retries == 0 { - error!( - num_retries = MAX_NUM_TX_RETRIES, + // Ensure idempotency. + let (certificate, effects) = match QueryHelpers::get_transaction(&self.store, tx_digest) { + Ok((cert, effects)) => (cert, effects), + _ => { + let span = tracing::debug_span!( + "gateway_execute_transaction", ?tx_digest, - "All transaction retries failed" + tx_kind = tx.data.kind_as_str() ); - // Okay to unwrap since we checked that this is an error - return Err(res.unwrap_err()); - } - remaining_retries -= 1; - self.metrics.total_tx_retries.inc(); - - debug!( - remaining_retries, - ?tx_digest, - ?res, - "Retrying failed transaction" - ); - - res = self - .execute_transaction_impl(tx.clone(), remaining_retries == 0) - .instrument(span.clone()) - .await; - } - // Okay to unwrap() since we checked that this is Ok - let (certificate, effects) = res.unwrap(); - let effects = effects.effects; + // Use start_coarse_time() if the below turns out to have a perf impact + let timer = self.metrics.transaction_latency.start_timer(); + let mut res = self + .execute_transaction_impl(tx.clone(), false) + .instrument(span.clone()) + .await; + // NOTE: below only records latency if this completes. + timer.stop_and_record(); + + let mut remaining_retries = MAX_NUM_TX_RETRIES; + while res.is_err() { + if remaining_retries == 0 { + error!( + num_retries = MAX_NUM_TX_RETRIES, + ?tx_digest, + "All transaction retries failed" + ); + // Okay to unwrap since we checked that this is an error + return Err(res.unwrap_err()); + } + remaining_retries -= 1; + self.metrics.total_tx_retries.inc(); + + debug!( + remaining_retries, + ?tx_digest, + ?res, + "Retrying failed transaction" + ); + + res = self + .execute_transaction_impl(tx.clone(), remaining_retries == 0) + .instrument(span.clone()) + .await; + } + + // Okay to unwrap() since we checked that this is Ok + let (certificate, effects) = res.unwrap(); + let effects = effects.effects; + + debug!(?tx_digest, "Transaction succeeded"); + (certificate, effects) + } + }; - debug!(tx_digest = ?tx_digest, "Transaction succeeded"); // Create custom response base on the request type if let TransactionKind::Single(tx_kind) = tx_kind { match tx_kind { @@ -1443,7 +1465,7 @@ where &self, digest: TransactionDigest, ) -> Result { - let (cert, effect) = QueryHelpers::get_transaction(&self.store, digest)?; + let (cert, effect) = QueryHelpers::get_transaction(&self.store, &digest)?; Ok(TransactionEffectsResponse { certificate: cert.try_into()?, diff --git a/crates/sui-core/src/query_helpers.rs b/crates/sui-core/src/query_helpers.rs index 96de295343ade..4f8429ba60a2b 100644 --- a/crates/sui-core/src/query_helpers.rs +++ b/crates/sui-core/src/query_helpers.rs @@ -75,12 +75,12 @@ impl Deserialize<'de>> QueryHelpers { pub fn get_transaction( database: &SuiDataStore, - digest: TransactionDigest, + digest: &TransactionDigest, ) -> Result<(CertifiedTransaction, TransactionEffects), anyhow::Error> { - let opt = database.get_certified_transaction(&digest)?; + let opt = database.get_certified_transaction(digest)?; match opt { - Some(certificate) => Ok((certificate, database.get_effects(&digest)?)), - None => Err(anyhow!(SuiError::TransactionNotFound { digest })), + Some(certificate) => Ok((certificate, database.get_effects(digest)?)), + None => Err(anyhow!(SuiError::TransactionNotFound { digest: *digest })), } } } diff --git a/crates/sui-core/src/unit_tests/gateway_state_tests.rs b/crates/sui-core/src/unit_tests/gateway_state_tests.rs index 8b37eb273a4fe..5db8fedb5aa9b 100644 --- a/crates/sui-core/src/unit_tests/gateway_state_tests.rs +++ b/crates/sui-core/src/unit_tests/gateway_state_tests.rs @@ -353,10 +353,10 @@ async fn test_recent_transactions() -> Result<(), anyhow::Error> { #[tokio::test] async fn test_equivocation_resilient() { + telemetry_subscribers::init_for_testing(); let (addr1, key1) = get_key_pair(); let coin_object = Object::with_owner_for_testing(addr1); - let gas_object = Object::with_owner_for_testing(addr1); - let genesis_objects = vec![coin_object.clone(), gas_object.clone()]; + let genesis_objects = vec![coin_object.clone()]; let gateway = Arc::new(Box::new(create_gateway_state(genesis_objects).await)); let mut handles = vec![]; @@ -364,16 +364,13 @@ async fn test_equivocation_resilient() { // Make sure that one of them succeeds and there are no pending tx in the end. for _ in 0..20 { let (recipient, _) = get_key_pair(); - let data = gateway - .public_transfer_object( - addr1, - coin_object.id(), - Some(gas_object.id()), - GAS_VALUE_FOR_TESTING, - recipient, - ) - .await - .unwrap(); + let data = TransactionData::new_transfer_sui( + recipient, + addr1, + None, + coin_object.compute_object_reference(), + 1000, + ); let signature = key1.sign(&data.to_bytes()); let handle = tokio::task::spawn({ let gateway_copy = gateway.clone(); @@ -393,6 +390,7 @@ async fn test_equivocation_resilient() { .count(), 1 ); + println!("{:?}", gateway.store().pending_transactions().iter().next()); assert_eq!(gateway.store().pending_transactions().iter().count(), 0); }