diff --git a/Cargo.lock b/Cargo.lock index 73572e3f5..7e6e2a7e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1792,6 +1792,7 @@ dependencies = [ "serde", "sha2", "shellexpand", + "tempfile", "tokio", "tokio-stream", "tokio-util", diff --git a/nativelink-service/tests/ac_server_test.rs b/nativelink-service/tests/ac_server_test.rs index 6f8c458a4..3c5da9cbd 100644 --- a/nativelink-service/tests/ac_server_test.rs +++ b/nativelink-service/tests/ac_server_test.rs @@ -55,6 +55,7 @@ async fn make_store_manager() -> Result, Error> { &nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()), &store_manager, Some(&mut ::default()), + None, ) .await?, ); @@ -64,6 +65,7 @@ async fn make_store_manager() -> Result, Error> { &nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()), &store_manager, Some(&mut ::default()), + None, ) .await?, ); diff --git a/nativelink-service/tests/bytestream_server_test.rs b/nativelink-service/tests/bytestream_server_test.rs index 370a6d6a4..20f39fffd 100644 --- a/nativelink-service/tests/bytestream_server_test.rs +++ b/nativelink-service/tests/bytestream_server_test.rs @@ -41,6 +41,7 @@ async fn make_store_manager() -> Result, Error> { &nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()), &store_manager, Some(&mut ::default()), + None, ) .await?, ); diff --git a/nativelink-service/tests/cas_server_test.rs b/nativelink-service/tests/cas_server_test.rs index 6f8c60c4f..ffc3aaf6a 100644 --- a/nativelink-service/tests/cas_server_test.rs +++ b/nativelink-service/tests/cas_server_test.rs @@ -41,6 +41,7 @@ async fn make_store_manager() -> Result, Error> { &nativelink_config::stores::StoreConfig::memory(nativelink_config::stores::MemoryStore::default()), &store_manager, Some(&mut ::default()), + None, ) .await?, ); diff --git a/nativelink-store/BUILD.bazel b/nativelink-store/BUILD.bazel index 606a0d6a4..002a6e27c 100644 --- a/nativelink-store/BUILD.bazel +++ b/nativelink-store/BUILD.bazel @@ -58,6 +58,7 @@ rust_library( "@crates//:serde", "@crates//:sha2", "@crates//:shellexpand", + "@crates//:tempfile", "@crates//:tokio", "@crates//:tokio-stream", "@crates//:tokio-util", diff --git a/nativelink-store/Cargo.toml b/nativelink-store/Cargo.toml index a1c67bdb1..0ffd051c8 100644 --- a/nativelink-store/Cargo.toml +++ b/nativelink-store/Cargo.toml @@ -30,6 +30,7 @@ rand = "0.8.5" serde = "1.0.193" sha2 = "0.10.8" shellexpand = "3.1.0" +tempfile = "3.9.0" tokio = { version = "1.35.1" } tokio-stream = { version = "0.1.14", features = ["fs"] } tokio-util = { version = "0.7.10" } diff --git a/nativelink-store/src/completeness_checking_store.rs b/nativelink-store/src/completeness_checking_store.rs index 00a3d4ffc..c58051bc1 100644 --- a/nativelink-store/src/completeness_checking_store.rs +++ b/nativelink-store/src/completeness_checking_store.rs @@ -25,6 +25,8 @@ use nativelink_proto::build::bazel::remote::execution::v2::{ }; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::default_health_status_indicator; +use nativelink_util::health_utils::HealthStatusIndicator; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use parking_lot::Mutex; use tokio::sync::Notify; @@ -361,3 +363,5 @@ impl Store for CompletenessCheckingStore { Box::new(self) } } + +default_health_status_indicator!(CompletenessCheckingStore); diff --git a/nativelink-store/src/compression_store.rs b/nativelink-store/src/compression_store.rs index c2677669b..341c4052f 100644 --- a/nativelink-store/src/compression_store.rs +++ b/nativelink-store/src/compression_store.rs @@ -26,6 +26,8 @@ use lz4_flex::block::{compress_into, decompress_into, get_maximum_output_size}; use nativelink_error::{error_if, make_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::{DigestInfo, JoinHandleDropGuard}; +use nativelink_util::default_health_status_indicator; +use nativelink_util::health_utils::HealthStatusIndicator; use nativelink_util::metrics_utils::Registry; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use serde::{Deserialize, Serialize}; @@ -590,3 +592,5 @@ impl Store for CompressionStore { self.inner_store.clone().register_metrics(inner_store_registry); } } + +default_health_status_indicator!(CompressionStore); diff --git a/nativelink-store/src/dedup_store.rs b/nativelink-store/src/dedup_store.rs index a9bbd9e04..4b30c1130 100644 --- a/nativelink-store/src/dedup_store.rs +++ b/nativelink-store/src/dedup_store.rs @@ -23,7 +23,9 @@ use futures::stream::{self, FuturesOrdered, StreamExt, TryStreamExt}; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf, StreamReader}; use nativelink_util::common::DigestInfo; +use nativelink_util::default_health_status_indicator; use nativelink_util::fastcdc::FastCDC; +use nativelink_util::health_utils::HealthStatusIndicator; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use serde::{Deserialize, Serialize}; use tokio_util::codec::FramedRead; @@ -349,3 +351,5 @@ impl Store for DedupStore { Box::new(self) } } + +default_health_status_indicator!(DedupStore); diff --git a/nativelink-store/src/default_store_factory.rs b/nativelink-store/src/default_store_factory.rs index 4e54251c3..595c15e35 100644 --- a/nativelink-store/src/default_store_factory.rs +++ b/nativelink-store/src/default_store_factory.rs @@ -19,6 +19,7 @@ use futures::stream::FuturesOrdered; use futures::{Future, TryStreamExt}; use nativelink_config::stores::StoreConfig; use nativelink_error::Error; +use nativelink_util::health_utils::HealthRegistryBuilder; use nativelink_util::metrics_utils::Registry; use nativelink_util::store_trait::Store; @@ -44,6 +45,7 @@ pub fn store_factory<'a>( backend: &'a StoreConfig, store_manager: &'a Arc, maybe_store_metrics: Option<&'a mut Registry>, + maybe_health_registry_builder: Option<&'a mut HealthRegistryBuilder>, ) -> Pin> { Box::pin(async move { let store: Arc = match backend { @@ -51,36 +53,36 @@ pub fn store_factory<'a>( StoreConfig::experimental_s3_store(config) => Arc::new(S3Store::new(config).await?), StoreConfig::verify(config) => Arc::new(VerifyStore::new( config, - store_factory(&config.backend, store_manager, None).await?, + store_factory(&config.backend, store_manager, None, None).await?, )), StoreConfig::compression(config) => Arc::new(CompressionStore::new( *config.clone(), - store_factory(&config.backend, store_manager, None).await?, + store_factory(&config.backend, store_manager, None, None).await?, )?), StoreConfig::dedup(config) => Arc::new(DedupStore::new( config, - store_factory(&config.index_store, store_manager, None).await?, - store_factory(&config.content_store, store_manager, None).await?, + store_factory(&config.index_store, store_manager, None, None).await?, + store_factory(&config.content_store, store_manager, None, None).await?, )), StoreConfig::existence_cache(config) => Arc::new(ExistenceCacheStore::new( config, - store_factory(&config.backend, store_manager, None).await?, + store_factory(&config.backend, store_manager, None, None).await?, )), StoreConfig::completeness_checking(config) => Arc::new(CompletenessCheckingStore::new( - store_factory(&config.backend, store_manager, None).await?, - store_factory(&config.cas_store, store_manager, None).await?, + store_factory(&config.backend, store_manager, None, None).await?, + store_factory(&config.cas_store, store_manager, None, None).await?, )), StoreConfig::fast_slow(config) => Arc::new(FastSlowStore::new( config, - store_factory(&config.fast, store_manager, None).await?, - store_factory(&config.slow, store_manager, None).await?, + store_factory(&config.fast, store_manager, None, None).await?, + store_factory(&config.slow, store_manager, None, None).await?, )), StoreConfig::filesystem(config) => Arc::new(::new(config).await?), StoreConfig::ref_store(config) => Arc::new(RefStore::new(config, Arc::downgrade(store_manager))), StoreConfig::size_partitioning(config) => Arc::new(SizePartitioningStore::new( config, - store_factory(&config.lower_store, store_manager, None).await?, - store_factory(&config.upper_store, store_manager, None).await?, + store_factory(&config.lower_store, store_manager, None, None).await?, + store_factory(&config.upper_store, store_manager, None, None).await?, )), StoreConfig::grpc(config) => Arc::new(GrpcStore::new(config).await?), StoreConfig::noop => Arc::new(NoopStore::new()), @@ -88,7 +90,7 @@ pub fn store_factory<'a>( let stores = config .stores .iter() - .map(|store_config| store_factory(&store_config.store, store_manager, None)) + .map(|store_config| store_factory(&store_config.store, store_manager, None, None)) .collect::>() .try_collect::>() .await?; @@ -98,6 +100,11 @@ pub fn store_factory<'a>( if let Some(store_metrics) = maybe_store_metrics { store.clone().register_metrics(store_metrics); } + + if let Some(health_registry_builder) = maybe_health_registry_builder { + store.clone().register_health(health_registry_builder); + } + Ok(store) }) } diff --git a/nativelink-store/src/existence_cache_store.rs b/nativelink-store/src/existence_cache_store.rs index 9b6c60595..9a3757814 100644 --- a/nativelink-store/src/existence_cache_store.rs +++ b/nativelink-store/src/existence_cache_store.rs @@ -22,7 +22,9 @@ use nativelink_config::stores::{EvictionPolicy, ExistenceCacheStore as Existence use nativelink_error::{error_if, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::default_health_status_indicator; use nativelink_util::evicting_map::{EvictingMap, LenEntry}; +use nativelink_util::health_utils::HealthStatusIndicator; use nativelink_util::metrics_utils::{CollectorState, MetricsComponent, Registry}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; @@ -198,3 +200,5 @@ impl MetricsComponent for ExistenceCacheStore { self.existence_cache.gather_metrics(c) } } + +default_health_status_indicator!(ExistenceCacheStore); diff --git a/nativelink-store/src/fast_slow_store.rs b/nativelink-store/src/fast_slow_store.rs index 4ba77d9c4..76ed30a98 100644 --- a/nativelink-store/src/fast_slow_store.rs +++ b/nativelink-store/src/fast_slow_store.rs @@ -22,6 +22,8 @@ use futures::{join, FutureExt}; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::default_health_status_indicator; +use nativelink_util::health_utils::HealthStatusIndicator; use nativelink_util::metrics_utils::Registry; use nativelink_util::store_trait::{Store, UploadSizeInfo}; @@ -268,3 +270,5 @@ impl Store for FastSlowStore { self.slow_store.clone().register_metrics(slow_store_registry); } } + +default_health_status_indicator!(FastSlowStore); diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 94a2c342f..facad1ff4 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -29,6 +29,7 @@ use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::{fs, DigestInfo}; use nativelink_util::evicting_map::{EvictingMap, LenEntry}; +use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatusIndicator}; use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; @@ -750,6 +751,10 @@ impl Store for FilesystemStore { fn register_metrics(self: Arc, registry: &mut Registry) { registry.register_collector(Box::new(Collector::new(&self))); } + + fn register_health(self: Arc, registry: &mut HealthRegistryBuilder) { + registry.register_indicator(self); + } } impl MetricsComponent for FilesystemStore { @@ -777,3 +782,10 @@ impl MetricsComponent for FilesystemStore { c.publish("evicting_map", self.evicting_map.as_ref(), ""); } } + +#[async_trait] +impl HealthStatusIndicator for FilesystemStore { + fn get_name(&self) -> &'static str { + "FilesystemStore" + } +} diff --git a/nativelink-store/src/grpc_store.rs b/nativelink-store/src/grpc_store.rs index de199b25a..ac5c5466d 100644 --- a/nativelink-store/src/grpc_store.rs +++ b/nativelink-store/src/grpc_store.rs @@ -37,11 +37,12 @@ use nativelink_proto::google::bytestream::{ use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; use nativelink_util::grpc_utils::ConnectionManager; +use nativelink_util::health_utils::HealthStatusIndicator; use nativelink_util::resource_info::ResourceInfo; use nativelink_util::retry::{Retrier, RetryResult}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; -use nativelink_util::tls_utils; use nativelink_util::write_request_stream_wrapper::WriteRequestStreamWrapper; +use nativelink_util::{default_health_status_indicator, tls_utils}; use parking_lot::Mutex; use prost::Message; use rand::rngs::OsRng; @@ -844,3 +845,5 @@ impl Store for GrpcStore { Box::new(self) } } + +default_health_status_indicator!(GrpcStore); diff --git a/nativelink-store/src/memory_store.rs b/nativelink-store/src/memory_store.rs index ebf129987..e2d54fa4b 100644 --- a/nativelink-store/src/memory_store.rs +++ b/nativelink-store/src/memory_store.rs @@ -22,7 +22,9 @@ use bytes::{Bytes, BytesMut}; use nativelink_error::{Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::default_health_status_indicator; use nativelink_util::evicting_map::{EvictingMap, LenEntry}; +use nativelink_util::health_utils::HealthStatusIndicator; use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; @@ -170,3 +172,5 @@ impl MetricsComponent for MemoryStore { c.publish("evicting_map", &self.evicting_map, ""); } } + +default_health_status_indicator!(MemoryStore); diff --git a/nativelink-store/src/noop_store.rs b/nativelink-store/src/noop_store.rs index a3a3c6b51..084f37dd5 100644 --- a/nativelink-store/src/noop_store.rs +++ b/nativelink-store/src/noop_store.rs @@ -19,6 +19,8 @@ use async_trait::async_trait; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::default_health_status_indicator; +use nativelink_util::health_utils::HealthStatusIndicator; use nativelink_util::store_trait::{Store, UploadSizeInfo}; #[derive(Default)] @@ -71,3 +73,5 @@ impl Store for NoopStore { Box::new(self) } } + +default_health_status_indicator!(NoopStore); diff --git a/nativelink-store/src/ref_store.rs b/nativelink-store/src/ref_store.rs index ff2f11d31..3322f5ebb 100644 --- a/nativelink-store/src/ref_store.rs +++ b/nativelink-store/src/ref_store.rs @@ -20,6 +20,8 @@ use async_trait::async_trait; use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::default_health_status_indicator; +use nativelink_util::health_utils::HealthStatusIndicator; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use tracing::error; @@ -137,3 +139,5 @@ impl Store for RefStore { Box::new(self) } } + +default_health_status_indicator!(RefStore); diff --git a/nativelink-store/src/s3_store.rs b/nativelink-store/src/s3_store.rs index 2179846a4..2c920a7a4 100644 --- a/nativelink-store/src/s3_store.rs +++ b/nativelink-store/src/s3_store.rs @@ -40,6 +40,8 @@ use hyper_rustls::{HttpsConnector, MaybeHttpsStream}; use nativelink_error::{error_if, make_err, make_input_err, Code, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::default_health_status_indicator; +use nativelink_util::health_utils::HealthStatusIndicator; use nativelink_util::retry::{Retrier, RetryResult}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use rand::rngs::OsRng; @@ -534,3 +536,5 @@ impl Store for S3Store { Box::new(self) } } + +default_health_status_indicator!(S3Store); diff --git a/nativelink-store/src/shard_store.rs b/nativelink-store/src/shard_store.rs index 5a858e862..00aa20bc6 100644 --- a/nativelink-store/src/shard_store.rs +++ b/nativelink-store/src/shard_store.rs @@ -21,6 +21,8 @@ use futures::stream::{FuturesUnordered, TryStreamExt}; use nativelink_error::{error_if, Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::default_health_status_indicator; +use nativelink_util::health_utils::HealthStatusIndicator; use nativelink_util::metrics_utils::Registry; use nativelink_util::store_trait::{Store, UploadSizeInfo}; @@ -191,3 +193,5 @@ impl Store for ShardStore { } } } + +default_health_status_indicator!(ShardStore); diff --git a/nativelink-store/src/size_partitioning_store.rs b/nativelink-store/src/size_partitioning_store.rs index 3d92148c6..952669630 100644 --- a/nativelink-store/src/size_partitioning_store.rs +++ b/nativelink-store/src/size_partitioning_store.rs @@ -19,6 +19,8 @@ use async_trait::async_trait; use nativelink_error::{Error, ResultExt}; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::default_health_status_indicator; +use nativelink_util::health_utils::HealthStatusIndicator; use nativelink_util::store_trait::{Store, UploadSizeInfo}; use tokio::join; @@ -128,3 +130,5 @@ impl Store for SizePartitioningStore { Box::new(self) } } + +default_health_status_indicator!(SizePartitioningStore); diff --git a/nativelink-store/src/verify_store.rs b/nativelink-store/src/verify_store.rs index 2cabf95a7..c92822e00 100644 --- a/nativelink-store/src/verify_store.rs +++ b/nativelink-store/src/verify_store.rs @@ -21,7 +21,9 @@ use nativelink_config::stores::ConfigDigestHashFunction; use nativelink_error::{make_input_err, Error, ResultExt}; use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::default_health_status_indicator; use nativelink_util::digest_hasher::{DigestHasher, DigestHasherFunc}; +use nativelink_util::health_utils::HealthStatusIndicator; use nativelink_util::metrics_utils::{Collector, CollectorState, CounterWithTime, MetricsComponent, Registry}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; @@ -199,3 +201,5 @@ impl MetricsComponent for VerifyStore { ); } } + +default_health_status_indicator!(VerifyStore); diff --git a/nativelink-store/tests/fast_slow_store_test.rs b/nativelink-store/tests/fast_slow_store_test.rs index d21dced01..eb1545f30 100644 --- a/nativelink-store/tests/fast_slow_store_test.rs +++ b/nativelink-store/tests/fast_slow_store_test.rs @@ -76,6 +76,8 @@ mod fast_slow_store_tests { use bytes::Bytes; use nativelink_error::{make_err, Code, ResultExt}; use nativelink_util::buf_channel::make_buf_channel_pair; + use nativelink_util::default_health_status_indicator; + use nativelink_util::health_utils::HealthStatusIndicator; use pretty_assertions::assert_eq; use super::*; // Must be declared in every module. @@ -301,6 +303,8 @@ mod fast_slow_store_tests { } } + default_health_status_indicator!(DropCheckStore); + let digest = DigestInfo::try_new(VALID_HASH, 100).unwrap(); let (fast_store_read_tx, fast_store_read_rx) = tokio::sync::oneshot::channel(); let (fast_store_eof_tx, fast_store_eof_rx) = tokio::sync::oneshot::channel(); diff --git a/nativelink-util/BUILD.bazel b/nativelink-util/BUILD.bazel index 77d5302d0..81d7f0187 100644 --- a/nativelink-util/BUILD.bazel +++ b/nativelink-util/BUILD.bazel @@ -17,6 +17,7 @@ rust_library( "src/fastcdc.rs", "src/fs.rs", "src/grpc_utils.rs", + "src/health_utils.rs", "src/lib.rs", "src/metrics_utils.rs", "src/platform_properties.rs", @@ -64,6 +65,7 @@ rust_test_suite( "tests/evicting_map_test.rs", "tests/fastcdc_test.rs", "tests/fs_test.rs", + "tests/health_utils_test.rs", "tests/resource_info_test.rs", "tests/retry_test.rs", ], diff --git a/nativelink-util/src/health_utils.rs b/nativelink-util/src/health_utils.rs new file mode 100644 index 000000000..c2fe9372a --- /dev/null +++ b/nativelink-util/src/health_utils.rs @@ -0,0 +1,160 @@ +// Copyright 2024 The Native Link Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::borrow::Cow; +use std::collections::HashMap; +use std::fmt::Debug; +use std::marker::Send; +use std::pin::Pin; +use std::sync::Arc; + +use async_trait::async_trait; +use futures::{Stream, StreamExt}; +use parking_lot::Mutex; +use serde::Serialize; + +/// Struct name health indicator component. +type StructName = str; +/// Readable message status of the health indicator. +type Message = str; + +/// Status state of a health indicator. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] +pub enum HealthStatus { + Ok(&'static StructName, Cow<'static, Message>), + Initializing(&'static StructName, Cow<'static, Message>), + Warning(&'static StructName, Cow<'static, Message>), + Failed(&'static StructName, Cow<'static, Message>), +} + +impl HealthStatus { + /// Make a healthy status. + pub fn new_ok(component: &(impl HealthStatusIndicator + ?Sized), message: Cow<'static, str>) -> Self { + Self::Ok(component.struct_name(), message) + } + + /// Make an initializing status. + pub fn new_initializing( + component: &(impl HealthStatusIndicator + ?Sized), + message: Cow<'static, str>, + ) -> HealthStatus { + Self::Initializing(component.struct_name(), message) + } + + /// Make a warning status. + pub fn new_warning(component: &(impl HealthStatusIndicator + ?Sized), message: Cow<'static, str>) -> HealthStatus { + Self::Warning(component.struct_name(), message) + } + + /// Make a failed status. + pub fn new_failed(component: &(impl HealthStatusIndicator + ?Sized), message: Cow<'static, str>) -> HealthStatus { + Self::Failed(component.struct_name(), message) + } +} + +/// Description of the health status of a component. +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize)] +pub struct HealthStatusDescription { + pub namespace: Cow<'static, str>, + pub status: HealthStatus, +} + +/// Health status indicator trait. This trait is used to define +/// a health status indicator by implementing the `check_health` function. +/// A default implementation is provided for the `check_health` function +/// that returns healthy component. +#[async_trait] +pub trait HealthStatusIndicator: Sync + Send + Unpin { + fn get_name(&self) -> &'static str; + + /// Returns the name of the struct implementing the trait. + fn struct_name(&self) -> &'static str { + std::any::type_name::() + } + + /// Check the health status of the component. This function should be + /// implemented by the component to check the health status of the component. + async fn check_health(&self, _namespace: Cow<'static, str>) -> HealthStatus { + HealthStatus::new_ok(self, "ok".into()) + } +} + +type HealthRegistryBuilderState = Arc, Arc>>>; +pub struct HealthRegistryBuilder { + namespace: Cow<'static, str>, + state: HealthRegistryBuilderState, +} + +impl HealthRegistryBuilder { + pub fn new(namespace: Cow<'static, str>) -> Self { + Self { + namespace: format!("/{}", namespace).into(), + state: Arc::new(Mutex::new(HashMap::new())), + } + } + + pub fn register_indicator(&mut self, indicator: Arc) { + let name = format!("{}/{}", self.namespace, indicator.get_name()); + self.state.lock().insert(name.into(), indicator); + } + + pub fn sub_builder(&mut self, namespace: Cow<'static, str>) -> HealthRegistryBuilder { + HealthRegistryBuilder { + namespace: format!("{}/{}", self.namespace, namespace).into(), + state: self.state.clone(), + } + } + + pub fn build(&mut self) -> HealthRegistry { + HealthRegistry { + indicators: self.state.lock().clone().into_iter().collect(), + } + } +} + +/// Health registry to register health status indicators and sub-registries. +/// Internal representation of the health registry is a tree structure. +#[derive(Default, Clone)] +pub struct HealthRegistry { + indicators: Vec<(Cow<'static, str>, Arc)>, +} + +pub trait HealthStatusReporter { + fn health_status_report(&self) -> Pin + '_>>; +} + +impl HealthStatusReporter for HealthRegistry { + fn health_status_report(&self) -> Pin + '_>> { + Box::pin( + futures::stream::iter(self.indicators.iter()).then(|(namespace, indicator)| async move { + HealthStatusDescription { + namespace: namespace.clone(), + status: indicator.check_health(namespace.clone()).await, + } + }), + ) + } +} + +#[macro_export] +macro_rules! default_health_status_indicator { + ($type:ty) => { + #[async_trait::async_trait] + impl HealthStatusIndicator for $type { + fn get_name(&self) -> &'static str { + stringify!($type) + } + } + }; +} diff --git a/nativelink-util/src/lib.rs b/nativelink-util/src/lib.rs index 62c615c0e..c493b9f70 100644 --- a/nativelink-util/src/lib.rs +++ b/nativelink-util/src/lib.rs @@ -20,6 +20,7 @@ pub mod evicting_map; pub mod fastcdc; pub mod fs; pub mod grpc_utils; +pub mod health_utils; pub mod metrics_utils; pub mod platform_properties; pub mod resource_info; diff --git a/nativelink-util/src/store_trait.rs b/nativelink-util/src/store_trait.rs index 2c6a4b721..6bee4621a 100644 --- a/nativelink-util/src/store_trait.rs +++ b/nativelink-util/src/store_trait.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::borrow::Cow; use std::marker::Send; use std::pin::Pin; use std::sync::Arc; @@ -24,6 +25,7 @@ use serde::{Deserialize, Serialize}; use crate::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use crate::common::DigestInfo; +use crate::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator}; use crate::metrics_utils::Registry; #[derive(Debug, PartialEq, Copy, Clone, Serialize, Deserialize)] @@ -40,7 +42,7 @@ pub enum UploadSizeInfo { } #[async_trait] -pub trait Store: Sync + Send + Unpin { +pub trait Store: Sync + Send + Unpin + HealthStatusIndicator { /// Look up a digest in the store and return None if it does not exist in /// the store, or Some(size) if it does. /// Note: On an AC store the size will be incorrect and should not be used! @@ -168,6 +170,59 @@ pub trait Store: Sync + Send + Unpin { .merge(data_res.err_tip(|| "Failed to read stream to completion in get_part_unchunked")) } + // Default implementation of the health check. Some stores may want to override this + // in situations where the default implementation is not sufficient. + async fn check_health(self: Pin<&Self>, _namespace: Cow<'static, str>) -> HealthStatus { + let bytes = _namespace.as_bytes(); + let hash = blake3::hash(bytes).into(); + let len = bytes.len(); + let digest_info = DigestInfo::new(hash, len as i64); + let bytes_copy = bytes::Bytes::copy_from_slice(bytes); + + match self.update_oneshot(digest_info, bytes_copy.clone()).await { + Ok(_) => {} + Err(e) => { + return HealthStatus::new_failed( + self.get_ref(), + format!("Store.update_oneshot() failed: {}", e).into(), + ); + } + } + + match self.get_part_unchunked(digest_info, 0, Some(len), Some(len)).await { + Ok(b) => { + if b != bytes_copy { + return HealthStatus::new_failed(self.get_ref(), "Store.get_part_unchunked() data mismatch".into()); + } + } + Err(e) => { + return HealthStatus::new_failed( + self.get_ref(), + format!("Store.get_part_unchunked() failed: {}", e).into(), + ); + } + } + + match self.has(digest_info).await { + Ok(Some(s)) => { + if s != len { + return HealthStatus::new_failed( + self.get_ref(), + format!("Store.has() size mismatch {s} != {len}").into(), + ); + } + } + Ok(None) => { + return HealthStatus::new_failed(self.get_ref(), "Store.has() size not found".into()); + } + Err(e) => { + return HealthStatus::new_failed(self.get_ref(), format!("Store.has() failed: {}", e).into()); + } + } + + HealthStatus::new_ok(self.get_ref(), "Successfully store health check".into()) + } + /// Gets the underlying store for the given digest. This can be used to find out /// what any underlying store is for a given digest will be and hand it to the caller. /// A caller might want to use this to obtain a reference to the "real" underlying store @@ -180,4 +235,7 @@ pub trait Store: Sync + Send + Unpin { /// Register any metrics that this store wants to expose to the Prometheus. fn register_metrics(self: Arc, _registry: &mut Registry) {} + + // Register health checks used to monitor the store. + fn register_health(self: Arc, _registry: &mut HealthRegistryBuilder) {} } diff --git a/nativelink-util/tests/health_utils_test.rs b/nativelink-util/tests/health_utils_test.rs new file mode 100644 index 000000000..ce20c7eb1 --- /dev/null +++ b/nativelink-util/tests/health_utils_test.rs @@ -0,0 +1,192 @@ +// Copyright 2024 The Native Link Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::borrow::Cow; +use std::collections::HashSet; +use std::iter::FromIterator; +use std::sync::Arc; + +use futures::StreamExt; +use nativelink_error::Error; +use nativelink_util::health_utils::*; + +#[cfg(test)] +mod health_utils_tests { + + use pretty_assertions::assert_eq; + + use super::*; + + #[tokio::test] + async fn create_empty_indicator() -> Result<(), Error> { + let mut health_registry_builder = HealthRegistryBuilder::new("nativelink".into()); + let health_registry = health_registry_builder.build(); + let health_status: Vec = health_registry.health_status_report().collect().await; + assert_eq!(health_status.len(), 0); + Ok(()) + } + + #[tokio::test] + async fn create_register_indicator() -> Result<(), Error> { + generate_health_status_indicator!(MockComponentImpl, HealthStatus::Ok, "ok"); + + let mut health_registry_builder = HealthRegistryBuilder::new("nativelink".into()); + + health_registry_builder.register_indicator(Arc::new(MockComponentImpl {})); + + let health_registry = health_registry_builder.build(); + let health_status: Vec = health_registry.health_status_report().collect().await; + + assert_eq!(health_status.len(), 1); + assert_eq!( + health_status, + vec![HealthStatusDescription { + namespace: "/nativelink/MockComponentImpl".into(), + status: HealthStatus::Ok("MockComponentImpl", "ok".into()), + }] + ); + + Ok(()) + } + + #[tokio::test] + async fn create_sub_registry() -> Result<(), Error> { + generate_health_status_indicator!(MockComponentImpl, HealthStatus::Ok, "ok"); + + let mut health_registry_builder = HealthRegistryBuilder::new("nativelink".into()); + + health_registry_builder.register_indicator(Arc::new(MockComponentImpl {})); + + let mut namespace1_registry = health_registry_builder.sub_builder("namespace1".into()); + + namespace1_registry.register_indicator(Arc::new(MockComponentImpl {})); + + let health_registry = health_registry_builder.build(); + let health_status: Vec = health_registry.health_status_report().collect().await; + + assert_eq!(health_status.len(), 2); + let expected_health_status = vec_to_set(vec![ + HealthStatusDescription { + namespace: "/nativelink/MockComponentImpl".into(), + status: HealthStatus::Ok("MockComponentImpl", "ok".into()), + }, + HealthStatusDescription { + namespace: "/nativelink/namespace1/MockComponentImpl".into(), + status: HealthStatus::Ok("MockComponentImpl", "ok".into()), + }, + ]); + + assert_eq!(vec_to_set(health_status), expected_health_status); + + Ok(()) + } + + #[tokio::test] + async fn create_multiple_indicators_same_registry() -> Result<(), Error> { + generate_health_status_indicator!(MockComponentImpl1, HealthStatus::Ok, "ok"); + generate_health_status_indicator!(MockComponentImpl2, HealthStatus::Ok, "ok"); + generate_health_status_indicator!(MockComponentImpl3, HealthStatus::Ok, "ok"); + + let mut health_registry_builder = HealthRegistryBuilder::new("nativelink".into()); + + health_registry_builder.register_indicator(Arc::new(MockComponentImpl1 {})); + health_registry_builder.register_indicator(Arc::new(MockComponentImpl2 {})); + health_registry_builder.register_indicator(Arc::new(MockComponentImpl3 {})); + + let health_registry = health_registry_builder.build(); + let health_status: Vec = health_registry.health_status_report().collect().await; + + assert_eq!(health_status.len(), 3); + let expected_health_status = vec_to_set(vec![ + HealthStatusDescription { + namespace: "/nativelink/MockComponentImpl1".into(), + status: HealthStatus::Ok("MockComponentImpl1", "ok".into()), + }, + HealthStatusDescription { + namespace: "/nativelink/MockComponentImpl2".into(), + status: HealthStatus::Ok("MockComponentImpl2", "ok".into()), + }, + HealthStatusDescription { + namespace: "/nativelink/MockComponentImpl3".into(), + status: HealthStatus::Ok("MockComponentImpl3", "ok".into()), + }, + ]); + + assert_eq!(vec_to_set(health_status), expected_health_status); + + Ok(()) + } + + #[tokio::test] + async fn create_multiple_indicators_with_sub_registry() -> Result<(), Error> { + generate_health_status_indicator!(MockComponentImpl1, HealthStatus::Ok, "ok"); + generate_health_status_indicator!(MockComponentImpl2, HealthStatus::Ok, "ok"); + generate_health_status_indicator!(MockComponentImpl3, HealthStatus::Ok, "ok"); + + let mut health_registry_builder = HealthRegistryBuilder::new("nativelink".into()); + + let mut sub_builder = health_registry_builder.sub_builder("namespace1".into()); + sub_builder.register_indicator(Arc::new(MockComponentImpl1 {})); + let mut sub_builder = health_registry_builder.sub_builder("namespace2".into()); + sub_builder.register_indicator(Arc::new(MockComponentImpl2 {})); + + health_registry_builder + .sub_builder("namespace3".into()) + .register_indicator(Arc::new(MockComponentImpl3 {})); + + let health_registry = health_registry_builder.build(); + let health_status: Vec = health_registry.health_status_report().collect().await; + + assert_eq!(health_status.len(), 3); + let expected_health_status = vec_to_set(vec![ + HealthStatusDescription { + namespace: "/nativelink/namespace1/MockComponentImpl1".into(), + status: HealthStatus::Ok("MockComponentImpl1", "ok".into()), + }, + HealthStatusDescription { + namespace: "/nativelink/namespace2/MockComponentImpl2".into(), + status: HealthStatus::Ok("MockComponentImpl2", "ok".into()), + }, + HealthStatusDescription { + namespace: "/nativelink/namespace3/MockComponentImpl3".into(), + status: HealthStatus::Ok("MockComponentImpl3", "ok".into()), + }, + ]); + + assert_eq!(vec_to_set(health_status), expected_health_status); + + Ok(()) + } + + #[macro_export] + macro_rules! generate_health_status_indicator { + ($struct_name:ident, $health_status:expr, $status_msg:expr) => { + struct $struct_name; + + #[async_trait::async_trait] + impl HealthStatusIndicator for $struct_name { + fn get_name(&self) -> &'static str { + stringify!($struct_name).into() + } + async fn check_health(&self, _namespace: Cow<'static, str>) -> HealthStatus { + $health_status(stringify!($struct_name).into(), $status_msg.into()) + } + } + }; + } + + fn vec_to_set(vec: Vec) -> HashSet { + HashSet::from_iter(vec) + } +} diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index b8a222a7a..878cdb8f7 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -21,7 +21,7 @@ use async_lock::Mutex as AsyncMutex; use axum::Router; use clap::Parser; use futures::future::{select_all, BoxFuture, OptionFuture, TryFutureExt}; -use futures::FutureExt; +use futures::{FutureExt, StreamExt}; use hyper::server::conn::Http; use hyper::{Response, StatusCode}; use nativelink_config::cas_server::{ @@ -41,6 +41,7 @@ use nativelink_store::default_store_factory::store_factory; use nativelink_store::store_manager::StoreManager; use nativelink_util::common::fs::{set_idle_file_descriptor_timeout, set_open_file_limit}; use nativelink_util::digest_hasher::{set_default_digest_hasher_func, DigestHasherFunc}; +use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatusDescription, HealthStatusReporter}; use nativelink_util::metrics_utils::{ set_metrics_enabled_for_this_thread, Collector, CollectorState, Counter, MetricsComponent, Registry, }; @@ -71,6 +72,9 @@ const DEFAULT_ADMIN_API_PATH: &str = "/admin"; /// Name of environment variable to disable metrics. const METRICS_DISABLE_ENV: &str = "NATIVELINK_DISABLE_METRICS"; +/// Content type header value for JSON. +const JSON_CONTENT_TYPE: &str = "application/json; charset=utf-8"; + /// Backend for bazel remote execution / cache API. #[derive(Parser, Debug)] #[clap( @@ -87,17 +91,27 @@ struct Args { async fn inner_main(cfg: CasConfig, server_start_timestamp: u64) -> Result<(), Box> { let mut root_metrics_registry = ::with_prefix("nativelink"); + let health_registry_builder = Arc::new(AsyncMutex::new(HealthRegistryBuilder::new("nativelink".into()))); let store_manager = Arc::new(StoreManager::new()); { + let mut health_registry_lock = health_registry_builder.lock().await; let root_store_metrics = root_metrics_registry.sub_registry_with_prefix("stores"); + for (name, store_cfg) in cfg.stores { + let health_component_name = format!("stores/{name}"); + let mut health_register_store = health_registry_lock.sub_builder(health_component_name.into()); let store_metrics = root_store_metrics.sub_registry_with_prefix(&name); store_manager.add_store( &name, - store_factory(&store_cfg, &store_manager, Some(store_metrics)) - .await - .err_tip(|| format!("Failed to create store '{name}'"))?, + store_factory( + &store_cfg, + &store_manager, + Some(store_metrics), + Some(&mut health_register_store), + ) + .await + .err_tip(|| format!("Failed to create store '{name}'"))?, ); } } @@ -349,12 +363,49 @@ async fn inner_main(cfg: CasConfig, server_start_timestamp: u64) -> Result<(), B ); let root_metrics_registry = root_metrics_registry.clone(); + let health_registry_status = health_registry_builder.lock().await.build(); let mut svc = Router::new() // This is the default service that executes if no other endpoint matches. .fallback_service(tonic_services.into_service().map_err(|e| panic!("{e}"))) - // This is a generic endpoint used to check if the server is up. - .route_service("/status", axum::routing::get(move || async move { "Ok".to_string() })); + .route_service( + "/status", + axum::routing::get(move || async move { + fn error_to_response(e: E) -> Response { + let mut response = Response::new(format!("Error: {e:?}")); + *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + response + } + + spawn_blocking(move || { + futures::executor::block_on(async { + let health_status_descriptions: Vec = + health_registry_status.health_status_report().collect().await; + + match serde_json5::to_string(&health_status_descriptions) { + Ok(health_status_descriptions) => Response::builder() + .status(StatusCode::OK) + .header( + hyper::header::CONTENT_TYPE, + hyper::header::HeaderValue::from_static(JSON_CONTENT_TYPE), + ) + .body(health_status_descriptions) + .unwrap(), + Err(e) => Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .header( + hyper::header::CONTENT_TYPE, + hyper::header::HeaderValue::from_static(JSON_CONTENT_TYPE), + ) + .body(format!("Internal Failure: {e:?}")) + .unwrap(), + } + }) + }) + .await + .unwrap_or_else(error_to_response) + }), + ); if let Some(prometheus_cfg) = services.experimental_prometheus { fn error_to_response(e: E) -> Response {