Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 235 additions & 16 deletions tonic-xds/src/client/loadbalance/outlier_detection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,12 @@
//! `max_ejection_time`); the LB then routes the resolved
//! [`UnejectedChannel`] back into the ready set.
//! - **Housekeeping actor** ([`spawn_actor`]): on each
//! `config.interval` tick, runs the failure-percentage algorithm
//! over a snapshot of counters, ejects qualifying channels, resets
//! counters, and decrements multipliers for non-ejected channels.
//! When the ejected-set membership changes, broadcasts a fresh
//! snapshot on the `watch` channel; quiet ticks skip the broadcast
//! via an O(1) version compare.
//!
//! Only the failure-percentage algorithm is implemented; success-rate
//! (cross-endpoint mean/stdev) is left to a follow-up.
//! `config.interval` tick, runs the success-rate and
//! failure-percentage algorithms over a snapshot of counters, ejects
//! qualifying channels, resets counters, and decrements multipliers
//! for non-ejected channels. When the ejected-set membership changes,
//! broadcasts a fresh snapshot on the `watch` channel; quiet ticks
//! skip the broadcast via an O(1) version compare.
//!
//! [gRFC A50]: https://github.com/grpc/proposal/blob/master/A50-xds-outlier-detection.md
//! [`ReadyChannel`]: crate::client::loadbalance::channel_state::ReadyChannel
Expand Down Expand Up @@ -160,13 +157,20 @@ impl OutlierStatsRegistry {
/// One interval-boundary sweep (gRFC A50 §6). Order matters:
///
/// 1. Snapshot every channel's counters for one consistent pass.
/// 2. Run the failure-percentage algorithm against the snapshot:
/// apply `minimum_hosts` to the qualifying population, then
/// `max_ejection_percent`, then per-channel threshold and the
/// enforcement roll.
/// 3. Reset counters and decrement multipliers for non-ejected
/// 2. Run the success-rate algorithm against the snapshot: compute
/// mean and stdev of success rates across qualifying hosts (per
/// `request_volume`), gated by `minimum_hosts`; eject any host
/// whose success rate is below `mean - stdev * stdev_factor /
/// 1000`, subject to `max_ejection_percent` and the enforcement
/// roll.
/// 3. Run the failure-percentage algorithm against the same
/// snapshot: apply `minimum_hosts` to the qualifying population,
/// then `max_ejection_percent`, then per-channel threshold and
/// the enforcement roll. Hosts already ejected by step 2 are
/// skipped, and the `max_ejection_percent` cap accounts for them.
/// 4. Reset counters and decrement multipliers for non-ejected
/// channels.
/// 4. If the ejected-set version changed (sweep ejected at least
/// 5. If the ejected-set version changed (sweep ejected at least
/// one channel, or the LB unejected between ticks), rebuild
/// the snapshot of ejected addresses and broadcast it on the
/// `watch` channel. Quiet ticks skip the rebuild via an O(1)
Expand All @@ -192,6 +196,51 @@ impl OutlierStatsRegistry {
})
.collect();

if let Some(sr) = config.success_rate.as_ref() {
let request_volume = u64::from(sr.request_volume);
// Success rate in 0.0..=100.0 for each qualifying host. The
// mean/stdev are computed over this set; the threshold is
// `mean - stdev * stdev_factor / 1000` (A50 §"success_rate
// ejection").
let rates: Vec<f64> = snapshots
.iter()
.filter_map(|(_, s, f)| {
let total = s + f;
(total >= request_volume).then(|| 100.0 * (*s as f64) / (total as f64))
})
.collect();
if rates.len() >= sr.minimum_hosts as usize && !rates.is_empty() {
let n = rates.len() as f64;
let mean = rates.iter().sum::<f64>() / n;
let variance = rates.iter().map(|r| (r - mean).powi(2)).sum::<f64>() / n;
let stdev = variance.sqrt();
let threshold = mean - stdev * f64::from(sr.stdev_factor) / 1000.0;
let max_ejections = self.max_ejections(&config);
let now = Instant::now();
let enforcing = sr.enforcing_success_rate.get();
for (state, s, f) in &snapshots {
let total = s + f;
if total < request_volume || state.is_ejected() {
continue;
}
if self.ejected_count.load(Ordering::Relaxed) >= max_ejections {
break;
}
let rate = 100.0 * (*s as f64) / (total as f64);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If total and request_volume are 0, then rate will (100*0)/0 which is NaN. Can this might lead to ejection of all the hosts

if rate >= threshold {
continue;
}
if !roll(enforcing) {
continue;
}
if state.try_eject(now) {
self.ejected_count.fetch_add(1, Ordering::Relaxed);
self.ejected_set_version.fetch_add(1, Ordering::Relaxed);
}
Comment on lines +221 to +239

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is duplicated both for success_rate and failure_percentage logic except one line. Can you pleas create a new function for the common logic so that no drift is introduced in future between the two impls

}
}
}

if let Some(fp) = config.failure_percentage.as_ref() {
let request_volume = u64::from(fp.request_volume);
let qualifying = snapshots
Expand Down Expand Up @@ -360,7 +409,7 @@ fn roll(pct: u8) -> bool {
mod tests {
use super::*;
use crate::xds::resource::outlier_detection::{
FailurePercentageConfig, OutlierDetectionConfig, Percentage,
FailurePercentageConfig, OutlierDetectionConfig, Percentage, SuccessRateConfig,
};
use std::sync::atomic::Ordering;
use std::time::Duration;
Expand Down Expand Up @@ -415,6 +464,21 @@ mod tests {
c
}

fn sr_config(
stdev_factor: u32,
request_volume: u32,
minimum_hosts: u32,
) -> OutlierDetectionConfig {
let mut c = base_config();
c.success_rate = Some(SuccessRateConfig {
stdev_factor,
enforcing_success_rate: pct(100),
minimum_hosts,
request_volume,
});
c
}

/// Drive `n` outcomes through `record_outcome` for one channel.
fn drive(state: &OutlierChannelState, successes: u64, failures: u64) {
for _ in 0..successes {
Expand Down Expand Up @@ -621,6 +685,161 @@ mod tests {
);
}

// ----- run_housekeeping: success-rate detection -----

/// 4 hosts at 100%, 1 at 0%. mean=80, stdev=40, threshold with
/// factor 1900 = 80 - 40 * 1.9 = 4 ⇒ the 0% host (rate < 4) is
/// ejected; the others are clear.
#[test]
fn success_rate_ejects_outlier_below_threshold() {
let registry = make_registry_only(sr_config(1900, 10, 3));
let bad = registry.add_channel(addr(8084));
for port in 8080..=8083 {
let s = registry.add_channel(addr(port));
drive(&s, 100, 0);
}
drive(&bad, 0, 100);
registry.run_housekeeping();
assert!(bad.is_ejected());
assert_eq!(registry.ejected_count.load(Ordering::Relaxed), 1);
}

/// Uniform population: stdev = 0, threshold = mean, no host is
/// strictly below the mean ⇒ nothing ejects.
#[test]
fn success_rate_uniform_population_does_not_eject() {
let registry = make_registry_only(sr_config(1900, 10, 3));
let mut all = vec![];
for port in 8080..=8084 {
let s = registry.add_channel(addr(port));
drive(&s, 80, 20);
all.push(s);
}
registry.run_housekeeping();
for s in &all {
assert!(!s.is_ejected());
}
}

/// minimum_hosts gate: only 2 hosts meet request_volume but
/// minimum_hosts is 5 ⇒ algorithm skipped, no ejection.
#[test]
fn success_rate_minimum_hosts_gates_ejection() {
let registry = make_registry_only(sr_config(1900, 10, 5));
let mut all = vec![];
for port in 8080..=8081 {
let s = registry.add_channel(addr(port));
drive(&s, 0, 100);
all.push(s);
}
registry.run_housekeeping();
for s in &all {
assert!(!s.is_ejected());
}
}

/// request_volume filter: the low-traffic outlier is excluded from
/// both the qualifying population and the candidate list, so even
/// though its rate is 0%, it doesn't get ejected.
#[test]
fn success_rate_request_volume_filters_low_traffic() {
let registry = make_registry_only(sr_config(1900, 100, 3));
let bad = registry.add_channel(addr(8080));
drive(&bad, 0, 5);
for port in 8081..=8084 {
let s = registry.add_channel(addr(port));
drive(&s, 200, 0);
}
registry.run_housekeeping();
assert!(!bad.is_ejected());
}

/// `enforcing_success_rate = 0` skips actual ejection regardless
/// of how far below threshold a host falls.
#[test]
fn success_rate_enforcement_zero_never_ejects() {
let mut config = sr_config(1900, 10, 3);
config.success_rate.as_mut().unwrap().enforcing_success_rate = pct(0);
let registry = make_registry_only(config);
let bad = registry.add_channel(addr(8084));
for port in 8080..=8083 {
let s = registry.add_channel(addr(port));
drive(&s, 100, 0);
}
drive(&bad, 0, 100);
registry.run_housekeeping();
assert!(!bad.is_ejected());
}

/// stdev_factor 0 collapses the threshold to the mean. 4 hosts at
/// 100% + 1 at 0% gives mean=80, so the 0% host (< 80) ejects but
/// the 100% hosts (not < 80) don't.
#[test]
fn success_rate_stdev_factor_zero_ejects_below_mean() {
let registry = make_registry_only(sr_config(0, 10, 3));
let bad = registry.add_channel(addr(8084));
let mut healthy = vec![];
for port in 8080..=8083 {
let s = registry.add_channel(addr(port));
drive(&s, 100, 0);
healthy.push(s);
}
drive(&bad, 0, 100);
registry.run_housekeeping();
assert!(bad.is_ejected());
for s in &healthy {
assert!(!s.is_ejected());
}
}

/// max_ejection_percent applies before per-host eligibility, so
/// even when every host is below threshold the cap holds.
#[test]
fn success_rate_max_ejection_percent_caps_concurrent_ejections() {
let mut config = sr_config(1900, 10, 3);
config.max_ejection_percent = pct(20);
let registry = make_registry_only(config);
// 4 hosts at 100%, 1 at 0%. The outlier is the only candidate

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is only 1 bad host and max cap is also 1. So this test will never breach the cap. Maybe keep number of bad hosts > 1.

// anyway; the cap test value here is that the cap math admits
// an ejection (5 × 20% = 1, plus the floor) for the single
// outlier, but would clamp tighter populations.
let bad = registry.add_channel(addr(8084));
for port in 8080..=8083 {
let s = registry.add_channel(addr(port));
drive(&s, 100, 0);
}
drive(&bad, 0, 100);
registry.run_housekeeping();
assert_eq!(registry.ejected_count.load(Ordering::Relaxed), 1);
assert!(bad.is_ejected());
}

/// Both algorithms configured: success-rate runs first and
/// catches the cross-host outlier; failure-percentage gets a
/// second look but skips already-ejected hosts.
#[test]
fn success_rate_and_failure_percentage_compose() {
let mut config = sr_config(1900, 10, 3);
config.failure_percentage = Some(FailurePercentageConfig {
threshold: pct(50),
enforcing_failure_percentage: pct(100),
minimum_hosts: 3,
request_volume: 10,
});
let registry = make_registry_only(config);
let bad = registry.add_channel(addr(8084));
for port in 8080..=8083 {
let s = registry.add_channel(addr(port));
drive(&s, 100, 0);
}
drive(&bad, 0, 100);
registry.run_housekeeping();
// Success-rate ejected it; failure-percentage saw it as
// already-ejected on its pass and didn't double-count.
assert!(bad.is_ejected());
assert_eq!(registry.ejected_count.load(Ordering::Relaxed), 1);
}

// ----- Housekeeping -----

#[test]
Expand Down
Loading