Skip to content

Commit

Permalink
Fix flaky gateway tests (MystenLabs#3517)
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind authored Jul 26, 2022
1 parent d8982a8 commit 12e06e3
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 69 deletions.
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1247,7 +1247,7 @@ impl AuthorityState {
&self,
digest: TransactionDigest,
) -> Result<(CertifiedTransaction, TransactionEffects), anyhow::Error> {
QueryHelpers::get_transaction(&self.database, digest)
QueryHelpers::get_transaction(&self.database, &digest)
}

fn get_indexes(&self) -> SuiResult<Arc<IndexStore>> {
Expand Down
126 changes: 74 additions & 52 deletions crates/sui-core/src/gateway_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,11 @@ where
pending_transaction,
} => {
debug!(tx_digest=?pending_transaction, "Objects locked by a previous transaction, re-executing the previous transaction");
if self.retry_pending_tx(pending_transaction).await.is_err() {
if let Err(err) = self.retry_pending_tx(pending_transaction).await {
debug!(
"Retrying pending tx failed: {:?}. Resetting the transaction lock",
err
);
self.store.reset_transaction_lock(&owned_objects).await?;
}
self.set_transaction_lock(&owned_objects, transaction.clone())
Expand Down Expand Up @@ -670,20 +674,30 @@ where
}

async fn retry_pending_tx(&self, digest: TransactionDigest) -> Result<(), anyhow::Error> {
let tx = self
.store
.get_transaction(&digest)?
.ok_or(SuiError::TransactionNotFound { digest })?;
self.execute_transaction(tx).await?;
Ok(())
let tx = self.store.get_transaction(&digest)?;
match tx {
Some(tx) => {
self.execute_transaction(tx).await?;
Ok(())
}
None => {
// It's possible that the tx has been executed already.
if self.store.get_certified_transaction(&digest)?.is_some() {
Ok(())
} else {
Err(SuiError::TransactionNotFound { digest }.into())
}
}
}
}

async fn download_object_from_authorities(&self, object_id: ObjectID) -> SuiResult<ObjectRead> {
let result = self.authorities.get_object_info_execute(object_id).await?;
if let ObjectRead::Exists(obj_ref, object, _) = &result {
let local_object = self.store.get_object(&object_id)?;
if local_object.is_none()
|| &local_object.unwrap().compute_object_reference() != obj_ref
// We only update local object if the validator version is newer.
|| local_object.unwrap().version() < obj_ref.1
{
self.store.insert_object_direct(*obj_ref, object).await?;
}
Expand Down Expand Up @@ -1075,53 +1089,61 @@ where

debug!(tx_digest = ?tx_digest, "Received execute_transaction request");

let span = tracing::debug_span!(
"gateway_execute_transaction",
?tx_digest,
tx_kind = tx.data.kind_as_str()
);

// Use start_coarse_time() if the below turns out to have a perf impact
let timer = self.metrics.transaction_latency.start_timer();
let mut res = self
.execute_transaction_impl(tx.clone(), false)
.instrument(span.clone())
.await;
// NOTE: below only records latency if this completes.
timer.stop_and_record();

let mut remaining_retries = MAX_NUM_TX_RETRIES;
while res.is_err() {
if remaining_retries == 0 {
error!(
num_retries = MAX_NUM_TX_RETRIES,
// Ensure idempotency.
let (certificate, effects) = match QueryHelpers::get_transaction(&self.store, tx_digest) {
Ok((cert, effects)) => (cert, effects),
_ => {
let span = tracing::debug_span!(
"gateway_execute_transaction",
?tx_digest,
"All transaction retries failed"
tx_kind = tx.data.kind_as_str()
);
// Okay to unwrap since we checked that this is an error
return Err(res.unwrap_err());
}
remaining_retries -= 1;
self.metrics.total_tx_retries.inc();

debug!(
remaining_retries,
?tx_digest,
?res,
"Retrying failed transaction"
);

res = self
.execute_transaction_impl(tx.clone(), remaining_retries == 0)
.instrument(span.clone())
.await;
}

// Okay to unwrap() since we checked that this is Ok
let (certificate, effects) = res.unwrap();
let effects = effects.effects;
// Use start_coarse_time() if the below turns out to have a perf impact
let timer = self.metrics.transaction_latency.start_timer();
let mut res = self
.execute_transaction_impl(tx.clone(), false)
.instrument(span.clone())
.await;
// NOTE: below only records latency if this completes.
timer.stop_and_record();

let mut remaining_retries = MAX_NUM_TX_RETRIES;
while res.is_err() {
if remaining_retries == 0 {
error!(
num_retries = MAX_NUM_TX_RETRIES,
?tx_digest,
"All transaction retries failed"
);
// Okay to unwrap since we checked that this is an error
return Err(res.unwrap_err());
}
remaining_retries -= 1;
self.metrics.total_tx_retries.inc();

debug!(
remaining_retries,
?tx_digest,
?res,
"Retrying failed transaction"
);

res = self
.execute_transaction_impl(tx.clone(), remaining_retries == 0)
.instrument(span.clone())
.await;
}

// Okay to unwrap() since we checked that this is Ok
let (certificate, effects) = res.unwrap();
let effects = effects.effects;

debug!(?tx_digest, "Transaction succeeded");
(certificate, effects)
}
};

debug!(tx_digest = ?tx_digest, "Transaction succeeded");
// Create custom response base on the request type
if let TransactionKind::Single(tx_kind) = tx_kind {
match tx_kind {
Expand Down Expand Up @@ -1443,7 +1465,7 @@ where
&self,
digest: TransactionDigest,
) -> Result<TransactionEffectsResponse, anyhow::Error> {
let (cert, effect) = QueryHelpers::get_transaction(&self.store, digest)?;
let (cert, effect) = QueryHelpers::get_transaction(&self.store, &digest)?;

Ok(TransactionEffectsResponse {
certificate: cert.try_into()?,
Expand Down
8 changes: 4 additions & 4 deletions crates/sui-core/src/query_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ impl<S: Eq + Serialize + for<'de> Deserialize<'de>> QueryHelpers<S> {

pub fn get_transaction(
database: &SuiDataStore<S>,
digest: TransactionDigest,
digest: &TransactionDigest,
) -> Result<(CertifiedTransaction, TransactionEffects), anyhow::Error> {
let opt = database.get_certified_transaction(&digest)?;
let opt = database.get_certified_transaction(digest)?;
match opt {
Some(certificate) => Ok((certificate, database.get_effects(&digest)?)),
None => Err(anyhow!(SuiError::TransactionNotFound { digest })),
Some(certificate) => Ok((certificate, database.get_effects(digest)?)),
None => Err(anyhow!(SuiError::TransactionNotFound { digest: *digest })),
}
}
}
22 changes: 10 additions & 12 deletions crates/sui-core/src/unit_tests/gateway_state_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,27 +353,24 @@ async fn test_recent_transactions() -> Result<(), anyhow::Error> {

#[tokio::test]
async fn test_equivocation_resilient() {
telemetry_subscribers::init_for_testing();
let (addr1, key1) = get_key_pair();
let coin_object = Object::with_owner_for_testing(addr1);
let gas_object = Object::with_owner_for_testing(addr1);
let genesis_objects = vec![coin_object.clone(), gas_object.clone()];
let genesis_objects = vec![coin_object.clone()];
let gateway = Arc::new(Box::new(create_gateway_state(genesis_objects).await));

let mut handles = vec![];
// We create 20 requests that try to touch the same object to the gateway.
// Make sure that one of them succeeds and there are no pending tx in the end.
for _ in 0..20 {
let (recipient, _) = get_key_pair();
let data = gateway
.public_transfer_object(
addr1,
coin_object.id(),
Some(gas_object.id()),
GAS_VALUE_FOR_TESTING,
recipient,
)
.await
.unwrap();
let data = TransactionData::new_transfer_sui(
recipient,
addr1,
None,
coin_object.compute_object_reference(),
1000,
);
let signature = key1.sign(&data.to_bytes());
let handle = tokio::task::spawn({
let gateway_copy = gateway.clone();
Expand All @@ -393,6 +390,7 @@ async fn test_equivocation_resilient() {
.count(),
1
);
println!("{:?}", gateway.store().pending_transactions().iter().next());
assert_eq!(gateway.store().pending_transactions().iter().count(), 0);
}

Expand Down

0 comments on commit 12e06e3

Please sign in to comment.