diff --git a/crates/mysten-metrics/src/lib.rs b/crates/mysten-metrics/src/lib.rs index a25cdcbb7218c..75c7fe2732151 100644 --- a/crates/mysten-metrics/src/lib.rs +++ b/crates/mysten-metrics/src/lib.rs @@ -13,7 +13,7 @@ use std::time::Instant; use once_cell::sync::OnceCell; use prometheus::{register_int_gauge_vec_with_registry, IntGaugeVec, Registry, TextEncoder}; use tap::TapFallible; -use tracing::warn; +use tracing::{warn, Span}; pub use scopeguard; use uuid::Uuid; @@ -271,6 +271,70 @@ impl Future for MonitoredScopeFuture { } } +pub struct CancelMonitor { + finished: bool, + inner: Pin>, +} + +impl CancelMonitor +where + F: Future, +{ + pub fn new(inner: F) -> Self { + Self { + finished: false, + inner: Box::pin(inner), + } + } + + pub fn is_finished(&self) -> bool { + self.finished + } +} + +impl Future for CancelMonitor +where + F: Future, +{ + type Output = F::Output; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.inner.as_mut().poll(cx) { + Poll::Ready(output) => { + self.finished = true; + Poll::Ready(output) + } + Poll::Pending => Poll::Pending, + } + } +} + +impl Drop for CancelMonitor { + fn drop(&mut self) { + if !self.finished { + Span::current().record("cancelled", true); + } + } +} + +/// MonitorCancellation records a cancelled = true span attribute if the future it +/// is decorating is dropped before completion. The cancelled attribute must be added +/// at span creation, as you cannot add new attributes after the span is created. +pub trait MonitorCancellation { + fn monitor_cancellation(self) -> CancelMonitor + where + Self: Sized + Future; +} + +impl MonitorCancellation for T +where + T: Future, +{ + fn monitor_cancellation(self) -> CancelMonitor { + CancelMonitor::new(self) + } +} + pub type RegistryID = Uuid; /// A service to manage the prometheus registries. This service allow us to create diff --git a/crates/sui-authority-aggregation/src/lib.rs b/crates/sui-authority-aggregation/src/lib.rs index a267c7cd7929a..d1b05f5163e0a 100644 --- a/crates/sui-authority-aggregation/src/lib.rs +++ b/crates/sui-authority-aggregation/src/lib.rs @@ -4,7 +4,6 @@ use futures::Future; use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt}; use mysten_metrics::monitored_future; -use tracing::instrument::Instrument; use std::collections::{BTreeMap, BTreeSet}; use std::sync::Arc; @@ -63,17 +62,7 @@ where .map(|name| { let client = authority_clients[&name].clone(); let execute = map_each_authority.clone(); - let concise_name = name.concise_owned(); - monitored_future!(async move { - ( - name.clone(), - execute(name, client) - .instrument( - tracing::trace_span!("quorum_map_auth", authority =? concise_name), - ) - .await, - ) - }) + monitored_future!(async move { (name.clone(), execute(name, client).await,) }) }) .collect(); diff --git a/crates/sui-core/src/authority_aggregator.rs b/crates/sui-core/src/authority_aggregator.rs index c4f7da27474f9..6bc31210d6c51 100644 --- a/crates/sui-core/src/authority_aggregator.rs +++ b/crates/sui-core/src/authority_aggregator.rs @@ -9,7 +9,7 @@ use crate::authority_client::{ use crate::safe_client::{SafeClient, SafeClientMetrics, SafeClientMetricsBase}; use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt}; use mysten_metrics::histogram::Histogram; -use mysten_metrics::{monitored_future, spawn_monitored_task, GaugeGuard}; +use mysten_metrics::{monitored_future, spawn_monitored_task, GaugeGuard, MonitorCancellation}; use mysten_network::config::Config; use std::convert::AsRef; use std::net::SocketAddr; @@ -35,7 +35,7 @@ use sui_types::{ transaction::*, }; use thiserror::Error; -use tracing::{debug, error, info, trace, warn, Instrument}; +use tracing::{debug, error, info, instrument, trace, trace_span, warn, Instrument}; use crate::epoch::committee_store::CommitteeStore; use crate::stake_aggregator::{InsertResult, MultiStakeAggregator, StakeAggregator}; @@ -1012,6 +1012,7 @@ where } /// Submits the transaction to a quorum of validators to make a certificate. + #[instrument(level = "trace", skip_all)] pub async fn process_transaction( &self, transaction: Transaction, @@ -1051,11 +1052,15 @@ where committee.clone(), self.authority_clients.clone(), state, - |_name, client| { + |name, client| { Box::pin( async move { let _guard = GaugeGuard::acquire(&self.metrics.inflight_transaction_requests); - client.handle_transaction(transaction_ref.clone(), client_addr).await + let concise_name = name.concise_owned(); + client.handle_transaction(transaction_ref.clone(), client_addr) + .monitor_cancellation() + .instrument(trace_span!("handle_transaction", cancelled = false, authority =? concise_name)) + .await }, ) }, @@ -1475,6 +1480,7 @@ where - state.tx_signatures.total_votes() } + #[instrument(level = "trace", skip_all)] pub async fn process_certificate( &self, request: HandleCertificateRequestV3, @@ -1532,6 +1538,7 @@ where move |name, client| { Box::pin(async move { let _guard = GaugeGuard::acquire(&metrics_clone.inflight_certificate_requests); + let concise_name = name.concise_owned(); if request_ref.include_input_objects || request_ref.include_output_objects { // adjust the request to validators we aren't planning on sampling @@ -1549,16 +1556,12 @@ where client .handle_certificate_v3(req, client_addr) - .instrument( - tracing::trace_span!("handle_certificate", authority =? name.concise()), - ) + .instrument(trace_span!("handle_certificate_v3", authority =? concise_name)) .await } else { client .handle_certificate_v2(request_ref.certificate, client_addr) - .instrument( - tracing::trace_span!("handle_certificate", authority =? name.concise()), - ) + .instrument(trace_span!("handle_certificate_v2", authority =? concise_name)) .await .map(|response| HandleCertificateResponseV3 { effects: response.signed_effects, @@ -1771,6 +1774,7 @@ where } } + #[instrument(level = "trace", skip_all, fields(tx_digest = ?transaction.digest()))] pub async fn execute_transaction_block( &self, transaction: &Transaction, @@ -1779,7 +1783,6 @@ where let tx_guard = GaugeGuard::acquire(&self.metrics.inflight_transactions); let result = self .process_transaction(transaction.clone(), client_addr) - .instrument(tracing::debug_span!("process_tx")) .await?; let cert = match result { ProcessTransactionResult::Certified { certificate, .. } => certificate, @@ -1802,7 +1805,6 @@ where }, client_addr, ) - .instrument(tracing::debug_span!("process_cert")) .await?; Ok(response.effects_cert) @@ -1810,6 +1812,7 @@ where /// This function tries to get SignedTransaction OR CertifiedTransaction from /// an given list of validators who are supposed to know about it. + #[instrument(level = "trace", skip_all, fields(?tx_digest))] pub async fn handle_transaction_info_request_from_some_validators( &self, tx_digest: &TransactionDigest, diff --git a/crates/sui-core/src/quorum_driver/mod.rs b/crates/sui-core/src/quorum_driver/mod.rs index b10ac8762d781..a01afe41206b5 100644 --- a/crates/sui-core/src/quorum_driver/mod.rs +++ b/crates/sui-core/src/quorum_driver/mod.rs @@ -25,8 +25,7 @@ use tokio::time::{sleep_until, Instant}; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::task::JoinHandle; -use tracing::Instrument; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, instrument, trace_span, warn}; use crate::authority_aggregator::{ AggregatorProcessCertificateError, AggregatorProcessTransactionError, AuthorityAggregator, @@ -59,6 +58,7 @@ pub struct QuorumDriverTask { pub retry_times: u32, pub next_retry_after: Instant, pub client_addr: Option, + pub trace_span: Option, } impl Debug for QuorumDriverTask { @@ -193,6 +193,7 @@ impl QuorumDriver { retry_times: old_retry_times + 1, next_retry_after, client_addr, + trace_span: Some(tracing::Span::current()), }) .await } @@ -237,6 +238,7 @@ impl QuorumDriver where A: AuthorityAPI + Send + Sync + 'static + Clone, { + #[instrument(level = "trace", skip_all)] pub async fn submit_transaction( &self, request: ExecuteTransactionRequestV3, @@ -252,6 +254,7 @@ where retry_times: 0, next_retry_after: Instant::now(), client_addr: None, + trace_span: Some(tracing::Span::current()), }) .await?; Ok(ticket) @@ -259,6 +262,7 @@ where // Used when the it is called in a component holding the notifier, and a ticket is // already obtained prior to calling this function, for instance, TransactionOrchestrator + #[instrument(level = "trace", skip_all)] pub async fn submit_transaction_no_ticket( &self, request: ExecuteTransactionRequestV3, @@ -277,10 +281,12 @@ where retry_times: 0, next_retry_after: Instant::now(), client_addr, + trace_span: Some(tracing::Span::current()), }) .await } + #[instrument(level = "trace", skip_all)] pub(crate) async fn process_transaction( &self, transaction: Transaction, @@ -289,15 +295,13 @@ where let auth_agg = self.validators.load(); let _tx_guard = GaugeGuard::acquire(&auth_agg.metrics.inflight_transactions); let tx_digest = *transaction.digest(); - let result = auth_agg - .process_transaction(transaction, client_addr) - .instrument(tracing::debug_span!("aggregator_process_tx", ?tx_digest)) - .await; + let result = auth_agg.process_transaction(transaction, client_addr).await; self.process_transaction_result(result, tx_digest, client_addr) .await } + #[instrument(level = "trace", skip_all)] async fn process_transaction_result( &self, result: Result, @@ -410,6 +414,7 @@ where } } + #[instrument(level = "trace", skip_all)] async fn process_conflicting_tx( &self, tx_digest: TransactionDigest, @@ -470,6 +475,7 @@ where } } + #[instrument(level = "trace", skip_all, fields(tx_digest = ?request.certificate.digest()))] pub(crate) async fn process_certificate( &self, request: HandleCertificateRequestV3, @@ -480,7 +486,6 @@ where let tx_digest = *request.certificate.digest(); let response = auth_agg .process_certificate(request.clone(), client_addr) - .instrument(tracing::debug_span!("aggregator_process_cert", ?tx_digest)) .await .map_err(|agg_err| match agg_err { AggregatorProcessCertificateError::FatalExecuteCertificate { @@ -517,6 +522,7 @@ where /// Returns Some(true) if the conflicting transaction is executed successfully /// (or already executed), or Some(false) if it did not. + #[instrument(level = "trace", skip_all)] async fn attempt_conflicting_transaction( &self, tx_digest: &TransactionDigest, @@ -751,6 +757,7 @@ where /// Process a QuorumDriverTask. /// The function has no return value - the corresponding actions of task result /// are performed in this call. + #[instrument(level = "trace", parent = task.trace_span.as_ref().and_then(|s| s.id()), skip_all)] async fn process_task(quorum_driver: Arc>, task: QuorumDriverTask) { debug!(?task, "Quorum Driver processing task"); let QuorumDriverTask { @@ -917,6 +924,10 @@ where ) { let limit = Arc::new(Semaphore::new(TASK_QUEUE_SIZE)); while let Some(task) = task_receiver.recv().await { + let task_queue_span = + trace_span!(parent: task.trace_span.as_ref().and_then(|s| s.id()), "task_queue"); + let task_span_guard = task_queue_span.enter(); + // hold semaphore permit until task completes. unwrap ok because we never close // the semaphore in this context. let limit = limit.clone(); @@ -935,6 +946,7 @@ where } metrics.current_requests_in_flight.dec(); let qd = quorum_driver.clone(); + drop(task_span_guard); spawn_monitored_task!(async move { let _guard = permit; QuorumDriverHandler::process_task(qd, task).await diff --git a/crates/sui-core/src/safe_client.rs b/crates/sui-core/src/safe_client.rs index 0deae65548a69..74c80cd42afd4 100644 --- a/crates/sui-core/src/safe_client.rs +++ b/crates/sui-core/src/safe_client.rs @@ -28,7 +28,7 @@ use sui_types::{ transaction::*, }; use tap::TapFallible; -use tracing::{debug, error}; +use tracing::{debug, error, instrument}; macro_rules! check_error { ($address:expr, $cond:expr, $msg:expr) => { @@ -496,6 +496,7 @@ where } /// Handle Transaction information requests for a given digest. + #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))] pub async fn handle_transaction_info_request( &self, request: TransactionInfoRequest, @@ -582,6 +583,7 @@ where } } + #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))] pub async fn handle_checkpoint( &self, request: CheckpointRequest, @@ -597,6 +599,7 @@ where Ok(resp) } + #[instrument(level = "trace", skip_all, fields(authority = ?self.address.concise()))] pub async fn handle_system_state_object(&self) -> Result { self.authority_client .handle_system_state_object(SystemStateRequest { _unused: false }) diff --git a/crates/sui-core/src/transaction_orchestrator.rs b/crates/sui-core/src/transaction_orchestrator.rs index 1ef8bb7c86f8f..6b655d0358d56 100644 --- a/crates/sui-core/src/transaction_orchestrator.rs +++ b/crates/sui-core/src/transaction_orchestrator.rs @@ -228,6 +228,7 @@ where good_response_metrics.inc(); let QuorumDriverResponse { effects_cert, .. } = response; if !wait_for_local_execution { + debug!(?tx_digest, ?wait_for_local_execution, "success"); return Ok(ExecuteTransactionResponse::EffectsCert(Box::new(( FinalizedEffects::new_from_effects_cert(effects_cert.into()), response.events.unwrap_or_default(), @@ -249,11 +250,15 @@ where ) .await { - Ok(_) => Ok(ExecuteTransactionResponse::EffectsCert(Box::new(( - FinalizedEffects::new_from_effects_cert(effects_cert.into()), - response.events.unwrap_or_default(), - true, - )))), + Ok(_) => { + debug!(?tx_digest, ?wait_for_local_execution, "success"); + + Ok(ExecuteTransactionResponse::EffectsCert(Box::new(( + FinalizedEffects::new_from_effects_cert(effects_cert.into()), + response.events.unwrap_or_default(), + true, + )))) + } Err(_) => Ok(ExecuteTransactionResponse::EffectsCert(Box::new(( FinalizedEffects::new_from_effects_cert(effects_cert.into()), response.events.unwrap_or_default(), @@ -265,6 +270,8 @@ where } // Utilize the handle_certificate_v3 validator api to request input/output objects + #[instrument(name = "tx_orchestrator_execute_transaction_v3", level = "trace", skip_all, + fields(tx_digest = ?request.transaction.digest()))] pub async fn execute_transaction_v3( &self, request: ExecuteTransactionRequestV3, @@ -351,6 +358,7 @@ where /// Submits the transaction to Quorum Driver for execution. /// Returns an awaitable Future. + #[instrument(name = "tx_orchestrator_submit", level = "trace", skip_all)] async fn submit( &self, transaction: VerifiedTransaction,