From 837f4a54b459f9f272ee663e760e9b2cd619451c Mon Sep 17 00:00:00 2001 From: MW Tian Date: Wed, 5 Jun 2024 12:26:45 -0700 Subject: [PATCH] update --- consensus/core/src/authority_service.rs | 8 +-- consensus/core/src/block_manager.rs | 25 +++++---- consensus/core/src/commit_observer.rs | 14 ----- consensus/core/src/core.rs | 2 +- consensus/core/src/dag_state.rs | 20 ++++++- consensus/core/src/metrics.rs | 75 +++++++++++++++---------- consensus/core/src/subscriber.rs | 15 +++-- consensus/core/src/synchronizer.rs | 17 +++--- 8 files changed, 102 insertions(+), 74 deletions(-) diff --git a/consensus/core/src/authority_service.rs b/consensus/core/src/authority_service.rs index 630d3456b319da..00d40522ea2f40 100644 --- a/consensus/core/src/authority_service.rs +++ b/consensus/core/src/authority_service.rs @@ -378,8 +378,8 @@ impl BroadcastStream { context .metrics .node_metrics - .subscriber_connections - .with_label_values(&[peer_hostname, "inbound"]) + .subscribed_peers + .with_label_values(&[peer_hostname]) .set(1); Self { context, @@ -426,8 +426,8 @@ impl Drop for BroadcastStream { self.context .metrics .node_metrics - .subscriber_connections - .with_label_values(&[peer_hostname, "inbound"]) + .subscribed_peers + .with_label_values(&[peer_hostname]) .set(0); } } diff --git a/consensus/core/src/block_manager.rs b/consensus/core/src/block_manager.rs index 41b5f127ca1632..68071fe3d5d265 100644 --- a/consensus/core/src/block_manager.rs +++ b/consensus/core/src/block_manager.rs @@ -60,7 +60,7 @@ pub(crate) struct BlockManager { missing_blocks: BTreeSet, /// A vector that holds a tuple of (lowest_round, highest_round) of received blocks per authority. /// This is used for metrics reporting purposes and resets during restarts. - received_block_rounds: Vec<(Round, Round)>, + received_block_rounds: Vec>, } impl BlockManager { @@ -69,14 +69,15 @@ impl BlockManager { dag_state: Arc>, block_verifier: Arc, ) -> Self { + let committee_size = context.committee.size(); Self { - context: context.clone(), + context, dag_state, block_verifier, suspended_blocks: BTreeMap::new(), missing_ancestors: BTreeMap::new(), missing_blocks: BTreeSet::new(), - received_block_rounds: vec![(0, 0); context.committee.size()], + received_block_rounds: vec![None; committee_size], } } @@ -99,7 +100,7 @@ impl BlockManager { let mut missing_blocks = BTreeSet::new(); for block in blocks { - self.update_block_receive_metrics(&block); + self.update_block_received_metrics(&block); // Try to accept the input block. let block_ref = block.reference(); @@ -349,22 +350,26 @@ impl BlockManager { self.missing_blocks.clone() } - fn update_block_receive_metrics(&mut self, block: &VerifiedBlock) { - let (min_round, max_round) = self.received_block_rounds[block.author()]; - self.received_block_rounds[block.author()] = - (min_round.min(block.round()), max_round.max(block.round())); + fn update_block_received_metrics(&mut self, block: &VerifiedBlock) { + let (min_round, max_round) = + if let Some((curr_min, curr_max)) = self.received_block_rounds[block.author()] { + (curr_min.min(block.round()), curr_max.max(block.round())) + } else { + (block.round(), block.round()) + }; + self.received_block_rounds[block.author()] = Some((min_round, max_round)); let hostname = &self.context.committee.authority(block.author()).hostname; self.context .metrics .node_metrics - .block_lowest_received_round + .lowest_verified_authority_round .with_label_values(&[hostname]) .set(min_round.into()); self.context .metrics .node_metrics - .block_highest_received_round + .highest_verified_authority_round .with_label_values(&[hostname]) .set(max_round.into()); } diff --git a/consensus/core/src/commit_observer.rs b/consensus/core/src/commit_observer.rs index 40d59fbc9608bc..c22677aac33d71 100644 --- a/consensus/core/src/commit_observer.rs +++ b/consensus/core/src/commit_observer.rs @@ -171,20 +171,6 @@ impl CommitObserver { .node_metrics .last_committed_leader_round .set(block.round() as i64); - - let hostname = &self.context.committee.authority(block.author()).hostname; - self.context - .metrics - .node_metrics - .last_committed_authority_round - .with_label_values(&[hostname]) - .set(block.round().into()); - self.context - .metrics - .node_metrics - .committed_blocks - .with_label_values(&[hostname]) - .inc(); } self.context diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index db0ecf7a3c560b..c2a00441b19cca 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -196,7 +196,7 @@ impl Core { self.context .metrics .node_metrics - .core_add_blocks + .core_add_blocks_batch_size .observe(blocks.len() as f64); // Try to accept them via the block manager diff --git a/consensus/core/src/dag_state.rs b/consensus/core/src/dag_state.rs index 6fcb952570dd49..1c0538af37d12a 100644 --- a/consensus/core/src/dag_state.rs +++ b/consensus/core/src/dag_state.rs @@ -232,16 +232,21 @@ impl DagState { self.recent_blocks.insert(block_ref, block.clone()); self.recent_refs[block_ref.author].insert(block_ref); self.highest_accepted_round = max(self.highest_accepted_round, block.round()); + self.context + .metrics + .node_metrics + .highest_accepted_round + .set(self.highest_accepted_round as i64); let highest_accepted_round_for_author = self.recent_refs[block_ref.author] .last() .map(|block_ref| block_ref.round) .expect("There should be by now at least one block ref"); - let hostname = &self.context.committee.authority(block.author()).hostname; + let hostname = &self.context.committee.authority(block_ref.author).hostname; self.context .metrics .node_metrics - .highest_accepted_round + .highest_accepted_authority_round .with_label_values(&[hostname]) .set(highest_accepted_round_for_author as i64); } @@ -613,6 +618,17 @@ impl DagState { ); } + for (i, round) in self.last_committed_rounds.iter().enumerate() { + let index = self.context.committee.to_authority_index(i).unwrap(); + let hostname = &self.context.committee.authority(index).hostname; + self.context + .metrics + .node_metrics + .last_committed_authority_round + .with_label_values(&[hostname]) + .set((*round).into()); + } + self.pending_commit_votes.push_back(commit.reference()); self.commits_to_write.push(commit); } diff --git a/consensus/core/src/metrics.rs b/consensus/core/src/metrics.rs index c2195ce7f864ba..da8754050ff8c0 100644 --- a/consensus/core/src/metrics.rs +++ b/consensus/core/src/metrics.rs @@ -19,9 +19,10 @@ const FINE_GRAINED_LATENCY_SEC_BUCKETS: &[f64] = &[ 4.0, 4.5, 5.0, 5.5, 6.0, 6.5, 7.0, 7.5, 8.0, 8.5, 9.0, 9.5, 10., ]; -const COMMITTED_BLOCKS_BUCKETS: &[f64] = &[ +const NUM_BLOCKS_BUCKETS: &[f64] = &[ 1.0, 2.0, 4.0, 8.0, 10.0, 20.0, 40.0, 80.0, 100.0, 150.0, 200.0, 400.0, 800.0, 1000.0, 2000.0, - 3000.0, + 3000.0, 5000.0, 10000.0, 20000.0, 30000.0, 50000.0, 100_000.0, 200_000.0, 300_000.0, 500_000.0, + 1_000_000.0, ]; const LATENCY_SEC_BUCKETS: &[f64] = &[ @@ -80,25 +81,26 @@ pub(crate) struct NodeMetrics { pub(crate) proposed_blocks: IntCounterVec, pub(crate) block_size: Histogram, pub(crate) block_ancestors: Histogram, - pub(crate) block_highest_received_round: IntGaugeVec, - pub(crate) block_lowest_received_round: IntGaugeVec, + pub(crate) highest_verified_authority_round: IntGaugeVec, + pub(crate) lowest_verified_authority_round: IntGaugeVec, pub(crate) block_proposal_leader_wait_ms: IntCounterVec, pub(crate) block_proposal_leader_wait_count: IntCounterVec, pub(crate) block_timestamp_drift_wait_ms: IntCounterVec, pub(crate) blocks_per_commit_count: Histogram, pub(crate) broadcaster_rtt_estimate_ms: IntGaugeVec, - pub(crate) committed_blocks: IntGaugeVec, - pub(crate) core_add_blocks: Histogram, + pub(crate) core_add_blocks_batch_size: Histogram, pub(crate) core_lock_dequeued: IntCounter, pub(crate) core_lock_enqueued: IntCounter, - pub(crate) highest_accepted_round: IntGaugeVec, + pub(crate) highest_accepted_authority_round: IntGaugeVec, + pub(crate) highest_accepted_round: IntGauge, pub(crate) accepted_blocks: IntCounterVec, pub(crate) dag_state_recent_blocks: IntGauge, pub(crate) dag_state_recent_refs: IntGauge, pub(crate) dag_state_store_read_count: IntCounterVec, pub(crate) dag_state_store_write_count: IntCounter, pub(crate) fetch_blocks_scheduler_inflight: IntGauge, - pub(crate) fetched_blocks: IntCounterVec, + pub(crate) synchronizer_fetched_blocks_by_peer: IntCounterVec, + pub(crate) synchronizer_fetched_blocks_by_authority: IntCounterVec, pub(crate) invalid_blocks: IntCounterVec, pub(crate) rejected_blocks: IntCounterVec, pub(crate) rejected_future_blocks: IntCounterVec, @@ -125,6 +127,7 @@ pub(crate) struct NodeMetrics { pub(crate) threshold_clock_round: IntGauge, pub(crate) subscriber_connection_attempts: IntCounterVec, pub(crate) subscriber_connections: IntGaugeVec, + pub(crate) subscribed_peers: IntGaugeVec, pub(crate) commit_sync_inflight_fetches: IntGauge, pub(crate) commit_sync_pending_fetches: IntGauge, pub(crate) commit_sync_fetched_commits: IntCounter, @@ -165,15 +168,15 @@ impl NodeMetrics { exponential_buckets(1.0, 1.4, 20).unwrap(), registry, ).unwrap(), - block_highest_received_round: register_int_gauge_vec_with_registry!( - "block_highest_received_round", - "The highest received block round for the corresponding authority", + highest_verified_authority_round: register_int_gauge_vec_with_registry!( + "highest_verified_authority_round", + "The highest round of verified block for the corresponding authority", &["authority"], registry, ).unwrap(), - block_lowest_received_round: register_int_gauge_vec_with_registry!( - "block_lowest_received_round", - "The lowest received block round for the corresponding authority", + lowest_verified_authority_round: register_int_gauge_vec_with_registry!( + "lowest_verified_authority_round", + "The lowest round of verified block for the corresponding authority", &["authority"], registry, ).unwrap(), @@ -198,7 +201,7 @@ impl NodeMetrics { blocks_per_commit_count: register_histogram_with_registry!( "blocks_per_commit_count", "The number of blocks per commit.", - COMMITTED_BLOCKS_BUCKETS.to_vec(), + NUM_BLOCKS_BUCKETS.to_vec(), registry, ).unwrap(), broadcaster_rtt_estimate_ms: register_int_gauge_vec_with_registry!( @@ -207,15 +210,10 @@ impl NodeMetrics { &["peer"], registry, ).unwrap(), - committed_blocks: register_int_gauge_vec_with_registry!( - "committed_blocks", - "Number of committed blocks per authority", - &["authority"], - registry, - ).unwrap(), - core_add_blocks: register_histogram_with_registry!( - "core_add_blocks", + core_add_blocks_batch_size: register_histogram_with_registry!( + "core_add_blocks_batch_size", "The number of blocks received from Core for processing on a single batch", + NUM_BLOCKS_BUCKETS.to_vec(), registry, ).unwrap(), core_lock_dequeued: register_int_counter_with_registry!( @@ -228,12 +226,17 @@ impl NodeMetrics { "Number of enqueued core requests", registry, ).unwrap(), - highest_accepted_round: register_int_gauge_vec_with_registry!( - "highest_accepted_round", + highest_accepted_authority_round: register_int_gauge_vec_with_registry!( + "highest_accepted_authority_round", "The highest round where a block has been accepted by author. Resets on restart.", &["author"], registry, ).unwrap(), + highest_accepted_round: register_int_gauge_with_registry!( + "highest_accepted_round", + "The highest round where a block has been accepted. Resets on restart.", + registry, + ).unwrap(), accepted_blocks: register_int_counter_vec_with_registry!( "accepted_blocks", "Number of accepted blocks by source (own, others)", @@ -266,10 +269,16 @@ impl NodeMetrics { "Designates whether the synchronizer scheduler task to fetch blocks is currently running", registry, ).unwrap(), - fetched_blocks: register_int_counter_vec_with_registry!( - "fetched_blocks", + synchronizer_fetched_blocks_by_peer: register_int_counter_vec_with_registry!( + "synchronizer_fetched_blocks_by_peer", "Number of fetched blocks per peer authority via the synchronizer and also by block authority", - &["peer", "type", "authority"], + &["peer", "type"], + registry, + ).unwrap(), + synchronizer_fetched_blocks_by_authority: register_int_counter_vec_with_registry!( + "synchronizer_fetched_blocks_by_authority", + "Number of fetched blocks per block author via the synchronizer", + &["authority", "type"], registry, ).unwrap(), // TODO: add a short status label. @@ -415,8 +424,14 @@ impl NodeMetrics { ).unwrap(), subscriber_connections: register_int_gauge_vec_with_registry!( "subscriber_connections", - "The number of block stream connections broken down by peer. The direction can be outbound (our node is subscribed to a peer) or inbound (a peer is subscribed to our node)", - &["authority", "direction"], + "Peers that this authority subscribed to for block streams.", + &["authority"], + registry, + ).unwrap(), + subscribed_peers: register_int_gauge_vec_with_registry!( + "subscribed_peers", + "Peers subscribing for block streams from this authority.", + &["authority"], registry, ).unwrap(), commit_sync_inflight_fetches: register_int_gauge_with_registry!( diff --git a/consensus/core/src/subscriber.rs b/consensus/core/src/subscriber.rs index 8c7eca05b40d3a..7c194f003502a9 100644 --- a/consensus/core/src/subscriber.rs +++ b/consensus/core/src/subscriber.rs @@ -85,15 +85,17 @@ impl Subscriber { fn unsubscribe_locked(&self, peer: AuthorityIndex, subscription: &mut Option>) { let peer_hostname = &self.context.committee.authority(peer).hostname; + if let Some(subscription) = subscription.take() { + subscription.abort(); + } + // There is a race between shutting down the subscription task and clearing the metric here. + // TODO: fix the race when unsubscribe_locked() gets called outside of stop(). self.context .metrics .node_metrics .subscriber_connections - .with_label_values(&[peer_hostname, "outbound"]) + .with_label_values(&[peer_hostname]) .set(0); - if let Some(subscription) = subscription.take() { - subscription.abort(); - } } async fn subscription_loop( @@ -116,7 +118,7 @@ impl Subscriber { .metrics .node_metrics .subscriber_connections - .with_label_values(&[peer_hostname, "outbound"]) + .with_label_values(&[peer_hostname]) .set(0); if retries > IMMEDIATE_RETRIES { @@ -139,6 +141,7 @@ impl Subscriber { delay = INITIAL_RETRY_INTERVAL; } retries += 1; + let mut blocks = match network_client .subscribe_blocks(peer, last_received, MAX_RETRY_INTERVAL) .await @@ -171,7 +174,7 @@ impl Subscriber { .metrics .node_metrics .subscriber_connections - .with_label_values(&[peer_hostname, "outbound"]) + .with_label_values(&[peer_hostname]) .set(1); 'stream: loop { diff --git a/consensus/core/src/synchronizer.rs b/consensus/core/src/synchronizer.rs index ad378d8cba2140..e2122d3f4a2c05 100644 --- a/consensus/core/src/synchronizer.rs +++ b/consensus/core/src/synchronizer.rs @@ -477,15 +477,18 @@ impl Synchronizer