Skip to content

Commit

Permalink
[Consensus 2.0] Additional metrics & logs (MystenLabs#18075)
Browse files Browse the repository at this point in the history
## Description 

More metrics and logs related to block receive/acceptance/commit ,
subscription etc

## Test plan 

CI

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:

---------

Co-authored-by: MW Tian <mingwei@mystenlabs.com>
  • Loading branch information
2 people authored and tx-tomcat committed Jul 29, 2024
1 parent 53ddc93 commit a74b23e
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 53 deletions.
32 changes: 29 additions & 3 deletions consensus/core/src/authority_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
}
let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);

debug!("Received block {verified_block} via send block.");

// Reject block with timestamp too far in the future.
let now = self.context.clock.timestamp_utc_ms();
let forward_time_drift =
Expand Down Expand Up @@ -236,8 +238,11 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
.into_iter()
.map(|block| block.serialized().clone()),
);
let broadcasted_blocks =
BroadcastedBlockStream::new(peer, self.rx_block_broadcaster.resubscribe());
let broadcasted_blocks = BroadcastedBlockStream::new(
self.context.clone(),
peer,
self.rx_block_broadcaster.resubscribe(),
);

// Return a stream of blocks that first yields missed blocks as requested, then new blocks.
Ok(Box::pin(missed_blocks.chain(
Expand Down Expand Up @@ -358,6 +363,7 @@ pub(crate) type BroadcastedBlockStream = BroadcastStream<VerifiedBlock>;
/// Adapted from `tokio_stream::wrappers::BroadcastStream`. The main difference is that
/// this tolerates lags with only logging, without yielding errors.
pub(crate) struct BroadcastStream<T> {
context: Arc<Context>,
peer: AuthorityIndex,
// Stores the receiver across poll_next() calls.
inner: ReusableBoxFuture<
Expand All @@ -370,8 +376,16 @@ pub(crate) struct BroadcastStream<T> {
}

impl<T: 'static + Clone + Send> BroadcastStream<T> {
pub fn new(peer: AuthorityIndex, rx: broadcast::Receiver<T>) -> Self {
pub fn new(context: Arc<Context>, peer: AuthorityIndex, rx: broadcast::Receiver<T>) -> Self {
let peer_hostname = &context.committee.authority(peer).hostname;
context
.metrics
.node_metrics
.subscribed_peers
.with_label_values(&[peer_hostname])
.set(1);
Self {
context,
peer,
inner: ReusableBoxFuture::new(make_recv_future(rx)),
}
Expand Down Expand Up @@ -409,6 +423,18 @@ impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
}
}

impl<T> Drop for BroadcastStream<T> {
fn drop(&mut self) {
let peer_hostname = &self.context.committee.authority(self.peer).hostname;
self.context
.metrics
.node_metrics
.subscribed_peers
.with_label_values(&[peer_hostname])
.set(0);
}
}

async fn make_recv_future<T: Clone>(
mut rx: broadcast::Receiver<T>,
) -> (
Expand Down
37 changes: 37 additions & 0 deletions consensus/core/src/block_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::{
block_verifier::BlockVerifier,
context::Context,
dag_state::DagState,
Round,
};

struct SuspendedBlock {
Expand Down Expand Up @@ -57,6 +58,9 @@ pub(crate) struct BlockManager {
/// Keeps all the blocks that we actually miss and haven't fetched them yet. That set will basically contain all the
/// keys from the `missing_ancestors` minus any keys that exist in `suspended_blocks`.
missing_blocks: BTreeSet<BlockRef>,
/// A vector that holds a tuple of (lowest_round, highest_round) of received blocks per authority.
/// This is used for metrics reporting purposes and resets during restarts.
received_block_rounds: Vec<Option<(Round, Round)>>,
}

impl BlockManager {
Expand All @@ -65,13 +69,15 @@ impl BlockManager {
dag_state: Arc<RwLock<DagState>>,
block_verifier: Arc<dyn BlockVerifier>,
) -> Self {
let committee_size = context.committee.size();
Self {
context,
dag_state,
block_verifier,
suspended_blocks: BTreeMap::new(),
missing_ancestors: BTreeMap::new(),
missing_blocks: BTreeSet::new(),
received_block_rounds: vec![None; committee_size],
}
}

Expand All @@ -94,10 +100,17 @@ impl BlockManager {
let mut missing_blocks = BTreeSet::new();

for block in blocks {
self.update_block_received_metrics(&block);

// Try to accept the input block.
let block_ref = block.reference();
let block = match self.try_accept_one_block(block) {
TryAcceptResult::Accepted(block) => block,
TryAcceptResult::Suspended(ancestors_to_fetch) => {
debug!(
"Missing ancestors for block {block_ref}: {}",
ancestors_to_fetch.iter().map(|b| b.to_string()).join(",")
);
missing_blocks.extend(ancestors_to_fetch);
continue;
}
Expand Down Expand Up @@ -337,6 +350,30 @@ impl BlockManager {
self.missing_blocks.clone()
}

fn update_block_received_metrics(&mut self, block: &VerifiedBlock) {
let (min_round, max_round) =
if let Some((curr_min, curr_max)) = self.received_block_rounds[block.author()] {
(curr_min.min(block.round()), curr_max.max(block.round()))
} else {
(block.round(), block.round())
};
self.received_block_rounds[block.author()] = Some((min_round, max_round));

let hostname = &self.context.committee.authority(block.author()).hostname;
self.context
.metrics
.node_metrics
.lowest_verified_authority_round
.with_label_values(&[hostname])
.set(min_round.into());
self.context
.metrics
.node_metrics
.highest_verified_authority_round
.with_label_values(&[hostname])
.set(max_round.into());
}

/// Returns all the suspended blocks whose causal history we miss hence we can't accept them yet.
#[cfg(test)]
fn suspended_blocks(&self) -> Vec<BlockRef> {
Expand Down
50 changes: 28 additions & 22 deletions consensus/core/src/commit_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{sync::Arc, time::Duration};

use mysten_metrics::monitored_mpsc::UnboundedSender;
use parking_lot::RwLock;
use tracing::info;

use crate::{
block::{BlockAPI, VerifiedBlock},
Expand Down Expand Up @@ -152,32 +153,37 @@ impl CommitObserver {
}

fn report_metrics(&self, committed: &[CommittedSubDag]) {
let metrics = &self.context.metrics.node_metrics;
let utc_now = self.context.clock.timestamp_utc_ms();
let mut total = 0;
for block in committed.iter().flat_map(|dag| &dag.blocks) {
let latency_ms = utc_now
.checked_sub(block.timestamp_ms())
.unwrap_or_default();

total += 1;

self.context
.metrics
.node_metrics
.block_commit_latency
.observe(Duration::from_millis(latency_ms).as_secs_f64());
self.context
.metrics
.node_metrics

for commit in committed {
info!(
"Consensus commit {} with leader {} has {} blocks",
commit.commit_ref,
commit.leader,
commit.blocks.len()
);

metrics
.last_committed_leader_round
.set(block.round() as i64);
.set(commit.leader.round as i64);
metrics
.last_commit_index
.set(commit.commit_ref.index as i64);
metrics
.blocks_per_commit_count
.observe(commit.blocks.len() as f64);

for block in &commit.blocks {
let latency_ms = utc_now
.checked_sub(block.timestamp_ms())
.unwrap_or_default();
metrics
.block_commit_latency
.observe(Duration::from_millis(latency_ms).as_secs_f64());
}
}

self.context
.metrics
.node_metrics
.blocks_per_commit_count
.observe(total as f64);
self.context
.metrics
.node_metrics
Expand Down
5 changes: 5 additions & 0 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ impl Core {
.scope_processing_time
.with_label_values(&["Core::add_blocks"])
.start_timer();
self.context
.metrics
.node_metrics
.core_add_blocks_batch_size
.observe(blocks.len() as f64);

// Try to accept them via the block manager
let (accepted_blocks, missing_blocks) = self.block_manager.try_accept_blocks(blocks);
Expand Down
23 changes: 23 additions & 0 deletions consensus/core/src/dag_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,18 @@ impl DagState {
.node_metrics
.highest_accepted_round
.set(self.highest_accepted_round as i64);

let highest_accepted_round_for_author = self.recent_refs[block_ref.author]
.last()
.map(|block_ref| block_ref.round)
.expect("There should be by now at least one block ref");
let hostname = &self.context.committee.authority(block_ref.author).hostname;
self.context
.metrics
.node_metrics
.highest_accepted_authority_round
.with_label_values(&[hostname])
.set(highest_accepted_round_for_author as i64);
}

/// Accepts a blocks into DagState and keeps it in memory.
Expand Down Expand Up @@ -606,6 +618,17 @@ impl DagState {
);
}

for (i, round) in self.last_committed_rounds.iter().enumerate() {
let index = self.context.committee.to_authority_index(i).unwrap();
let hostname = &self.context.committee.authority(index).hostname;
self.context
.metrics
.node_metrics
.last_committed_authority_round
.with_label_values(&[hostname])
.set((*round).into());
}

self.pending_commit_votes.push_back(commit.reference());
self.commits_to_write.push(commit);
}
Expand Down
Loading

0 comments on commit a74b23e

Please sign in to comment.