diff --git a/consensus/core/src/authority_service.rs b/consensus/core/src/authority_service.rs index e68878513445e1..11423ba61386b0 100644 --- a/consensus/core/src/authority_service.rs +++ b/consensus/core/src/authority_service.rs @@ -107,6 +107,8 @@ impl NetworkService for AuthorityService { } let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block); + debug!("Received block {verified_block} via send block."); + // Reject block with timestamp too far in the future. let now = self.context.clock.timestamp_utc_ms(); let forward_time_drift = @@ -236,8 +238,11 @@ impl NetworkService for AuthorityService { .into_iter() .map(|block| block.serialized().clone()), ); - let broadcasted_blocks = - BroadcastedBlockStream::new(peer, self.rx_block_broadcaster.resubscribe()); + let broadcasted_blocks = BroadcastedBlockStream::new( + self.context.clone(), + peer, + self.rx_block_broadcaster.resubscribe(), + ); // Return a stream of blocks that first yields missed blocks as requested, then new blocks. Ok(Box::pin(missed_blocks.chain( @@ -358,6 +363,7 @@ pub(crate) type BroadcastedBlockStream = BroadcastStream; /// Adapted from `tokio_stream::wrappers::BroadcastStream`. The main difference is that /// this tolerates lags with only logging, without yielding errors. pub(crate) struct BroadcastStream { + context: Arc, peer: AuthorityIndex, // Stores the receiver across poll_next() calls. inner: ReusableBoxFuture< @@ -370,8 +376,16 @@ pub(crate) struct BroadcastStream { } impl BroadcastStream { - pub fn new(peer: AuthorityIndex, rx: broadcast::Receiver) -> Self { + pub fn new(context: Arc, peer: AuthorityIndex, rx: broadcast::Receiver) -> Self { + let peer_hostname = &context.committee.authority(peer).hostname; + context + .metrics + .node_metrics + .subscribed_peers + .with_label_values(&[peer_hostname]) + .set(1); Self { + context, peer, inner: ReusableBoxFuture::new(make_recv_future(rx)), } @@ -409,6 +423,18 @@ impl Stream for BroadcastStream { } } +impl Drop for BroadcastStream { + fn drop(&mut self) { + let peer_hostname = &self.context.committee.authority(self.peer).hostname; + self.context + .metrics + .node_metrics + .subscribed_peers + .with_label_values(&[peer_hostname]) + .set(0); + } +} + async fn make_recv_future( mut rx: broadcast::Receiver, ) -> ( diff --git a/consensus/core/src/block_manager.rs b/consensus/core/src/block_manager.rs index 90bf852ae24149..68071fe3d5d265 100644 --- a/consensus/core/src/block_manager.rs +++ b/consensus/core/src/block_manager.rs @@ -18,6 +18,7 @@ use crate::{ block_verifier::BlockVerifier, context::Context, dag_state::DagState, + Round, }; struct SuspendedBlock { @@ -57,6 +58,9 @@ pub(crate) struct BlockManager { /// Keeps all the blocks that we actually miss and haven't fetched them yet. That set will basically contain all the /// keys from the `missing_ancestors` minus any keys that exist in `suspended_blocks`. missing_blocks: BTreeSet, + /// A vector that holds a tuple of (lowest_round, highest_round) of received blocks per authority. + /// This is used for metrics reporting purposes and resets during restarts. + received_block_rounds: Vec>, } impl BlockManager { @@ -65,6 +69,7 @@ impl BlockManager { dag_state: Arc>, block_verifier: Arc, ) -> Self { + let committee_size = context.committee.size(); Self { context, dag_state, @@ -72,6 +77,7 @@ impl BlockManager { suspended_blocks: BTreeMap::new(), missing_ancestors: BTreeMap::new(), missing_blocks: BTreeSet::new(), + received_block_rounds: vec![None; committee_size], } } @@ -94,10 +100,17 @@ impl BlockManager { let mut missing_blocks = BTreeSet::new(); for block in blocks { + self.update_block_received_metrics(&block); + // Try to accept the input block. + let block_ref = block.reference(); let block = match self.try_accept_one_block(block) { TryAcceptResult::Accepted(block) => block, TryAcceptResult::Suspended(ancestors_to_fetch) => { + debug!( + "Missing ancestors for block {block_ref}: {}", + ancestors_to_fetch.iter().map(|b| b.to_string()).join(",") + ); missing_blocks.extend(ancestors_to_fetch); continue; } @@ -337,6 +350,30 @@ impl BlockManager { self.missing_blocks.clone() } + fn update_block_received_metrics(&mut self, block: &VerifiedBlock) { + let (min_round, max_round) = + if let Some((curr_min, curr_max)) = self.received_block_rounds[block.author()] { + (curr_min.min(block.round()), curr_max.max(block.round())) + } else { + (block.round(), block.round()) + }; + self.received_block_rounds[block.author()] = Some((min_round, max_round)); + + let hostname = &self.context.committee.authority(block.author()).hostname; + self.context + .metrics + .node_metrics + .lowest_verified_authority_round + .with_label_values(&[hostname]) + .set(min_round.into()); + self.context + .metrics + .node_metrics + .highest_verified_authority_round + .with_label_values(&[hostname]) + .set(max_round.into()); + } + /// Returns all the suspended blocks whose causal history we miss hence we can't accept them yet. #[cfg(test)] fn suspended_blocks(&self) -> Vec { diff --git a/consensus/core/src/commit_observer.rs b/consensus/core/src/commit_observer.rs index a3bb24a624761f..7a27416fa5f123 100644 --- a/consensus/core/src/commit_observer.rs +++ b/consensus/core/src/commit_observer.rs @@ -5,6 +5,7 @@ use std::{sync::Arc, time::Duration}; use mysten_metrics::monitored_mpsc::UnboundedSender; use parking_lot::RwLock; +use tracing::info; use crate::{ block::{BlockAPI, VerifiedBlock}, @@ -152,32 +153,37 @@ impl CommitObserver { } fn report_metrics(&self, committed: &[CommittedSubDag]) { + let metrics = &self.context.metrics.node_metrics; let utc_now = self.context.clock.timestamp_utc_ms(); - let mut total = 0; - for block in committed.iter().flat_map(|dag| &dag.blocks) { - let latency_ms = utc_now - .checked_sub(block.timestamp_ms()) - .unwrap_or_default(); - - total += 1; - - self.context - .metrics - .node_metrics - .block_commit_latency - .observe(Duration::from_millis(latency_ms).as_secs_f64()); - self.context - .metrics - .node_metrics + + for commit in committed { + info!( + "Consensus commit {} with leader {} has {} blocks", + commit.commit_ref, + commit.leader, + commit.blocks.len() + ); + + metrics .last_committed_leader_round - .set(block.round() as i64); + .set(commit.leader.round as i64); + metrics + .last_commit_index + .set(commit.commit_ref.index as i64); + metrics + .blocks_per_commit_count + .observe(commit.blocks.len() as f64); + + for block in &commit.blocks { + let latency_ms = utc_now + .checked_sub(block.timestamp_ms()) + .unwrap_or_default(); + metrics + .block_commit_latency + .observe(Duration::from_millis(latency_ms).as_secs_f64()); + } } - self.context - .metrics - .node_metrics - .blocks_per_commit_count - .observe(total as f64); self.context .metrics .node_metrics diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index feef2bfa2319d7..8085718f640f90 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -193,6 +193,11 @@ impl Core { .scope_processing_time .with_label_values(&["Core::add_blocks"]) .start_timer(); + self.context + .metrics + .node_metrics + .core_add_blocks_batch_size + .observe(blocks.len() as f64); // Try to accept them via the block manager let (accepted_blocks, missing_blocks) = self.block_manager.try_accept_blocks(blocks); diff --git a/consensus/core/src/dag_state.rs b/consensus/core/src/dag_state.rs index 12c022a3a0408e..5ebac04a585a1f 100644 --- a/consensus/core/src/dag_state.rs +++ b/consensus/core/src/dag_state.rs @@ -237,6 +237,18 @@ impl DagState { .node_metrics .highest_accepted_round .set(self.highest_accepted_round as i64); + + let highest_accepted_round_for_author = self.recent_refs[block_ref.author] + .last() + .map(|block_ref| block_ref.round) + .expect("There should be by now at least one block ref"); + let hostname = &self.context.committee.authority(block_ref.author).hostname; + self.context + .metrics + .node_metrics + .highest_accepted_authority_round + .with_label_values(&[hostname]) + .set(highest_accepted_round_for_author as i64); } /// Accepts a blocks into DagState and keeps it in memory. @@ -606,6 +618,17 @@ impl DagState { ); } + for (i, round) in self.last_committed_rounds.iter().enumerate() { + let index = self.context.committee.to_authority_index(i).unwrap(); + let hostname = &self.context.committee.authority(index).hostname; + self.context + .metrics + .node_metrics + .last_committed_authority_round + .with_label_values(&[hostname]) + .set((*round).into()); + } + self.pending_commit_votes.push_back(commit.reference()); self.commits_to_write.push(commit); } diff --git a/consensus/core/src/metrics.rs b/consensus/core/src/metrics.rs index 404558a4fd042b..da5e4908695524 100644 --- a/consensus/core/src/metrics.rs +++ b/consensus/core/src/metrics.rs @@ -19,9 +19,33 @@ const FINE_GRAINED_LATENCY_SEC_BUCKETS: &[f64] = &[ 4.0, 4.5, 5.0, 5.5, 6.0, 6.5, 7.0, 7.5, 8.0, 8.5, 9.0, 9.5, 10., ]; -const COMMITTED_BLOCKS_BUCKETS: &[f64] = &[ - 1.0, 2.0, 4.0, 8.0, 10.0, 20.0, 40.0, 80.0, 100.0, 150.0, 200.0, 400.0, 800.0, 1000.0, 2000.0, +const NUM_BLOCKS_BUCKETS: &[f64] = &[ + 1.0, + 2.0, + 4.0, + 8.0, + 10.0, + 20.0, + 40.0, + 80.0, + 100.0, + 150.0, + 200.0, + 400.0, + 800.0, + 1000.0, + 2000.0, 3000.0, + 5000.0, + 10000.0, + 20000.0, + 30000.0, + 50000.0, + 100_000.0, + 200_000.0, + 300_000.0, + 500_000.0, + 1_000_000.0, ]; const LATENCY_SEC_BUCKETS: &[f64] = &[ @@ -80,13 +104,17 @@ pub(crate) struct NodeMetrics { pub(crate) proposed_blocks: IntCounterVec, pub(crate) block_size: Histogram, pub(crate) block_ancestors: Histogram, + pub(crate) highest_verified_authority_round: IntGaugeVec, + pub(crate) lowest_verified_authority_round: IntGaugeVec, pub(crate) block_proposal_leader_wait_ms: IntCounterVec, pub(crate) block_proposal_leader_wait_count: IntCounterVec, pub(crate) block_timestamp_drift_wait_ms: IntCounterVec, pub(crate) blocks_per_commit_count: Histogram, pub(crate) broadcaster_rtt_estimate_ms: IntGaugeVec, + pub(crate) core_add_blocks_batch_size: Histogram, pub(crate) core_lock_dequeued: IntCounter, pub(crate) core_lock_enqueued: IntCounter, + pub(crate) highest_accepted_authority_round: IntGaugeVec, pub(crate) highest_accepted_round: IntGauge, pub(crate) accepted_blocks: IntCounterVec, pub(crate) dag_state_recent_blocks: IntGauge, @@ -94,13 +122,16 @@ pub(crate) struct NodeMetrics { pub(crate) dag_state_store_read_count: IntCounterVec, pub(crate) dag_state_store_write_count: IntCounter, pub(crate) fetch_blocks_scheduler_inflight: IntGauge, - pub(crate) fetched_blocks: IntCounterVec, + pub(crate) synchronizer_fetched_blocks_by_peer: IntCounterVec, + pub(crate) synchronizer_fetched_blocks_by_authority: IntCounterVec, pub(crate) invalid_blocks: IntCounterVec, pub(crate) rejected_blocks: IntCounterVec, pub(crate) rejected_future_blocks: IntCounterVec, pub(crate) verified_blocks: IntCounterVec, pub(crate) committed_leaders_total: IntCounterVec, + pub(crate) last_committed_authority_round: IntGaugeVec, pub(crate) last_committed_leader_round: IntGauge, + pub(crate) last_commit_index: IntGauge, pub(crate) commit_round_advancement_interval: Histogram, pub(crate) last_decided_leader_round: IntGauge, pub(crate) leader_timeout_total: IntCounterVec, @@ -120,6 +151,7 @@ pub(crate) struct NodeMetrics { pub(crate) threshold_clock_round: IntGauge, pub(crate) subscriber_connection_attempts: IntCounterVec, pub(crate) subscriber_connections: IntGaugeVec, + pub(crate) subscribed_peers: IntGaugeVec, pub(crate) commit_sync_inflight_fetches: IntGauge, pub(crate) commit_sync_pending_fetches: IntGauge, pub(crate) commit_sync_fetched_commits: IntCounter, @@ -160,6 +192,18 @@ impl NodeMetrics { exponential_buckets(1.0, 1.4, 20).unwrap(), registry, ).unwrap(), + highest_verified_authority_round: register_int_gauge_vec_with_registry!( + "highest_verified_authority_round", + "The highest round of verified block for the corresponding authority", + &["authority"], + registry, + ).unwrap(), + lowest_verified_authority_round: register_int_gauge_vec_with_registry!( + "lowest_verified_authority_round", + "The lowest round of verified block for the corresponding authority", + &["authority"], + registry, + ).unwrap(), block_proposal_leader_wait_ms: register_int_counter_vec_with_registry!( "block_proposal_leader_wait_ms", "Total time in ms spent waiting for a leader when proposing blocks.", @@ -181,7 +225,7 @@ impl NodeMetrics { blocks_per_commit_count: register_histogram_with_registry!( "blocks_per_commit_count", "The number of blocks per commit.", - COMMITTED_BLOCKS_BUCKETS.to_vec(), + NUM_BLOCKS_BUCKETS.to_vec(), registry, ).unwrap(), broadcaster_rtt_estimate_ms: register_int_gauge_vec_with_registry!( @@ -190,6 +234,12 @@ impl NodeMetrics { &["peer"], registry, ).unwrap(), + core_add_blocks_batch_size: register_histogram_with_registry!( + "core_add_blocks_batch_size", + "The number of blocks received from Core for processing on a single batch", + NUM_BLOCKS_BUCKETS.to_vec(), + registry, + ).unwrap(), core_lock_dequeued: register_int_counter_with_registry!( "core_lock_dequeued", "Number of dequeued core requests", @@ -200,6 +250,12 @@ impl NodeMetrics { "Number of enqueued core requests", registry, ).unwrap(), + highest_accepted_authority_round: register_int_gauge_vec_with_registry!( + "highest_accepted_authority_round", + "The highest round where a block has been accepted by author. Resets on restart.", + &["author"], + registry, + ).unwrap(), highest_accepted_round: register_int_gauge_with_registry!( "highest_accepted_round", "The highest round where a block has been accepted. Resets on restart.", @@ -237,9 +293,15 @@ impl NodeMetrics { "Designates whether the synchronizer scheduler task to fetch blocks is currently running", registry, ).unwrap(), - fetched_blocks: register_int_counter_vec_with_registry!( - "fetched_blocks", - "Number of fetched blocks per peer authority via the synchronizer.", + synchronizer_fetched_blocks_by_peer: register_int_counter_vec_with_registry!( + "synchronizer_fetched_blocks_by_peer", + "Number of fetched blocks per peer authority via the synchronizer and also by block authority", + &["peer", "type"], + registry, + ).unwrap(), + synchronizer_fetched_blocks_by_authority: register_int_counter_vec_with_registry!( + "synchronizer_fetched_blocks_by_authority", + "Number of fetched blocks per block author via the synchronizer", &["authority", "type"], registry, ).unwrap(), @@ -274,11 +336,22 @@ impl NodeMetrics { &["authority", "commit_type"], registry, ).unwrap(), + last_committed_authority_round: register_int_gauge_vec_with_registry!( + "last_committed_authority_round", + "The last round committed by authority.", + &["authority"], + registry, + ).unwrap(), last_committed_leader_round: register_int_gauge_with_registry!( "last_committed_leader_round", "The last round where a leader was committed to store and sent to commit consumer.", registry, ).unwrap(), + last_commit_index: register_int_gauge_with_registry!( + "last_commit_index", + "Index of the last commit.", + registry, + ).unwrap(), commit_round_advancement_interval: register_histogram_with_registry!( "commit_round_advancement_interval", "Intervals (in secs) between commit round advancements.", @@ -380,7 +453,13 @@ impl NodeMetrics { ).unwrap(), subscriber_connections: register_int_gauge_vec_with_registry!( "subscriber_connections", - "The number of block stream connections broken down by peer", + "Peers that this authority subscribed to for block streams.", + &["authority"], + registry, + ).unwrap(), + subscribed_peers: register_int_gauge_vec_with_registry!( + "subscribed_peers", + "Peers subscribing for block streams from this authority.", &["authority"], registry, ).unwrap(), diff --git a/consensus/core/src/subscriber.rs b/consensus/core/src/subscriber.rs index 7cc957bd9b191e..85e534b83ffbf5 100644 --- a/consensus/core/src/subscriber.rs +++ b/consensus/core/src/subscriber.rs @@ -74,13 +74,6 @@ impl Subscriber { peer, last_received, ))); - let peer_hostname = &self.context.committee.authority(peer).hostname; - self.context - .metrics - .node_metrics - .subscriber_connections - .with_label_values(&[peer_hostname]) - .set(1); } pub(crate) fn stop(&self) { @@ -92,15 +85,17 @@ impl Subscriber { fn unsubscribe_locked(&self, peer: AuthorityIndex, subscription: &mut Option>) { let peer_hostname = &self.context.committee.authority(peer).hostname; + if let Some(subscription) = subscription.take() { + subscription.abort(); + } + // There is a race between shutting down the subscription task and clearing the metric here. + // TODO: fix the race when unsubscribe_locked() gets called outside of stop(). self.context .metrics .node_metrics .subscriber_connections .with_label_values(&[peer_hostname]) .set(0); - if let Some(subscription) = subscription.take() { - subscription.abort(); - } } async fn subscription_loop( @@ -119,6 +114,13 @@ impl Subscriber { let mut retries: i64 = 0; let mut delay = INITIAL_RETRY_INTERVAL; 'subscription: loop { + context + .metrics + .node_metrics + .subscriber_connections + .with_label_values(&[peer_hostname]) + .set(0); + if retries > IMMEDIATE_RETRIES { debug!( "Delaying retry {} of peer {} subscription, in {} seconds", @@ -139,6 +141,7 @@ impl Subscriber { delay = INITIAL_RETRY_INTERVAL; } retries += 1; + let mut blocks = match network_client .subscribe_blocks(peer, last_received, MAX_RETRY_INTERVAL) .await @@ -164,6 +167,16 @@ impl Subscriber { continue 'subscription; } }; + + // Now can consider the subscription successful + let peer_hostname = &context.committee.authority(peer).hostname; + context + .metrics + .node_metrics + .subscriber_connections + .with_label_values(&[peer_hostname]) + .set(1); + 'stream: loop { match blocks.next().await { Some(block) => { diff --git a/consensus/core/src/synchronizer.rs b/consensus/core/src/synchronizer.rs index f6f5911acd49dd..7f58314827ee04 100644 --- a/consensus/core/src/synchronizer.rs +++ b/consensus/core/src/synchronizer.rs @@ -400,19 +400,15 @@ impl Synchronizer { match response { Ok(blocks) => { - let peer_hostname = &context.committee.authority(peer_index).hostname; - context - .metrics - .node_metrics - .fetched_blocks.with_label_values(&[peer_hostname, "live"]).inc_by(blocks.len() as u64); - if let Err(err) = Self::process_fetched_blocks(blocks, peer_index, blocks_guard, core_dispatcher.clone(), block_verifier.clone(), context.clone(), - commands_sender.clone()).await { + commands_sender.clone(), + "live" + ).await { warn!("Error while processing fetched blocks from peer {peer_index}: {err}"); } }, @@ -445,6 +441,7 @@ impl Synchronizer, context: Arc, commands_sender: Sender, + sync_method: &str, ) -> ConsensusResult<()> { // The maximum number of blocks that can be additionally fetched from the one requested - those // are potentially missing ancestors. @@ -485,6 +482,20 @@ impl Synchronizer Synchronizer