Skip to content

Commit

Permalink
Support observe backend health status #225
Browse files Browse the repository at this point in the history
---
test: add test for upstream health observe
---
renamed the function and added doc to make it intelligible
---
fix clippy error
---
Merge branch 'main' into main
---
test: fix test for backend do update

Co-authored-by: Tree Xie <tree.xie@outlook.com>
Includes-commit: 1421c26
Includes-commit: 695d549
Includes-commit: 6a09b52
Includes-commit: 72d6ee0
Includes-commit: e6c2af0
Includes-commit: fb62869
Replicated-from: #325
  • Loading branch information
vicanso authored and andrewhavck committed Aug 9, 2024
1 parent 24d7229 commit 8a0c73f
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .bleep
Original file line number Diff line number Diff line change
@@ -1 +1 @@
80970257fefa5cff505f63552dd2ae74372d501a
36269b8823b23381398508138bbab33c03ba7681
111 changes: 110 additions & 1 deletion pingora-load-balancing/src/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,25 @@ use pingora_http::{RequestHeader, ResponseHeader};
use std::sync::Arc;
use std::time::Duration;

#[async_trait]
pub trait HealthObserve {
/// Observes the health of a [Backend], can be used for monitoring purposes.
async fn observe(&self, target: &Backend, healthy: bool);
}
/// Provided to a [HealthCheck] to observe changes to [Backend] health.
pub type HealthObserveCallback = Box<dyn HealthObserve + Send + Sync>;

/// [HealthCheck] is the interface to implement health check for backends
#[async_trait]
pub trait HealthCheck {
/// Check the given backend.
///
/// `Ok(())`` if the check passes, otherwise the check fails.
async fn check(&self, target: &Backend) -> Result<()>;

/// Called when the health changes for a [Backend].
async fn health_status_change(&self, _target: &Backend, _healthy: bool) {}

/// This function defines how many *consecutive* checks should flip the health of a backend.
///
/// For example: with `success``: `true`: this function should return the
Expand All @@ -56,6 +68,8 @@ pub struct TcpHealthCheck {
/// set, it will also try to establish a TLS connection on top of the TCP connection.
pub peer_template: BasicPeer,
connector: TransportConnector,
/// A callback that is invoked when the `healthy` status changes for a [Backend].
pub health_changed_callback: Option<HealthObserveCallback>,
}

impl Default for TcpHealthCheck {
Expand All @@ -67,6 +81,7 @@ impl Default for TcpHealthCheck {
consecutive_failure: 1,
peer_template,
connector: TransportConnector::new(None),
health_changed_callback: None,
}
}
}
Expand Down Expand Up @@ -110,6 +125,12 @@ impl HealthCheck for TcpHealthCheck {
peer._address = target.addr.clone();
self.connector.get_stream(&peer).await.map(|_| {})
}

async fn health_status_change(&self, target: &Backend, healthy: bool) {
if let Some(callback) = &self.health_changed_callback {
callback.observe(target, healthy).await;
}
}
}

type Validator = Box<dyn Fn(&ResponseHeader) -> Result<()> + Send + Sync>;
Expand Down Expand Up @@ -147,6 +168,8 @@ pub struct HttpHealthCheck {
/// Sometimes the health check endpoint lives one a different port than the actual backend.
/// Setting this option allows the health check to perform on the given port of the backend IP.
pub port_override: Option<u16>,
/// A callback that is invoked when the `healthy` status changes for a [Backend].
pub health_changed_callback: Option<HealthObserveCallback>,
}

impl HttpHealthCheck {
Expand Down Expand Up @@ -174,6 +197,7 @@ impl HttpHealthCheck {
req,
validator: None,
port_override: None,
health_changed_callback: None,
}
}

Expand Down Expand Up @@ -235,6 +259,11 @@ impl HealthCheck for HttpHealthCheck {

Ok(())
}
async fn health_status_change(&self, target: &Backend, healthy: bool) {
if let Some(callback) = &self.health_changed_callback {
callback.observe(target, healthy).await;
}
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -313,8 +342,14 @@ impl Health {

#[cfg(test)]
mod test {
use std::{
collections::{BTreeSet, HashMap},
sync::atomic::{AtomicU16, Ordering},
};

use super::*;
use crate::SocketAddr;
use crate::{discovery, Backends, SocketAddr};
use async_trait::async_trait;
use http::Extensions;

#[tokio::test]
Expand Down Expand Up @@ -387,4 +422,78 @@ mod test {

assert!(http_check.check(&backend).await.is_ok());
}

#[tokio::test]
async fn test_health_observe() {
struct Observe {
unhealthy_count: Arc<AtomicU16>,
}
#[async_trait]
impl HealthObserve for Observe {
async fn observe(&self, _target: &Backend, healthy: bool) {
if !healthy {
self.unhealthy_count.fetch_add(1, Ordering::Relaxed);
}
}
}

let good_backend = Backend::new("127.0.0.1:79").unwrap();
let new_good_backends = || -> (BTreeSet<Backend>, HashMap<u64, bool>) {
let mut healthy = HashMap::new();
healthy.insert(good_backend.hash_key(), true);
let mut backends = BTreeSet::new();
backends.extend(vec![good_backend.clone()]);
(backends, healthy)
};
// tcp health check
{
let unhealthy_count = Arc::new(AtomicU16::new(0));
let ob = Observe {
unhealthy_count: unhealthy_count.clone(),
};
let bob = Box::new(ob);
let tcp_check = TcpHealthCheck {
health_changed_callback: Some(bob),
..Default::default()
};

let discovery = discovery::Static::default();
let mut backends = Backends::new(Box::new(discovery));
backends.set_health_check(Box::new(tcp_check));
let result = new_good_backends();
backends.do_update(result.0, result.1, |_backend: Arc<BTreeSet<Backend>>| {});
// the backend is ready
assert!(backends.ready(&good_backend));

// run health check
backends.run_health_check(false).await;
assert!(1 == unhealthy_count.load(Ordering::Relaxed));
// backend is unhealthy
assert!(!backends.ready(&good_backend));
}

// http health check
{
let unhealthy_count = Arc::new(AtomicU16::new(0));
let ob = Observe {
unhealthy_count: unhealthy_count.clone(),
};
let bob = Box::new(ob);

let mut https_check = HttpHealthCheck::new("one.one.one.one", true);
https_check.health_changed_callback = Some(bob);

let discovery = discovery::Static::default();
let mut backends = Backends::new(Box::new(discovery));
backends.set_health_check(Box::new(https_check));
let result = new_good_backends();
backends.do_update(result.0, result.1, |_backend: Arc<BTreeSet<Backend>>| {});
// the backend is ready
assert!(backends.ready(&good_backend));
// run health check
backends.run_health_check(false).await;
assert!(1 == unhealthy_count.load(Ordering::Relaxed));
assert!(!backends.ready(&good_backend));
}
}
}
1 change: 1 addition & 0 deletions pingora-load-balancing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ impl Backends {
let flipped =
h.observe_health(errored.is_none(), check.health_threshold(errored.is_none()));
if flipped {
check.health_status_change(backend, errored.is_none()).await;
if let Some(e) = errored {
warn!("{backend:?} becomes unhealthy, {e}");
} else {
Expand Down

0 comments on commit 8a0c73f

Please sign in to comment.