From 486ed2c6a6f87bf78ee412f34def9733b481c337 Mon Sep 17 00:00:00 2001 From: George Danezis Date: Fri, 5 Aug 2022 16:26:22 +0300 Subject: [PATCH] [narwhal adpater] Add backoff mechanism to consensus adapter (#3548) * Add back-off mechanism to consensus adopter (both certs and fragments) * Added more clear constants and function Co-authored-by: George Danezis --- crates/sui-core/src/authority_server.rs | 10 +- crates/sui-core/src/consensus_adapter.rs | 221 +++++++++++++++++- .../src/unit_tests/consensus_tests.rs | 2 + 3 files changed, 219 insertions(+), 14 deletions(-) diff --git a/crates/sui-core/src/authority_server.rs b/crates/sui-core/src/authority_server.rs index ff626c107d2fd..df05fcd660220 100644 --- a/crates/sui-core/src/authority_server.rs +++ b/crates/sui-core/src/authority_server.rs @@ -5,8 +5,8 @@ use crate::{ authority::AuthorityState, consensus_adapter::{ - CheckpointConsensusAdapter, CheckpointSender, ConsensusAdapter, ConsensusListener, - ConsensusListenerMessage, + CheckpointConsensusAdapter, CheckpointSender, ConsensusAdapter, ConsensusAdapterMetrics, + ConsensusListener, ConsensusListenerMessage, }, }; use anyhow::anyhow; @@ -86,11 +86,13 @@ impl AuthorityServer { consensus_address: Multiaddr, tx_consensus_listener: Sender, ) -> Self { + let metrics = ConsensusAdapterMetrics::new_test(); let consensus_adapter = ConsensusAdapter::new( consensus_address, state.clone_committee(), tx_consensus_listener, /* max_delay */ Duration::from_millis(20_000), + metrics, ); Self { @@ -206,12 +208,15 @@ impl ValidatorService { /* max_pending_transactions */ 1_000_000, ); + let metrics = ConsensusAdapterMetrics::new(prometheus_registry); + // The consensus adapter allows the authority to send user certificates through consensus. let consensus_adapter = ConsensusAdapter::new( consensus_config.address().to_owned(), state.clone_committee(), tx_sui_to_consensus.clone(), /* max_delay */ Duration::from_millis(5_000), + metrics.clone(), ); // Update the checkpoint store with a consensus client. @@ -229,6 +234,7 @@ impl ValidatorService { /* checkpoint_locals */ checkpoint_store, /* retry_delay */ Duration::from_millis(5_000), /* max_pending_transactions */ 10_000, + metrics, ) .spawn(); Some(handle) diff --git a/crates/sui-core/src/consensus_adapter.rs b/crates/sui-core/src/consensus_adapter.rs index 6151d2cb3fc5e..9ce277e87217d 100644 --- a/crates/sui-core/src/consensus_adapter.rs +++ b/crates/sui-core/src/consensus_adapter.rs @@ -11,7 +11,14 @@ use narwhal_executor::SubscriberResult; use narwhal_types::TransactionProto; use narwhal_types::TransactionsClient; use parking_lot::Mutex; +use prometheus::register_int_counter_with_registry; +use prometheus::register_int_gauge_with_registry; +use prometheus::IntCounter; +use prometheus::IntGauge; +use prometheus::Registry; use std::collections::VecDeque; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::{ collections::{hash_map::DefaultHasher, HashMap}, @@ -24,7 +31,10 @@ use sui_types::{ error::{SuiError, SuiResult}, messages::ConsensusTransaction, }; + use tap::prelude::*; +use tokio::time::Instant; + use tokio::{ sync::{ mpsc::{Receiver, Sender}, @@ -53,6 +63,86 @@ type SerializedTransactionInfoResponse = Vec; type TxSequencedNotifier = oneshot::Sender>; type TxSequencedNotifierClose = oneshot::Sender<()>; +pub struct ConsensusAdapterMetrics { + // Certificate sequencing metrics + pub sequencing_certificate_attempt: IntCounter, + pub sequencing_certificate_success: IntCounter, + pub sequencing_certificate_timeouts: IntCounter, + pub sequencing_certificate_control_delay: IntGauge, + + // Certificate sequencing metrics + pub sequencing_fragment_attempt: IntCounter, + pub sequencing_fragment_success: IntCounter, + pub sequencing_fragment_timeouts: IntCounter, + pub sequencing_fragment_control_delay: IntGauge, +} + +const MAX_DELAY_MULTIPLIER: u64 = 100; +fn weighted_average_half(old_average: u64, new_value: u64) -> u64 { + (500 * old_average + 500 * new_value) / 1000 +} + +pub type OptArcConsensusAdapterMetrics = Option>; + +impl ConsensusAdapterMetrics { + pub fn new(registry: &Registry) -> OptArcConsensusAdapterMetrics { + Some(Arc::new(ConsensusAdapterMetrics { + sequencing_certificate_attempt: register_int_counter_with_registry!( + "sequencing_certificate_attempt", + "TODO", + registry, + ) + .unwrap(), + sequencing_certificate_success: register_int_counter_with_registry!( + "sequencing_certificate_success", + "TODO", + registry, + ) + .unwrap(), + sequencing_certificate_timeouts: register_int_counter_with_registry!( + "sequencing_certificate_timeouts", + "TODO", + registry, + ) + .unwrap(), + sequencing_certificate_control_delay: register_int_gauge_with_registry!( + "sequencing_certificate_control_delay", + "Current number of concurrent follower connections", + registry, + ) + .unwrap(), + sequencing_fragment_attempt: register_int_counter_with_registry!( + "sequencing_fragment_attempt", + "TODO", + registry, + ) + .unwrap(), + sequencing_fragment_success: register_int_counter_with_registry!( + "sequencing_fragment_success", + "TODO", + registry, + ) + .unwrap(), + sequencing_fragment_timeouts: register_int_counter_with_registry!( + "sequencing_fragment_timeouts", + "TODO", + registry, + ) + .unwrap(), + sequencing_fragment_control_delay: register_int_gauge_with_registry!( + "sequencing_fragment_control_delay", + "Current number of concurrent follower connections", + registry, + ) + .unwrap(), + })) + } + + pub fn new_test() -> OptArcConsensusAdapterMetrics { + None + } +} + /// Message to notify the consensus listener that a new transaction has been sent to consensus /// or that the caller timed out on a specific transaction. #[derive(Debug)] @@ -119,6 +209,14 @@ pub struct ConsensusAdapter { /// this delay passed, the client will be notified that its transaction was probably not /// sequence and it should try to resubmit its transaction. max_delay: Duration, + + /// Estimation of the conensus delay, to use to dynamically adjust the delay + /// before we time out, so that we do not spam the consensus adapter with the + /// same transaction. + delay_ms: AtomicU64, + + /// A structure to register metrics + opt_metrics: OptArcConsensusAdapterMetrics, } impl ConsensusAdapter { @@ -128,6 +226,7 @@ impl ConsensusAdapter { committee: Committee, tx_consensus_listener: Sender, max_delay: Duration, + opt_metrics: OptArcConsensusAdapterMetrics, ) -> Self { let consensus_client = TransactionsClient::new( mysten_network::client::connect_lazy(&consensus_address) @@ -138,6 +237,8 @@ impl ConsensusAdapter { committee, tx_consensus_listener, max_delay, + delay_ms: AtomicU64::new(max_delay.as_millis() as u64), + opt_metrics, } } @@ -148,6 +249,8 @@ impl ConsensusAdapter { } /// Submit a transaction to consensus, wait for its processing, and notify the caller. + // Use .inspect when its stable. + #[allow(clippy::option_map_unit_fn)] pub async fn submit(&self, certificate: &ConsensusTransaction) -> SuiResult { // Check the Sui certificate (submitted by the user). certificate.verify(&self.committee)?; @@ -167,7 +270,9 @@ impl ConsensusAdapter { .expect("Failed to notify consensus listener"); // Check if this authority submits the transaction to consensus. - if Self::should_submit(certificate) { + let now = Instant::now(); + let should_submit = Self::should_submit(certificate); + if should_submit { self.consensus_client .clone() .submit_transaction(TransactionProto { transaction: bytes }) @@ -176,6 +281,11 @@ impl ConsensusAdapter { .tap_err(|r| { error!("Submit transaction failed with: {:?}", r); })?; + + // Increment the attempted certificate sequencing + self.opt_metrics.as_ref().map(|metrics| { + metrics.sequencing_certificate_attempt.inc(); + }); } // Wait for the consensus to sequence the certificate and assign locks to shared objects. @@ -183,14 +293,55 @@ impl ConsensusAdapter { // certificate will be sequenced. So the best we can do is to set a timer and notify the // client to retry if we timeout without hearing back from consensus (this module does not // handle retries). The best timeout value depends on the consensus protocol. - match timeout(self.max_delay, waiter.wait_for_result()).await { - Ok(_) => Ok(()), + let back_off_delay = + self.max_delay + Duration::from_millis(self.delay_ms.load(Ordering::Relaxed)); + let result = match timeout(back_off_delay, waiter.wait_for_result()).await { + Ok(_) => { + // Increment the attempted certificate sequencing success + self.opt_metrics.as_ref().map(|metrics| { + metrics.sequencing_certificate_success.inc(); + }); + + Ok(()) + } Err(e) => { + // Increment the attempted certificate sequencing failure + self.opt_metrics.as_ref().map(|metrics| { + metrics.sequencing_certificate_timeouts.inc(); + }); + // We drop the waiter which will signal to the conensus listener task to clean up // the channels. Err(SuiError::FailedToHearBackFromConsensus(e.to_string())) } + }; + + // Adapt the timeout for the next submission based on the delay we have observed so + // far using a weighted average, implementing proportinal control targetting the observed latency. + // Note that if we keep timing out the delay will keep increasing linearly as we constantly + // add max_delay to the observe delay to set the + // time-out. + if should_submit { + let past_ms = now.elapsed().as_millis() as u64; + let current_delay = self.delay_ms.load(Ordering::Relaxed); + let new_delay = weighted_average_half(past_ms, current_delay); + // clip to a max delay, 100x the self.max_delay. 100x is arbitrary + // but all we really need here is some max so that we do not wait for ever + // in case consensus if dead. + let new_delay = + new_delay.min((self.max_delay.as_millis() as u64) * MAX_DELAY_MULTIPLIER); + + // Store the latest latency + self.opt_metrics.as_ref().map(|metrics| { + metrics + .sequencing_certificate_control_delay + .set(new_delay as i64); + }); + + self.delay_ms.store(new_delay, Ordering::Relaxed); } + + result } } @@ -346,6 +497,9 @@ pub struct CheckpointConsensusAdapter { max_pending_transactions: usize, /// Keep all checkpoint fragment waiting to be sequenced. buffer: VecDeque<(SerializedConsensusTransaction, CheckpointSequenceNumber)>, + + /// A structure to register metrics + opt_metrics: OptArcConsensusAdapterMetrics, } impl CheckpointConsensusAdapter { @@ -357,6 +511,7 @@ impl CheckpointConsensusAdapter { checkpoint_db: Arc>, retry_delay: Duration, max_pending_transactions: usize, + opt_metrics: OptArcConsensusAdapterMetrics, ) -> Self { // Create a new network client. let connection = mysten_network::client::connect_lazy(&consensus_address) @@ -372,6 +527,7 @@ impl CheckpointConsensusAdapter { retry_delay, max_pending_transactions, buffer: VecDeque::with_capacity(max_pending_transactions), + opt_metrics, } } @@ -381,9 +537,17 @@ impl CheckpointConsensusAdapter { } /// Submit a transaction to consensus. + // Use .inspect when its stable. + #[allow(clippy::option_map_unit_fn)] async fn submit(&self, serialized: SerializedConsensusTransaction) -> SuiResult { let transaction = Bytes::from(serialized); let proto_transaction = TransactionProto { transaction }; + + // Increment the attempted fragment sequencing failure + self.opt_metrics.as_ref().map(|metrics| { + metrics.sequencing_fragment_attempt.inc(); + }); + self.consensus_client .clone() .submit_transaction(proto_transaction) @@ -397,18 +561,26 @@ impl CheckpointConsensusAdapter { receiver: ConsensusWaiter, retry_delay: Duration, deliver: T, - ) -> (SuiResult, T) { + ) -> (SuiResult, u64, T) { + let now = Instant::now(); let outcome = match timeout(retry_delay, receiver.wait_for_result()).await { Ok(reply) => reply, Err(e) => Err(SuiError::FailedToHearBackFromConsensus(e.to_string())), }; - (outcome, deliver) + let conensus_latency = now.elapsed().as_millis() as u64; + (outcome, conensus_latency, deliver) } /// Main loop receiving checkpoint fragments to reliably submit to consensus. + // Use .inspect when its stable. + #[allow(clippy::option_map_unit_fn)] async fn run(&mut self) { let mut waiting = FuturesUnordered::new(); + // Fragment sequencing latency estimation + let mut latency_estimate = self.retry_delay.as_millis() as u64; + let max_latency = latency_estimate * 100; + // Continuously listen to checkpoint fragments and re-attempt sequencing if needed. loop { // Try to submit all pending checkpoint fragments to consensus. @@ -421,16 +593,20 @@ impl CheckpointConsensusAdapter { let consensus_input = ConsensusListenerMessage::New(serialized.clone(), signals); - self.tx_consensus_listener - .send(consensus_input) - .await - .expect("Failed to notify consensus listener"); // Add the receiver to the waiter. So we can retransmit if the // connection fails. let deliver = (serialized, sequence_number); - let future = Self::waiter(waiter, self.retry_delay, deliver); + let timeout_delay = + Duration::from_millis(latency_estimate) + self.retry_delay; + let future = Self::waiter(waiter, timeout_delay, deliver); waiting.push(future); + + // Finally sent to consensus, after registering to avoid a race condition + self.tx_consensus_listener + .send(consensus_input) + .await + .expect("Failed to notify consensus listener"); } Err(e) => { error!("Checkpoint fragment submit failed: {:?}", e); @@ -463,11 +639,32 @@ impl CheckpointConsensusAdapter { }, // Listen to checkpoint fragments who failed to be sequenced and need retries. - Some((outcome, identifier)) = waiting.next() => { + Some((outcome, latency_ms, identifier)) = waiting.next() => { + + // Update the latency estimate using a weigted average + // But also cap it upwards by max_latency + latency_estimate = max_latency.min(weighted_average_half(latency_estimate, latency_ms)); + + // Record the latest consensus latency estimate for fragments + self.opt_metrics.as_ref().map(|metrics| { + metrics.sequencing_fragment_control_delay.set(latency_estimate as i64); + }); + if let Err(error) = outcome { - tracing::debug!("Failed to sequence checkpoint fragment: {error}"); + tracing::warn!("Failed to sequence checkpoint fragment, and re-submitting fragment: {error}"); let (serialized_transaction, checkpoint_sequence_number) = identifier; + + // Increment the attempted fragment sequencing failure + self.opt_metrics.as_ref().map(|metrics| { + metrics.sequencing_fragment_timeouts.inc(); + }); + self.buffer.push_back((serialized_transaction, checkpoint_sequence_number)); + } else { + // Increment the attempted fragment sequencing success + self.opt_metrics.as_ref().map(|metrics| { + metrics.sequencing_fragment_success.inc(); + }); } }, } diff --git a/crates/sui-core/src/unit_tests/consensus_tests.rs b/crates/sui-core/src/unit_tests/consensus_tests.rs index e2a4829f45440..70e4eaea1dd7f 100644 --- a/crates/sui-core/src/unit_tests/consensus_tests.rs +++ b/crates/sui-core/src/unit_tests/consensus_tests.rs @@ -154,6 +154,7 @@ async fn submit_transaction_to_consensus() { let committee = state.clone_committee(); let state_guard = Arc::new(state); + let metrics = ConsensusAdapterMetrics::new_test(); // Make a new consensus submitter instance. let submitter = ConsensusAdapter::new( @@ -161,6 +162,7 @@ async fn submit_transaction_to_consensus() { committee, tx_consensus_listener, /* max_delay */ Duration::from_millis(1_000), + metrics, ); // Spawn a network listener to receive the transaction (emulating the consensus node).