Skip to content

Commit

Permalink
[sui_proxy/pod health] (MystenLabs#19416)
Browse files Browse the repository at this point in the history
## Description
* add a route for pod health in k8s

i am using the consumer_operations_submitted value but more values can
be added later if we need to. when this value is non-zero, it means the
service is processing data correctly and can be used to consider it
healthy enough to handle traffic. it will be implemented as a liveness
check.

## Test Plan
local

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
suiwombat authored Sep 18, 2024
1 parent fd3ffac commit f2ab91e
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 3 deletions.
5 changes: 3 additions & 2 deletions crates/sui-proxy/MYPKG
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ rust_binary(
)
mypkg(
name="sui-proxy",
version="bf9f49ff50ff1c4b3378319dfb16dd0a",
version="77007d47482d76f49ed5326807b6f2a0",
)
podman_build(
name="sui-proxy-image",
registry="docker//us-central1-docker.pkg.dev/cryptic-bolt-398315/sui-proxy",
dockerfile="sui_proxy_dockerfile",
resources=[":sui-proxy"],
)
tag="testnet"
)
65 changes: 64 additions & 1 deletion crates/sui-proxy/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,33 @@ use axum::{extract::Extension, http::StatusCode, routing::get, Router};
use mysten_metrics::RegistryService;
use prometheus::{Registry, TextEncoder};
use std::net::TcpListener;
use std::sync::{Arc, RwLock};
use tower::ServiceBuilder;
use tower_http::trace::{DefaultOnResponse, TraceLayer};
use tower_http::LatencyUnit;
use tracing::Level;

const METRICS_ROUTE: &str = "/metrics";
const POD_HEALTH_ROUTE: &str = "/pod_health";

type HealthCheckMetrics = Arc<RwLock<HealthCheck>>;

/// Do not access struct members without using HealthCheckMetrics to arc+mutex
#[derive(Debug)]
struct HealthCheck {
// eg; consumer_operations_submitted{...}
consumer_operations_submitted: f64,
}

/// HealthCheck contains fields we believe are interesting that say whether this pod should be
/// considered health. do not use w/o using an arc+mutex
impl HealthCheck {
fn new() -> Self {
Self {
consumer_operations_submitted: 0.0,
}
}
}

// Creates a new http server that has as a sole purpose to expose
// and endpoint that prometheus agent can use to poll for the metrics.
Expand All @@ -19,9 +40,13 @@ pub fn start_prometheus_server(listener: TcpListener) -> RegistryService {

let registry_service = RegistryService::new(registry);

let pod_health_data = Arc::new(RwLock::new(HealthCheck::new()));

let app = Router::new()
.route(METRICS_ROUTE, get(metrics))
.route(POD_HEALTH_ROUTE, get(pod_health))
.layer(Extension(registry_service.clone()))
.layer(Extension(pod_health_data.clone()))
.layer(
ServiceBuilder::new().layer(
TraceLayer::new_for_http().on_response(
Expand All @@ -42,9 +67,30 @@ pub fn start_prometheus_server(listener: TcpListener) -> RegistryService {
}

// DO NOT remove this handler, it is not compatible with the mysten_metrics::metric equivalent
async fn metrics(Extension(registry_service): Extension<RegistryService>) -> (StatusCode, String) {
async fn metrics(
Extension(registry_service): Extension<RegistryService>,
Extension(pod_health): Extension<HealthCheckMetrics>,
) -> (StatusCode, String) {
let mut metric_families = registry_service.gather_all();
metric_families.extend(prometheus::gather());

if let Some(consumer_operations_submitted) = metric_families
.iter()
.filter_map(|v| {
if v.get_name() == "consumer_operations_submitted" {
// Expecting one metric, so return the first one, as it is the only one
v.get_metric().first().map(|m| m.get_counter().get_value())
} else {
None
}
})
.next()
{
pod_health
.write()
.expect("unable to write to pod health metrics")
.consumer_operations_submitted = consumer_operations_submitted;
};
match TextEncoder.encode_to_string(&metric_families) {
Ok(metrics) => (StatusCode::OK, metrics),
Err(error) => (
Expand All @@ -53,3 +99,20 @@ async fn metrics(Extension(registry_service): Extension<RegistryService>) -> (St
),
}
}

/// pod_health is called by k8s to know if this service is correctly processing data
async fn pod_health(Extension(pod_health): Extension<HealthCheckMetrics>) -> (StatusCode, String) {
let consumer_operations_submitted = pod_health
.read()
.expect("unable to read pod health metrics")
.consumer_operations_submitted;

if consumer_operations_submitted > 0.0 {
(StatusCode::OK, consumer_operations_submitted.to_string())
} else {
(
StatusCode::SERVICE_UNAVAILABLE,
consumer_operations_submitted.to_string(),
)
}
}

0 comments on commit f2ab91e

Please sign in to comment.