Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
mwtian committed Jun 5, 2024
1 parent 5712130 commit 837f4a5
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 74 deletions.
8 changes: 4 additions & 4 deletions consensus/core/src/authority_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,8 @@ impl<T: 'static + Clone + Send> BroadcastStream<T> {
context
.metrics
.node_metrics
.subscriber_connections
.with_label_values(&[peer_hostname, "inbound"])
.subscribed_peers
.with_label_values(&[peer_hostname])
.set(1);
Self {
context,
Expand Down Expand Up @@ -426,8 +426,8 @@ impl<T> Drop for BroadcastStream<T> {
self.context
.metrics
.node_metrics
.subscriber_connections
.with_label_values(&[peer_hostname, "inbound"])
.subscribed_peers
.with_label_values(&[peer_hostname])
.set(0);
}
}
Expand Down
25 changes: 15 additions & 10 deletions consensus/core/src/block_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub(crate) struct BlockManager {
missing_blocks: BTreeSet<BlockRef>,
/// A vector that holds a tuple of (lowest_round, highest_round) of received blocks per authority.
/// This is used for metrics reporting purposes and resets during restarts.
received_block_rounds: Vec<(Round, Round)>,
received_block_rounds: Vec<Option<(Round, Round)>>,
}

impl BlockManager {
Expand All @@ -69,14 +69,15 @@ impl BlockManager {
dag_state: Arc<RwLock<DagState>>,
block_verifier: Arc<dyn BlockVerifier>,
) -> Self {
let committee_size = context.committee.size();
Self {
context: context.clone(),
context,
dag_state,
block_verifier,
suspended_blocks: BTreeMap::new(),
missing_ancestors: BTreeMap::new(),
missing_blocks: BTreeSet::new(),
received_block_rounds: vec![(0, 0); context.committee.size()],
received_block_rounds: vec![None; committee_size],
}
}

Expand All @@ -99,7 +100,7 @@ impl BlockManager {
let mut missing_blocks = BTreeSet::new();

for block in blocks {
self.update_block_receive_metrics(&block);
self.update_block_received_metrics(&block);

// Try to accept the input block.
let block_ref = block.reference();
Expand Down Expand Up @@ -349,22 +350,26 @@ impl BlockManager {
self.missing_blocks.clone()
}

fn update_block_receive_metrics(&mut self, block: &VerifiedBlock) {
let (min_round, max_round) = self.received_block_rounds[block.author()];
self.received_block_rounds[block.author()] =
(min_round.min(block.round()), max_round.max(block.round()));
fn update_block_received_metrics(&mut self, block: &VerifiedBlock) {
let (min_round, max_round) =
if let Some((curr_min, curr_max)) = self.received_block_rounds[block.author()] {
(curr_min.min(block.round()), curr_max.max(block.round()))
} else {
(block.round(), block.round())
};
self.received_block_rounds[block.author()] = Some((min_round, max_round));

let hostname = &self.context.committee.authority(block.author()).hostname;
self.context
.metrics
.node_metrics
.block_lowest_received_round
.lowest_verified_authority_round
.with_label_values(&[hostname])
.set(min_round.into());
self.context
.metrics
.node_metrics
.block_highest_received_round
.highest_verified_authority_round
.with_label_values(&[hostname])
.set(max_round.into());
}
Expand Down
14 changes: 0 additions & 14 deletions consensus/core/src/commit_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,20 +171,6 @@ impl CommitObserver {
.node_metrics
.last_committed_leader_round
.set(block.round() as i64);

let hostname = &self.context.committee.authority(block.author()).hostname;
self.context
.metrics
.node_metrics
.last_committed_authority_round
.with_label_values(&[hostname])
.set(block.round().into());
self.context
.metrics
.node_metrics
.committed_blocks
.with_label_values(&[hostname])
.inc();
}

self.context
Expand Down
2 changes: 1 addition & 1 deletion consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl Core {
self.context
.metrics
.node_metrics
.core_add_blocks
.core_add_blocks_batch_size
.observe(blocks.len() as f64);

// Try to accept them via the block manager
Expand Down
20 changes: 18 additions & 2 deletions consensus/core/src/dag_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,16 +232,21 @@ impl DagState {
self.recent_blocks.insert(block_ref, block.clone());
self.recent_refs[block_ref.author].insert(block_ref);
self.highest_accepted_round = max(self.highest_accepted_round, block.round());
self.context
.metrics
.node_metrics
.highest_accepted_round
.set(self.highest_accepted_round as i64);

let highest_accepted_round_for_author = self.recent_refs[block_ref.author]
.last()
.map(|block_ref| block_ref.round)
.expect("There should be by now at least one block ref");
let hostname = &self.context.committee.authority(block.author()).hostname;
let hostname = &self.context.committee.authority(block_ref.author).hostname;
self.context
.metrics
.node_metrics
.highest_accepted_round
.highest_accepted_authority_round
.with_label_values(&[hostname])
.set(highest_accepted_round_for_author as i64);
}
Expand Down Expand Up @@ -613,6 +618,17 @@ impl DagState {
);
}

for (i, round) in self.last_committed_rounds.iter().enumerate() {
let index = self.context.committee.to_authority_index(i).unwrap();
let hostname = &self.context.committee.authority(index).hostname;
self.context
.metrics
.node_metrics
.last_committed_authority_round
.with_label_values(&[hostname])
.set((*round).into());
}

self.pending_commit_votes.push_back(commit.reference());
self.commits_to_write.push(commit);
}
Expand Down
75 changes: 45 additions & 30 deletions consensus/core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ const FINE_GRAINED_LATENCY_SEC_BUCKETS: &[f64] = &[
4.0, 4.5, 5.0, 5.5, 6.0, 6.5, 7.0, 7.5, 8.0, 8.5, 9.0, 9.5, 10.,
];

const COMMITTED_BLOCKS_BUCKETS: &[f64] = &[
const NUM_BLOCKS_BUCKETS: &[f64] = &[
1.0, 2.0, 4.0, 8.0, 10.0, 20.0, 40.0, 80.0, 100.0, 150.0, 200.0, 400.0, 800.0, 1000.0, 2000.0,
3000.0,
3000.0, 5000.0, 10000.0, 20000.0, 30000.0, 50000.0, 100_000.0, 200_000.0, 300_000.0, 500_000.0,
1_000_000.0,
];

const LATENCY_SEC_BUCKETS: &[f64] = &[
Expand Down Expand Up @@ -80,25 +81,26 @@ pub(crate) struct NodeMetrics {
pub(crate) proposed_blocks: IntCounterVec,
pub(crate) block_size: Histogram,
pub(crate) block_ancestors: Histogram,
pub(crate) block_highest_received_round: IntGaugeVec,
pub(crate) block_lowest_received_round: IntGaugeVec,
pub(crate) highest_verified_authority_round: IntGaugeVec,
pub(crate) lowest_verified_authority_round: IntGaugeVec,
pub(crate) block_proposal_leader_wait_ms: IntCounterVec,
pub(crate) block_proposal_leader_wait_count: IntCounterVec,
pub(crate) block_timestamp_drift_wait_ms: IntCounterVec,
pub(crate) blocks_per_commit_count: Histogram,
pub(crate) broadcaster_rtt_estimate_ms: IntGaugeVec,
pub(crate) committed_blocks: IntGaugeVec,
pub(crate) core_add_blocks: Histogram,
pub(crate) core_add_blocks_batch_size: Histogram,
pub(crate) core_lock_dequeued: IntCounter,
pub(crate) core_lock_enqueued: IntCounter,
pub(crate) highest_accepted_round: IntGaugeVec,
pub(crate) highest_accepted_authority_round: IntGaugeVec,
pub(crate) highest_accepted_round: IntGauge,
pub(crate) accepted_blocks: IntCounterVec,
pub(crate) dag_state_recent_blocks: IntGauge,
pub(crate) dag_state_recent_refs: IntGauge,
pub(crate) dag_state_store_read_count: IntCounterVec,
pub(crate) dag_state_store_write_count: IntCounter,
pub(crate) fetch_blocks_scheduler_inflight: IntGauge,
pub(crate) fetched_blocks: IntCounterVec,
pub(crate) synchronizer_fetched_blocks_by_peer: IntCounterVec,
pub(crate) synchronizer_fetched_blocks_by_authority: IntCounterVec,
pub(crate) invalid_blocks: IntCounterVec,
pub(crate) rejected_blocks: IntCounterVec,
pub(crate) rejected_future_blocks: IntCounterVec,
Expand All @@ -125,6 +127,7 @@ pub(crate) struct NodeMetrics {
pub(crate) threshold_clock_round: IntGauge,
pub(crate) subscriber_connection_attempts: IntCounterVec,
pub(crate) subscriber_connections: IntGaugeVec,
pub(crate) subscribed_peers: IntGaugeVec,
pub(crate) commit_sync_inflight_fetches: IntGauge,
pub(crate) commit_sync_pending_fetches: IntGauge,
pub(crate) commit_sync_fetched_commits: IntCounter,
Expand Down Expand Up @@ -165,15 +168,15 @@ impl NodeMetrics {
exponential_buckets(1.0, 1.4, 20).unwrap(),
registry,
).unwrap(),
block_highest_received_round: register_int_gauge_vec_with_registry!(
"block_highest_received_round",
"The highest received block round for the corresponding authority",
highest_verified_authority_round: register_int_gauge_vec_with_registry!(
"highest_verified_authority_round",
"The highest round of verified block for the corresponding authority",
&["authority"],
registry,
).unwrap(),
block_lowest_received_round: register_int_gauge_vec_with_registry!(
"block_lowest_received_round",
"The lowest received block round for the corresponding authority",
lowest_verified_authority_round: register_int_gauge_vec_with_registry!(
"lowest_verified_authority_round",
"The lowest round of verified block for the corresponding authority",
&["authority"],
registry,
).unwrap(),
Expand All @@ -198,7 +201,7 @@ impl NodeMetrics {
blocks_per_commit_count: register_histogram_with_registry!(
"blocks_per_commit_count",
"The number of blocks per commit.",
COMMITTED_BLOCKS_BUCKETS.to_vec(),
NUM_BLOCKS_BUCKETS.to_vec(),
registry,
).unwrap(),
broadcaster_rtt_estimate_ms: register_int_gauge_vec_with_registry!(
Expand All @@ -207,15 +210,10 @@ impl NodeMetrics {
&["peer"],
registry,
).unwrap(),
committed_blocks: register_int_gauge_vec_with_registry!(
"committed_blocks",
"Number of committed blocks per authority",
&["authority"],
registry,
).unwrap(),
core_add_blocks: register_histogram_with_registry!(
"core_add_blocks",
core_add_blocks_batch_size: register_histogram_with_registry!(
"core_add_blocks_batch_size",
"The number of blocks received from Core for processing on a single batch",
NUM_BLOCKS_BUCKETS.to_vec(),
registry,
).unwrap(),
core_lock_dequeued: register_int_counter_with_registry!(
Expand All @@ -228,12 +226,17 @@ impl NodeMetrics {
"Number of enqueued core requests",
registry,
).unwrap(),
highest_accepted_round: register_int_gauge_vec_with_registry!(
"highest_accepted_round",
highest_accepted_authority_round: register_int_gauge_vec_with_registry!(
"highest_accepted_authority_round",
"The highest round where a block has been accepted by author. Resets on restart.",
&["author"],
registry,
).unwrap(),
highest_accepted_round: register_int_gauge_with_registry!(
"highest_accepted_round",
"The highest round where a block has been accepted. Resets on restart.",
registry,
).unwrap(),
accepted_blocks: register_int_counter_vec_with_registry!(
"accepted_blocks",
"Number of accepted blocks by source (own, others)",
Expand Down Expand Up @@ -266,10 +269,16 @@ impl NodeMetrics {
"Designates whether the synchronizer scheduler task to fetch blocks is currently running",
registry,
).unwrap(),
fetched_blocks: register_int_counter_vec_with_registry!(
"fetched_blocks",
synchronizer_fetched_blocks_by_peer: register_int_counter_vec_with_registry!(
"synchronizer_fetched_blocks_by_peer",
"Number of fetched blocks per peer authority via the synchronizer and also by block authority",
&["peer", "type", "authority"],
&["peer", "type"],
registry,
).unwrap(),
synchronizer_fetched_blocks_by_authority: register_int_counter_vec_with_registry!(
"synchronizer_fetched_blocks_by_authority",
"Number of fetched blocks per block author via the synchronizer",
&["authority", "type"],
registry,
).unwrap(),
// TODO: add a short status label.
Expand Down Expand Up @@ -415,8 +424,14 @@ impl NodeMetrics {
).unwrap(),
subscriber_connections: register_int_gauge_vec_with_registry!(
"subscriber_connections",
"The number of block stream connections broken down by peer. The direction can be outbound (our node is subscribed to a peer) or inbound (a peer is subscribed to our node)",
&["authority", "direction"],
"Peers that this authority subscribed to for block streams.",
&["authority"],
registry,
).unwrap(),
subscribed_peers: register_int_gauge_vec_with_registry!(
"subscribed_peers",
"Peers subscribing for block streams from this authority.",
&["authority"],
registry,
).unwrap(),
commit_sync_inflight_fetches: register_int_gauge_with_registry!(
Expand Down
15 changes: 9 additions & 6 deletions consensus/core/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,17 @@ impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {

fn unsubscribe_locked(&self, peer: AuthorityIndex, subscription: &mut Option<JoinHandle<()>>) {
let peer_hostname = &self.context.committee.authority(peer).hostname;
if let Some(subscription) = subscription.take() {
subscription.abort();
}
// There is a race between shutting down the subscription task and clearing the metric here.
// TODO: fix the race when unsubscribe_locked() gets called outside of stop().
self.context
.metrics
.node_metrics
.subscriber_connections
.with_label_values(&[peer_hostname, "outbound"])
.with_label_values(&[peer_hostname])
.set(0);
if let Some(subscription) = subscription.take() {
subscription.abort();
}
}

async fn subscription_loop(
Expand All @@ -116,7 +118,7 @@ impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
.metrics
.node_metrics
.subscriber_connections
.with_label_values(&[peer_hostname, "outbound"])
.with_label_values(&[peer_hostname])
.set(0);

if retries > IMMEDIATE_RETRIES {
Expand All @@ -139,6 +141,7 @@ impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
delay = INITIAL_RETRY_INTERVAL;
}
retries += 1;

let mut blocks = match network_client
.subscribe_blocks(peer, last_received, MAX_RETRY_INTERVAL)
.await
Expand Down Expand Up @@ -171,7 +174,7 @@ impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
.metrics
.node_metrics
.subscriber_connections
.with_label_values(&[peer_hostname, "outbound"])
.with_label_values(&[peer_hostname])
.set(1);

'stream: loop {
Expand Down
17 changes: 10 additions & 7 deletions consensus/core/src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,15 +477,18 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
}
}

let metrics = &context.metrics.node_metrics;
let peer_hostname = &context.committee.authority(peer_index).hostname;
metrics
.synchronizer_fetched_blocks_by_peer
.with_label_values(&[peer_hostname, &sync_method])
.inc_by(blocks.len() as u64);
for block in &blocks {
let peer_hostname = &context.committee.authority(peer_index).hostname;
let block_hostname = &context.committee.authority(block.author()).hostname;
context
.metrics
.node_metrics
.fetched_blocks
.with_label_values(&[peer_hostname, &sync_method, block_hostname])
.inc_by(1u64);
metrics
.synchronizer_fetched_blocks_by_authority
.with_label_values(&[block_hostname, &sync_method])
.inc();
}

debug!(
Expand Down

0 comments on commit 837f4a5

Please sign in to comment.