Skip to content

Commit

Permalink
Add inflight metrics in authority aggregator (#11213)
Browse files Browse the repository at this point in the history
## Description 

These metrics help locate issues when backlogs form at clients.

## Test Plan 

CI. Deployed to private testnet.

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
mwtian authored Apr 23, 2023
1 parent ac6befd commit 823d243
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 15 deletions.
36 changes: 32 additions & 4 deletions crates/sui-benchmark/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use async_trait::async_trait;
use embedded_reconfig_observer::EmbeddedReconfigObserver;
use fullnode_reconfig_observer::FullNodeReconfigObserver;
use futures::{stream::FuturesUnordered, StreamExt};
use mysten_metrics::GaugeGuard;
use prometheus::Registry;
use rand::Rng;
use roaring::RoaringBitmap;
Expand Down Expand Up @@ -395,13 +396,19 @@ impl ValidatorProxy for LocalValidatorAggregatorProxy {
async fn execute_bench_transaction(&self, tx: Transaction) -> anyhow::Result<ExecutionEffects> {
// Store the epoch number; we read it from the votes and use it later to create the certificate.
let mut epoch = 0;
let auth_agg = self.qd.authority_aggregator().load();

// Send the transaction to all validators.
let tx_guard = GaugeGuard::acquire(&auth_agg.metrics.inflight_transactions);
let mut futures = FuturesUnordered::new();
for (name, client) in self.clients.iter() {
let fut = client.handle_transaction(tx.clone()).map(|r| (r, *name));
futures.push(fut);
}
auth_agg
.metrics
.inflight_transaction_requests
.add(futures.len() as i64);

// TODO: This following aggregation will not work well at epoch boundary.

Expand All @@ -410,6 +417,7 @@ impl ValidatorProxy for LocalValidatorAggregatorProxy {
let mut votes = Vec::new();
let mut certificate = None;
while let Some((response, name)) = futures.next().await {
auth_agg.metrics.inflight_transaction_requests.dec();
match response {
Ok(response) => match response.status {
// If all goes well, the authority returns a vote.
Expand Down Expand Up @@ -490,8 +498,14 @@ impl ValidatorProxy for LocalValidatorAggregatorProxy {
Envelope::new_from_data_and_sig(tx.into_data(), quorum_signature)
}
};
auth_agg
.metrics
.inflight_transaction_requests
.sub(futures.len() as i64);
drop(tx_guard);

// Send the certificate to all validators.
let _cert_guard = GaugeGuard::acquire(&auth_agg.metrics.inflight_certificates);
let mut futures = FuturesUnordered::new();
total_stake = 0;
let mut transaction_effects = None;
Expand All @@ -507,9 +521,14 @@ impl ValidatorProxy for LocalValidatorAggregatorProxy {
.await
});
}
auth_agg
.metrics
.inflight_certificate_requests
.add(futures.len() as i64);

// Wait for the replies from a quorum of validators.
while let Some((response, name)) = futures.next().await {
auth_agg.metrics.inflight_certificate_requests.dec();
match response {
// If all goes well, the validators reply with signed effects.
Ok(HandleCertificateResponse {
Expand All @@ -525,9 +544,7 @@ impl ValidatorProxy for LocalValidatorAggregatorProxy {
// This typically happens when the validators are overloaded and the certificate is
// immediately rejected.
Err(e) => {
self.qd
.authority_aggregator()
.load()
auth_agg
.metrics
.process_cert_errors
.with_label_values(&[&name.concise().to_string(), e.as_ref()])
Expand All @@ -548,10 +565,21 @@ impl ValidatorProxy for LocalValidatorAggregatorProxy {
}

// Wait for 10 more seconds on remaining requests asynchronously.
const WAIT_TIMEOUT: Duration = Duration::from_secs(10);
{
let auth_agg = auth_agg.clone();
let mut requests = self.requests.lock().unwrap();
requests.spawn(async move {
let _ = timeout(Duration::from_secs(10), futures.collect::<Vec<_>>()).await;
let _ = timeout(WAIT_TIMEOUT, async {
while futures.next().await.is_some() {
auth_agg.metrics.inflight_certificate_requests.dec();
}
})
.await;
auth_agg
.metrics
.inflight_certificate_requests
.sub(futures.len() as i64);
});
}

Expand Down
44 changes: 40 additions & 4 deletions crates/sui-core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::authority_client::{
use crate::safe_client::{SafeClient, SafeClientMetrics, SafeClientMetricsBase};
use fastcrypto::encoding::Encoding;
use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
use mysten_metrics::monitored_future;
use mysten_metrics::{monitored_future, GaugeGuard};
use mysten_network::config::Config;
use std::convert::AsRef;
use sui_config::genesis::Genesis;
Expand All @@ -36,8 +36,8 @@ use thiserror::Error;
use tracing::{debug, error, info, trace, warn, Instrument};

use prometheus::{
register_int_counter_vec_with_registry, register_int_counter_with_registry, IntCounter,
IntCounterVec, Registry,
register_int_counter_vec_with_registry, register_int_counter_with_registry,
register_int_gauge_with_registry, IntCounter, IntCounterVec, IntGauge, Registry,
};
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::string::ToString;
Expand Down Expand Up @@ -103,6 +103,10 @@ pub struct AuthAggMetrics {
pub total_client_double_spend_attempts_detected: IntCounter,
pub total_aggregated_err: IntCounterVec,
pub total_rpc_err: IntCounterVec,
pub inflight_transactions: IntGauge,
pub inflight_certificates: IntGauge,
pub inflight_transaction_requests: IntGauge,
pub inflight_certificate_requests: IntGauge,
}

impl AuthAggMetrics {
Expand Down Expand Up @@ -148,6 +152,30 @@ impl AuthAggMetrics {
registry,
)
.unwrap(),
inflight_transactions: register_int_gauge_with_registry!(
"auth_agg_inflight_transactions",
"Inflight transaction gathering signatures",
registry,
)
.unwrap(),
inflight_certificates: register_int_gauge_with_registry!(
"auth_agg_inflight_certificates",
"Inflight certificates gathering effects",
registry,
)
.unwrap(),
inflight_transaction_requests: register_int_gauge_with_registry!(
"auth_agg_inflight_transaction_requests",
"Inflight handle_transaction requests",
registry,
)
.unwrap(),
inflight_certificate_requests: register_int_gauge_with_registry!(
"auth_agg_inflight_certificate_requests",
"Inflight handle_certificate requests",
registry,
)
.unwrap(),
}
}

Expand Down Expand Up @@ -1000,7 +1028,10 @@ where
state,
|_name, client| {
Box::pin(
async move { client.handle_transaction(transaction_ref.clone()).await },
async move {
let _guard = GaugeGuard::acquire(&self.metrics.inflight_transaction_requests);
client.handle_transaction(transaction_ref.clone()).await
},
)
},
|mut state, name, weight, response| {
Expand Down Expand Up @@ -1424,6 +1455,7 @@ where
state,
|name, client| {
Box::pin(async move {
let _guard = GaugeGuard::acquire(&self.metrics.inflight_certificate_requests);
client
.handle_certificate(cert_ref.clone())
.instrument(
Expand Down Expand Up @@ -1579,6 +1611,7 @@ where
&self,
transaction: &VerifiedTransaction,
) -> Result<VerifiedCertifiedTransactionEffects, anyhow::Error> {
let tx_guard = GaugeGuard::acquire(&self.metrics.inflight_transactions);
let result = self
.process_transaction(transaction.clone())
.instrument(tracing::debug_span!("process_tx"))
Expand All @@ -1590,6 +1623,9 @@ where
}
};
self.metrics.total_tx_certificates_created.inc();
drop(tx_guard);

let _cert_guard = GaugeGuard::acquire(&self.metrics.inflight_certificates);
let response = self
.process_certificate(cert.clone().into())
.instrument(tracing::debug_span!("process_cert"))
Expand Down
14 changes: 7 additions & 7 deletions crates/sui-core/src/quorum_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::authority_aggregator::{
};
use crate::authority_client::AuthorityAPI;
use mysten_common::sync::notify_read::{NotifyRead, Registration};
use mysten_metrics::spawn_monitored_task;
use mysten_metrics::{spawn_monitored_task, GaugeGuard};
use std::fmt::Write;
use sui_types::error::{SuiError, SuiResult};
use sui_types::messages::{
Expand Down Expand Up @@ -246,10 +246,10 @@ where
&self,
transaction: VerifiedTransaction,
) -> Result<ProcessTransactionResult, Option<QuorumDriverError>> {
let auth_agg = self.validators.load();
let _tx_guard = GaugeGuard::acquire(&auth_agg.metrics.inflight_transactions);
let tx_digest = *transaction.digest();
let result = self
.validators
.load()
let result = auth_agg
.process_transaction(transaction)
.instrument(tracing::debug_span!("aggregator_process_tx", ?tx_digest))
.await;
Expand Down Expand Up @@ -402,10 +402,10 @@ where
&self,
certificate: VerifiedCertificate,
) -> Result<QuorumDriverResponse, Option<QuorumDriverError>> {
let auth_agg = self.validators.load();
let _cert_guard = GaugeGuard::acquire(&auth_agg.metrics.inflight_certificates);
let tx_digest = *certificate.digest();
let (effects, events) = self
.validators
.load()
let (effects, events) = auth_agg
.process_certificate(certificate.clone().into_inner())
.instrument(tracing::debug_span!("aggregator_process_cert", ?tx_digest))
.await
Expand Down

0 comments on commit 823d243

Please sign in to comment.