Skip to content

Commit

Permalink
[sui/narwhal] introduce RegistryService (MystenLabs#6672)
Browse files Browse the repository at this point in the history
Create the RegistryService to allow us manage and create multiple Prometheus registries. That will allow us to reconfigure Narwhal and do even more fine-grained metrics manipulation.
  • Loading branch information
akichidis authored Dec 14, 2022
1 parent 41233a9 commit ae776c7
Show file tree
Hide file tree
Showing 21 changed files with 347 additions and 49 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/mysten-metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ prometheus = "0.13"
once_cell = "1"
tokio = { workspace = true }
tap = "1.0"
dashmap = "5.4.0"
uuid = { version = "1.1.2", features = ["v4", "fast-rng"]}
162 changes: 162 additions & 0 deletions crates/mysten-metrics/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use dashmap::DashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;

Expand All @@ -12,6 +14,7 @@ use tap::TapFallible;
use tracing::warn;

pub use scopeguard;
use uuid::Uuid;

#[derive(Debug)]
pub struct Metrics {
Expand Down Expand Up @@ -164,3 +167,162 @@ impl<F: Future> Future for MonitoredScopeFuture<F> {
self.f.as_mut().poll(cx)
}
}

type RegistryID = Uuid;

/// A service to manage the prometheus registries. This service allow us to create
/// a new Registry on demand and keep it accessible for processing/polling.
/// The service can be freely cloned/shared across threads.
#[derive(Clone)]
pub struct RegistryService {
// Holds a Registry that is supposed to be used
default_registry: Registry,
registries_by_id: Arc<DashMap<Uuid, Registry>>,
}

impl RegistryService {
// Creates a new registry service and also adds the main/default registry that is supposed to
// be preserved and never get removed
pub fn new(default_registry: Registry) -> Self {
Self {
default_registry,
registries_by_id: Arc::new(DashMap::new()),
}
}

// Returns the default registry for the service that someone can use
// if they don't want to create a new one.
pub fn default_registry(&self) -> Registry {
self.default_registry.clone()
}

// Adds a new registry to the service. The corresponding RegistryID is returned so can later be
// used for removing the Registry. Method panics if we try to insert a registry with the same id.
// As this can be quite serious for the operation of the node we don't want to accidentally
// swap an existing registry - we expected a removal to happen explicitly.
pub fn add(&self, registry: Registry) -> RegistryID {
let registry_id = Uuid::new_v4();
if self
.registries_by_id
.insert(registry_id, registry)
.is_some()
{
panic!("Other Registry already detected for the same id {registry_id}");
}

registry_id
}

// Removes the registry from the service. If Registry existed then this method returns true,
// otherwise false is returned instead.
pub fn remove(&self, registry_id: RegistryID) -> bool {
self.registries_by_id.remove(&registry_id).is_some()
}

// Returns all the registries of the service
pub fn get_all(&self) -> Vec<Registry> {
let mut registries: Vec<Registry> = self
.registries_by_id
.iter()
.map(|r| r.value().clone())
.collect();
registries.push(self.default_registry.clone());

registries
}

// Returns all the metric families from the registries that a service holds.
pub fn gather_all(&self) -> Vec<prometheus::proto::MetricFamily> {
self.get_all().iter().flat_map(|r| r.gather()).collect()
}
}

#[cfg(test)]
mod tests {
use crate::RegistryService;
use prometheus::{IntCounter, Registry};

#[test]
fn registry_service() {
// GIVEN
let default_registry = Registry::new_custom(Some("default".to_string()), None).unwrap();

let registry_service = RegistryService::new(default_registry.clone());
let default_counter = IntCounter::new("counter", "counter_desc").unwrap();
default_counter.inc();
default_registry
.register(Box::new(default_counter))
.unwrap();

// AND add a metric to the default registry

// AND a registry with one metric
let registry_1 = Registry::new_custom(Some("narwhal".to_string()), None).unwrap();
registry_1
.register(Box::new(
IntCounter::new("counter_1", "counter_1_desc").unwrap(),
))
.unwrap();

// WHEN
let registry_1_id = registry_service.add(registry_1);

// THEN
let mut metrics = registry_service.gather_all();
metrics.sort_by(|m1, m2| Ord::cmp(m1.get_name(), m2.get_name()));

assert_eq!(metrics.len(), 2);

let metric_default = metrics.remove(0);
assert_eq!(metric_default.get_name(), "default_counter");
assert_eq!(metric_default.get_help(), "counter_desc");

let metric_1 = metrics.remove(0);
assert_eq!(metric_1.get_name(), "narwhal_counter_1");
assert_eq!(metric_1.get_help(), "counter_1_desc");

// AND add a second registry with a metric
let registry_2 = Registry::new_custom(Some("sui".to_string()), None).unwrap();
registry_2
.register(Box::new(
IntCounter::new("counter_2", "counter_2_desc").unwrap(),
))
.unwrap();
let _registry_2_id = registry_service.add(registry_2);

// THEN all the metrics should be returned
let mut metrics = registry_service.gather_all();
metrics.sort_by(|m1, m2| Ord::cmp(m1.get_name(), m2.get_name()));

assert_eq!(metrics.len(), 3);

let metric_default = metrics.remove(0);
assert_eq!(metric_default.get_name(), "default_counter");
assert_eq!(metric_default.get_help(), "counter_desc");

let metric_1 = metrics.remove(0);
assert_eq!(metric_1.get_name(), "narwhal_counter_1");
assert_eq!(metric_1.get_help(), "counter_1_desc");

let metric_2 = metrics.remove(0);
assert_eq!(metric_2.get_name(), "sui_counter_2");
assert_eq!(metric_2.get_help(), "counter_2_desc");

// AND remove first registry
assert!(registry_service.remove(registry_1_id));

// THEN metrics should now not contain metric of registry_1
let mut metrics = registry_service.gather_all();
metrics.sort_by(|m1, m2| Ord::cmp(m1.get_name(), m2.get_name()));

assert_eq!(metrics.len(), 2);

let metric_default = metrics.remove(0);
assert_eq!(metric_default.get_name(), "default_counter");
assert_eq!(metric_default.get_help(), "counter_desc");

let metric_1 = metrics.remove(0);
assert_eq!(metric_1.get_name(), "sui_counter_2");
assert_eq!(metric_1.get_help(), "counter_2_desc");
}
}
6 changes: 4 additions & 2 deletions crates/sui-benchmark/src/bin/stress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,13 @@ async fn main() -> Result<()> {
}
let _guard = config.with_env().init();

let registry: Arc<Registry> = Arc::new(metrics::start_prometheus_server(
let registry_service = metrics::start_prometheus_server(
format!("{}:{}", opts.client_metric_host, opts.client_metric_port)
.parse()
.unwrap(),
));
);
let registry: Arc<Registry> = Arc::new(registry_service.default_registry());

let barrier = Arc::new(Barrier::new(2));
let cloned_barrier = barrier.clone();
let (primary_gas_id, owner, keypair, aggregator) = if opts.local {
Expand Down
17 changes: 8 additions & 9 deletions crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@ use sui_network::{
tonic,
};

use sui_types::{error::*, messages::*};
use tap::TapFallible;
use tokio::{sync::mpsc::Receiver, task::JoinHandle, time::sleep};

use mysten_metrics::spawn_monitored_task;
use mysten_metrics::{spawn_monitored_task, RegistryService};
use narwhal_types::TransactionsClient;
use sui_types::messages_checkpoint::CheckpointRequest;
use sui_types::messages_checkpoint::CheckpointResponse;
use sui_types::{error::*, messages::*};
use tap::TapFallible;
use tokio::{sync::mpsc::Receiver, task::JoinHandle, time::sleep};
use tracing::{debug, info, Instrument};

use crate::checkpoints::{
Expand Down Expand Up @@ -260,9 +259,10 @@ impl ValidatorService {
state: Arc<AuthorityState>,
checkpoint_store: Arc<CheckpointStore>,
state_sync_handle: sui_network::state_sync::Handle,
prometheus_registry: Registry,
registry_service: RegistryService,
rx_reconfigure_consensus: Receiver<ReconfigConsensusMessage>,
) -> Result<Self> {
let prometheus_registry = registry_service.default_registry();
let consensus_config = config
.consensus_config()
.ok_or_else(|| anyhow!("Validator is missing consensus config"))?;
Expand Down Expand Up @@ -306,8 +306,7 @@ impl ValidatorService {
let consensus_parameters = consensus_config.narwhal_config().to_owned();
let network_keypair = config.network_key_pair.copy();

let registry = prometheus_registry.clone();
let tx_validator = SuiTxValidator::new(state.clone(), &registry);
let tx_validator = SuiTxValidator::new(state.clone(), &prometheus_registry);
spawn_monitored_task!(narwhal_node::restarter::NodeRestarter::watch(
consensus_keypair,
network_keypair,
Expand All @@ -319,7 +318,7 @@ impl ValidatorService {
consensus_parameters,
tx_validator,
rx_reconfigure_consensus,
&registry,
registry_service
));

Ok(Self {
Expand Down
3 changes: 2 additions & 1 deletion crates/sui-faucet/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ async fn main() -> Result<(), anyhow::Error> {

let prom_binding = PROM_PORT_ADDR.parse().unwrap();
info!("Starting Prometheus HTTP endpoint at {}", prom_binding);
let prometheus_registry = sui_node::metrics::start_prometheus_server(prom_binding);
let registry_service = sui_node::metrics::start_prometheus_server(prom_binding);
let prometheus_registry = registry_service.default_registry();

let app_state = Arc::new(AppState {
faucet: SimpleFaucet::new(context, &prometheus_registry)
Expand Down
3 changes: 3 additions & 0 deletions crates/sui-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,8 @@ telemetry-subscribers.workspace = true

workspace-hack.workspace = true

[dev-dependencies]
reqwest = { version = "0.11.13", features = ["blocking", "json"] }

[target.'cfg(msim)'.dependencies]
sui-simulator = { path = "../sui-simulator" }
Loading

0 comments on commit ae776c7

Please sign in to comment.