-
Notifications
You must be signed in to change notification settings - Fork 2
Closed
Labels
enhancementNew feature or requestNew feature or request
Description
Problem
Current CircuitBreakerMiddleware and RateLimitMiddleware implementations are global and lack production-grade features:
CircuitBreakerMiddleware limitations:
- Global state (no per-endpoint/method/session scoping)
- No persistence (resets on restart)
- No aggregated metrics/observability
- Fixed thresholds (no dynamic adjustment)
RateLimitMiddleware limitations:
- Global limits (no per-endpoint/method/session scoping)
- No persistence across restarts
- No distributed coordination
- No backpressure signals
Proposed Solutions
1. Enhanced CircuitBreakerMiddleware
pub struct CircuitBreakerMiddleware {
config: CircuitBreakerConfig,
state_store: Arc<dyn CircuitBreakerStateStore>,
metrics: Arc<CircuitBreakerMetrics>,
}
pub struct CircuitBreakerConfig {
/// Scoping strategy
pub scope: CircuitBreakerScope,
/// Failure threshold before opening
pub failure_threshold: u32,
/// Success threshold to close from half-open
pub success_threshold: u32,
/// Time to wait before attempting half-open
pub timeout: Duration,
/// Errors that trigger the circuit breaker
pub trigger_errors: ErrorMatcher,
/// State persistence
pub persistence: StatePersistence,
/// Metrics aggregation
pub metrics_config: MetricsConfig,
}
pub enum CircuitBreakerScope {
/// Global circuit breaker (all requests)
Global,
/// Per server URL
PerServer,
/// Per method (e.g., tools/call, resources/read)
PerMethod,
/// Per endpoint (method + URL)
PerEndpoint,
/// Per session ID
PerSession,
/// Custom scoping key
Custom(Arc<dyn Fn(&JSONRPCRequest) -> String + Send + Sync>),
}
pub enum StatePersistence {
/// In-memory only (lost on restart)
Memory,
/// Persist to file
File(PathBuf),
/// Persist to Redis (for distributed systems)
#[cfg(feature = "redis")]
Redis {
client: redis::Client,
key_prefix: String,
},
/// Custom persistence
Custom(Arc<dyn CircuitBreakerStateStore>),
}
#[async_trait]
pub trait CircuitBreakerStateStore: Send + Sync {
async fn get_state(&self, key: &str) -> Result<CircuitBreakerState>;
async fn set_state(&self, key: &str, state: CircuitBreakerState) -> Result<()>;
async fn get_metrics(&self) -> Result<HashMap<String, CircuitBreakerMetrics>>;
}
pub struct CircuitBreakerMetrics {
pub total_requests: AtomicU64,
pub successful_requests: AtomicU64,
pub failed_requests: AtomicU64,
pub rejected_requests: AtomicU64,
pub state_transitions: AtomicU64,
pub current_state: Arc<RwLock<HashMap<String, CircuitBreakerState>>>,
}Usage examples:
// Per-server circuit breaker with persistence
let cb = CircuitBreakerMiddleware::builder()
.scope(CircuitBreakerScope::PerServer)
.failure_threshold(5)
.success_threshold(3)
.timeout(Duration::from_secs(30))
.persistence(StatePersistence::File(PathBuf::from(".circuit_breaker_state")))
.build();
// Per-method circuit breaker
let cb = CircuitBreakerMiddleware::builder()
.scope(CircuitBreakerScope::PerMethod)
.failure_threshold(10)
.trigger_errors(ErrorMatcher::StatusCodes(vec![
StatusCode::INTERNAL_SERVER_ERROR,
StatusCode::BAD_GATEWAY,
StatusCode::SERVICE_UNAVAILABLE,
]))
.build();
// Custom scoping (e.g., per user)
let cb = CircuitBreakerMiddleware::builder()
.scope(CircuitBreakerScope::Custom(Arc::new(|req| {
req.params
.as_ref()
.and_then(|p| p.get("user_id"))
.and_then(|v| v.as_str())
.unwrap_or("default")
.to_string()
})))
.build();
// Distributed circuit breaker (Redis-backed)
#[cfg(feature = "redis")]
let cb = CircuitBreakerMiddleware::builder()
.scope(CircuitBreakerScope::PerEndpoint)
.persistence(StatePersistence::Redis {
client: redis::Client::open("redis://localhost")?,
key_prefix: "mcp:circuit_breaker:".to_string(),
})
.build();
// Access aggregated metrics
let metrics = cb.metrics();
println!("Total requests: {}", metrics.total_requests.load(Ordering::Relaxed));
println!("Rejected (circuit open): {}", metrics.rejected_requests.load(Ordering::Relaxed));
for (key, state) in metrics.current_state.read().await.iter() {
println!("{}: {:?}", key, state);
}2. Enhanced RateLimitMiddleware
pub struct RateLimitMiddleware {
config: RateLimitConfig,
limiter: Arc<dyn RateLimiter>,
metrics: Arc<RateLimitMetrics>,
}
pub struct RateLimitConfig {
/// Scoping strategy
pub scope: RateLimitScope,
/// Rate limit strategy
pub strategy: RateLimitStrategy,
/// State persistence
pub persistence: StatePersistence,
/// Backpressure behavior
pub backpressure: BackpressureConfig,
/// Metrics aggregation
pub metrics_config: MetricsConfig,
}
pub enum RateLimitScope {
/// Global rate limit
Global,
/// Per server URL
PerServer,
/// Per method
PerMethod,
/// Per endpoint (method + URL)
PerEndpoint,
/// Per session ID
PerSession,
/// Custom scoping key
Custom(Arc<dyn Fn(&JSONRPCRequest) -> String + Send + Sync>),
}
pub enum RateLimitStrategy {
/// Fixed window (requests per time window)
FixedWindow {
max_requests: u32,
window: Duration,
},
/// Sliding window (more accurate, higher overhead)
SlidingWindow {
max_requests: u32,
window: Duration,
},
/// Token bucket (allows bursts)
TokenBucket {
capacity: u32,
refill_rate: u32, // tokens per second
},
/// Leaky bucket (smooth rate)
LeakyBucket {
capacity: u32,
leak_rate: u32, // requests per second
},
}
pub struct BackpressureConfig {
/// Wait for capacity instead of rejecting
pub wait_for_capacity: bool,
/// Maximum wait time
pub max_wait: Duration,
/// Send backpressure signals to client
pub signal_client: bool,
/// Backpressure header name
pub backpressure_header: String, // e.g., "X-RateLimit-Reset"
}
pub struct RateLimitMetrics {
pub total_requests: AtomicU64,
pub allowed_requests: AtomicU64,
pub rejected_requests: AtomicU64,
pub waited_requests: AtomicU64,
pub wait_time_ms: AtomicU64,
pub current_usage: Arc<RwLock<HashMap<String, RateLimitUsage>>>,
}
pub struct RateLimitUsage {
pub scope_key: String,
pub used: u32,
pub limit: u32,
pub reset_at: SystemTime,
}Usage examples:
// Per-server token bucket with persistence
let rl = RateLimitMiddleware::builder()
.scope(RateLimitScope::PerServer)
.strategy(RateLimitStrategy::TokenBucket {
capacity: 100,
refill_rate: 10, // 10 requests/sec sustained, 100 burst
})
.persistence(StatePersistence::File(PathBuf::from(".rate_limits")))
.backpressure(BackpressureConfig {
wait_for_capacity: true,
max_wait: Duration::from_secs(5),
signal_client: true,
backpressure_header: "X-RateLimit-Reset".to_string(),
})
.build();
// Per-method sliding window
let rl = RateLimitMiddleware::builder()
.scope(RateLimitScope::PerMethod)
.strategy(RateLimitStrategy::SlidingWindow {
max_requests: 1000,
window: Duration::from_secs(60),
})
.build();
// Per-session fixed window
let rl = RateLimitMiddleware::builder()
.scope(RateLimitScope::PerSession)
.strategy(RateLimitStrategy::FixedWindow {
max_requests: 100,
window: Duration::from_secs(60),
})
.build();
// Distributed rate limiting (Redis-backed)
#[cfg(feature = "redis")]
let rl = RateLimitMiddleware::builder()
.scope(RateLimitScope::PerEndpoint)
.strategy(RateLimitStrategy::TokenBucket { capacity: 1000, refill_rate: 100 })
.persistence(StatePersistence::Redis {
client: redis::Client::open("redis://localhost")?,
key_prefix: "mcp:rate_limit:".to_string(),
})
.build();
// Access aggregated metrics
let metrics = rl.metrics();
println!("Allowed: {}", metrics.allowed_requests.load(Ordering::Relaxed));
println!("Rejected: {}", metrics.rejected_requests.load(Ordering::Relaxed));
for (key, usage) in metrics.current_usage.read().await.iter() {
println!("{}: {}/{} (resets at {:?})", key, usage.used, usage.limit, usage.reset_at);
}3. Unified Metrics Integration
Integrate both middleware with MetricsMiddleware:
// MetricsMiddleware automatically aggregates from CircuitBreaker + RateLimit
let metrics_middleware = MetricsMiddleware::builder()
.export_interval(Duration::from_secs(60))
.exporter(MetricsExporter::Prometheus {
endpoint: "/metrics".to_string(),
})
.include_circuit_breaker_stats(true)
.include_rate_limit_stats(true)
.build();
// Prometheus metrics exposed:
// mcp_circuit_breaker_state{scope="server",key="https://api.example.com"} 0 # 0=closed, 1=open, 2=half_open
// mcp_circuit_breaker_failures_total{scope="server",key="https://api.example.com"} 42
// mcp_circuit_breaker_requests_rejected_total{scope="server",key="https://api.example.com"} 15
// mcp_rate_limit_requests_allowed_total{scope="method",key="tools/call"} 9523
// mcp_rate_limit_requests_rejected_total{scope="method",key="tools/call"} 47
// mcp_rate_limit_current_usage{scope="method",key="tools/call"} 0.85 # 85% of limitImplementation Plan
Phase 1: CircuitBreakerMiddleware enhancements
- Add scoping (Global, PerServer, PerMethod, PerEndpoint, PerSession, Custom)
- Implement
CircuitBreakerStateStoretrait - Add file-based persistence
- Add Redis-based persistence (behind feature flag)
- Implement
CircuitBreakerMetricswith aggregation
Phase 2: RateLimitMiddleware enhancements
- Add scoping (same as circuit breaker)
- Implement rate limit strategies (FixedWindow, SlidingWindow, TokenBucket, LeakyBucket)
- Add
RateLimitertrait - Implement backpressure handling (wait vs reject)
- Add persistence (file + Redis)
- Implement
RateLimitMetricswith aggregation
Phase 3: Metrics integration
- Integrate with existing
MetricsMiddleware - Add Prometheus exporter
- Add aggregated stats endpoints
- Implement real-time metrics updates
Phase 4: Testing and documentation
- Unit tests for scoping strategies
- Integration tests for persistence
- Load tests for rate limiting strategies
- Documentation: examples/34_circuit_breaker_scoping.rs, examples/35_rate_limiting.rs
- Update ch11-middleware.md
Benefits
CircuitBreakerMiddleware:
- Granularity: Per-server/method/endpoint scoping prevents cascading failures
- Resilience: Persistence survives restarts
- Observability: Aggregated metrics show health across all scopes
- Distributed: Redis backend enables cluster-wide circuit breakers
RateLimitMiddleware:
- Flexibility: Multiple strategies for different use cases
- Fair: Per-session scoping prevents abuse
- User-friendly: Backpressure waits instead of rejecting
- Observable: Real-time usage metrics
Unified:
- Production-ready: All features needed for real deployments
- TypeScript parity: Matches TS SDK + goes beyond with scoping/persistence
- Integration: Works seamlessly with existing MetricsMiddleware
References
- Circuit breaker pattern: https://martinfowler.com/bliki/CircuitBreaker.html
- Rate limiting strategies: https://stripe.com/blog/rate-limiters
- Existing middleware:
src/shared/middleware.rs
Related Issues
- feat: Add first-class middleware integration to Client/Protocol/Transport #80 - First-class middleware integration API
- feat: Add HttpMiddleware trait for transport-level HTTP concerns #82 - HttpMiddleware trait
- feat: Enhance LoggingMiddleware and RetryMiddleware with production-grade options #84 - Logging and retry enhancements
Priority: Medium
Complexity: Medium-High (persistence + distributed coordination)
Dependencies: None (enhances existing middleware)
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request