From 24a98c216ecd3d9d0f4dbd357c9b8873f45fec76 Mon Sep 17 00:00:00 2001 From: Adam Singer Date: Tue, 6 Feb 2024 16:50:10 -0800 Subject: [PATCH] Introduce component based health Introduce a component based health check system. Each type of component should be able to opt into registering handlers implement some mechanical checks of health. Health in this context is functionality expected to work but runtime wise are semi no-op in terms of influencing the underlying storage / rpc systems. Opting in to the system requires for component to define a `HealthStatusIndicator` and implement the `check_health()` function. Registration should automatically be done by the existence of implementation and calling the `Store.register_health()` function in component implementation. At the moment only `Store` based components can register and a single health check is defined for `FilesystemStore`. The `/status` endpoint has been updated to return the resulting string serialized instances of `HealthStatus`, smart serialization of such objects is not implemented at this time. --- Cargo.lock | 1 + nativelink-service/tests/ac_server_test.rs | 2 + .../tests/bytestream_server_test.rs | 1 + nativelink-service/tests/cas_server_test.rs | 1 + nativelink-store/BUILD.bazel | 1 + nativelink-store/Cargo.toml | 1 + .../src/completeness_checking_store.rs | 4 + nativelink-store/src/compression_store.rs | 4 + nativelink-store/src/dedup_store.rs | 4 + nativelink-store/src/default_store_factory.rs | 31 ++- nativelink-store/src/existence_cache_store.rs | 4 + nativelink-store/src/fast_slow_store.rs | 4 + nativelink-store/src/filesystem_store.rs | 12 + nativelink-store/src/grpc_store.rs | 5 +- nativelink-store/src/memory_store.rs | 4 + nativelink-store/src/noop_store.rs | 4 + nativelink-store/src/ref_store.rs | 4 + nativelink-store/src/s3_store.rs | 4 + nativelink-store/src/shard_store.rs | 4 + .../src/size_partitioning_store.rs | 4 + nativelink-store/src/verify_store.rs | 4 + .../tests/fast_slow_store_test.rs | 4 + nativelink-util/BUILD.bazel | 2 + nativelink-util/src/health_utils.rs | 206 ++++++++++++++++ nativelink-util/src/lib.rs | 1 + nativelink-util/src/store_trait.rs | 60 ++++- nativelink-util/tests/health_utils_test.rs | 222 ++++++++++++++++++ src/bin/nativelink.rs | 63 ++++- 28 files changed, 641 insertions(+), 20 deletions(-) create mode 100644 nativelink-util/src/health_utils.rs create mode 100644 nativelink-util/tests/health_utils_test.rs diff --git a/Cargo.lock b/Cargo.lock index 73572e3f5..7e6e2a7e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1792,6 +1792,7 @@ dependencies = [ "serde", "sha2", "shellexpand", + "tempfile", "tokio", "tokio-stream", "tokio-util", diff --git a/nativelink-service/tests/ac_server_test.rs b/nativelink-service/tests/ac_server_test.rs index 6f8c458a4..3c5da9cbd 100644 --- a/nativelink-service/tests/ac_server_test.rs +++ b/nativelink-service/tests/ac_server_test.rs @@ -55,6 +55,7 @@ async fn make_store_manager() -> Result, Error> { &nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()), &store_manager, Some(&mut ::default()), + None, ) .await?, ); @@ -64,6 +65,7 @@ async fn make_store_manager() -> Result, Error> { &nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()), &store_manager, Some(&mut ::default()), + None, ) .await?, ); diff --git a/nativelink-service/tests/bytestream_server_test.rs b/nativelink-service/tests/bytestream_server_test.rs index 370a6d6a4..20f39fffd 100644 --- a/nativelink-service/tests/bytestream_server_test.rs +++ b/nativelink-service/tests/bytestream_server_test.rs @@ -41,6 +41,7 @@ async fn make_store_manager() -> Result, Error> { &nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()), &store_manager, Some(&mut ::default()), + None, ) .await?, ); diff --git a/nativelink-service/tests/cas_server_test.rs b/nativelink-service/tests/cas_server_test.rs index 6f8c60c4f..ffc3aaf6a 100644 --- a/nativelink-service/tests/cas_server_test.rs +++ b/nativelink-service/tests/cas_server_test.rs @@ -41,6 +41,7 @@ async fn make_store_manager() -> Result, Error> { &nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()), &store_manager, Some(&mut ::default()), + None, ) .await?, ); diff --git a/nativelink-store/BUILD.bazel b/nativelink-store/BUILD.bazel index 606a0d6a4..002a6e27c 100644 --- a/nativelink-store/BUILD.bazel +++ b/nativelink-store/BUILD.bazel @@ -58,6 +58,7 @@ rust_library( "@crates//:serde", "@crates//:sha2", "@crates//:shellexpand", + "@crates//:tempfile", "@crates//:tokio", "@crates//:tokio-stream", "@crates//:tokio-util", diff --git a/nativelink-store/Cargo.toml b/nativelink-store/Cargo.toml index a1c67bdb1..0ffd051c8 100644 --- a/nativelink-store/Cargo.toml +++ b/nativelink-store/Cargo.toml @@ -30,6 +30,7 @@ rand = "0.8.5" serde = "1.0.193" sha2 = "0.10.8" shellexpand = "3.1.0" +tempfile = "3.9.0" tokio = { version = "1.35.1" } tokio-stream = { version = "0.1.14", features = ["fs"] } tokio-util = { version = "0.7.10" } diff --git a/nativelink-store/src/completeness_checking_store.rs b/nativelink-store/src/completeness_checking_store.rs index 00a3d4ffc..c58051bc1 100644 --- a/nativelink-store/src/completeness_checking_store.rs +++ b/nativelink-store/src/completeness_checking_store.rs @@ -25,6 +25,8 @@ use nativelink_proto::build::bazel::remote::execution::v2::{ }; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::default_health_status_indicator; +use nativelink_util::health_utils::HealthStatusIndicator; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use parking_lot::Mutex; use tokio::sync::Notify; @@ -361,3 +363,5 @@ impl Store for CompletenessCheckingStore { Box::new(self) } } + +default_health_status_indicator!(CompletenessCheckingStore); diff --git a/nativelink-store/src/compression_store.rs b/nativelink-store/src/compression_store.rs index c2677669b..341c4052f 100644 --- a/nativelink-store/src/compression_store.rs +++ b/nativelink-store/src/compression_store.rs @@ -26,6 +26,8 @@ use lz4_flex::block::{compress_into, decompress_into, get_maximum_output_size}; use nativelink_error::{error_if, make_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::{DigestInfo, JoinHandleDropGuard}; +use nativelink_util::default_health_status_indicator; +use nativelink_util::health_utils::HealthStatusIndicator; use nativelink_util::metrics_utils::Registry; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use serde::{Deserialize, Serialize}; @@ -590,3 +592,5 @@ impl Store for CompressionStore { self.inner_store.clone().register_metrics(inner_store_registry); } } + +default_health_status_indicator!(CompressionStore); diff --git a/nativelink-store/src/dedup_store.rs b/nativelink-store/src/dedup_store.rs index a9bbd9e04..4b30c1130 100644 --- a/nativelink-store/src/dedup_store.rs +++ b/nativelink-store/src/dedup_store.rs @@ -23,7 +23,9 @@ use futures::stream::{self, FuturesOrdered, StreamExt, TryStreamExt}; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf, StreamReader}; use nativelink_util::common::DigestInfo; +use nativelink_util::default_health_status_indicator; use nativelink_util::fastcdc::FastCDC; +use nativelink_util::health_utils::HealthStatusIndicator; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use serde::{Deserialize, Serialize}; use tokio_util::codec::FramedRead; @@ -349,3 +351,5 @@ impl Store for DedupStore { Box::new(self) } } + +default_health_status_indicator!(DedupStore); diff --git a/nativelink-store/src/default_store_factory.rs b/nativelink-store/src/default_store_factory.rs index 4e54251c3..595c15e35 100644 --- a/nativelink-store/src/default_store_factory.rs +++ b/nativelink-store/src/default_store_factory.rs @@ -19,6 +19,7 @@ use futures::stream::FuturesOrdered; use futures::{Future, TryStreamExt}; use nativelink_config::stores::StoreConfig; use nativelink_error::Error; +use nativelink_util::health_utils::HealthRegistryBuilder; use nativelink_util::metrics_utils::Registry; use nativelink_util::store_trait::Store; @@ -44,6 +45,7 @@ pub fn store_factory<'a>( backend: &'a StoreConfig, store_manager: &'a Arc, maybe_store_metrics: Option<&'a mut Registry>, + maybe_health_registry_builder: Option<&'a mut HealthRegistryBuilder>, ) -> Pin> { Box::pin(async move { let store: Arc = match backend { @@ -51,36 +53,36 @@ pub fn store_factory<'a>( StoreConfig::experimental_s3_store(config) => Arc::new(S3Store::new(config).await?), StoreConfig::verify(config) => Arc::new(VerifyStore::new( config, - store_factory(&config.backend, store_manager, None).await?, + store_factory(&config.backend, store_manager, None, None).await?, )), StoreConfig::compression(config) => Arc::new(CompressionStore::new( *config.clone(), - store_factory(&config.backend, store_manager, None).await?, + store_factory(&config.backend, store_manager, None, None).await?, )?), StoreConfig::dedup(config) => Arc::new(DedupStore::new( config, - store_factory(&config.index_store, store_manager, None).await?, - store_factory(&config.content_store, store_manager, None).await?, + store_factory(&config.index_store, store_manager, None, None).await?, + store_factory(&config.content_store, store_manager, None, None).await?, )), StoreConfig::existence_cache(config) => Arc::new(ExistenceCacheStore::new( config, - store_factory(&config.backend, store_manager, None).await?, + store_factory(&config.backend, store_manager, None, None).await?, )), StoreConfig::completeness_checking(config) => Arc::new(CompletenessCheckingStore::new( - store_factory(&config.backend, store_manager, None).await?, - store_factory(&config.cas_store, store_manager, None).await?, + store_factory(&config.backend, store_manager, None, None).await?, + store_factory(&config.cas_store, store_manager, None, None).await?, )), StoreConfig::fast_slow(config) => Arc::new(FastSlowStore::new( config, - store_factory(&config.fast, store_manager, None).await?, - store_factory(&config.slow, store_manager, None).await?, + store_factory(&config.fast, store_manager, None, None).await?, + store_factory(&config.slow, store_manager, None, None).await?, )), StoreConfig::filesystem(config) => Arc::new(::new(config).await?), StoreConfig::ref_store(config) => Arc::new(RefStore::new(config, Arc::downgrade(store_manager))), StoreConfig::size_partitioning(config) => Arc::new(SizePartitioningStore::new( config, - store_factory(&config.lower_store, store_manager, None).await?, - store_factory(&config.upper_store, store_manager, None).await?, + store_factory(&config.lower_store, store_manager, None, None).await?, + store_factory(&config.upper_store, store_manager, None, None).await?, )), StoreConfig::grpc(config) => Arc::new(GrpcStore::new(config).await?), StoreConfig::noop => Arc::new(NoopStore::new()), @@ -88,7 +90,7 @@ pub fn store_factory<'a>( let stores = config .stores .iter() - .map(|store_config| store_factory(&store_config.store, store_manager, None)) + .map(|store_config| store_factory(&store_config.store, store_manager, None, None)) .collect::>() .try_collect::>() .await?; @@ -98,6 +100,11 @@ pub fn store_factory<'a>( if let Some(store_metrics) = maybe_store_metrics { store.clone().register_metrics(store_metrics); } + + if let Some(health_registry_builder) = maybe_health_registry_builder { + store.clone().register_health(health_registry_builder); + } + Ok(store) }) } diff --git a/nativelink-store/src/existence_cache_store.rs b/nativelink-store/src/existence_cache_store.rs index 9b6c60595..9a3757814 100644 --- a/nativelink-store/src/existence_cache_store.rs +++ b/nativelink-store/src/existence_cache_store.rs @@ -22,7 +22,9 @@ use nativelink_config::stores::{EvictionPolicy, ExistenceCacheStore as Existence use nativelink_error::{error_if, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::default_health_status_indicator; use nativelink_util::evicting_map::{EvictingMap, LenEntry}; +use nativelink_util::health_utils::HealthStatusIndicator; use nativelink_util::metrics_utils::{CollectorState, MetricsComponent, Registry}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; @@ -198,3 +200,5 @@ impl MetricsComponent for ExistenceCacheStore { self.existence_cache.gather_metrics(c) } } + +default_health_status_indicator!(ExistenceCacheStore); diff --git a/nativelink-store/src/fast_slow_store.rs b/nativelink-store/src/fast_slow_store.rs index 4ba77d9c4..76ed30a98 100644 --- a/nativelink-store/src/fast_slow_store.rs +++ b/nativelink-store/src/fast_slow_store.rs @@ -22,6 +22,8 @@ use futures::{join, FutureExt}; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::default_health_status_indicator; +use nativelink_util::health_utils::HealthStatusIndicator; use nativelink_util::metrics_utils::Registry; use nativelink_util::store_trait::{Store, UploadSizeInfo}; @@ -268,3 +270,5 @@ impl Store for FastSlowStore { self.slow_store.clone().register_metrics(slow_store_registry); } } + +default_health_status_indicator!(FastSlowStore); diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 94a2c342f..facad1ff4 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -29,6 +29,7 @@ use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::{fs, DigestInfo}; use nativelink_util::evicting_map::{EvictingMap, LenEntry}; +use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatusIndicator}; use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; @@ -750,6 +751,10 @@ impl Store for FilesystemStore { fn register_metrics(self: Arc, registry: &mut Registry) { registry.register_collector(Box::new(Collector::new(&self))); } + + fn register_health(self: Arc, registry: &mut HealthRegistryBuilder) { + registry.register_indicator(self); + } } impl MetricsComponent for FilesystemStore { @@ -777,3 +782,10 @@ impl MetricsComponent for FilesystemStore { c.publish("evicting_map", self.evicting_map.as_ref(), ""); } } + +#[async_trait] +impl HealthStatusIndicator for FilesystemStore { + fn get_name(&self) -> &'static str { + "FilesystemStore" + } +} diff --git a/nativelink-store/src/grpc_store.rs b/nativelink-store/src/grpc_store.rs index de199b25a..ac5c5466d 100644 --- a/nativelink-store/src/grpc_store.rs +++ b/nativelink-store/src/grpc_store.rs @@ -37,11 +37,12 @@ use nativelink_proto::google::bytestream::{ use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; use nativelink_util::grpc_utils::ConnectionManager; +use nativelink_util::health_utils::HealthStatusIndicator; use nativelink_util::resource_info::ResourceInfo; use nativelink_util::retry::{Retrier, RetryResult}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; -use nativelink_util::tls_utils; use nativelink_util::write_request_stream_wrapper::WriteRequestStreamWrapper; +use nativelink_util::{default_health_status_indicator, tls_utils}; use parking_lot::Mutex; use prost::Message; use rand::rngs::OsRng; @@ -844,3 +845,5 @@ impl Store for GrpcStore { Box::new(self) } } + +default_health_status_indicator!(GrpcStore); diff --git a/nativelink-store/src/memory_store.rs b/nativelink-store/src/memory_store.rs index ebf129987..e2d54fa4b 100644 --- a/nativelink-store/src/memory_store.rs +++ b/nativelink-store/src/memory_store.rs @@ -22,7 +22,9 @@ use bytes::{Bytes, BytesMut}; use nativelink_error::{Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::default_health_status_indicator; use nativelink_util::evicting_map::{EvictingMap, LenEntry}; +use nativelink_util::health_utils::HealthStatusIndicator; use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; @@ -170,3 +172,5 @@ impl MetricsComponent for MemoryStore { c.publish("evicting_map", &self.evicting_map, ""); } } + +default_health_status_indicator!(MemoryStore); diff --git a/nativelink-store/src/noop_store.rs b/nativelink-store/src/noop_store.rs index a3a3c6b51..084f37dd5 100644 --- a/nativelink-store/src/noop_store.rs +++ b/nativelink-store/src/noop_store.rs @@ -19,6 +19,8 @@ use async_trait::async_trait; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::default_health_status_indicator; +use nativelink_util::health_utils::HealthStatusIndicator; use nativelink_util::store_trait::{Store, UploadSizeInfo}; #[derive(Default)] @@ -71,3 +73,5 @@ impl Store for NoopStore { Box::new(self) } } + +default_health_status_indicator!(NoopStore); diff --git a/nativelink-store/src/ref_store.rs b/nativelink-store/src/ref_store.rs index ff2f11d31..3322f5ebb 100644 --- a/nativelink-store/src/ref_store.rs +++ b/nativelink-store/src/ref_store.rs @@ -20,6 +20,8 @@ use async_trait::async_trait; use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::default_health_status_indicator; +use nativelink_util::health_utils::HealthStatusIndicator; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use tracing::error; @@ -137,3 +139,5 @@ impl Store for RefStore { Box::new(self) } } + +default_health_status_indicator!(RefStore); diff --git a/nativelink-store/src/s3_store.rs b/nativelink-store/src/s3_store.rs index 2179846a4..2c920a7a4 100644 --- a/nativelink-store/src/s3_store.rs +++ b/nativelink-store/src/s3_store.rs @@ -40,6 +40,8 @@ use hyper_rustls::{HttpsConnector, MaybeHttpsStream}; use nativelink_error::{error_if, make_err, make_input_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::default_health_status_indicator; +use nativelink_util::health_utils::HealthStatusIndicator; use nativelink_util::retry::{Retrier, RetryResult}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use rand::rngs::OsRng; @@ -534,3 +536,5 @@ impl Store for S3Store { Box::new(self) } } + +default_health_status_indicator!(S3Store); diff --git a/nativelink-store/src/shard_store.rs b/nativelink-store/src/shard_store.rs index 5a858e862..00aa20bc6 100644 --- a/nativelink-store/src/shard_store.rs +++ b/nativelink-store/src/shard_store.rs @@ -21,6 +21,8 @@ use futures::stream::{FuturesUnordered, TryStreamExt}; use nativelink_error::{error_if, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::default_health_status_indicator; +use nativelink_util::health_utils::HealthStatusIndicator; use nativelink_util::metrics_utils::Registry; use nativelink_util::store_trait::{Store, UploadSizeInfo}; @@ -191,3 +193,5 @@ impl Store for ShardStore { } } } + +default_health_status_indicator!(ShardStore); diff --git a/nativelink-store/src/size_partitioning_store.rs b/nativelink-store/src/size_partitioning_store.rs index 3d92148c6..952669630 100644 --- a/nativelink-store/src/size_partitioning_store.rs +++ b/nativelink-store/src/size_partitioning_store.rs @@ -19,6 +19,8 @@ use async_trait::async_trait; use nativelink_error::{Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::default_health_status_indicator; +use nativelink_util::health_utils::HealthStatusIndicator; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use tokio::join; @@ -128,3 +130,5 @@ impl Store for SizePartitioningStore { Box::new(self) } } + +default_health_status_indicator!(SizePartitioningStore); diff --git a/nativelink-store/src/verify_store.rs b/nativelink-store/src/verify_store.rs index 2cabf95a7..c92822e00 100644 --- a/nativelink-store/src/verify_store.rs +++ b/nativelink-store/src/verify_store.rs @@ -21,7 +21,9 @@ use nativelink_config::stores::ConfigDigestHashFunction; use nativelink_error::{make_input_err, Error, ResultExt}; use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::default_health_status_indicator; use nativelink_util::digest_hasher::{DigestHasher, DigestHasherFunc}; +use nativelink_util::health_utils::HealthStatusIndicator; use nativelink_util::metrics_utils::{Collector, CollectorState, CounterWithTime, MetricsComponent, Registry}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; @@ -199,3 +201,5 @@ impl MetricsComponent for VerifyStore { ); } } + +default_health_status_indicator!(VerifyStore); diff --git a/nativelink-store/tests/fast_slow_store_test.rs b/nativelink-store/tests/fast_slow_store_test.rs index d21dced01..eb1545f30 100644 --- a/nativelink-store/tests/fast_slow_store_test.rs +++ b/nativelink-store/tests/fast_slow_store_test.rs @@ -76,6 +76,8 @@ mod fast_slow_store_tests { use bytes::Bytes; use nativelink_error::{make_err, Code, ResultExt}; use nativelink_util::buf_channel::make_buf_channel_pair; + use nativelink_util::default_health_status_indicator; + use nativelink_util::health_utils::HealthStatusIndicator; use pretty_assertions::assert_eq; use super::*; // Must be declared in every module. @@ -301,6 +303,8 @@ mod fast_slow_store_tests { } } + default_health_status_indicator!(DropCheckStore); + let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap(); let (fast_store_read_tx, fast_store_read_rx) = tokio::sync::oneshot::channel(); let (fast_store_eof_tx, fast_store_eof_rx) = tokio::sync::oneshot::channel(); diff --git a/nativelink-util/BUILD.bazel b/nativelink-util/BUILD.bazel index 77d5302d0..81d7f0187 100644 --- a/nativelink-util/BUILD.bazel +++ b/nativelink-util/BUILD.bazel @@ -17,6 +17,7 @@ rust_library( "src/fastcdc.rs", "src/fs.rs", "src/grpc_utils.rs", + "src/health_utils.rs", "src/lib.rs", "src/metrics_utils.rs", "src/platform_properties.rs", @@ -64,6 +65,7 @@ rust_test_suite( "tests/evicting_map_test.rs", "tests/fastcdc_test.rs", "tests/fs_test.rs", + "tests/health_utils_test.rs", "tests/resource_info_test.rs", "tests/retry_test.rs", ], diff --git a/nativelink-util/src/health_utils.rs b/nativelink-util/src/health_utils.rs new file mode 100644 index 000000000..cda87a37e --- /dev/null +++ b/nativelink-util/src/health_utils.rs @@ -0,0 +1,206 @@ +// Copyright 2024 The Native Link Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::borrow::Cow; +use std::collections::HashMap; +use std::fmt::Debug; +use std::marker::Send; +use std::pin::Pin; +use std::sync::Arc; + +use async_trait::async_trait; +use futures::{Stream, StreamExt}; +use parking_lot::Mutex; +use serde::Serialize; + +/// Struct name health indicator component. +type StructName = str; +/// Readable message status of the health indicator. +type Message = str; + +/// Status state of a health indicator. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] +pub enum HealthStatus { + /// Healthy status. + Ok { + struct_name: &'static StructName, + message: Cow<'static, Message>, + }, + /// Initializing status, used to signal or indicate component is initializing. + Initializing { + struct_name: &'static StructName, + message: Cow<'static, Message>, + }, + /// Warning status, used to signal or indicate component is in a warning state. + /// This status is used to indicate a non-fatal issue with the component. + Warning { + struct_name: &'static StructName, + message: Cow<'static, Message>, + }, + /// Failed status, used to signal or indicate component is in a failed state. + /// This status is used to indicate a fatal issue with the component, + /// might not be able to recover. + Failed { + struct_name: &'static StructName, + message: Cow<'static, Message>, + }, +} + +/// Health status indicator trait. This trait is used to define +/// a health status indicator by implementing the `check_health` function. +impl HealthStatus { + /// Make a healthy status. + pub fn new_ok(component: &(impl HealthStatusIndicator + ?Sized), message: Cow<'static, str>) -> Self { + Self::Ok { + struct_name: component.struct_name(), + message, + } + } + + /// Make an initializing status. + pub fn new_initializing( + component: &(impl HealthStatusIndicator + ?Sized), + message: Cow<'static, str>, + ) -> HealthStatus { + Self::Initializing { + struct_name: component.struct_name(), + message, + } + } + + /// Make a warning status. + pub fn new_warning(component: &(impl HealthStatusIndicator + ?Sized), message: Cow<'static, str>) -> HealthStatus { + Self::Warning { + struct_name: component.struct_name(), + message, + } + } + + /// Make a failed status. + pub fn new_failed(component: &(impl HealthStatusIndicator + ?Sized), message: Cow<'static, str>) -> HealthStatus { + Self::Failed { + struct_name: component.struct_name(), + message, + } + } +} + +/// Description of the health status of a component. +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize)] +pub struct HealthStatusDescription { + pub namespace: Cow<'static, str>, + pub status: HealthStatus, +} + +/// Health status indicator trait. This trait is used to define +/// a health status indicator by implementing the `check_health` function. +/// A default implementation is provided for the `check_health` function +/// that returns healthy component. +#[async_trait] +pub trait HealthStatusIndicator: Sync + Send + Unpin { + fn get_name(&self) -> &'static str; + + /// Returns the name of the struct implementing the trait. + fn struct_name(&self) -> &'static str { + std::any::type_name::() + } + + /// Check the health status of the component. This function should be + /// implemented by the component to check the health status of the component. + async fn check_health(&self, _namespace: Cow<'static, str>) -> HealthStatus { + HealthStatus::new_ok(self, "ok".into()) + } +} + +type HealthRegistryBuilderState = Arc, Arc>>>; +pub struct HealthRegistryBuilder { + namespace: Cow<'static, str>, + state: HealthRegistryBuilderState, +} + +/// Health registry builder that is used to build a health registry. +/// The builder provides creation, registering of health status indicators, +/// sub building scoped health registries and building the health registry. +/// `build()` should be called once for finalizing the production of a health registry. +impl HealthRegistryBuilder { + /// Create new root builder. + pub fn new(namespace: Cow<'static, str>) -> Self { + Self { + namespace: format!("/{}", namespace).into(), + state: Arc::new(Mutex::new(HashMap::new())), + } + } + + /// Register a health status indicator at current namespace. + pub fn register_indicator(&mut self, indicator: Arc) { + let name = format!("{}/{}", self.namespace, indicator.get_name()); + self.state.lock().insert(name.into(), indicator); + } + + /// Create a sub builder for a namespace. + pub fn sub_builder(&mut self, namespace: Cow<'static, str>) -> HealthRegistryBuilder { + HealthRegistryBuilder { + namespace: format!("{}/{}", self.namespace, namespace).into(), + state: self.state.clone(), + } + } + + /// Finalize the production of the health registry. + pub fn build(&mut self) -> HealthRegistry { + HealthRegistry { + indicators: self.state.lock().clone().into_iter().collect(), + } + } +} + +/// Health registry that holds the health status indicators of a component. +#[derive(Default, Clone)] +pub struct HealthRegistry { + indicators: Vec<(Cow<'static, str>, Arc)>, +} + +/// Reporter of the health status health registry. +pub trait HealthStatusReporter { + fn health_status_report(&self) -> Pin + '_>>; +} + +/// Health status reporter implementation for the health registry that provides a stream +/// of health status descriptions. +impl HealthStatusReporter for HealthRegistry { + fn health_status_report(&self) -> Pin + '_>> { + Box::pin( + futures::stream::iter(self.indicators.iter()).then(|(namespace, indicator)| async move { + HealthStatusDescription { + namespace: namespace.clone(), + status: indicator.check_health(namespace.clone()).await, + } + }), + ) + } +} + +/// Default health status indicator implementation for a component. +/// Generally used for components that don't need custom implementations +/// of the `check_health` function. +#[macro_export] +macro_rules! default_health_status_indicator { + ($type:ty) => { + #[async_trait::async_trait] + impl HealthStatusIndicator for $type { + fn get_name(&self) -> &'static str { + stringify!($type) + } + } + }; +} diff --git a/nativelink-util/src/lib.rs b/nativelink-util/src/lib.rs index 62c615c0e..c493b9f70 100644 --- a/nativelink-util/src/lib.rs +++ b/nativelink-util/src/lib.rs @@ -20,6 +20,7 @@ pub mod evicting_map; pub mod fastcdc; pub mod fs; pub mod grpc_utils; +pub mod health_utils; pub mod metrics_utils; pub mod platform_properties; pub mod resource_info; diff --git a/nativelink-util/src/store_trait.rs b/nativelink-util/src/store_trait.rs index 2c6a4b721..6bee4621a 100644 --- a/nativelink-util/src/store_trait.rs +++ b/nativelink-util/src/store_trait.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::borrow::Cow; use std::marker::Send; use std::pin::Pin; use std::sync::Arc; @@ -24,6 +25,7 @@ use serde::{Deserialize, Serialize}; use crate::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use crate::common::DigestInfo; +use crate::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator}; use crate::metrics_utils::Registry; #[derive(Debug, PartialEq, Copy, Clone, Serialize, Deserialize)] @@ -40,7 +42,7 @@ pub enum UploadSizeInfo { } #[async_trait] -pub trait Store: Sync + Send + Unpin { +pub trait Store: Sync + Send + Unpin + HealthStatusIndicator { /// Look up a digest in the store and return None if it does not exist in /// the store, or Some(size) if it does. /// Note: On an AC store the size will be incorrect and should not be used! @@ -168,6 +170,59 @@ pub trait Store: Sync + Send + Unpin { .merge(data_res.err_tip(|| "Failed to read stream to completion in get_part_unchunked")) } + // Default implementation of the health check. Some stores may want to override this + // in situations where the default implementation is not sufficient. + async fn check_health(self: Pin<&Self>, _namespace: Cow<'static, str>) -> HealthStatus { + let bytes = _namespace.as_bytes(); + let hash = blake3::hash(bytes).into(); + let len = bytes.len(); + let digest_info = DigestInfo::new(hash, len as i64); + let bytes_copy = bytes::Bytes::copy_from_slice(bytes); + + match self.update_oneshot(digest_info, bytes_copy.clone()).await { + Ok(_) => {} + Err(e) => { + return HealthStatus::new_failed( + self.get_ref(), + format!("Store.update_oneshot() failed: {}", e).into(), + ); + } + } + + match self.get_part_unchunked(digest_info, 0, Some(len), Some(len)).await { + Ok(b) => { + if b != bytes_copy { + return HealthStatus::new_failed(self.get_ref(), "Store.get_part_unchunked() data mismatch".into()); + } + } + Err(e) => { + return HealthStatus::new_failed( + self.get_ref(), + format!("Store.get_part_unchunked() failed: {}", e).into(), + ); + } + } + + match self.has(digest_info).await { + Ok(Some(s)) => { + if s != len { + return HealthStatus::new_failed( + self.get_ref(), + format!("Store.has() size mismatch {s} != {len}").into(), + ); + } + } + Ok(None) => { + return HealthStatus::new_failed(self.get_ref(), "Store.has() size not found".into()); + } + Err(e) => { + return HealthStatus::new_failed(self.get_ref(), format!("Store.has() failed: {}", e).into()); + } + } + + HealthStatus::new_ok(self.get_ref(), "Successfully store health check".into()) + } + /// Gets the underlying store for the given digest. This can be used to find out /// what any underlying store is for a given digest will be and hand it to the caller. /// A caller might want to use this to obtain a reference to the "real" underlying store @@ -180,4 +235,7 @@ pub trait Store: Sync + Send + Unpin { /// Register any metrics that this store wants to expose to the Prometheus. fn register_metrics(self: Arc, _registry: &mut Registry) {} + + // Register health checks used to monitor the store. + fn register_health(self: Arc, _registry: &mut HealthRegistryBuilder) {} } diff --git a/nativelink-util/tests/health_utils_test.rs b/nativelink-util/tests/health_utils_test.rs new file mode 100644 index 000000000..7abc12376 --- /dev/null +++ b/nativelink-util/tests/health_utils_test.rs @@ -0,0 +1,222 @@ +// Copyright 2024 The Native Link Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::borrow::Cow; +use std::collections::HashSet; +use std::iter::FromIterator; +use std::sync::Arc; + +use futures::StreamExt; +use nativelink_error::Error; +use nativelink_util::health_utils::*; + +#[cfg(test)] +mod health_utils_tests { + + use pretty_assertions::assert_eq; + + use super::*; + + #[tokio::test] + async fn create_empty_indicator() -> Result<(), Error> { + let mut health_registry_builder = HealthRegistryBuilder::new("nativelink".into()); + let health_registry = health_registry_builder.build(); + let health_status: Vec = health_registry.health_status_report().collect().await; + assert_eq!(health_status.len(), 0); + Ok(()) + } + + #[tokio::test] + async fn create_register_indicator() -> Result<(), Error> { + generate_health_status_indicator!(MockComponentImpl, Ok, "ok"); + + let mut health_registry_builder = HealthRegistryBuilder::new("nativelink".into()); + + health_registry_builder.register_indicator(Arc::new(MockComponentImpl {})); + + let health_registry = health_registry_builder.build(); + let health_status: Vec = health_registry.health_status_report().collect().await; + + assert_eq!(health_status.len(), 1); + assert_eq!( + health_status, + vec![HealthStatusDescription { + namespace: "/nativelink/MockComponentImpl".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl", + message: "ok".into() + }, + }] + ); + + Ok(()) + } + + #[tokio::test] + async fn create_sub_registry() -> Result<(), Error> { + generate_health_status_indicator!(MockComponentImpl, Ok, "ok"); + + let mut health_registry_builder = HealthRegistryBuilder::new("nativelink".into()); + + health_registry_builder.register_indicator(Arc::new(MockComponentImpl {})); + + let mut namespace1_registry = health_registry_builder.sub_builder("namespace1".into()); + + namespace1_registry.register_indicator(Arc::new(MockComponentImpl {})); + + let health_registry = health_registry_builder.build(); + let health_status: Vec = health_registry.health_status_report().collect().await; + + assert_eq!(health_status.len(), 2); + let expected_health_status = vec_to_set(vec![ + HealthStatusDescription { + namespace: "/nativelink/MockComponentImpl".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl", + message: "ok".into(), + }, + }, + HealthStatusDescription { + namespace: "/nativelink/namespace1/MockComponentImpl".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl", + message: "ok".into(), + }, + }, + ]); + + assert_eq!(vec_to_set(health_status), expected_health_status); + + Ok(()) + } + + #[tokio::test] + async fn create_multiple_indicators_same_registry() -> Result<(), Error> { + generate_health_status_indicator!(MockComponentImpl1, Ok, "ok"); + generate_health_status_indicator!(MockComponentImpl2, Ok, "ok"); + generate_health_status_indicator!(MockComponentImpl3, Ok, "ok"); + + let mut health_registry_builder = HealthRegistryBuilder::new("nativelink".into()); + + health_registry_builder.register_indicator(Arc::new(MockComponentImpl1 {})); + health_registry_builder.register_indicator(Arc::new(MockComponentImpl2 {})); + health_registry_builder.register_indicator(Arc::new(MockComponentImpl3 {})); + + let health_registry = health_registry_builder.build(); + let health_status: Vec = health_registry.health_status_report().collect().await; + + assert_eq!(health_status.len(), 3); + let expected_health_status = vec_to_set(vec![ + HealthStatusDescription { + namespace: "/nativelink/MockComponentImpl1".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl1", + message: "ok".into(), + }, + }, + HealthStatusDescription { + namespace: "/nativelink/MockComponentImpl2".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl2", + message: "ok".into(), + }, + }, + HealthStatusDescription { + namespace: "/nativelink/MockComponentImpl3".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl3", + message: "ok".into(), + }, + }, + ]); + + assert_eq!(vec_to_set(health_status), expected_health_status); + + Ok(()) + } + + #[tokio::test] + async fn create_multiple_indicators_with_sub_registry() -> Result<(), Error> { + generate_health_status_indicator!(MockComponentImpl1, Ok, "ok"); + generate_health_status_indicator!(MockComponentImpl2, Ok, "ok"); + generate_health_status_indicator!(MockComponentImpl3, Ok, "ok"); + + let mut health_registry_builder = HealthRegistryBuilder::new("nativelink".into()); + + let mut sub_builder = health_registry_builder.sub_builder("namespace1".into()); + sub_builder.register_indicator(Arc::new(MockComponentImpl1 {})); + let mut sub_builder = health_registry_builder.sub_builder("namespace2".into()); + sub_builder.register_indicator(Arc::new(MockComponentImpl2 {})); + + health_registry_builder + .sub_builder("namespace3".into()) + .register_indicator(Arc::new(MockComponentImpl3 {})); + + let health_registry = health_registry_builder.build(); + let health_status: Vec = health_registry.health_status_report().collect().await; + + assert_eq!(health_status.len(), 3); + let expected_health_status = vec_to_set(vec![ + HealthStatusDescription { + namespace: "/nativelink/namespace1/MockComponentImpl1".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl1", + message: "ok".into(), + }, + }, + HealthStatusDescription { + namespace: "/nativelink/namespace2/MockComponentImpl2".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl2", + message: "ok".into(), + }, + }, + HealthStatusDescription { + namespace: "/nativelink/namespace3/MockComponentImpl3".into(), + status: HealthStatus::Ok { + struct_name: "MockComponentImpl3", + message: "ok".into(), + }, + }, + ]); + + assert_eq!(vec_to_set(health_status), expected_health_status); + + Ok(()) + } + + #[macro_export] + macro_rules! generate_health_status_indicator { + ($struct_name:ident, $health_status:ident, $status_msg:expr) => { + struct $struct_name; + + #[async_trait::async_trait] + impl HealthStatusIndicator for $struct_name { + fn get_name(&self) -> &'static str { + stringify!($struct_name).into() + } + async fn check_health(&self, _namespace: Cow<'static, str>) -> HealthStatus { + HealthStatus::$health_status { + struct_name: stringify!($struct_name).into(), + message: $status_msg.into(), + } + } + } + }; + } + + fn vec_to_set(vec: Vec) -> HashSet { + HashSet::from_iter(vec) + } +} diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index b8a222a7a..878cdb8f7 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -21,7 +21,7 @@ use async_lock::Mutex as AsyncMutex; use axum::Router; use clap::Parser; use futures::future::{select_all, BoxFuture, OptionFuture, TryFutureExt}; -use futures::FutureExt; +use futures::{FutureExt, StreamExt}; use hyper::server::conn::Http; use hyper::{Response, StatusCode}; use nativelink_config::cas_server::{ @@ -41,6 +41,7 @@ use nativelink_store::default_store_factory::store_factory; use nativelink_store::store_manager::StoreManager; use nativelink_util::common::fs::{set_idle_file_descriptor_timeout, set_open_file_limit}; use nativelink_util::digest_hasher::{set_default_digest_hasher_func, DigestHasherFunc}; +use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatusDescription, HealthStatusReporter}; use nativelink_util::metrics_utils::{ set_metrics_enabled_for_this_thread, Collector, CollectorState, Counter, MetricsComponent, Registry, }; @@ -71,6 +72,9 @@ const DEFAULT_ADMIN_API_PATH: &str = "/admin"; /// Name of environment variable to disable metrics. const METRICS_DISABLE_ENV: &str = "NATIVELINK_DISABLE_METRICS"; +/// Content type header value for JSON. +const JSON_CONTENT_TYPE: &str = "application/json; charset=utf-8"; + /// Backend for bazel remote execution / cache API. #[derive(Parser, Debug)] #[clap( @@ -87,17 +91,27 @@ struct Args { async fn inner_main(cfg: CasConfig, server_start_timestamp: u64) -> Result<(), Box> { let mut root_metrics_registry = ::with_prefix("nativelink"); + let health_registry_builder = Arc::new(AsyncMutex::new(HealthRegistryBuilder::new("nativelink".into()))); let store_manager = Arc::new(StoreManager::new()); { + let mut health_registry_lock = health_registry_builder.lock().await; let root_store_metrics = root_metrics_registry.sub_registry_with_prefix("stores"); + for (name, store_cfg) in cfg.stores { + let health_component_name = format!("stores/{name}"); + let mut health_register_store = health_registry_lock.sub_builder(health_component_name.into()); let store_metrics = root_store_metrics.sub_registry_with_prefix(&name); store_manager.add_store( &name, - store_factory(&store_cfg, &store_manager, Some(store_metrics)) - .await - .err_tip(|| format!("Failed to create store '{name}'"))?, + store_factory( + &store_cfg, + &store_manager, + Some(store_metrics), + Some(&mut health_register_store), + ) + .await + .err_tip(|| format!("Failed to create store '{name}'"))?, ); } } @@ -349,12 +363,49 @@ async fn inner_main(cfg: CasConfig, server_start_timestamp: u64) -> Result<(), B ); let root_metrics_registry = root_metrics_registry.clone(); + let health_registry_status = health_registry_builder.lock().await.build(); let mut svc = Router::new() // This is the default service that executes if no other endpoint matches. .fallback_service(tonic_services.into_service().map_err(|e| panic!("{e}"))) - // This is a generic endpoint used to check if the server is up. - .route_service("/status", axum::routing::get(move || async move { "Ok".to_string() })); + .route_service( + "/status", + axum::routing::get(move || async move { + fn error_to_response(e: E) -> Response { + let mut response = Response::new(format!("Error: {e:?}")); + *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + response + } + + spawn_blocking(move || { + futures::executor::block_on(async { + let health_status_descriptions: Vec = + health_registry_status.health_status_report().collect().await; + + match serde_json5::to_string(&health_status_descriptions) { + Ok(health_status_descriptions) => Response::builder() + .status(StatusCode::OK) + .header( + hyper::header::CONTENT_TYPE, + hyper::header::HeaderValue::from_static(JSON_CONTENT_TYPE), + ) + .body(health_status_descriptions) + .unwrap(), + Err(e) => Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .header( + hyper::header::CONTENT_TYPE, + hyper::header::HeaderValue::from_static(JSON_CONTENT_TYPE), + ) + .body(format!("Internal Failure: {e:?}")) + .unwrap(), + } + }) + }) + .await + .unwrap_or_else(error_to_response) + }), + ); if let Some(prometheus_cfg) = services.experimental_prometheus { fn error_to_response(e: E) -> Response {