From a74b23e5e2c8040b5355a67c6d41a2a8301efbbd Mon Sep 17 00:00:00 2001 From: Anastasios Kichidis Date: Thu, 6 Jun 2024 18:45:32 +0100 Subject: [PATCH] [Consensus 2.0] Additional metrics & logs (#18075) ## Description More metrics and logs related to block receive/acceptance/commit , subscription etc ## Test plan CI --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: --------- Co-authored-by: MW Tian --- consensus/core/src/authority_service.rs | 32 ++++++++- consensus/core/src/block_manager.rs | 37 ++++++++++ consensus/core/src/commit_observer.rs | 50 +++++++------ consensus/core/src/core.rs | 5 ++ consensus/core/src/dag_state.rs | 23 ++++++ consensus/core/src/metrics.rs | 95 ++++++++++++++++++++++--- consensus/core/src/subscriber.rs | 33 ++++++--- consensus/core/src/synchronizer.rs | 29 +++++--- 8 files changed, 251 insertions(+), 53 deletions(-) diff --git a/consensus/core/src/authority_service.rs b/consensus/core/src/authority_service.rs index e68878513445e1..11423ba61386b0 100644 --- a/consensus/core/src/authority_service.rs +++ b/consensus/core/src/authority_service.rs @@ -107,6 +107,8 @@ impl NetworkService for AuthorityService { } let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block); + debug!("Received block {verified_block} via send block."); + // Reject block with timestamp too far in the future. let now = self.context.clock.timestamp_utc_ms(); let forward_time_drift = @@ -236,8 +238,11 @@ impl NetworkService for AuthorityService { .into_iter() .map(|block| block.serialized().clone()), ); - let broadcasted_blocks = - BroadcastedBlockStream::new(peer, self.rx_block_broadcaster.resubscribe()); + let broadcasted_blocks = BroadcastedBlockStream::new( + self.context.clone(), + peer, + self.rx_block_broadcaster.resubscribe(), + ); // Return a stream of blocks that first yields missed blocks as requested, then new blocks. Ok(Box::pin(missed_blocks.chain( @@ -358,6 +363,7 @@ pub(crate) type BroadcastedBlockStream = BroadcastStream; /// Adapted from `tokio_stream::wrappers::BroadcastStream`. The main difference is that /// this tolerates lags with only logging, without yielding errors. pub(crate) struct BroadcastStream { + context: Arc, peer: AuthorityIndex, // Stores the receiver across poll_next() calls. inner: ReusableBoxFuture< @@ -370,8 +376,16 @@ pub(crate) struct BroadcastStream { } impl BroadcastStream { - pub fn new(peer: AuthorityIndex, rx: broadcast::Receiver) -> Self { + pub fn new(context: Arc, peer: AuthorityIndex, rx: broadcast::Receiver) -> Self { + let peer_hostname = &context.committee.authority(peer).hostname; + context + .metrics + .node_metrics + .subscribed_peers + .with_label_values(&[peer_hostname]) + .set(1); Self { + context, peer, inner: ReusableBoxFuture::new(make_recv_future(rx)), } @@ -409,6 +423,18 @@ impl Stream for BroadcastStream { } } +impl Drop for BroadcastStream { + fn drop(&mut self) { + let peer_hostname = &self.context.committee.authority(self.peer).hostname; + self.context + .metrics + .node_metrics + .subscribed_peers + .with_label_values(&[peer_hostname]) + .set(0); + } +} + async fn make_recv_future( mut rx: broadcast::Receiver, ) -> ( diff --git a/consensus/core/src/block_manager.rs b/consensus/core/src/block_manager.rs index 90bf852ae24149..68071fe3d5d265 100644 --- a/consensus/core/src/block_manager.rs +++ b/consensus/core/src/block_manager.rs @@ -18,6 +18,7 @@ use crate::{ block_verifier::BlockVerifier, context::Context, dag_state::DagState, + Round, }; struct SuspendedBlock { @@ -57,6 +58,9 @@ pub(crate) struct BlockManager { /// Keeps all the blocks that we actually miss and haven't fetched them yet. That set will basically contain all the /// keys from the `missing_ancestors` minus any keys that exist in `suspended_blocks`. missing_blocks: BTreeSet, + /// A vector that holds a tuple of (lowest_round, highest_round) of received blocks per authority. + /// This is used for metrics reporting purposes and resets during restarts. + received_block_rounds: Vec>, } impl BlockManager { @@ -65,6 +69,7 @@ impl BlockManager { dag_state: Arc>, block_verifier: Arc, ) -> Self { + let committee_size = context.committee.size(); Self { context, dag_state, @@ -72,6 +77,7 @@ impl BlockManager { suspended_blocks: BTreeMap::new(), missing_ancestors: BTreeMap::new(), missing_blocks: BTreeSet::new(), + received_block_rounds: vec![None; committee_size], } } @@ -94,10 +100,17 @@ impl BlockManager { let mut missing_blocks = BTreeSet::new(); for block in blocks { + self.update_block_received_metrics(&block); + // Try to accept the input block. + let block_ref = block.reference(); let block = match self.try_accept_one_block(block) { TryAcceptResult::Accepted(block) => block, TryAcceptResult::Suspended(ancestors_to_fetch) => { + debug!( + "Missing ancestors for block {block_ref}: {}", + ancestors_to_fetch.iter().map(|b| b.to_string()).join(",") + ); missing_blocks.extend(ancestors_to_fetch); continue; } @@ -337,6 +350,30 @@ impl BlockManager { self.missing_blocks.clone() } + fn update_block_received_metrics(&mut self, block: &VerifiedBlock) { + let (min_round, max_round) = + if let Some((curr_min, curr_max)) = self.received_block_rounds[block.author()] { + (curr_min.min(block.round()), curr_max.max(block.round())) + } else { + (block.round(), block.round()) + }; + self.received_block_rounds[block.author()] = Some((min_round, max_round)); + + let hostname = &self.context.committee.authority(block.author()).hostname; + self.context + .metrics + .node_metrics + .lowest_verified_authority_round + .with_label_values(&[hostname]) + .set(min_round.into()); + self.context + .metrics + .node_metrics + .highest_verified_authority_round + .with_label_values(&[hostname]) + .set(max_round.into()); + } + /// Returns all the suspended blocks whose causal history we miss hence we can't accept them yet. #[cfg(test)] fn suspended_blocks(&self) -> Vec { diff --git a/consensus/core/src/commit_observer.rs b/consensus/core/src/commit_observer.rs index a3bb24a624761f..7a27416fa5f123 100644 --- a/consensus/core/src/commit_observer.rs +++ b/consensus/core/src/commit_observer.rs @@ -5,6 +5,7 @@ use std::{sync::Arc, time::Duration}; use mysten_metrics::monitored_mpsc::UnboundedSender; use parking_lot::RwLock; +use tracing::info; use crate::{ block::{BlockAPI, VerifiedBlock}, @@ -152,32 +153,37 @@ impl CommitObserver { } fn report_metrics(&self, committed: &[CommittedSubDag]) { + let metrics = &self.context.metrics.node_metrics; let utc_now = self.context.clock.timestamp_utc_ms(); - let mut total = 0; - for block in committed.iter().flat_map(|dag| &dag.blocks) { - let latency_ms = utc_now - .checked_sub(block.timestamp_ms()) - .unwrap_or_default(); - - total += 1; - - self.context - .metrics - .node_metrics - .block_commit_latency - .observe(Duration::from_millis(latency_ms).as_secs_f64()); - self.context - .metrics - .node_metrics + + for commit in committed { + info!( + "Consensus commit {} with leader {} has {} blocks", + commit.commit_ref, + commit.leader, + commit.blocks.len() + ); + + metrics .last_committed_leader_round - .set(block.round() as i64); + .set(commit.leader.round as i64); + metrics + .last_commit_index + .set(commit.commit_ref.index as i64); + metrics + .blocks_per_commit_count + .observe(commit.blocks.len() as f64); + + for block in &commit.blocks { + let latency_ms = utc_now + .checked_sub(block.timestamp_ms()) + .unwrap_or_default(); + metrics + .block_commit_latency + .observe(Duration::from_millis(latency_ms).as_secs_f64()); + } } - self.context - .metrics - .node_metrics - .blocks_per_commit_count - .observe(total as f64); self.context .metrics .node_metrics diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index feef2bfa2319d7..8085718f640f90 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -193,6 +193,11 @@ impl Core { .scope_processing_time .with_label_values(&["Core::add_blocks"]) .start_timer(); + self.context + .metrics + .node_metrics + .core_add_blocks_batch_size + .observe(blocks.len() as f64); // Try to accept them via the block manager let (accepted_blocks, missing_blocks) = self.block_manager.try_accept_blocks(blocks); diff --git a/consensus/core/src/dag_state.rs b/consensus/core/src/dag_state.rs index 12c022a3a0408e..5ebac04a585a1f 100644 --- a/consensus/core/src/dag_state.rs +++ b/consensus/core/src/dag_state.rs @@ -237,6 +237,18 @@ impl DagState { .node_metrics .highest_accepted_round .set(self.highest_accepted_round as i64); + + let highest_accepted_round_for_author = self.recent_refs[block_ref.author] + .last() + .map(|block_ref| block_ref.round) + .expect("There should be by now at least one block ref"); + let hostname = &self.context.committee.authority(block_ref.author).hostname; + self.context + .metrics + .node_metrics + .highest_accepted_authority_round + .with_label_values(&[hostname]) + .set(highest_accepted_round_for_author as i64); } /// Accepts a blocks into DagState and keeps it in memory. @@ -606,6 +618,17 @@ impl DagState { ); } + for (i, round) in self.last_committed_rounds.iter().enumerate() { + let index = self.context.committee.to_authority_index(i).unwrap(); + let hostname = &self.context.committee.authority(index).hostname; + self.context + .metrics + .node_metrics + .last_committed_authority_round + .with_label_values(&[hostname]) + .set((*round).into()); + } + self.pending_commit_votes.push_back(commit.reference()); self.commits_to_write.push(commit); } diff --git a/consensus/core/src/metrics.rs b/consensus/core/src/metrics.rs index 404558a4fd042b..da5e4908695524 100644 --- a/consensus/core/src/metrics.rs +++ b/consensus/core/src/metrics.rs @@ -19,9 +19,33 @@ const FINE_GRAINED_LATENCY_SEC_BUCKETS: &[f64] = &[ 4.0, 4.5, 5.0, 5.5, 6.0, 6.5, 7.0, 7.5, 8.0, 8.5, 9.0, 9.5, 10., ]; -const COMMITTED_BLOCKS_BUCKETS: &[f64] = &[ - 1.0, 2.0, 4.0, 8.0, 10.0, 20.0, 40.0, 80.0, 100.0, 150.0, 200.0, 400.0, 800.0, 1000.0, 2000.0, +const NUM_BLOCKS_BUCKETS: &[f64] = &[ + 1.0, + 2.0, + 4.0, + 8.0, + 10.0, + 20.0, + 40.0, + 80.0, + 100.0, + 150.0, + 200.0, + 400.0, + 800.0, + 1000.0, + 2000.0, 3000.0, + 5000.0, + 10000.0, + 20000.0, + 30000.0, + 50000.0, + 100_000.0, + 200_000.0, + 300_000.0, + 500_000.0, + 1_000_000.0, ]; const LATENCY_SEC_BUCKETS: &[f64] = &[ @@ -80,13 +104,17 @@ pub(crate) struct NodeMetrics { pub(crate) proposed_blocks: IntCounterVec, pub(crate) block_size: Histogram, pub(crate) block_ancestors: Histogram, + pub(crate) highest_verified_authority_round: IntGaugeVec, + pub(crate) lowest_verified_authority_round: IntGaugeVec, pub(crate) block_proposal_leader_wait_ms: IntCounterVec, pub(crate) block_proposal_leader_wait_count: IntCounterVec, pub(crate) block_timestamp_drift_wait_ms: IntCounterVec, pub(crate) blocks_per_commit_count: Histogram, pub(crate) broadcaster_rtt_estimate_ms: IntGaugeVec, + pub(crate) core_add_blocks_batch_size: Histogram, pub(crate) core_lock_dequeued: IntCounter, pub(crate) core_lock_enqueued: IntCounter, + pub(crate) highest_accepted_authority_round: IntGaugeVec, pub(crate) highest_accepted_round: IntGauge, pub(crate) accepted_blocks: IntCounterVec, pub(crate) dag_state_recent_blocks: IntGauge, @@ -94,13 +122,16 @@ pub(crate) struct NodeMetrics { pub(crate) dag_state_store_read_count: IntCounterVec, pub(crate) dag_state_store_write_count: IntCounter, pub(crate) fetch_blocks_scheduler_inflight: IntGauge, - pub(crate) fetched_blocks: IntCounterVec, + pub(crate) synchronizer_fetched_blocks_by_peer: IntCounterVec, + pub(crate) synchronizer_fetched_blocks_by_authority: IntCounterVec, pub(crate) invalid_blocks: IntCounterVec, pub(crate) rejected_blocks: IntCounterVec, pub(crate) rejected_future_blocks: IntCounterVec, pub(crate) verified_blocks: IntCounterVec, pub(crate) committed_leaders_total: IntCounterVec, + pub(crate) last_committed_authority_round: IntGaugeVec, pub(crate) last_committed_leader_round: IntGauge, + pub(crate) last_commit_index: IntGauge, pub(crate) commit_round_advancement_interval: Histogram, pub(crate) last_decided_leader_round: IntGauge, pub(crate) leader_timeout_total: IntCounterVec, @@ -120,6 +151,7 @@ pub(crate) struct NodeMetrics { pub(crate) threshold_clock_round: IntGauge, pub(crate) subscriber_connection_attempts: IntCounterVec, pub(crate) subscriber_connections: IntGaugeVec, + pub(crate) subscribed_peers: IntGaugeVec, pub(crate) commit_sync_inflight_fetches: IntGauge, pub(crate) commit_sync_pending_fetches: IntGauge, pub(crate) commit_sync_fetched_commits: IntCounter, @@ -160,6 +192,18 @@ impl NodeMetrics { exponential_buckets(1.0, 1.4, 20).unwrap(), registry, ).unwrap(), + highest_verified_authority_round: register_int_gauge_vec_with_registry!( + "highest_verified_authority_round", + "The highest round of verified block for the corresponding authority", + &["authority"], + registry, + ).unwrap(), + lowest_verified_authority_round: register_int_gauge_vec_with_registry!( + "lowest_verified_authority_round", + "The lowest round of verified block for the corresponding authority", + &["authority"], + registry, + ).unwrap(), block_proposal_leader_wait_ms: register_int_counter_vec_with_registry!( "block_proposal_leader_wait_ms", "Total time in ms spent waiting for a leader when proposing blocks.", @@ -181,7 +225,7 @@ impl NodeMetrics { blocks_per_commit_count: register_histogram_with_registry!( "blocks_per_commit_count", "The number of blocks per commit.", - COMMITTED_BLOCKS_BUCKETS.to_vec(), + NUM_BLOCKS_BUCKETS.to_vec(), registry, ).unwrap(), broadcaster_rtt_estimate_ms: register_int_gauge_vec_with_registry!( @@ -190,6 +234,12 @@ impl NodeMetrics { &["peer"], registry, ).unwrap(), + core_add_blocks_batch_size: register_histogram_with_registry!( + "core_add_blocks_batch_size", + "The number of blocks received from Core for processing on a single batch", + NUM_BLOCKS_BUCKETS.to_vec(), + registry, + ).unwrap(), core_lock_dequeued: register_int_counter_with_registry!( "core_lock_dequeued", "Number of dequeued core requests", @@ -200,6 +250,12 @@ impl NodeMetrics { "Number of enqueued core requests", registry, ).unwrap(), + highest_accepted_authority_round: register_int_gauge_vec_with_registry!( + "highest_accepted_authority_round", + "The highest round where a block has been accepted by author. Resets on restart.", + &["author"], + registry, + ).unwrap(), highest_accepted_round: register_int_gauge_with_registry!( "highest_accepted_round", "The highest round where a block has been accepted. Resets on restart.", @@ -237,9 +293,15 @@ impl NodeMetrics { "Designates whether the synchronizer scheduler task to fetch blocks is currently running", registry, ).unwrap(), - fetched_blocks: register_int_counter_vec_with_registry!( - "fetched_blocks", - "Number of fetched blocks per peer authority via the synchronizer.", + synchronizer_fetched_blocks_by_peer: register_int_counter_vec_with_registry!( + "synchronizer_fetched_blocks_by_peer", + "Number of fetched blocks per peer authority via the synchronizer and also by block authority", + &["peer", "type"], + registry, + ).unwrap(), + synchronizer_fetched_blocks_by_authority: register_int_counter_vec_with_registry!( + "synchronizer_fetched_blocks_by_authority", + "Number of fetched blocks per block author via the synchronizer", &["authority", "type"], registry, ).unwrap(), @@ -274,11 +336,22 @@ impl NodeMetrics { &["authority", "commit_type"], registry, ).unwrap(), + last_committed_authority_round: register_int_gauge_vec_with_registry!( + "last_committed_authority_round", + "The last round committed by authority.", + &["authority"], + registry, + ).unwrap(), last_committed_leader_round: register_int_gauge_with_registry!( "last_committed_leader_round", "The last round where a leader was committed to store and sent to commit consumer.", registry, ).unwrap(), + last_commit_index: register_int_gauge_with_registry!( + "last_commit_index", + "Index of the last commit.", + registry, + ).unwrap(), commit_round_advancement_interval: register_histogram_with_registry!( "commit_round_advancement_interval", "Intervals (in secs) between commit round advancements.", @@ -380,7 +453,13 @@ impl NodeMetrics { ).unwrap(), subscriber_connections: register_int_gauge_vec_with_registry!( "subscriber_connections", - "The number of block stream connections broken down by peer", + "Peers that this authority subscribed to for block streams.", + &["authority"], + registry, + ).unwrap(), + subscribed_peers: register_int_gauge_vec_with_registry!( + "subscribed_peers", + "Peers subscribing for block streams from this authority.", &["authority"], registry, ).unwrap(), diff --git a/consensus/core/src/subscriber.rs b/consensus/core/src/subscriber.rs index 7cc957bd9b191e..85e534b83ffbf5 100644 --- a/consensus/core/src/subscriber.rs +++ b/consensus/core/src/subscriber.rs @@ -74,13 +74,6 @@ impl Subscriber { peer, last_received, ))); - let peer_hostname = &self.context.committee.authority(peer).hostname; - self.context - .metrics - .node_metrics - .subscriber_connections - .with_label_values(&[peer_hostname]) - .set(1); } pub(crate) fn stop(&self) { @@ -92,15 +85,17 @@ impl Subscriber { fn unsubscribe_locked(&self, peer: AuthorityIndex, subscription: &mut Option>) { let peer_hostname = &self.context.committee.authority(peer).hostname; + if let Some(subscription) = subscription.take() { + subscription.abort(); + } + // There is a race between shutting down the subscription task and clearing the metric here. + // TODO: fix the race when unsubscribe_locked() gets called outside of stop(). self.context .metrics .node_metrics .subscriber_connections .with_label_values(&[peer_hostname]) .set(0); - if let Some(subscription) = subscription.take() { - subscription.abort(); - } } async fn subscription_loop( @@ -119,6 +114,13 @@ impl Subscriber { let mut retries: i64 = 0; let mut delay = INITIAL_RETRY_INTERVAL; 'subscription: loop { + context + .metrics + .node_metrics + .subscriber_connections + .with_label_values(&[peer_hostname]) + .set(0); + if retries > IMMEDIATE_RETRIES { debug!( "Delaying retry {} of peer {} subscription, in {} seconds", @@ -139,6 +141,7 @@ impl Subscriber { delay = INITIAL_RETRY_INTERVAL; } retries += 1; + let mut blocks = match network_client .subscribe_blocks(peer, last_received, MAX_RETRY_INTERVAL) .await @@ -164,6 +167,16 @@ impl Subscriber { continue 'subscription; } }; + + // Now can consider the subscription successful + let peer_hostname = &context.committee.authority(peer).hostname; + context + .metrics + .node_metrics + .subscriber_connections + .with_label_values(&[peer_hostname]) + .set(1); + 'stream: loop { match blocks.next().await { Some(block) => { diff --git a/consensus/core/src/synchronizer.rs b/consensus/core/src/synchronizer.rs index f6f5911acd49dd..7f58314827ee04 100644 --- a/consensus/core/src/synchronizer.rs +++ b/consensus/core/src/synchronizer.rs @@ -400,19 +400,15 @@ impl Synchronizer { match response { Ok(blocks) => { - let peer_hostname = &context.committee.authority(peer_index).hostname; - context - .metrics - .node_metrics - .fetched_blocks.with_label_values(&[peer_hostname, "live"]).inc_by(blocks.len() as u64); - if let Err(err) = Self::process_fetched_blocks(blocks, peer_index, blocks_guard, core_dispatcher.clone(), block_verifier.clone(), context.clone(), - commands_sender.clone()).await { + commands_sender.clone(), + "live" + ).await { warn!("Error while processing fetched blocks from peer {peer_index}: {err}"); } }, @@ -445,6 +441,7 @@ impl Synchronizer, context: Arc, commands_sender: Sender, + sync_method: &str, ) -> ConsensusResult<()> { // The maximum number of blocks that can be additionally fetched from the one requested - those // are potentially missing ancestors. @@ -485,6 +482,20 @@ impl Synchronizer Synchronizer