Skip to content

Commit

Permalink
Add checkpoint metrics (MystenLabs#3204)
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind authored Jul 14, 2022
1 parent c1d3e46 commit d9c20ae
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 23 deletions.
12 changes: 6 additions & 6 deletions crates/sui-core/src/authority_active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub mod gossip;
use gossip::{gossip_process, node_sync_process};

pub mod checkpoint_driver;
use crate::authority_active::checkpoint_driver::CheckpointMetrics;
use checkpoint_driver::checkpoint_process;

pub mod execution_driver;
Expand Down Expand Up @@ -198,21 +199,20 @@ impl<A> ActiveAuthority<A>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
pub async fn spawn_checkpoint_process(self: Arc<Self>) {
self.spawn_checkpoint_process_with_config(Some(CheckpointProcessControl::default()))
pub async fn spawn_checkpoint_process(self: Arc<Self>, metrics: CheckpointMetrics) {
self.spawn_checkpoint_process_with_config(CheckpointProcessControl::default(), metrics)
.await
}

/// Spawn all active tasks.
pub async fn spawn_checkpoint_process_with_config(
self: Arc<Self>,
checkpoint_process_control: Option<CheckpointProcessControl>,
checkpoint_process_control: CheckpointProcessControl,
metrics: CheckpointMetrics,
) {
// Spawn task to take care of checkpointing
let _checkpoint_join = tokio::task::spawn(async move {
if let Some(checkpoint) = checkpoint_process_control {
checkpoint_process(&self, &checkpoint).await;
}
checkpoint_process(&self, &checkpoint_process_control, metrics).await;
});

if let Err(err) = _checkpoint_join.await {
Expand Down
48 changes: 43 additions & 5 deletions crates/sui-core/src/authority_active/checkpoint_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{
};

use parking_lot::Mutex;
use prometheus::{register_int_counter_with_registry, IntCounter, Registry};
use sui_types::{
base_types::{AuthorityName, ExecutionDigests},
error::SuiError,
Expand Down Expand Up @@ -80,9 +81,40 @@ impl Default for CheckpointProcessControl {
}
}

#[derive(Clone)]
pub struct CheckpointMetrics {
checkpoint_certificates_stored: IntCounter,
checkpoints_signed: IntCounter,
}

impl CheckpointMetrics {
pub fn new(registry: &Registry) -> Self {
Self {
checkpoint_certificates_stored: register_int_counter_with_registry!(
"checkpoint_certificates_stored",
"Total number of unique checkpoint certificates stored in this validator",
registry,
)
.unwrap(),
checkpoints_signed: register_int_counter_with_registry!(
"checkpoints_signed",
"Total number of checkpoints signed by this validator",
registry,
)
.unwrap(),
}
}

pub fn new_for_tests() -> Self {
let registry = Registry::new();
Self::new(&registry)
}
}

pub async fn checkpoint_process<A>(
active_authority: &ActiveAuthority<A>,
timing: &CheckpointProcessControl,
metrics: CheckpointMetrics,
) where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
Expand Down Expand Up @@ -179,6 +211,7 @@ pub async fn checkpoint_process<A>(
let _name = state_checkpoints.lock().name;
let _next_checkpoint = state_checkpoints.lock().next_checkpoint();
info!("{_name:?} at checkpoint {_next_checkpoint:?}");
metrics.checkpoint_certificates_stored.inc();
tokio::time::sleep(timing.long_pause_between_checkpoints).await;
continue;
}
Expand Down Expand Up @@ -211,7 +244,7 @@ pub async fn checkpoint_process<A>(
// (5) Now we try to create fragments and construct checkpoint.
match proposal {
Ok(my_proposal) => {
create_fragments_and_make_checkpoint(
if create_fragments_and_make_checkpoint(
active_authority,
state_checkpoints.clone(),
&my_proposal,
Expand All @@ -221,7 +254,10 @@ pub async fn checkpoint_process<A>(
committee,
timing.consensus_delay_estimate,
)
.await;
.await
{
metrics.checkpoints_signed.inc();
}
}
Err(err) => {
warn!(
Expand Down Expand Up @@ -596,15 +632,16 @@ where
/// Picks other authorities at random and constructs checkpoint fragments
/// that are submitted to consensus. The process terminates when a future
/// checkpoint is created, or we run out of validators.
/// Returns whether we have successfully created a new checkpoint.
/// Returns whether we have successfully created and signed a new checkpoint.
pub async fn create_fragments_and_make_checkpoint<A>(
active_authority: &ActiveAuthority<A>,
checkpoint_db: Arc<Mutex<CheckpointStore>>,
my_proposal: &CheckpointProposal,
mut available_authorities: BTreeSet<AuthorityName>,
committee: &Committee,
consensus_delay_estimate: Duration,
) where
) -> bool
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
// Pick another authority, get their proposal, and submit it to consensus
Expand Down Expand Up @@ -703,12 +740,13 @@ pub async fn create_fragments_and_make_checkpoint<A>(
}
Ok(()) => {
// A new checkpoint has been made.
break;
return true;
}
}
}
}
}
false
}

/// Given a fragment with this authority as the proposer and another authority as the counterpart,
Expand Down
17 changes: 9 additions & 8 deletions crates/sui-core/src/authority_active/checkpoint_driver/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
authority_active::{checkpoint_driver::CheckpointProcessControl, ActiveAuthority},
authority_client::LocalAuthorityClient,
checkpoints::checkpoint_tests::TestSetup,
safe_client::SafeClient,
authority_active::ActiveAuthority, authority_client::LocalAuthorityClient,
checkpoints::checkpoint_tests::TestSetup, safe_client::SafeClient,
};

use crate::authority_active::checkpoint_driver::CheckpointMetrics;
use std::{collections::BTreeSet, sync::Arc, time::Duration};
use sui_types::messages::ExecutionStatus;

Expand Down Expand Up @@ -38,7 +37,9 @@ async fn checkpoint_active_flow_happy_path() {
)
.unwrap(),
);
active_state.spawn_checkpoint_process().await
active_state
.spawn_checkpoint_process(CheckpointMetrics::new_for_tests())
.await
});
}

Expand Down Expand Up @@ -121,7 +122,7 @@ async fn checkpoint_active_flow_crash_client_with_gossip() {

// Spin the checkpoint service.
active_state
.spawn_checkpoint_process_with_config(Some(CheckpointProcessControl::default()))
.spawn_checkpoint_process(CheckpointMetrics::new_for_tests())
.await;
});
}
Expand Down Expand Up @@ -214,7 +215,7 @@ async fn checkpoint_active_flow_crash_client_no_gossip() {

// Spin the gossip service.
active_state
.spawn_checkpoint_process_with_config(Some(CheckpointProcessControl::default()))
.spawn_checkpoint_process(CheckpointMetrics::new_for_tests())
.await;
});
}
Expand Down Expand Up @@ -306,7 +307,7 @@ async fn test_empty_checkpoint() {

// Spin the gossip service.
active_state
.spawn_checkpoint_process_with_config(Some(CheckpointProcessControl::default()))
.spawn_checkpoint_process(CheckpointMetrics::new_for_tests())
.await;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use crate::{authority_active::ActiveAuthority, checkpoints::checkpoint_tests::TestSetup};

use crate::authority_active::checkpoint_driver::CheckpointMetrics;
use std::sync::Arc;
use std::time::Duration;
use sui_types::messages::ExecutionStatus;
Expand Down Expand Up @@ -36,7 +37,9 @@ async fn pending_exec_storage_notify() {
)
.unwrap(),
);
active_state.spawn_checkpoint_process().await
active_state
.spawn_checkpoint_process(CheckpointMetrics::new_for_tests())
.await
});
}

Expand Down Expand Up @@ -121,7 +124,9 @@ async fn pending_exec_full() {
);

active_state.clone().spawn_execute_process().await;
active_state.spawn_checkpoint_process().await;
active_state
.spawn_checkpoint_process(CheckpointMetrics::new_for_tests())
.await;
});
}

Expand Down
11 changes: 9 additions & 2 deletions crates/sui/tests/checkpoints_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use rand::{rngs::StdRng, SeedableRng};
use std::collections::HashSet;
use std::sync::Arc;
use sui_core::authority_active::checkpoint_driver::CheckpointMetrics;
use sui_core::{
authority::AuthorityState,
authority_active::{checkpoint_driver::CheckpointProcessControl, ActiveAuthority},
Expand Down Expand Up @@ -149,7 +150,10 @@ async fn end_to_end() {
..CheckpointProcessControl::default()
};
active_state
.spawn_checkpoint_process_with_config(Some(checkpoint_process_control))
.spawn_checkpoint_process_with_config(
checkpoint_process_control,
CheckpointMetrics::new_for_tests(),
)
.await
});
}
Expand Down Expand Up @@ -240,7 +244,10 @@ async fn checkpoint_with_shared_objects() {
active_state.clone().spawn_execute_process().await;

active_state
.spawn_checkpoint_process_with_config(Some(checkpoint_process_control))
.spawn_checkpoint_process_with_config(
checkpoint_process_control,
CheckpointMetrics::new_for_tests(),
)
.await
});
}
Expand Down

0 comments on commit d9c20ae

Please sign in to comment.