Skip to content

Commit

Permalink
Client side change for handling ValidatorOverloadedRetryAfter error (M…
Browse files Browse the repository at this point in the history
…ystenLabs#16219)

## Description 

This PR implements the client side change for handling
ValidatorOverloadedRetryAfter error. Upon receiving more than 2f+1 such
error, the client performs exponential backoff and retry. Different from
other retryable errors, the retry in this case ignores the
max_retry_times constraints to make sure that locked objects can be
eventually unlocked.

Caveat in this PR:
- ValidatorOverloadedRetryAfter error contains a suggestion about the
minimum duration to wait until the next retry. This is currently ignored
on the client side to make the PR managable.

## Test Plan 

Unit test + integration tests added.

---
If your changes are not user-facing and do not break anything, you can
skip the following section. Otherwise, please briefly describe what has
changed under the Release Notes section.

### Type of Change (Check all that apply)

- [ ] protocol change
- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
halfprice authored Feb 16, 2024
1 parent 2bf6b29 commit c0bea5a
Show file tree
Hide file tree
Showing 9 changed files with 276 additions and 14 deletions.
45 changes: 44 additions & 1 deletion crates/sui-core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,18 @@ pub enum AggregatorProcessTransactionError {

#[error("Transaction is already finalized but with different user signatures")]
TxAlreadyFinalizedWithDifferentUserSignatures,

#[error(
"{} of the validators by stake are overloaded and requested the client to retry after {} seconds. Validator errors: {:?}",
overload_stake,
retry_after_secs,
errors
)]
SystemOverloadRetryAfter {
overload_stake: StakeUnit,
errors: GroupedErrors,
retry_after_secs: u64,
},
}

#[derive(Error, Debug)]
Expand Down Expand Up @@ -294,6 +306,8 @@ struct ProcessTransactionState {
object_or_package_not_found_stake: StakeUnit,
// Validators that are overloaded with txns pending execution.
overloaded_stake: StakeUnit,
// Validators that are overloaded and request client to retry.
retryable_overloaded_stake: StakeUnit,
// If there are conflicting transactions, we note them down and may attempt to retry
conflicting_tx_digests:
BTreeMap<TransactionDigest, (Vec<(AuthorityName, ObjectRef)>, StakeUnit)>,
Expand Down Expand Up @@ -1014,6 +1028,7 @@ where
object_or_package_not_found_stake: 0,
non_retryable_stake: 0,
overloaded_stake: 0,
retryable_overloaded_stake: 0,
retryable: true,
conflicting_tx_digests: Default::default(),
conflicting_tx_total_stake: 0,
Expand Down Expand Up @@ -1073,8 +1088,21 @@ where
// Special case for validator overload too. Once we have >= 2f + 1
// overloaded validators we consider the system overloaded so we exit
// and notify the user.
// Note that currently, this overload account for
// - per object queue overload
// - consensus overload
state.overloaded_stake += weight;
}
else if err.is_retryable_overload() {
// Different from above overload error, retryable overload targets authority overload (entire
// authority server is overload). In this case, the retry behavior is different from
// above that we may perform continuous retry due to that objects may have been locked
// in the validator.
//
// TODO: currently retryable overload and above overload error look redundant. We want to have a unified
// code path to handle both overload scenarios.
state.retryable_overloaded_stake += weight;
}
else if !retryable && !state.record_conflicting_transaction_if_any(name, weight, &err) {
// We don't count conflicting transactions as non-retryable errors here
// because its handling is a bit different.
Expand Down Expand Up @@ -1102,8 +1130,9 @@ where
return ReduceOutput::Failed(state);
}

// TODO: add more comments to explain each condition.
if state.non_retryable_stake >= validity_threshold
|| state.object_or_package_not_found_stake >= quorum_threshold
|| state.object_or_package_not_found_stake >= quorum_threshold // In normal case, object/package not found should be more than f+1
|| state.overloaded_stake >= quorum_threshold {
// We have hit an exit condition, f+1 non-retryable err or 2f+1 object not found or overload,
// so we no longer consider the transaction state as retryable.
Expand Down Expand Up @@ -1213,6 +1242,20 @@ where
};
}

// When state is in a retryable state and process transaction was not successful, it indicates that
// we have heard from *all* validators. Check if any SystemOverloadRetryAfter error caused the txn
// to fail. If so, return explicit SystemOverloadRetryAfter error for continuous retry (since objects)
// are locked in validators. If not, retry regular RetryableTransaction error.
if state.tx_signatures.total_votes() + state.retryable_overloaded_stake >= quorum_threshold
{
// TODO: make use of retry_after_secs, which is currently not used.
return AggregatorProcessTransactionError::SystemOverloadRetryAfter {
overload_stake: state.retryable_overloaded_stake,
errors: group_errors(state.errors),
retry_after_secs: 0,
};
}

// No conflicting transaction, the system is not overloaded and transaction state is still retryable.
AggregatorProcessTransactionError::RetryableTransaction {
errors: group_errors(state.errors),
Expand Down
13 changes: 13 additions & 0 deletions crates/sui-core/src/quorum_driver/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub struct QuorumDriverMetrics {
pub(crate) total_attempts_retrying_conflicting_transaction: IntCounter,
pub(crate) total_successful_attempts_retrying_conflicting_transaction: IntCounter,
pub(crate) total_times_conflicting_transaction_already_finalized_when_retrying: IntCounter,
pub(crate) total_retryable_overload_errors: IntCounter,
pub(crate) transaction_retry_count: Histogram,

pub(crate) settlement_finality_latency: HistogramVec,
}
Expand Down Expand Up @@ -100,6 +102,17 @@ impl QuorumDriverMetrics {
registry,
)
.unwrap(),
total_retryable_overload_errors: register_int_counter_with_registry!(
"quorum_driver_total_retryable_overload_errors",
"Total number of transactions experiencing retryable overload error",
registry,
)
.unwrap(),
transaction_retry_count: Histogram::new_in_registry(
"quorum_driver_transaction_retry_count",
"Histogram of transaction retry count",
registry,
),
settlement_finality_latency: register_histogram_vec_with_registry!(
"quorum_driver_settlement_finality_latency",
"Settlement finality latency observed from quorum driver",
Expand Down
69 changes: 57 additions & 12 deletions crates/sui-core/src/quorum_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ impl<A: Clone> QuorumDriver<A> {

/// Enqueue the task again if it hasn't maxed out the total retry attempts.
/// If it has, notify failure.
/// Enqueuing happens only after the `next_retry_after`, if not, wait until that instant
async fn enqueue_again_maybe(
&self,
transaction: Transaction,
Expand All @@ -145,6 +144,17 @@ impl<A: Clone> QuorumDriver<A> {
);
return Ok(());
}
self.backoff_and_enqueue(transaction, tx_cert, old_retry_times)
.await
}

/// Performs exponential backoff and enqueue the `transaction` to the execution queue.
async fn backoff_and_enqueue(
&self,
transaction: Transaction,
tx_cert: Option<CertifiedTransaction>,
old_retry_times: u8,
) -> SuiResult<()> {
let next_retry_after =
Instant::now() + Duration::from_millis(200 * u64::pow(2, old_retry_times.into()));
sleep_until(next_retry_after).await;
Expand Down Expand Up @@ -329,6 +339,20 @@ where
}))
}

Err(AggregatorProcessTransactionError::SystemOverloadRetryAfter {
overload_stake,
errors,
retry_after_secs,
}) => {
self.metrics.total_retryable_overload_errors.inc();
debug!(?tx_digest, ?errors, "System overload and retry");
Err(Some(QuorumDriverError::SystemOverloadRetryAfter {
overload_stake,
errors,
retry_after_secs,
}))
}

Err(AggregatorProcessTransactionError::RetryableTransaction { errors }) => {
debug!(?tx_digest, ?errors, "Retryable transaction error");
Err(None)
Expand Down Expand Up @@ -687,6 +711,11 @@ where
let tx_digest = *transaction.digest();
let is_single_writer_tx = !transaction.contains_shared_object();

quorum_driver
.metrics
.transaction_retry_count
.report(old_retry_times as u64 + 1);

let timer = Instant::now();
let tx_cert = match tx_cert {
None => match quorum_driver.process_transaction(transaction.clone()).await {
Expand Down Expand Up @@ -762,17 +791,33 @@ where
action: &'static str,
) {
let tx_digest = *transaction.digest();
if let Some(qd_error) = err {
debug!(?tx_digest, "Failed to {action}: {}", qd_error);
// non-retryable failure, this task reaches terminal state for now, notify waiter.
quorum_driver.notify(&transaction, &Err(qd_error), old_retry_times + 1);
} else {
debug!(?tx_digest, "Failed to {action} - Retrying");
spawn_monitored_task!(quorum_driver.enqueue_again_maybe(
transaction.clone(),
tx_cert,
old_retry_times
));
match err {
None => {
debug!(?tx_digest, "Failed to {action} - Retrying");
spawn_monitored_task!(quorum_driver.enqueue_again_maybe(
transaction.clone(),
tx_cert,
old_retry_times
));
}
Some(QuorumDriverError::SystemOverloadRetryAfter { .. }) => {
// Special case for SystemOverloadRetryAfter error. In this case, due to that objects are already
// locked inside validators, we need to perform continuous retry and ignore `max_retry_times`.
// TODO: the txn can potentially be retried unlimited times, therefore, we need to bound the number
// of on going transactions in a quorum driver. When the limit is reached, the quorum driver should
// reject any new transaction requests.
debug!(?tx_digest, "Failed to {action} - Retrying");
spawn_monitored_task!(quorum_driver.backoff_and_enqueue(
transaction.clone(),
tx_cert,
old_retry_times
));
}
Some(qd_error) => {
debug!(?tx_digest, "Failed to {action}: {}", qd_error);
// non-retryable failure, this task reaches terminal state for now, notify waiter.
quorum_driver.notify(&transaction, &Err(qd_error), old_retry_times + 1);
}
}
}

Expand Down
58 changes: 58 additions & 0 deletions crates/sui-core/src/quorum_driver/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use crate::quorum_driver::reconfig_observer::DummyReconfigObserver;
use crate::quorum_driver::{AuthorityAggregator, QuorumDriverHandlerBuilder};
use crate::test_authority_clients::LocalAuthorityClient;
use crate::test_authority_clients::LocalAuthorityClientFaultConfig;
use crate::test_utils::make_transfer_sui_transaction;
use crate::{quorum_driver::QuorumDriverMetrics, test_utils::init_local_authorities};
use mysten_common::sync::notify_read::{NotifyRead, Registration};
Expand All @@ -16,6 +17,7 @@ use sui_types::effects::TransactionEffectsAPI;
use sui_types::object::{generate_test_gas_objects, Object};
use sui_types::quorum_driver_types::{QuorumDriverError, QuorumDriverResponse, QuorumDriverResult};
use sui_types::transaction::Transaction;
use tokio::time::timeout;

async fn setup() -> (AuthorityAggregator<LocalAuthorityClient>, Transaction) {
let (sender, keypair): (_, AccountKeyPair) = get_key_pair();
Expand Down Expand Up @@ -445,3 +447,59 @@ async fn test_quorum_driver_object_locked() -> Result<(), anyhow::Error> {

Ok(())
}

// Tests that quorum driver can continuously retry txn with SystemOverloadedRetryAfter error.
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_quorum_driver_handling_overload_and_retry() {
telemetry_subscribers::init_for_testing();

// Setup
let (sender, keypair): (_, AccountKeyPair) = get_key_pair();
let gas_object = Object::with_owner_for_testing(sender);
let (mut aggregator, authorities, genesis, _) =
init_local_authorities(4, vec![gas_object.clone()]).await;

// Make local authority client to always return SystemOverloadedRetryAfter error.
let fault_config = LocalAuthorityClientFaultConfig {
overload_retry_after_handle_transaction: true,
..Default::default()
};
let mut clients = aggregator.clone_inner_clients_test_only();
for client in &mut clients.values_mut() {
client.authority_client_mut().fault_config = fault_config;
}
let clients = clients.into_iter().map(|(k, v)| (k, Arc::new(v))).collect();
aggregator.authority_clients = Arc::new(clients);

// Create a transaction for the test.
let rgp = authorities
.first()
.unwrap()
.reference_gas_price_for_testing()
.unwrap();
let gas_object = genesis
.objects()
.iter()
.find(|o| o.id() == gas_object.id())
.unwrap();
let tx = make_tx(gas_object, sender, &keypair, rgp);

// Create a quorum driver with max_retry_times = 0.
let arc_aggregator = Arc::new(aggregator.clone());
let quorum_driver_handler = Arc::new(
QuorumDriverHandlerBuilder::new(
arc_aggregator.clone(),
Arc::new(QuorumDriverMetrics::new_for_tests()),
)
.with_reconfig_observer(Arc::new(DummyReconfigObserver {}))
.with_max_retry_times(0)
.start(),
);

// Submit the transaction, and check that it shouldn't return.
let ticket = quorum_driver_handler.submit_transaction(tx).await.unwrap();
match timeout(Duration::from_secs(300), ticket).await {
Ok(result) => panic!("Process transaction should timeout! {:?}", result),
Err(_) => eprintln!("Waiting for txn timed out! This is desired behavior."),
}
}
6 changes: 6 additions & 0 deletions crates/sui-core/src/test_authority_clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub struct LocalAuthorityClientFaultConfig {
pub fail_after_handle_transaction: bool,
pub fail_before_handle_confirmation: bool,
pub fail_after_handle_confirmation: bool,
pub overload_retry_after_handle_transaction: bool,
}

impl LocalAuthorityClientFaultConfig {
Expand Down Expand Up @@ -70,6 +71,11 @@ impl AuthorityAPI for LocalAuthorityClient {
error: "Mock error after handle_transaction".to_owned(),
});
}
if self.fault_config.overload_retry_after_handle_transaction {
return Err(SuiError::ValidatorOverloadedRetryAfter {
retry_after_secs: 0,
});
}
result
}

Expand Down
Loading

0 comments on commit c0bea5a

Please sign in to comment.