Skip to content

Commit

Permalink
Improve tracing for TransactionOrchestrator, QD, and AuthAgg (#18689)
Browse files Browse the repository at this point in the history
tracing for an execute transaction attempt now looks like: 
<img width="1209" alt="image"
src="https://github.com/user-attachments/assets/31ab9cb0-4ae2-4c11-8801-d24468d5a84a">
  • Loading branch information
mystenmark authored Jul 17, 2024
1 parent ddf42d3 commit 6e79fda
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 38 deletions.
66 changes: 65 additions & 1 deletion crates/mysten-metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::time::Instant;
use once_cell::sync::OnceCell;
use prometheus::{register_int_gauge_vec_with_registry, IntGaugeVec, Registry, TextEncoder};
use tap::TapFallible;
use tracing::warn;
use tracing::{warn, Span};

pub use scopeguard;
use uuid::Uuid;
Expand Down Expand Up @@ -271,6 +271,70 @@ impl<F: Future> Future for MonitoredScopeFuture<F> {
}
}

pub struct CancelMonitor<F: Sized> {
finished: bool,
inner: Pin<Box<F>>,
}

impl<F> CancelMonitor<F>
where
F: Future,
{
pub fn new(inner: F) -> Self {
Self {
finished: false,
inner: Box::pin(inner),
}
}

pub fn is_finished(&self) -> bool {
self.finished
}
}

impl<F> Future for CancelMonitor<F>
where
F: Future,
{
type Output = F::Output;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.inner.as_mut().poll(cx) {
Poll::Ready(output) => {
self.finished = true;
Poll::Ready(output)
}
Poll::Pending => Poll::Pending,
}
}
}

impl<F: Sized> Drop for CancelMonitor<F> {
fn drop(&mut self) {
if !self.finished {
Span::current().record("cancelled", true);
}
}
}

/// MonitorCancellation records a cancelled = true span attribute if the future it
/// is decorating is dropped before completion. The cancelled attribute must be added
/// at span creation, as you cannot add new attributes after the span is created.
pub trait MonitorCancellation {
fn monitor_cancellation(self) -> CancelMonitor<Self>
where
Self: Sized + Future;
}

impl<T> MonitorCancellation for T
where
T: Future,
{
fn monitor_cancellation(self) -> CancelMonitor<Self> {
CancelMonitor::new(self)
}
}

pub type RegistryID = Uuid;

/// A service to manage the prometheus registries. This service allow us to create
Expand Down
13 changes: 1 addition & 12 deletions crates/sui-authority-aggregation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
use futures::Future;
use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
use mysten_metrics::monitored_future;
use tracing::instrument::Instrument;

use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
Expand Down Expand Up @@ -63,17 +62,7 @@ where
.map(|name| {
let client = authority_clients[&name].clone();
let execute = map_each_authority.clone();
let concise_name = name.concise_owned();
monitored_future!(async move {
(
name.clone(),
execute(name, client)
.instrument(
tracing::trace_span!("quorum_map_auth", authority =? concise_name),
)
.await,
)
})
monitored_future!(async move { (name.clone(), execute(name, client).await,) })
})
.collect();

Expand Down
27 changes: 15 additions & 12 deletions crates/sui-core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::authority_client::{
use crate::safe_client::{SafeClient, SafeClientMetrics, SafeClientMetricsBase};
use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
use mysten_metrics::histogram::Histogram;
use mysten_metrics::{monitored_future, spawn_monitored_task, GaugeGuard};
use mysten_metrics::{monitored_future, spawn_monitored_task, GaugeGuard, MonitorCancellation};
use mysten_network::config::Config;
use std::convert::AsRef;
use std::net::SocketAddr;
Expand All @@ -35,7 +35,7 @@ use sui_types::{
transaction::*,
};
use thiserror::Error;
use tracing::{debug, error, info, trace, warn, Instrument};
use tracing::{debug, error, info, instrument, trace, trace_span, warn, Instrument};

use crate::epoch::committee_store::CommitteeStore;
use crate::stake_aggregator::{InsertResult, MultiStakeAggregator, StakeAggregator};
Expand Down Expand Up @@ -1012,6 +1012,7 @@ where
}

/// Submits the transaction to a quorum of validators to make a certificate.
#[instrument(level = "trace", skip_all)]
pub async fn process_transaction(
&self,
transaction: Transaction,
Expand Down Expand Up @@ -1051,11 +1052,15 @@ where
committee.clone(),
self.authority_clients.clone(),
state,
|_name, client| {
|name, client| {
Box::pin(
async move {
let _guard = GaugeGuard::acquire(&self.metrics.inflight_transaction_requests);
client.handle_transaction(transaction_ref.clone(), client_addr).await
let concise_name = name.concise_owned();
client.handle_transaction(transaction_ref.clone(), client_addr)
.monitor_cancellation()
.instrument(trace_span!("handle_transaction", cancelled = false, authority =? concise_name))
.await
},
)
},
Expand Down Expand Up @@ -1475,6 +1480,7 @@ where
- state.tx_signatures.total_votes()
}

#[instrument(level = "trace", skip_all)]
pub async fn process_certificate(
&self,
request: HandleCertificateRequestV3,
Expand Down Expand Up @@ -1532,6 +1538,7 @@ where
move |name, client| {
Box::pin(async move {
let _guard = GaugeGuard::acquire(&metrics_clone.inflight_certificate_requests);
let concise_name = name.concise_owned();
if request_ref.include_input_objects || request_ref.include_output_objects {

// adjust the request to validators we aren't planning on sampling
Expand All @@ -1549,16 +1556,12 @@ where

client
.handle_certificate_v3(req, client_addr)
.instrument(
tracing::trace_span!("handle_certificate", authority =? name.concise()),
)
.instrument(trace_span!("handle_certificate_v3", authority =? concise_name))
.await
} else {
client
.handle_certificate_v2(request_ref.certificate, client_addr)
.instrument(
tracing::trace_span!("handle_certificate", authority =? name.concise()),
)
.instrument(trace_span!("handle_certificate_v2", authority =? concise_name))
.await
.map(|response| HandleCertificateResponseV3 {
effects: response.signed_effects,
Expand Down Expand Up @@ -1771,6 +1774,7 @@ where
}
}

#[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))]
pub async fn execute_transaction_block(
&self,
transaction: &Transaction,
Expand All @@ -1779,7 +1783,6 @@ where
let tx_guard = GaugeGuard::acquire(&self.metrics.inflight_transactions);
let result = self
.process_transaction(transaction.clone(), client_addr)
.instrument(tracing::debug_span!("process_tx"))
.await?;
let cert = match result {
ProcessTransactionResult::Certified { certificate, .. } => certificate,
Expand All @@ -1802,14 +1805,14 @@ where
},
client_addr,
)
.instrument(tracing::debug_span!("process_cert"))
.await?;

Ok(response.effects_cert)
}

/// This function tries to get SignedTransaction OR CertifiedTransaction from
/// an given list of validators who are supposed to know about it.
#[instrument(level = "trace", skip_all, fields(?tx_digest))]
pub async fn handle_transaction_info_request_from_some_validators(
&self,
tx_digest: &TransactionDigest,
Expand Down
26 changes: 19 additions & 7 deletions crates/sui-core/src/quorum_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ use tokio::time::{sleep_until, Instant};

use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task::JoinHandle;
use tracing::Instrument;
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info, instrument, trace_span, warn};

use crate::authority_aggregator::{
AggregatorProcessCertificateError, AggregatorProcessTransactionError, AuthorityAggregator,
Expand Down Expand Up @@ -59,6 +58,7 @@ pub struct QuorumDriverTask {
pub retry_times: u32,
pub next_retry_after: Instant,
pub client_addr: Option<SocketAddr>,
pub trace_span: Option<tracing::Span>,
}

impl Debug for QuorumDriverTask {
Expand Down Expand Up @@ -193,6 +193,7 @@ impl<A: Clone> QuorumDriver<A> {
retry_times: old_retry_times + 1,
next_retry_after,
client_addr,
trace_span: Some(tracing::Span::current()),
})
.await
}
Expand Down Expand Up @@ -237,6 +238,7 @@ impl<A> QuorumDriver<A>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
#[instrument(level = "trace", skip_all)]
pub async fn submit_transaction(
&self,
request: ExecuteTransactionRequestV3,
Expand All @@ -252,13 +254,15 @@ where
retry_times: 0,
next_retry_after: Instant::now(),
client_addr: None,
trace_span: Some(tracing::Span::current()),
})
.await?;
Ok(ticket)
}

// Used when the it is called in a component holding the notifier, and a ticket is
// already obtained prior to calling this function, for instance, TransactionOrchestrator
#[instrument(level = "trace", skip_all)]
pub async fn submit_transaction_no_ticket(
&self,
request: ExecuteTransactionRequestV3,
Expand All @@ -277,10 +281,12 @@ where
retry_times: 0,
next_retry_after: Instant::now(),
client_addr,
trace_span: Some(tracing::Span::current()),
})
.await
}

#[instrument(level = "trace", skip_all)]
pub(crate) async fn process_transaction(
&self,
transaction: Transaction,
Expand All @@ -289,15 +295,13 @@ where
let auth_agg = self.validators.load();
let _tx_guard = GaugeGuard::acquire(&auth_agg.metrics.inflight_transactions);
let tx_digest = *transaction.digest();
let result = auth_agg
.process_transaction(transaction, client_addr)
.instrument(tracing::debug_span!("aggregator_process_tx", ?tx_digest))
.await;
let result = auth_agg.process_transaction(transaction, client_addr).await;

self.process_transaction_result(result, tx_digest, client_addr)
.await
}

#[instrument(level = "trace", skip_all)]
async fn process_transaction_result(
&self,
result: Result<ProcessTransactionResult, AggregatorProcessTransactionError>,
Expand Down Expand Up @@ -410,6 +414,7 @@ where
}
}

#[instrument(level = "trace", skip_all)]
async fn process_conflicting_tx(
&self,
tx_digest: TransactionDigest,
Expand Down Expand Up @@ -470,6 +475,7 @@ where
}
}

#[instrument(level = "trace", skip_all, fields(tx_digest = ?request.certificate.digest()))]
pub(crate) async fn process_certificate(
&self,
request: HandleCertificateRequestV3,
Expand All @@ -480,7 +486,6 @@ where
let tx_digest = *request.certificate.digest();
let response = auth_agg
.process_certificate(request.clone(), client_addr)
.instrument(tracing::debug_span!("aggregator_process_cert", ?tx_digest))
.await
.map_err(|agg_err| match agg_err {
AggregatorProcessCertificateError::FatalExecuteCertificate {
Expand Down Expand Up @@ -517,6 +522,7 @@ where

/// Returns Some(true) if the conflicting transaction is executed successfully
/// (or already executed), or Some(false) if it did not.
#[instrument(level = "trace", skip_all)]
async fn attempt_conflicting_transaction(
&self,
tx_digest: &TransactionDigest,
Expand Down Expand Up @@ -751,6 +757,7 @@ where
/// Process a QuorumDriverTask.
/// The function has no return value - the corresponding actions of task result
/// are performed in this call.
#[instrument(level = "trace", parent = task.trace_span.as_ref().and_then(|s| s.id()), skip_all)]
async fn process_task(quorum_driver: Arc<QuorumDriver<A>>, task: QuorumDriverTask) {
debug!(?task, "Quorum Driver processing task");
let QuorumDriverTask {
Expand Down Expand Up @@ -917,6 +924,10 @@ where
) {
let limit = Arc::new(Semaphore::new(TASK_QUEUE_SIZE));
while let Some(task) = task_receiver.recv().await {
let task_queue_span =
trace_span!(parent: task.trace_span.as_ref().and_then(|s| s.id()), "task_queue");
let task_span_guard = task_queue_span.enter();

// hold semaphore permit until task completes. unwrap ok because we never close
// the semaphore in this context.
let limit = limit.clone();
Expand All @@ -935,6 +946,7 @@ where
}
metrics.current_requests_in_flight.dec();
let qd = quorum_driver.clone();
drop(task_span_guard);
spawn_monitored_task!(async move {
let _guard = permit;
QuorumDriverHandler::process_task(qd, task).await
Expand Down
5 changes: 4 additions & 1 deletion crates/sui-core/src/safe_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use sui_types::{
transaction::*,
};
use tap::TapFallible;
use tracing::{debug, error};
use tracing::{debug, error, instrument};

macro_rules! check_error {
($address:expr, $cond:expr, $msg:expr) => {
Expand Down Expand Up @@ -496,6 +496,7 @@ where
}

/// Handle Transaction information requests for a given digest.
#[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
pub async fn handle_transaction_info_request(
&self,
request: TransactionInfoRequest,
Expand Down Expand Up @@ -582,6 +583,7 @@ where
}
}

#[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
pub async fn handle_checkpoint(
&self,
request: CheckpointRequest,
Expand All @@ -597,6 +599,7 @@ where
Ok(resp)
}

#[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))]
pub async fn handle_system_state_object(&self) -> Result<SuiSystemState, SuiError> {
self.authority_client
.handle_system_state_object(SystemStateRequest { _unused: false })
Expand Down
Loading

0 comments on commit 6e79fda

Please sign in to comment.