diff --git a/Cargo.lock b/Cargo.lock index 5a9c1723446cf..5b6a71e9033a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5012,12 +5012,14 @@ checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" name = "mysten-metrics" version = "0.7.0" dependencies = [ + "dashmap", "once_cell", "prometheus", "scopeguard", "tap", "tokio", "tracing", + "uuid", "workspace-hack", ] @@ -8364,6 +8366,7 @@ dependencies = [ "move-stackless-bytecode", "move-unit-test", "multiaddr 0.17.0", + "mysten-metrics", "mysten-network 0.2.0", "narwhal-executor", "prometheus", @@ -8942,6 +8945,7 @@ dependencies = [ "parking_lot 0.12.1", "prometheus", "prometheus-closure-metric", + "reqwest", "sui-config", "sui-core", "sui-json-rpc", @@ -9161,6 +9165,7 @@ version = "0.0.0" dependencies = [ "anyhow", "futures", + "mysten-metrics", "mysten-network 0.2.0", "prometheus", "rand 0.8.5", @@ -9597,6 +9602,7 @@ dependencies = [ "jsonrpsee", "move-core-types", "multiaddr 0.17.0", + "mysten-metrics", "mysten-network 0.2.0", "once_cell", "prometheus", diff --git a/crates/mysten-metrics/Cargo.toml b/crates/mysten-metrics/Cargo.toml index 27fae53e21cae..56a9ebc02fa77 100644 --- a/crates/mysten-metrics/Cargo.toml +++ b/crates/mysten-metrics/Cargo.toml @@ -14,3 +14,5 @@ prometheus = "0.13" once_cell = "1" tokio = { workspace = true } tap = "1.0" +dashmap = "5.4.0" +uuid = { version = "1.1.2", features = ["v4", "fast-rng"]} diff --git a/crates/mysten-metrics/src/lib.rs b/crates/mysten-metrics/src/lib.rs index 87a6d3c4a0a39..f759a22808818 100644 --- a/crates/mysten-metrics/src/lib.rs +++ b/crates/mysten-metrics/src/lib.rs @@ -1,8 +1,10 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use dashmap::DashMap; use std::future::Future; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Instant; @@ -12,6 +14,7 @@ use tap::TapFallible; use tracing::warn; pub use scopeguard; +use uuid::Uuid; #[derive(Debug)] pub struct Metrics { @@ -164,3 +167,162 @@ impl Future for MonitoredScopeFuture { self.f.as_mut().poll(cx) } } + +type RegistryID = Uuid; + +/// A service to manage the prometheus registries. This service allow us to create +/// a new Registry on demand and keep it accessible for processing/polling. +/// The service can be freely cloned/shared across threads. +#[derive(Clone)] +pub struct RegistryService { + // Holds a Registry that is supposed to be used + default_registry: Registry, + registries_by_id: Arc>, +} + +impl RegistryService { + // Creates a new registry service and also adds the main/default registry that is supposed to + // be preserved and never get removed + pub fn new(default_registry: Registry) -> Self { + Self { + default_registry, + registries_by_id: Arc::new(DashMap::new()), + } + } + + // Returns the default registry for the service that someone can use + // if they don't want to create a new one. + pub fn default_registry(&self) -> Registry { + self.default_registry.clone() + } + + // Adds a new registry to the service. The corresponding RegistryID is returned so can later be + // used for removing the Registry. Method panics if we try to insert a registry with the same id. + // As this can be quite serious for the operation of the node we don't want to accidentally + // swap an existing registry - we expected a removal to happen explicitly. + pub fn add(&self, registry: Registry) -> RegistryID { + let registry_id = Uuid::new_v4(); + if self + .registries_by_id + .insert(registry_id, registry) + .is_some() + { + panic!("Other Registry already detected for the same id {registry_id}"); + } + + registry_id + } + + // Removes the registry from the service. If Registry existed then this method returns true, + // otherwise false is returned instead. + pub fn remove(&self, registry_id: RegistryID) -> bool { + self.registries_by_id.remove(®istry_id).is_some() + } + + // Returns all the registries of the service + pub fn get_all(&self) -> Vec { + let mut registries: Vec = self + .registries_by_id + .iter() + .map(|r| r.value().clone()) + .collect(); + registries.push(self.default_registry.clone()); + + registries + } + + // Returns all the metric families from the registries that a service holds. + pub fn gather_all(&self) -> Vec { + self.get_all().iter().flat_map(|r| r.gather()).collect() + } +} + +#[cfg(test)] +mod tests { + use crate::RegistryService; + use prometheus::{IntCounter, Registry}; + + #[test] + fn registry_service() { + // GIVEN + let default_registry = Registry::new_custom(Some("default".to_string()), None).unwrap(); + + let registry_service = RegistryService::new(default_registry.clone()); + let default_counter = IntCounter::new("counter", "counter_desc").unwrap(); + default_counter.inc(); + default_registry + .register(Box::new(default_counter)) + .unwrap(); + + // AND add a metric to the default registry + + // AND a registry with one metric + let registry_1 = Registry::new_custom(Some("narwhal".to_string()), None).unwrap(); + registry_1 + .register(Box::new( + IntCounter::new("counter_1", "counter_1_desc").unwrap(), + )) + .unwrap(); + + // WHEN + let registry_1_id = registry_service.add(registry_1); + + // THEN + let mut metrics = registry_service.gather_all(); + metrics.sort_by(|m1, m2| Ord::cmp(m1.get_name(), m2.get_name())); + + assert_eq!(metrics.len(), 2); + + let metric_default = metrics.remove(0); + assert_eq!(metric_default.get_name(), "default_counter"); + assert_eq!(metric_default.get_help(), "counter_desc"); + + let metric_1 = metrics.remove(0); + assert_eq!(metric_1.get_name(), "narwhal_counter_1"); + assert_eq!(metric_1.get_help(), "counter_1_desc"); + + // AND add a second registry with a metric + let registry_2 = Registry::new_custom(Some("sui".to_string()), None).unwrap(); + registry_2 + .register(Box::new( + IntCounter::new("counter_2", "counter_2_desc").unwrap(), + )) + .unwrap(); + let _registry_2_id = registry_service.add(registry_2); + + // THEN all the metrics should be returned + let mut metrics = registry_service.gather_all(); + metrics.sort_by(|m1, m2| Ord::cmp(m1.get_name(), m2.get_name())); + + assert_eq!(metrics.len(), 3); + + let metric_default = metrics.remove(0); + assert_eq!(metric_default.get_name(), "default_counter"); + assert_eq!(metric_default.get_help(), "counter_desc"); + + let metric_1 = metrics.remove(0); + assert_eq!(metric_1.get_name(), "narwhal_counter_1"); + assert_eq!(metric_1.get_help(), "counter_1_desc"); + + let metric_2 = metrics.remove(0); + assert_eq!(metric_2.get_name(), "sui_counter_2"); + assert_eq!(metric_2.get_help(), "counter_2_desc"); + + // AND remove first registry + assert!(registry_service.remove(registry_1_id)); + + // THEN metrics should now not contain metric of registry_1 + let mut metrics = registry_service.gather_all(); + metrics.sort_by(|m1, m2| Ord::cmp(m1.get_name(), m2.get_name())); + + assert_eq!(metrics.len(), 2); + + let metric_default = metrics.remove(0); + assert_eq!(metric_default.get_name(), "default_counter"); + assert_eq!(metric_default.get_help(), "counter_desc"); + + let metric_1 = metrics.remove(0); + assert_eq!(metric_1.get_name(), "sui_counter_2"); + assert_eq!(metric_1.get_help(), "counter_2_desc"); + } +} diff --git a/crates/sui-benchmark/src/bin/stress.rs b/crates/sui-benchmark/src/bin/stress.rs index 7c1b125fcad42..42eb3eacc98f6 100644 --- a/crates/sui-benchmark/src/bin/stress.rs +++ b/crates/sui-benchmark/src/bin/stress.rs @@ -261,11 +261,13 @@ async fn main() -> Result<()> { } let _guard = config.with_env().init(); - let registry: Arc = Arc::new(metrics::start_prometheus_server( + let registry_service = metrics::start_prometheus_server( format!("{}:{}", opts.client_metric_host, opts.client_metric_port) .parse() .unwrap(), - )); + ); + let registry: Arc = Arc::new(registry_service.default_registry()); + let barrier = Arc::new(Barrier::new(2)); let cloned_barrier = barrier.clone(); let (primary_gas_id, owner, keypair, aggregator) = if opts.local { diff --git a/crates/sui-core/src/authority_server.rs b/crates/sui-core/src/authority_server.rs index e7af660332ec2..b48eede3bbe95 100644 --- a/crates/sui-core/src/authority_server.rs +++ b/crates/sui-core/src/authority_server.rs @@ -24,14 +24,13 @@ use sui_network::{ tonic, }; -use sui_types::{error::*, messages::*}; -use tap::TapFallible; -use tokio::{sync::mpsc::Receiver, task::JoinHandle, time::sleep}; - -use mysten_metrics::spawn_monitored_task; +use mysten_metrics::{spawn_monitored_task, RegistryService}; use narwhal_types::TransactionsClient; use sui_types::messages_checkpoint::CheckpointRequest; use sui_types::messages_checkpoint::CheckpointResponse; +use sui_types::{error::*, messages::*}; +use tap::TapFallible; +use tokio::{sync::mpsc::Receiver, task::JoinHandle, time::sleep}; use tracing::{debug, info, Instrument}; use crate::checkpoints::{ @@ -260,9 +259,10 @@ impl ValidatorService { state: Arc, checkpoint_store: Arc, state_sync_handle: sui_network::state_sync::Handle, - prometheus_registry: Registry, + registry_service: RegistryService, rx_reconfigure_consensus: Receiver, ) -> Result { + let prometheus_registry = registry_service.default_registry(); let consensus_config = config .consensus_config() .ok_or_else(|| anyhow!("Validator is missing consensus config"))?; @@ -306,8 +306,7 @@ impl ValidatorService { let consensus_parameters = consensus_config.narwhal_config().to_owned(); let network_keypair = config.network_key_pair.copy(); - let registry = prometheus_registry.clone(); - let tx_validator = SuiTxValidator::new(state.clone(), ®istry); + let tx_validator = SuiTxValidator::new(state.clone(), &prometheus_registry); spawn_monitored_task!(narwhal_node::restarter::NodeRestarter::watch( consensus_keypair, network_keypair, @@ -319,7 +318,7 @@ impl ValidatorService { consensus_parameters, tx_validator, rx_reconfigure_consensus, - ®istry, + registry_service )); Ok(Self { diff --git a/crates/sui-faucet/src/main.rs b/crates/sui-faucet/src/main.rs index 2f36a903d7dbd..f7ddc86c498b2 100644 --- a/crates/sui-faucet/src/main.rs +++ b/crates/sui-faucet/src/main.rs @@ -92,7 +92,8 @@ async fn main() -> Result<(), anyhow::Error> { let prom_binding = PROM_PORT_ADDR.parse().unwrap(); info!("Starting Prometheus HTTP endpoint at {}", prom_binding); - let prometheus_registry = sui_node::metrics::start_prometheus_server(prom_binding); + let registry_service = sui_node::metrics::start_prometheus_server(prom_binding); + let prometheus_registry = registry_service.default_registry(); let app_state = Arc::new(AppState { faucet: SimpleFaucet::new(context, &prometheus_registry) diff --git a/crates/sui-node/Cargo.toml b/crates/sui-node/Cargo.toml index 446d58d2eb73c..c5763e01753e1 100644 --- a/crates/sui-node/Cargo.toml +++ b/crates/sui-node/Cargo.toml @@ -38,5 +38,8 @@ telemetry-subscribers.workspace = true workspace-hack.workspace = true +[dev-dependencies] +reqwest = { version = "0.11.13", features = ["blocking", "json"] } + [target.'cfg(msim)'.dependencies] sui-simulator = { path = "../sui-simulator" } diff --git a/crates/sui-node/src/lib.rs b/crates/sui-node/src/lib.rs index 795055d612fd8..a0d46966a8f37 100644 --- a/crates/sui-node/src/lib.rs +++ b/crates/sui-node/src/lib.rs @@ -45,7 +45,7 @@ use tracing::info; use typed_store::DBMetrics; use crate::metrics::GrpcMetrics; -use mysten_metrics::spawn_monitored_task; +use mysten_metrics::{spawn_monitored_task, RegistryService}; use sui_core::epoch::committee_store::CommitteeStore; use sui_json_rpc::event_api::EventReadApiImpl; use sui_json_rpc::event_api::EventStreamingApiImpl; @@ -81,7 +81,7 @@ pub struct SuiNode { state: Arc, active: Arc>, transaction_orchestrator: Option>>, - _prometheus_registry: Registry, + registry_service: RegistryService, _p2p_network: anemo::Network, _discovery: discovery::Handle, @@ -98,10 +98,11 @@ pub struct SuiNode { } impl SuiNode { - pub async fn start(config: &NodeConfig, prometheus_registry: Registry) -> Result { + pub async fn start(config: &NodeConfig, registry_service: RegistryService) -> Result { // TODO: maybe have a config enum that takes care of this for us. let is_validator = config.consensus_config().is_some(); let is_full_node = !is_validator; + let prometheus_registry = registry_service.default_registry(); info!(node =? config.protocol_public_key(), "Initializing sui-node listening on {}", config.network_address @@ -248,7 +249,7 @@ impl SuiNode { state.clone(), checkpoint_store.clone(), &state_sync_handle, - &prometheus_registry, + registry_service.clone(), ) .await?; @@ -271,7 +272,7 @@ impl SuiNode { state, active: active_authority, transaction_orchestrator, - _prometheus_registry: prometheus_registry, + registry_service, _p2p_network: p2p_network, _discovery: discovery_handle, _state_sync: state_sync_handle, @@ -375,12 +376,12 @@ impl SuiNode { state: Arc, checkpoint_store: Arc, state_sync_handle: &state_sync::Handle, - prometheus_registry: &Registry, + registry_service: RegistryService, ) -> Result> { if state.is_fullnode() { return Ok(None); } - + let prometheus_registry = registry_service.default_registry(); let (tx_reconfigure_consensus, rx_reconfigure_consensus) = channel(100); let validator_service = ValidatorService::new( @@ -388,7 +389,7 @@ impl SuiNode { state.clone(), checkpoint_store, state_sync_handle.clone(), - prometheus_registry.clone(), + registry_service, rx_reconfigure_consensus, ) .await?; @@ -397,7 +398,7 @@ impl SuiNode { server_conf.global_concurrency_limit = config.grpc_concurrency_limit; server_conf.load_shed = config.grpc_load_shed; let mut server_builder = - ServerBuilder::from_config(&server_conf, GrpcMetrics::new(prometheus_registry)); + ServerBuilder::from_config(&server_conf, GrpcMetrics::new(&prometheus_registry)); server_builder = server_builder.add_service(ValidatorServer::new(validator_service)); @@ -487,7 +488,7 @@ impl SuiNode { self.state.clone(), self.checkpoint_store.clone(), &self._state_sync, - &self._prometheus_registry, + self.registry_service.clone(), ) .await .expect("Starting grpc server cannot fail"); diff --git a/crates/sui-node/src/main.rs b/crates/sui-node/src/main.rs index 27cda795b2dd2..c39c9abafdbe9 100644 --- a/crates/sui-node/src/main.rs +++ b/crates/sui-node/src/main.rs @@ -35,7 +35,8 @@ async fn main() -> Result<()> { let args = Args::parse(); let mut config = NodeConfig::load(&args.config_path)?; - let prometheus_registry = metrics::start_prometheus_server(config.metrics_address); + let registry_service = metrics::start_prometheus_server(config.metrics_address); + let prometheus_registry = registry_service.default_registry(); info!( "Started Prometheus HTTP endpoint at {}", config.metrics_address @@ -67,7 +68,7 @@ async fn main() -> Result<()> { sui_node::admin::start_admin_server(config.admin_interface_port, filter_handle); - let node = sui_node::SuiNode::start(&config, prometheus_registry).await?; + let node = sui_node::SuiNode::start(&config, registry_service).await?; node.monitor_reconfiguration().await?; Ok(()) diff --git a/crates/sui-node/src/metrics.rs b/crates/sui-node/src/metrics.rs index 0c3334de89b46..e94cd47304508 100644 --- a/crates/sui-node/src/metrics.rs +++ b/crates/sui-node/src/metrics.rs @@ -11,24 +11,30 @@ use std::net::SocketAddr; use std::time::Duration; use sui_network::tonic::Code; +use mysten_metrics::RegistryService; use tracing::warn; const METRICS_ROUTE: &str = "/metrics"; -pub fn start_prometheus_server(addr: SocketAddr) -> Registry { +// Creates a new http server that has as a sole purpose to expose +// and endpoint that prometheus agent can use to poll for the metrics. +// A RegistryService is returned that can be used to get access in prometheus Registries. +pub fn start_prometheus_server(addr: SocketAddr) -> RegistryService { let registry = Registry::new(); registry.register(uptime_metric()).unwrap(); + let registry_service = RegistryService::new(registry); + if cfg!(msim) { // prometheus uses difficult-to-support features such as TcpSocket::from_raw_fd(), so we // can't yet run it in the simulator. warn!("not starting prometheus server in simulator"); - return registry; + return registry_service; } let app = Router::new() .route(METRICS_ROUTE, get(metrics)) - .layer(Extension(registry.clone())); + .layer(Extension(registry_service.clone())); tokio::spawn(async move { axum::Server::bind(&addr) @@ -37,11 +43,11 @@ pub fn start_prometheus_server(addr: SocketAddr) -> Registry { .unwrap(); }); - registry + registry_service } -async fn metrics(Extension(registry): Extension) -> (StatusCode, String) { - let metrics_families = registry.gather(); +async fn metrics(Extension(registry_service): Extension) -> (StatusCode, String) { + let metrics_families = registry_service.gather_all(); match TextEncoder.encode_to_string(&metrics_families) { Ok(metrics) => (StatusCode::OK, metrics), Err(error) => ( @@ -102,7 +108,7 @@ impl MetricsCallbackProvider for GrpcMetrics { /// Create a metric that measures the uptime from when this metric was constructed. /// The metric is labeled with the node version: semver-gitrevision fn uptime_metric() -> Box { - const VERSION: &str = concat!(env!("CARGO_PKG_VERSION"), "-", env!("GIT_REVISION")); + let version: &str = version(); let opts = prometheus::opts!("uptime", "uptime of the node service in seconds") .variable_label("version"); @@ -113,9 +119,99 @@ fn uptime_metric() -> Box { opts, prometheus_closure_metric::ValueType::Counter, uptime, - &[VERSION], + &[version], ) .unwrap(); Box::new(metric) } + +fn version() -> &'static str { + concat!(env!("CARGO_PKG_VERSION"), "-", env!("GIT_REVISION")) +} + +#[cfg(test)] +mod tests { + use crate::metrics::{start_prometheus_server, version}; + use prometheus::{IntCounter, Registry}; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + + #[tokio::test] + pub async fn test_metrics_endpoint_with_multiple_registries_add_remove() { + let port: u16 = 8081; + let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port); + + let registry_service = start_prometheus_server(socket); + + tokio::task::yield_now().await; + + // now add a few registries to the service along side with metrics + let registry_1 = Registry::new_custom(Some("narwhal".to_string()), None).unwrap(); + let counter_1 = IntCounter::new("counter_1", "a sample counter 1").unwrap(); + registry_1.register(Box::new(counter_1)).unwrap(); + + let registry_2 = Registry::new_custom(Some("sui".to_string()), None).unwrap(); + let counter_2 = IntCounter::new("counter_2", "a sample counter 2").unwrap(); + registry_2.register(Box::new(counter_2.clone())).unwrap(); + + let registry_1_id = registry_service.add(registry_1); + let _registry_2_id = registry_service.add(registry_2); + + // request the endpoint + let result = get_metrics(port).await; + + assert!(result.contains( + "# HELP sui_counter_2 a sample counter 2 +# TYPE sui_counter_2 counter +sui_counter_2 0" + )); + + assert!(result.contains( + "# HELP narwhal_counter_1 a sample counter 1 +# TYPE narwhal_counter_1 counter +narwhal_counter_1 0" + )); + + let exp = format!( + "# HELP uptime uptime of the node service in seconds +# TYPE uptime counter +uptime{{version=\"{}\"}} 0", + version() + ); + assert!(result.contains(&exp)); + + // Now remove registry 1 + assert!(registry_service.remove(registry_1_id)); + + // AND increase metric 2 + counter_2.inc(); + + // Now pull again metrics + // request the endpoint + let result = get_metrics(port).await; + + // Registry 1 metrics should not be present anymore + assert!(!result.contains( + "# HELP narwhal_counter_1 a sample counter 1 +# TYPE narwhal_counter_1 counter +narwhal_counter_1 0" + )); + + // Registry 2 metric should have increased by 1 + assert!(result.contains( + "# HELP sui_counter_2 a sample counter 2 +# TYPE sui_counter_2 counter +sui_counter_2 1" + )); + } + + async fn get_metrics(port: u16) -> String { + let client = reqwest::Client::new(); + let response = client + .get(format!("http://127.0.0.1:{}/metrics", port)) + .send() + .await + .unwrap(); + response.text().await.unwrap() + } +} diff --git a/crates/sui-rosetta/src/main.rs b/crates/sui-rosetta/src/main.rs index 1b0384a5c3122..60ad4acc54dcf 100644 --- a/crates/sui-rosetta/src/main.rs +++ b/crates/sui-rosetta/src/main.rs @@ -127,9 +127,9 @@ impl RosettaServerCommand { }); let config = NodeConfig::load(&node_config)?; - let prometheus_registry = metrics::start_prometheus_server(config.metrics_address); + let registry_service = metrics::start_prometheus_server(config.metrics_address); // Staring a full node for the rosetta server. - let node = SuiNode::start(&config, prometheus_registry).await?; + let node = SuiNode::start(&config, registry_service).await?; let quorum_driver = node .transaction_orchestrator() .ok_or_else(|| anyhow!("Quorum driver is None"))? diff --git a/crates/sui-swarm/Cargo.toml b/crates/sui-swarm/Cargo.toml index 9f022fc91e6c6..708c8a0ac5482 100644 --- a/crates/sui-swarm/Cargo.toml +++ b/crates/sui-swarm/Cargo.toml @@ -20,6 +20,7 @@ prometheus = "0.13.3" sui-config = { path = "../sui-config" } sui-node = { path = "../sui-node" } sui-types = { path = "../sui-types" } +mysten-metrics = { path = "../mysten-metrics" } mysten-network.workspace = true telemetry-subscribers.workspace = true diff --git a/crates/sui-swarm/src/memory/container-sim.rs b/crates/sui-swarm/src/memory/container-sim.rs index a910c4160ed30..785f1160d7140 100644 --- a/crates/sui-swarm/src/memory/container-sim.rs +++ b/crates/sui-swarm/src/memory/container-sim.rs @@ -65,7 +65,8 @@ impl Container { .build(); let task_handle = node.spawn(async move { - let _server = SuiNode::start(&config, Registry::new()).await.unwrap(); + let registry_service = mysten_metrics::RegistryService::new(Registry::new()); + let _server = SuiNode::start(&config, registry_service).await.unwrap(); // Notify that we've successfully started the node trace!("node started, sending oneshot"); let _ = startup_sender.send(()); diff --git a/crates/sui-swarm/src/memory/container.rs b/crates/sui-swarm/src/memory/container.rs index de23e14ec44a9..2fa0605f4d721 100644 --- a/crates/sui-swarm/src/memory/container.rs +++ b/crates/sui-swarm/src/memory/container.rs @@ -78,12 +78,12 @@ impl Container { let runtime = builder.enable_all().build().unwrap(); runtime.block_on(async move { - let prometheus_registry = metrics::start_prometheus_server(config.metrics_address); + let registry_service = metrics::start_prometheus_server(config.metrics_address); info!( "Started Prometheus HTTP endpoint. To query metrics use\n\tcurl -s http://{}/metrics", config.metrics_address ); - let _server = SuiNode::start(&config, prometheus_registry).await.unwrap(); + let _server = SuiNode::start(&config, registry_service).await.unwrap(); // Notify that we've successfully started the node let _ = startup_sender.send(()); // run until canceled diff --git a/crates/sui/Cargo.toml b/crates/sui/Cargo.toml index 77c0ec1516aef..1d05b6174fa73 100644 --- a/crates/sui/Cargo.toml +++ b/crates/sui/Cargo.toml @@ -77,6 +77,7 @@ sui-core = { path = "../sui-core" } sui-node = { path = "../sui-node" } sui-macros = { path = "../sui-macros" } sui-simulator = { path = "../sui-simulator" } +mysten-metrics = { path = "../mysten-metrics" } assert_cmd = "2.0.6" diff --git a/crates/sui/tests/full_node_tests.rs b/crates/sui/tests/full_node_tests.rs index 0f2bc1eb46e75..4544eb4fe7f18 100644 --- a/crates/sui/tests/full_node_tests.rs +++ b/crates/sui/tests/full_node_tests.rs @@ -10,6 +10,7 @@ use jsonrpsee::core::client::{ClientT, Subscription, SubscriptionClientT}; use jsonrpsee::rpc_params; use move_core_types::parser::parse_struct_tag; use move_core_types::value::MoveStructLayout; +use mysten_metrics::RegistryService; use prometheus::Registry; use sui::client_commands::{SuiClientCommandResult, SuiClientCommands}; use sui_json_rpc_types::{ @@ -1068,7 +1069,8 @@ async fn test_full_node_transaction_orchestrator_basic() -> Result<(), anyhow::E async fn test_validator_node_has_no_transaction_orchestrator() { let configs = test_and_configure_authority_configs(1); let validator_config = &configs.validator_configs()[0]; - let node = SuiNode::start(validator_config, Registry::new()) + let registry_service = RegistryService::new(Registry::new()); + let node = SuiNode::start(validator_config, registry_service) .await .unwrap(); assert!(node.transaction_orchestrator().is_none()); diff --git a/crates/test-utils/Cargo.toml b/crates/test-utils/Cargo.toml index 3ed372bbef2f2..99931eaba1967 100644 --- a/crates/test-utils/Cargo.toml +++ b/crates/test-utils/Cargo.toml @@ -31,6 +31,7 @@ sui-swarm = { path = "../sui-swarm" } sui-types = { path = "../sui-types" } sui-keys = { path = "../sui-keys" } sui-sdk = { path = "../sui-sdk" } +mysten-metrics = { path = "../mysten-metrics"} once_cell = "1.16" multiaddr = "0.17.0" diff --git a/crates/test-utils/src/authority.rs b/crates/test-utils/src/authority.rs index fd930a2687d5b..1ac8ffacede5a 100644 --- a/crates/test-utils/src/authority.rs +++ b/crates/test-utils/src/authority.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::TEST_COMMITTEE_SIZE; +use mysten_metrics::RegistryService; use prometheus::Registry; use rand::{prelude::StdRng, SeedableRng}; use std::time::Duration; @@ -39,8 +40,11 @@ pub fn test_and_configure_authority_configs(committee_size: usize) -> NetworkCon } #[cfg(not(msim))] -pub async fn start_node(config: &NodeConfig, prom_registry: Registry) -> SuiNodeHandle { - SuiNode::start(config, prom_registry).await.unwrap().into() +pub async fn start_node(config: &NodeConfig, registry_service: RegistryService) -> SuiNodeHandle { + SuiNode::start(config, registry_service) + .await + .unwrap() + .into() } /// In the simulator, we call SuiNode::start from inside a newly spawned simulator node. @@ -52,7 +56,7 @@ pub async fn start_node(config: &NodeConfig, prom_registry: Registry) -> SuiNode /// Most of the time, tests do this just in order to verify some internal state, so this is fine /// most of the time. #[cfg(msim)] -pub async fn start_node(config: &NodeConfig, prom_registry: Registry) -> SuiNodeHandle { +pub async fn start_node(config: &NodeConfig, registry_service: RegistryService) -> SuiNodeHandle { use std::net::{IpAddr, SocketAddr}; let config = config.clone(); @@ -72,7 +76,7 @@ pub async fn start_node(config: &NodeConfig, prom_registry: Registry) -> SuiNode }) .build(); - node.spawn(async move { SuiNode::start(&config, prom_registry).await.unwrap() }) + node.spawn(async move { SuiNode::start(&config, registry_service).await.unwrap() }) .await .unwrap() .into() @@ -85,8 +89,8 @@ where { let mut handles = Vec::new(); for validator in config.validator_configs() { - let prom_registry = Registry::new(); - let node = start_node(validator, prom_registry).await; + let registry_service = RegistryService::new(Registry::new()); + let node = start_node(validator, registry_service).await; let objects = objects.clone(); node.with_async(|node| async move { diff --git a/crates/test-utils/src/network.rs b/crates/test-utils/src/network.rs index 0993d1498ce05..6014034c2c2d0 100644 --- a/crates/test-utils/src/network.rs +++ b/crates/test-utils/src/network.rs @@ -8,6 +8,7 @@ use jsonrpsee::ws_client::WsClient; use jsonrpsee::ws_client::WsClientBuilder; use prometheus::Registry; +use mysten_metrics::RegistryService; use sui::config::SuiEnv; use sui::{client_commands::WalletContext, config::SuiClientConfig}; use sui_config::genesis_config::GenesisConfig; @@ -220,7 +221,8 @@ impl Default for TestClusterBuilder { pub async fn start_fullnode_from_config( config: NodeConfig, ) -> Result { - let sui_node = SuiNode::start(&config, Registry::new()).await?; + let registry_service = RegistryService::new(Registry::new()); + let sui_node = SuiNode::start(&config, registry_service).await?; let rpc_url = format!("http://{}", config.json_rpc_address); let rpc_client = HttpClientBuilder::default().build(&rpc_url)?; diff --git a/narwhal/node/src/restarter.rs b/narwhal/node/src/restarter.rs index 6e979b9ec6988..8747e94cc54b0 100644 --- a/narwhal/node/src/restarter.rs +++ b/narwhal/node/src/restarter.rs @@ -7,6 +7,7 @@ use crypto::{KeyPair, NetworkKeyPair}; use executor::ExecutionState; use fastcrypto::traits::KeyPair as _; use futures::future::join_all; +use mysten_metrics::RegistryService; use prometheus::Registry; use std::{path::PathBuf, sync::Arc}; use tokio::sync::mpsc::Receiver; @@ -35,7 +36,7 @@ impl NodeRestarter { Vec<(WorkerId, NetworkKeyPair)>, WorkerCache, )>, - registry: &Registry, + registry_service: RegistryService, ) where State: ExecutionState + Send + Sync + 'static, { @@ -46,11 +47,17 @@ impl NodeRestarter { let mut committee = committee.clone(); let mut handles = Vec::new(); + let mut registry_id; // Listen for new committees. loop { tracing::info!("Starting epoch E{}", committee.epoch()); + // TODO: eventually replace this with a prefixed version of it + // for all metrics can start with narwhal_ + let registry = Registry::new(); + registry_id = registry_service.add(registry.clone()); + // Get a fresh store for the new epoch. let mut store_path = storage_base_path.clone(); store_path.push(format!("epoch{}", committee.epoch())); @@ -66,7 +73,7 @@ impl NodeRestarter { parameters.clone(), /* consensus */ true, execution_state.clone(), - registry, + ®istry, ) .await .unwrap(); @@ -79,7 +86,7 @@ impl NodeRestarter { &store, parameters.clone(), tx_validator.clone(), - registry, + ®istry, ); handles.extend(primary_handles); @@ -137,6 +144,9 @@ impl NodeRestarter { worker_ids_and_keypairs = new_worker_ids_and_keypairs; committee = new_committee; worker_cache.swap(Arc::new(new_worker_cache)); + + // remove the previous registry + registry_service.remove(registry_id); } } } diff --git a/narwhal/node/tests/reconfigure.rs b/narwhal/node/tests/reconfigure.rs index 449a2fe8e46b8..1cd30aa0e6ee3 100644 --- a/narwhal/node/tests/reconfigure.rs +++ b/narwhal/node/tests/reconfigure.rs @@ -10,6 +10,7 @@ use crypto::{KeyPair, NetworkKeyPair, PublicKey}; use executor::ExecutionState; use fastcrypto::traits::KeyPair as _; use futures::future::{join_all, try_join_all}; +use mysten_metrics::RegistryService; use narwhal_node as node; use node::{restarter::NodeRestarter, Node}; use prometheus::Registry; @@ -233,6 +234,8 @@ async fn restart() { ..Parameters::default() }; + let register_service = RegistryService::new(Registry::new()); + let keypair = a.keypair().copy(); let network_keypair = a.network_keypair().copy(); tokio::spawn(async move { @@ -247,7 +250,7 @@ async fn restart() { parameters, TrivialTransactionValidator::default(), rx_node_reconfigure, - &Registry::new(), + register_service, ) .await; });