Skip to content

Commit 8221ef7

Browse files
Dumbrisclaude
andcommitted
fix(quarantine): non-blocking inspection with circuit breaker (issue #105)
Implements non-blocking quarantine inspection with circuit breaker pattern to prevent UI freezes and cascading failures with unstable MCP servers. ## Problem (Issue #105) Quarantine inspection used blocking 20s wait loop that froze the MCP handler thread, causing dashboard hangs and requiring forced kills. Unstable servers (like JetBrains MCP) triggered repeated timeouts. ## Solution ### 1. Non-blocking Connection Wait (mcp.go:1177-1269) - Replaced blocking time.Sleep() loop with goroutine + channels - Proper context cancellation support - Progress logging every 2 seconds - 20-second timeout with clean error messages ### 2. Circuit Breaker Pattern (supervisor.go:945-1059) - Tracks consecutive inspection failures per server - 3-failure threshold triggers 5-minute cooldown - Prevents repeated timeout attempts on unstable servers - Success resets failure counter - Methods: CanInspect(), RecordInspectionFailure(), RecordInspectionSuccess() ### 3. Config Sync Fix (mcp.go:1568-1579) - UpdateConfig() called after adding server - Fixes ConfigService synchronization in test environments - Ensures supervisor sees newly added quarantined servers ## Test Results ### Unit Tests (exemption_test.go) - TestCircuitBreaker: 3-failure threshold ✅ - TestCircuitBreakerReset: Success resets counter ✅ - TestExemptionExpiry: Time-based expiration ✅ ### Real-World Test (idea-ide-local) - Calls 1-3: Each took ~20s (timeout), recorded failures ✅ - Call 4: Returned instantly with circuit breaker message ✅ - Cooldown: 5-minute protection active ✅ ### E2E Test Updates - Fixed tool name: upstream_servers → quarantine_security - Improved error handling and debugging - Better content type assertions ## Impact - ✅ No more thread blocking during inspection - ✅ Dashboard remains responsive during 20s timeout - ✅ Circuit breaker prevents cascading failures - ✅ Clear error messages reference issue #105 ## Related - Addresses feedback from PR #110 (SSE stability) - Complements connection loss detection improvements - Part of upstream stability improvements 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 7601c98 commit 8221ef7

File tree

4 files changed

+360
-36
lines changed

4 files changed

+360
-36
lines changed
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package supervisor
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/assert"
8+
"go.uber.org/zap"
9+
)
10+
11+
// TestCircuitBreaker tests the circuit breaker pattern for inspection failures
12+
func TestCircuitBreaker(t *testing.T) {
13+
logger, _ := zap.NewDevelopment()
14+
15+
// Test failure recording
16+
s := &Supervisor{
17+
logger: logger,
18+
inspectionFailures: make(map[string]*inspectionFailureInfo),
19+
}
20+
21+
serverName := "test-server"
22+
23+
// Record 3 failures
24+
for i := 0; i < 3; i++ {
25+
s.RecordInspectionFailure(serverName)
26+
}
27+
28+
// Check that circuit breaker is now active
29+
allowed, reason, cooldown := s.CanInspect(serverName)
30+
assert.False(t, allowed, "Should not allow inspection after 3 failures")
31+
assert.Contains(t, reason, "Circuit breaker active")
32+
assert.Greater(t, cooldown, time.Duration(0), "Should have cooldown remaining")
33+
34+
// Verify cooldown duration is approximately 5 minutes
35+
assert.InDelta(t, inspectionCooldown.Seconds(), cooldown.Seconds(), 1.0, "Cooldown should be ~5 minutes")
36+
}
37+
38+
// TestCircuitBreakerReset tests that successful inspection resets the failure counter
39+
func TestCircuitBreakerReset(t *testing.T) {
40+
logger, _ := zap.NewDevelopment()
41+
42+
s := &Supervisor{
43+
logger: logger,
44+
inspectionFailures: make(map[string]*inspectionFailureInfo),
45+
}
46+
47+
serverName := "test-server"
48+
49+
// Record 2 failures
50+
s.RecordInspectionFailure(serverName)
51+
s.RecordInspectionFailure(serverName)
52+
53+
// Verify we're still allowed (under threshold)
54+
allowed, _, _ := s.CanInspect(serverName)
55+
assert.True(t, allowed, "Should still allow inspection with 2 failures")
56+
57+
// Record success - should reset counter
58+
s.RecordInspectionSuccess(serverName)
59+
60+
// Verify failure counter was reset
61+
failures, inCooldown, _ := s.GetInspectionStats(serverName)
62+
assert.Equal(t, 0, failures, "Failure counter should be reset after success")
63+
assert.False(t, inCooldown, "Should not be in cooldown after success")
64+
}
65+
66+
// TestExemptionExpiry tests that exemptions expire correctly
67+
func TestExemptionExpiry(t *testing.T) {
68+
logger, _ := zap.NewDevelopment()
69+
70+
s := &Supervisor{
71+
logger: logger,
72+
inspectionExemptions: make(map[string]time.Time),
73+
inspectionFailures: make(map[string]*inspectionFailureInfo),
74+
}
75+
76+
serverName := "test-server"
77+
78+
// Grant exemption with short duration
79+
s.inspectionExemptions[serverName] = time.Now().Add(100 * time.Millisecond)
80+
81+
// Should be exempted immediately
82+
assert.True(t, s.IsInspectionExempted(serverName), "Should be exempted immediately")
83+
84+
// Wait for expiry
85+
time.Sleep(150 * time.Millisecond)
86+
87+
// Should no longer be exempted
88+
assert.False(t, s.IsInspectionExempted(serverName), "Should not be exempted after expiry")
89+
}

internal/runtime/supervisor/supervisor.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,23 @@ type Supervisor struct {
4747
inspectionExemptions map[string]time.Time
4848
inspectionExemptionsMu sync.RWMutex
4949

50+
// Circuit breaker for inspection failures (Phase 2: Issue #105 stability)
51+
inspectionFailures map[string]*inspectionFailureInfo
52+
inspectionFailuresMu sync.RWMutex
53+
5054
// Lifecycle
5155
ctx context.Context
5256
cancel context.CancelFunc
5357
wg sync.WaitGroup
5458
}
5559

60+
// inspectionFailureInfo tracks inspection failures for circuit breaker pattern
61+
type inspectionFailureInfo struct {
62+
consecutiveFailures int
63+
lastFailureTime time.Time
64+
cooldownUntil time.Time
65+
}
66+
5667
// UpstreamInterface defines the interface for upstream adapters.
5768
type UpstreamInterface interface {
5869
AddServer(name string, cfg *config.ServerConfig) error
@@ -84,6 +95,7 @@ func New(configSvc *configsvc.Service, upstream UpstreamInterface, logger *zap.L
8495
eventCh: make(chan Event, 500), // Phase 6: Increased buffer for async operations
8596
listeners: make([]chan Event, 0),
8697
inspectionExemptions: make(map[string]time.Time),
98+
inspectionFailures: make(map[string]*inspectionFailureInfo),
8799
ctx: ctx,
88100
cancel: cancel,
89101
}
@@ -929,3 +941,119 @@ func (s *Supervisor) IsInspectionExempted(serverName string) bool {
929941

930942
return true
931943
}
944+
945+
// ===== Circuit Breaker for Inspection Failures (Issue #105) =====
946+
947+
const (
948+
maxInspectionFailures = 3 // Max consecutive failures before cooldown
949+
inspectionCooldown = 5 * time.Minute // Cooldown duration after max failures
950+
failureResetTimeout = 10 * time.Minute // Reset counter if no failures for this long
951+
)
952+
953+
// CanInspect checks if inspection is allowed for a server (circuit breaker)
954+
// Returns (allowed bool, reason string, cooldownRemaining time.Duration)
955+
func (s *Supervisor) CanInspect(serverName string) (bool, string, time.Duration) {
956+
s.inspectionFailuresMu.RLock()
957+
defer s.inspectionFailuresMu.RUnlock()
958+
959+
info, exists := s.inspectionFailures[serverName]
960+
if !exists {
961+
// No failure history - allow inspection
962+
return true, "", 0
963+
}
964+
965+
now := time.Now()
966+
967+
// Check if cooldown is active
968+
if now.Before(info.cooldownUntil) {
969+
remaining := info.cooldownUntil.Sub(now)
970+
reason := fmt.Sprintf("Server '%s' has failed inspection %d times. Circuit breaker active - please wait %v before retrying. This prevents cascading failures with unstable servers (see issue #105).",
971+
serverName, info.consecutiveFailures, remaining.Round(time.Second))
972+
return false, reason, remaining
973+
}
974+
975+
// Check if failures should be reset (no failures for failureResetTimeout)
976+
if now.Sub(info.lastFailureTime) > failureResetTimeout {
977+
// Failures are old - will be reset on next inspection
978+
return true, "", 0
979+
}
980+
981+
// Within failure window but not in cooldown
982+
return true, "", 0
983+
}
984+
985+
// RecordInspectionFailure records an inspection failure for circuit breaker
986+
func (s *Supervisor) RecordInspectionFailure(serverName string) {
987+
s.inspectionFailuresMu.Lock()
988+
defer s.inspectionFailuresMu.Unlock()
989+
990+
now := time.Now()
991+
992+
info, exists := s.inspectionFailures[serverName]
993+
if !exists {
994+
info = &inspectionFailureInfo{}
995+
s.inspectionFailures[serverName] = info
996+
}
997+
998+
// Reset counter if last failure was too long ago
999+
if now.Sub(info.lastFailureTime) > failureResetTimeout {
1000+
info.consecutiveFailures = 0
1001+
}
1002+
1003+
info.consecutiveFailures++
1004+
info.lastFailureTime = now
1005+
1006+
s.logger.Warn("Inspection failure recorded",
1007+
zap.String("server", serverName),
1008+
zap.Int("consecutive_failures", info.consecutiveFailures),
1009+
zap.Int("max_before_cooldown", maxInspectionFailures))
1010+
1011+
// Activate cooldown if max failures reached
1012+
if info.consecutiveFailures >= maxInspectionFailures {
1013+
info.cooldownUntil = now.Add(inspectionCooldown)
1014+
s.logger.Error("⚠️ Inspection circuit breaker activated - too many failures",
1015+
zap.String("server", serverName),
1016+
zap.Int("failures", info.consecutiveFailures),
1017+
zap.Duration("cooldown", inspectionCooldown),
1018+
zap.Time("cooldown_until", info.cooldownUntil),
1019+
zap.String("issue", "#105 - preventing cascading failures"))
1020+
}
1021+
}
1022+
1023+
// RecordInspectionSuccess records a successful inspection, resetting failure counter
1024+
func (s *Supervisor) RecordInspectionSuccess(serverName string) {
1025+
s.inspectionFailuresMu.Lock()
1026+
defer s.inspectionFailuresMu.Unlock()
1027+
1028+
info, exists := s.inspectionFailures[serverName]
1029+
if !exists {
1030+
return
1031+
}
1032+
1033+
if info.consecutiveFailures > 0 {
1034+
s.logger.Info("Inspection succeeded - resetting failure counter",
1035+
zap.String("server", serverName),
1036+
zap.Int("previous_failures", info.consecutiveFailures))
1037+
}
1038+
1039+
// Reset failure counter
1040+
delete(s.inspectionFailures, serverName)
1041+
}
1042+
1043+
// GetInspectionStats returns inspection failure statistics for a server
1044+
func (s *Supervisor) GetInspectionStats(serverName string) (failures int, inCooldown bool, cooldownRemaining time.Duration) {
1045+
s.inspectionFailuresMu.RLock()
1046+
defer s.inspectionFailuresMu.RUnlock()
1047+
1048+
info, exists := s.inspectionFailures[serverName]
1049+
if !exists {
1050+
return 0, false, 0
1051+
}
1052+
1053+
now := time.Now()
1054+
if now.Before(info.cooldownUntil) {
1055+
return info.consecutiveFailures, true, info.cooldownUntil.Sub(now)
1056+
}
1057+
1058+
return info.consecutiveFailures, false, 0
1059+
}

internal/server/e2e_test.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1029,29 +1029,43 @@ func TestE2E_InspectQuarantined(t *testing.T) {
10291029

10301030
t.Log("🔍 Calling inspect_quarantined for quarantined-server...")
10311031

1032-
// Call inspect_quarantined
1032+
// Call inspect_quarantined (use quarantine_security tool, not upstream_servers)
10331033
inspectRequest := mcp.CallToolRequest{}
1034-
inspectRequest.Params.Name = "upstream_servers"
1034+
inspectRequest.Params.Name = "quarantine_security"
10351035
inspectRequest.Params.Arguments = map[string]interface{}{
10361036
"operation": "inspect_quarantined",
10371037
"name": "quarantined-server",
10381038
}
10391039

10401040
inspectResult, err := mcpClient.CallTool(ctx, inspectRequest)
10411041
require.NoError(t, err, "inspect_quarantined should not return error")
1042+
1043+
// Debug: Print all content items with their types
1044+
t.Logf("📋 Inspection result - IsError: %v, Content count: %d", inspectResult.IsError, len(inspectResult.Content))
1045+
for i, content := range inspectResult.Content {
1046+
t.Logf("Content[%d] type: %T", i, content)
1047+
// Handle both pointer and value types
1048+
if textContent, ok := content.(*mcp.TextContent); ok {
1049+
t.Logf("Content[%d] text (pointer): %s", i, textContent.Text)
1050+
} else if textContent, ok := content.(mcp.TextContent); ok {
1051+
t.Logf("Content[%d] text (value): %s", i, textContent.Text)
1052+
}
1053+
}
1054+
10421055
if inspectResult.IsError {
1043-
// Print the error for debugging
1056+
// Print the error for debugging - handle both pointer and value types
10441057
for _, content := range inspectResult.Content {
10451058
if textContent, ok := content.(*mcp.TextContent); ok {
1046-
t.Logf("❌ Error from inspect_quarantined: %s", textContent.Text)
1059+
t.Logf("❌ Error from inspect_quarantined (pointer): %s", textContent.Text)
1060+
} else if textContent, ok := content.(mcp.TextContent); ok {
1061+
t.Logf("❌ Error from inspect_quarantined (value): %s", textContent.Text)
10471062
}
10481063
}
1064+
t.Fatal("inspect_quarantined returned an error - see logs above")
10491065
}
1050-
assert.False(t, inspectResult.IsError, "inspect_quarantined should succeed")
10511066

10521067
// Verify result contains tool data
10531068
require.NotEmpty(t, inspectResult.Content, "Result should have content")
1054-
t.Logf("✅ Inspection result received: %d content items", len(inspectResult.Content))
10551069

10561070
// Verify the result contains information about the tools
10571071
var resultText string

0 commit comments

Comments
 (0)