Skip to content

feat: Enhance CircuitBreakerMiddleware and RateLimitMiddleware with scoping and metrics #85

@guyernest

Description

@guyernest

Problem

Current CircuitBreakerMiddleware and RateLimitMiddleware implementations are global and lack production-grade features:

CircuitBreakerMiddleware limitations:

  • Global state (no per-endpoint/method/session scoping)
  • No persistence (resets on restart)
  • No aggregated metrics/observability
  • Fixed thresholds (no dynamic adjustment)

RateLimitMiddleware limitations:

  • Global limits (no per-endpoint/method/session scoping)
  • No persistence across restarts
  • No distributed coordination
  • No backpressure signals

Proposed Solutions

1. Enhanced CircuitBreakerMiddleware

pub struct CircuitBreakerMiddleware {
    config: CircuitBreakerConfig,
    state_store: Arc<dyn CircuitBreakerStateStore>,
    metrics: Arc<CircuitBreakerMetrics>,
}

pub struct CircuitBreakerConfig {
    /// Scoping strategy
    pub scope: CircuitBreakerScope,
    
    /// Failure threshold before opening
    pub failure_threshold: u32,
    
    /// Success threshold to close from half-open
    pub success_threshold: u32,
    
    /// Time to wait before attempting half-open
    pub timeout: Duration,
    
    /// Errors that trigger the circuit breaker
    pub trigger_errors: ErrorMatcher,
    
    /// State persistence
    pub persistence: StatePersistence,
    
    /// Metrics aggregation
    pub metrics_config: MetricsConfig,
}

pub enum CircuitBreakerScope {
    /// Global circuit breaker (all requests)
    Global,
    
    /// Per server URL
    PerServer,
    
    /// Per method (e.g., tools/call, resources/read)
    PerMethod,
    
    /// Per endpoint (method + URL)
    PerEndpoint,
    
    /// Per session ID
    PerSession,
    
    /// Custom scoping key
    Custom(Arc<dyn Fn(&JSONRPCRequest) -> String + Send + Sync>),
}

pub enum StatePersistence {
    /// In-memory only (lost on restart)
    Memory,
    
    /// Persist to file
    File(PathBuf),
    
    /// Persist to Redis (for distributed systems)
    #[cfg(feature = "redis")]
    Redis {
        client: redis::Client,
        key_prefix: String,
    },
    
    /// Custom persistence
    Custom(Arc<dyn CircuitBreakerStateStore>),
}

#[async_trait]
pub trait CircuitBreakerStateStore: Send + Sync {
    async fn get_state(&self, key: &str) -> Result<CircuitBreakerState>;
    async fn set_state(&self, key: &str, state: CircuitBreakerState) -> Result<()>;
    async fn get_metrics(&self) -> Result<HashMap<String, CircuitBreakerMetrics>>;
}

pub struct CircuitBreakerMetrics {
    pub total_requests: AtomicU64,
    pub successful_requests: AtomicU64,
    pub failed_requests: AtomicU64,
    pub rejected_requests: AtomicU64,
    pub state_transitions: AtomicU64,
    pub current_state: Arc<RwLock<HashMap<String, CircuitBreakerState>>>,
}

Usage examples:

// Per-server circuit breaker with persistence
let cb = CircuitBreakerMiddleware::builder()
    .scope(CircuitBreakerScope::PerServer)
    .failure_threshold(5)
    .success_threshold(3)
    .timeout(Duration::from_secs(30))
    .persistence(StatePersistence::File(PathBuf::from(".circuit_breaker_state")))
    .build();

// Per-method circuit breaker
let cb = CircuitBreakerMiddleware::builder()
    .scope(CircuitBreakerScope::PerMethod)
    .failure_threshold(10)
    .trigger_errors(ErrorMatcher::StatusCodes(vec![
        StatusCode::INTERNAL_SERVER_ERROR,
        StatusCode::BAD_GATEWAY,
        StatusCode::SERVICE_UNAVAILABLE,
    ]))
    .build();

// Custom scoping (e.g., per user)
let cb = CircuitBreakerMiddleware::builder()
    .scope(CircuitBreakerScope::Custom(Arc::new(|req| {
        req.params
            .as_ref()
            .and_then(|p| p.get("user_id"))
            .and_then(|v| v.as_str())
            .unwrap_or("default")
            .to_string()
    })))
    .build();

// Distributed circuit breaker (Redis-backed)
#[cfg(feature = "redis")]
let cb = CircuitBreakerMiddleware::builder()
    .scope(CircuitBreakerScope::PerEndpoint)
    .persistence(StatePersistence::Redis {
        client: redis::Client::open("redis://localhost")?,
        key_prefix: "mcp:circuit_breaker:".to_string(),
    })
    .build();

// Access aggregated metrics
let metrics = cb.metrics();
println!("Total requests: {}", metrics.total_requests.load(Ordering::Relaxed));
println!("Rejected (circuit open): {}", metrics.rejected_requests.load(Ordering::Relaxed));
for (key, state) in metrics.current_state.read().await.iter() {
    println!("{}: {:?}", key, state);
}

2. Enhanced RateLimitMiddleware

pub struct RateLimitMiddleware {
    config: RateLimitConfig,
    limiter: Arc<dyn RateLimiter>,
    metrics: Arc<RateLimitMetrics>,
}

pub struct RateLimitConfig {
    /// Scoping strategy
    pub scope: RateLimitScope,
    
    /// Rate limit strategy
    pub strategy: RateLimitStrategy,
    
    /// State persistence
    pub persistence: StatePersistence,
    
    /// Backpressure behavior
    pub backpressure: BackpressureConfig,
    
    /// Metrics aggregation
    pub metrics_config: MetricsConfig,
}

pub enum RateLimitScope {
    /// Global rate limit
    Global,
    
    /// Per server URL
    PerServer,
    
    /// Per method
    PerMethod,
    
    /// Per endpoint (method + URL)
    PerEndpoint,
    
    /// Per session ID
    PerSession,
    
    /// Custom scoping key
    Custom(Arc<dyn Fn(&JSONRPCRequest) -> String + Send + Sync>),
}

pub enum RateLimitStrategy {
    /// Fixed window (requests per time window)
    FixedWindow {
        max_requests: u32,
        window: Duration,
    },
    
    /// Sliding window (more accurate, higher overhead)
    SlidingWindow {
        max_requests: u32,
        window: Duration,
    },
    
    /// Token bucket (allows bursts)
    TokenBucket {
        capacity: u32,
        refill_rate: u32,  // tokens per second
    },
    
    /// Leaky bucket (smooth rate)
    LeakyBucket {
        capacity: u32,
        leak_rate: u32,  // requests per second
    },
}

pub struct BackpressureConfig {
    /// Wait for capacity instead of rejecting
    pub wait_for_capacity: bool,
    
    /// Maximum wait time
    pub max_wait: Duration,
    
    /// Send backpressure signals to client
    pub signal_client: bool,
    
    /// Backpressure header name
    pub backpressure_header: String,  // e.g., "X-RateLimit-Reset"
}

pub struct RateLimitMetrics {
    pub total_requests: AtomicU64,
    pub allowed_requests: AtomicU64,
    pub rejected_requests: AtomicU64,
    pub waited_requests: AtomicU64,
    pub wait_time_ms: AtomicU64,
    pub current_usage: Arc<RwLock<HashMap<String, RateLimitUsage>>>,
}

pub struct RateLimitUsage {
    pub scope_key: String,
    pub used: u32,
    pub limit: u32,
    pub reset_at: SystemTime,
}

Usage examples:

// Per-server token bucket with persistence
let rl = RateLimitMiddleware::builder()
    .scope(RateLimitScope::PerServer)
    .strategy(RateLimitStrategy::TokenBucket {
        capacity: 100,
        refill_rate: 10,  // 10 requests/sec sustained, 100 burst
    })
    .persistence(StatePersistence::File(PathBuf::from(".rate_limits")))
    .backpressure(BackpressureConfig {
        wait_for_capacity: true,
        max_wait: Duration::from_secs(5),
        signal_client: true,
        backpressure_header: "X-RateLimit-Reset".to_string(),
    })
    .build();

// Per-method sliding window
let rl = RateLimitMiddleware::builder()
    .scope(RateLimitScope::PerMethod)
    .strategy(RateLimitStrategy::SlidingWindow {
        max_requests: 1000,
        window: Duration::from_secs(60),
    })
    .build();

// Per-session fixed window
let rl = RateLimitMiddleware::builder()
    .scope(RateLimitScope::PerSession)
    .strategy(RateLimitStrategy::FixedWindow {
        max_requests: 100,
        window: Duration::from_secs(60),
    })
    .build();

// Distributed rate limiting (Redis-backed)
#[cfg(feature = "redis")]
let rl = RateLimitMiddleware::builder()
    .scope(RateLimitScope::PerEndpoint)
    .strategy(RateLimitStrategy::TokenBucket { capacity: 1000, refill_rate: 100 })
    .persistence(StatePersistence::Redis {
        client: redis::Client::open("redis://localhost")?,
        key_prefix: "mcp:rate_limit:".to_string(),
    })
    .build();

// Access aggregated metrics
let metrics = rl.metrics();
println!("Allowed: {}", metrics.allowed_requests.load(Ordering::Relaxed));
println!("Rejected: {}", metrics.rejected_requests.load(Ordering::Relaxed));
for (key, usage) in metrics.current_usage.read().await.iter() {
    println!("{}: {}/{} (resets at {:?})", key, usage.used, usage.limit, usage.reset_at);
}

3. Unified Metrics Integration

Integrate both middleware with MetricsMiddleware:

// MetricsMiddleware automatically aggregates from CircuitBreaker + RateLimit
let metrics_middleware = MetricsMiddleware::builder()
    .export_interval(Duration::from_secs(60))
    .exporter(MetricsExporter::Prometheus {
        endpoint: "/metrics".to_string(),
    })
    .include_circuit_breaker_stats(true)
    .include_rate_limit_stats(true)
    .build();

// Prometheus metrics exposed:
// mcp_circuit_breaker_state{scope="server",key="https://api.example.com"} 0  # 0=closed, 1=open, 2=half_open
// mcp_circuit_breaker_failures_total{scope="server",key="https://api.example.com"} 42
// mcp_circuit_breaker_requests_rejected_total{scope="server",key="https://api.example.com"} 15
// mcp_rate_limit_requests_allowed_total{scope="method",key="tools/call"} 9523
// mcp_rate_limit_requests_rejected_total{scope="method",key="tools/call"} 47
// mcp_rate_limit_current_usage{scope="method",key="tools/call"} 0.85  # 85% of limit

Implementation Plan

Phase 1: CircuitBreakerMiddleware enhancements

  1. Add scoping (Global, PerServer, PerMethod, PerEndpoint, PerSession, Custom)
  2. Implement CircuitBreakerStateStore trait
  3. Add file-based persistence
  4. Add Redis-based persistence (behind feature flag)
  5. Implement CircuitBreakerMetrics with aggregation

Phase 2: RateLimitMiddleware enhancements

  1. Add scoping (same as circuit breaker)
  2. Implement rate limit strategies (FixedWindow, SlidingWindow, TokenBucket, LeakyBucket)
  3. Add RateLimiter trait
  4. Implement backpressure handling (wait vs reject)
  5. Add persistence (file + Redis)
  6. Implement RateLimitMetrics with aggregation

Phase 3: Metrics integration

  1. Integrate with existing MetricsMiddleware
  2. Add Prometheus exporter
  3. Add aggregated stats endpoints
  4. Implement real-time metrics updates

Phase 4: Testing and documentation

  1. Unit tests for scoping strategies
  2. Integration tests for persistence
  3. Load tests for rate limiting strategies
  4. Documentation: examples/34_circuit_breaker_scoping.rs, examples/35_rate_limiting.rs
  5. Update ch11-middleware.md

Benefits

CircuitBreakerMiddleware:

  • Granularity: Per-server/method/endpoint scoping prevents cascading failures
  • Resilience: Persistence survives restarts
  • Observability: Aggregated metrics show health across all scopes
  • Distributed: Redis backend enables cluster-wide circuit breakers

RateLimitMiddleware:

  • Flexibility: Multiple strategies for different use cases
  • Fair: Per-session scoping prevents abuse
  • User-friendly: Backpressure waits instead of rejecting
  • Observable: Real-time usage metrics

Unified:

  • Production-ready: All features needed for real deployments
  • TypeScript parity: Matches TS SDK + goes beyond with scoping/persistence
  • Integration: Works seamlessly with existing MetricsMiddleware

References

Related Issues


Priority: Medium
Complexity: Medium-High (persistence + distributed coordination)
Dependencies: None (enhances existing middleware)

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions