Skip to content

Commit f7caea5

Browse files
authored
Merge pull request #110 from smart-mcp-proxy/fix/upstream-stability
fix(upstream): improve SSE stability and OAuth timeout for JetBrains MCP
2 parents c9912fe + f43b290 commit f7caea5

File tree

16 files changed

+1391
-180
lines changed

16 files changed

+1391
-180
lines changed

.github/workflows/unit-tests.yml

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,41 @@ jobs:
171171
with:
172172
node-version: '20'
173173

174-
- name: Install frontend dependencies
174+
- name: Install frontend dependencies (Windows)
175+
if: matrix.os == 'windows-latest'
176+
shell: pwsh
177+
run: |
178+
cd frontend
179+
$maxRetries = 3
180+
$retryCount = 0
181+
182+
while ($retryCount -lt $maxRetries) {
183+
$retryCount++
184+
Write-Host "Attempting npm ci (attempt $retryCount/$maxRetries)..."
185+
186+
npm ci --prefer-offline --no-audit 2>&1 | Out-Host
187+
188+
if ($LASTEXITCODE -eq 0) {
189+
Write-Host "npm ci succeeded"
190+
break
191+
}
192+
193+
if ($retryCount -lt $maxRetries) {
194+
Write-Host "npm ci failed with exit code $LASTEXITCODE, waiting 5 seconds before retry..."
195+
Start-Sleep -Seconds 5
196+
197+
if (Test-Path node_modules) {
198+
Write-Host "Cleaning up node_modules..."
199+
Remove-Item -Recurse -Force node_modules -ErrorAction SilentlyContinue
200+
}
201+
} else {
202+
Write-Host "npm ci failed after $maxRetries attempts"
203+
exit 1
204+
}
205+
}
206+
207+
- name: Install frontend dependencies (Unix)
208+
if: matrix.os != 'windows-latest'
175209
run: cd frontend && npm ci
176210

177211
- name: Build frontend

cmd/mcpproxy-tray/main.go

Lines changed: 46 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ var (
4242
defaultCoreURL = "http://127.0.0.1:8080"
4343
errNoBundledCore = errors.New("no bundled core binary found")
4444
trayAPIKey = "" // API key generated for core communication
45+
shutdownComplete = make(chan struct{}) // Signal when shutdown is complete
4546
)
4647

4748
// getLogDir returns the standard log directory for the current OS.
@@ -152,21 +153,32 @@ func main() {
152153

153154
// Create tray application early so icon appears
154155
shutdownFunc := func() {
156+
defer close(shutdownComplete) // Signal when shutdown is done
157+
155158
logger.Info("Tray shutdown requested")
156-
// IMPORTANT: Send shutdown event to trigger state transition
159+
160+
// Send shutdown event to state machine
157161
stateMachine.SendEvent(state.EventShutdown)
158-
// IMPORTANT: Call handleShutdown() directly to terminate core process
159-
// We can't rely on the state transition goroutine because cancel() will kill it
162+
163+
// Shutdown launcher (stops SSE, health monitor, kills core)
160164
if launcher != nil {
161165
launcher.handleShutdown()
162166
}
163-
// IMPORTANT: Quit the tray UI BEFORE cancelling context
164-
// This prevents reconnection attempts from SSE goroutine
165-
logger.Info("Quitting system tray")
166-
trayApp.Quit()
167-
// Now shutdown state machine and cancel context
167+
168+
// Shutdown state machine
168169
stateMachine.Shutdown()
170+
171+
// CRITICAL: Cancel context LAST, after all cleanup
172+
// This prevents the tray.Run() goroutine from quitting prematurely
173+
logger.Info("Cancelling context after cleanup complete")
169174
cancel()
175+
176+
// Give tray.Run() goroutine a moment to see cancellation
177+
time.Sleep(100 * time.Millisecond)
178+
179+
// Finally, quit the tray UI
180+
logger.Info("Quitting system tray")
181+
trayApp.Quit()
170182
}
171183

172184
trayApp = tray.NewWithAPIClient(api.NewServerAdapter(apiClient), apiClient, logger.Sugar(), version, shutdownFunc)
@@ -214,25 +226,25 @@ func main() {
214226
go func() {
215227
<-sigCh
216228
logger.Info("Received shutdown signal")
217-
stateMachine.SendEvent(state.EventShutdown)
218-
// IMPORTANT: Call handleShutdown() directly to terminate core process
219-
// Same as shutdownFunc - we can't rely on state transition goroutine
220-
if launcher != nil {
221-
launcher.handleShutdown()
222-
}
223-
// IMPORTANT: Quit the tray UI BEFORE cancelling context
224-
logger.Info("Quitting system tray from signal handler")
225-
trayApp.Quit()
226-
stateMachine.Shutdown()
227-
cancel()
229+
230+
// Use the same shutdown flow as Quit menu item
231+
shutdownFunc()
228232
}()
229233

230234
logger.Info("Starting tray event loop")
231235
if err := trayApp.Run(ctx); err != nil && err != context.Canceled {
232236
logger.Error("Tray application error", zap.Error(err))
233237
}
234238

235-
// Wait for state machine to shut down gracefully
239+
// Wait for shutdown to complete (with timeout)
240+
select {
241+
case <-shutdownComplete:
242+
logger.Info("Shutdown completed successfully")
243+
case <-time.After(5 * time.Second):
244+
logger.Warn("Shutdown timeout - forcing exit")
245+
}
246+
247+
// Final cleanup
236248
stateMachine.Shutdown()
237249

238250
logger.Info("mcpproxy-tray shutdown complete")
@@ -331,33 +343,6 @@ func resolveCoreURL() string {
331343
return defaultCoreURL
332344
}
333345

334-
// isSocketAvailable checks if a socket/pipe endpoint is accessible
335-
func isSocketAvailable(endpoint string) bool {
336-
// Parse endpoint to extract scheme
337-
parsed, err := url.Parse(endpoint)
338-
if err != nil {
339-
return false
340-
}
341-
342-
switch parsed.Scheme {
343-
case "unix":
344-
// Check if Unix socket file exists
345-
socketPath := parsed.Path
346-
if socketPath == "" {
347-
socketPath = parsed.Opaque
348-
}
349-
_, err := os.Stat(socketPath)
350-
return err == nil
351-
case "npipe":
352-
// For Windows named pipes, we can't easily check existence
353-
// Return true and let the connection attempt fail if needed
354-
return true
355-
default:
356-
// Not a socket endpoint
357-
return false
358-
}
359-
}
360-
361346
func shouldSkipCoreLaunch() bool {
362347
value := strings.TrimSpace(os.Getenv("MCPPROXY_TRAY_SKIP_CORE"))
363348
return value == "1" || strings.EqualFold(value, "true")
@@ -1361,15 +1346,25 @@ func (cpl *CoreProcessLauncher) handleGeneralError() {
13611346
func (cpl *CoreProcessLauncher) handleShutdown() {
13621347
cpl.logger.Info("Core process launcher shutting down")
13631348

1364-
if cpl.processMonitor != nil {
1365-
cpl.processMonitor.Shutdown()
1366-
}
1349+
// CRITICAL: Stop SSE FIRST before killing core
1350+
// This prevents SSE from detecting disconnection and trying to reconnect
1351+
cpl.logger.Info("Stopping SSE connection")
1352+
cpl.apiClient.StopSSE()
13671353

1354+
// Give SSE goroutine a moment to see cancellation and exit cleanly
1355+
time.Sleep(100 * time.Millisecond)
1356+
1357+
// Stop health monitor before killing core
13681358
if cpl.healthMonitor != nil {
1359+
cpl.logger.Info("Stopping health monitor")
13691360
cpl.healthMonitor.Stop()
13701361
}
13711362

1372-
cpl.apiClient.StopSSE()
1363+
// Finally, kill the core process
1364+
if cpl.processMonitor != nil {
1365+
cpl.logger.Info("Shutting down core process")
1366+
cpl.processMonitor.Shutdown()
1367+
}
13731368
}
13741369

13751370
// buildCoreEnvironment builds the environment for the core process

cmd/mcpproxy/tools_cmd.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"time"
99

1010
"mcpproxy-go/internal/config"
11+
"mcpproxy-go/internal/transport"
1112
"mcpproxy-go/internal/upstream/cli"
1213

1314
"github.com/spf13/cobra"
@@ -29,16 +30,18 @@ This command is primarily used for debugging upstream server connections and too
2930
Examples:
3031
mcpproxy tools list --server=github-server --log-level=trace
3132
mcpproxy tools list --server=weather-api --log-level=debug
32-
mcpproxy tools list --server=local-script --log-level=info`,
33+
mcpproxy tools list --server=local-script --log-level=info
34+
mcpproxy tools list --server=jetbrains-sse --trace-transport # Enable HTTP/SSE frame tracing`,
3335
RunE: runToolsList,
3436
}
3537

3638
// Command flags
37-
serverName string
38-
toolsLogLevel string
39-
configPath string
40-
timeout time.Duration
41-
outputFormat string
39+
serverName string
40+
toolsLogLevel string
41+
configPath string
42+
timeout time.Duration
43+
outputFormat string
44+
traceTransport bool // Enable HTTP/SSE frame-by-frame tracing
4245
)
4346

4447
// GetToolsCommand returns the tools command for adding to the root command
@@ -56,6 +59,7 @@ func init() {
5659
toolsListCmd.Flags().StringVarP(&configPath, "config", "c", "", "Path to MCP configuration file (default: ~/.mcpproxy/mcp_config.json)")
5760
toolsListCmd.Flags().DurationVarP(&timeout, "timeout", "t", 30*time.Second, "Connection timeout")
5861
toolsListCmd.Flags().StringVarP(&outputFormat, "output", "o", "table", "Output format (table, json, yaml)")
62+
toolsListCmd.Flags().BoolVar(&traceTransport, "trace-transport", false, "Enable detailed HTTP/SSE frame-by-frame tracing (useful for debugging SSE connection issues)")
5963

6064
// Mark required flags
6165
err := toolsListCmd.MarkFlagRequired("server")
@@ -81,6 +85,14 @@ func runToolsList(_ *cobra.Command, _ []string) error {
8185
ctx, cancel := context.WithTimeout(context.Background(), timeout)
8286
defer cancel()
8387

88+
// Enable transport tracing if requested
89+
if traceTransport {
90+
transport.GlobalTraceEnabled = true
91+
fmt.Println("🔍 HTTP/SSE TRANSPORT TRACING ENABLED")
92+
fmt.Println(" All HTTP requests/responses and SSE frames will be logged")
93+
fmt.Println()
94+
}
95+
8496
// Load configuration
8597
globalConfig, err := loadToolsConfig()
8698
if err != nil {

internal/runtime/supervisor/actor_pool.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,23 @@ func (p *ActorPoolSimple) RemoveServer(name string) error {
122122
// ConnectServer tells the manager to connect a server.
123123
func (p *ActorPoolSimple) ConnectServer(ctx context.Context, name string) error {
124124
p.logger.Debug("Connecting server via manager", zap.String("name", name))
125-
// Manager handles connection automatically via managed clients
125+
126+
client, exists := p.manager.GetClient(name)
127+
if !exists {
128+
return fmt.Errorf("server %s not found", name)
129+
}
130+
131+
// Attempt to connect (managed client will handle state checks)
132+
// If client is already connecting/connected, Connect() will return an error
133+
// which we log but don't treat as fatal (supervisor will retry)
134+
if err := client.Connect(ctx); err != nil {
135+
p.logger.Debug("Connect returned error (may be already connecting/connected)",
136+
zap.String("server", name),
137+
zap.Error(err))
138+
// Not returning error - this is expected if client is already connecting
139+
// Supervisor's reconciliation logic will handle retries if needed
140+
}
141+
126142
return nil
127143
}
128144

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package supervisor
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/assert"
8+
"go.uber.org/zap"
9+
)
10+
11+
// TestCircuitBreaker tests the circuit breaker pattern for inspection failures
12+
func TestCircuitBreaker(t *testing.T) {
13+
logger, _ := zap.NewDevelopment()
14+
15+
// Test failure recording
16+
s := &Supervisor{
17+
logger: logger,
18+
inspectionFailures: make(map[string]*inspectionFailureInfo),
19+
}
20+
21+
serverName := "test-server"
22+
23+
// Record 3 failures
24+
for i := 0; i < 3; i++ {
25+
s.RecordInspectionFailure(serverName)
26+
}
27+
28+
// Check that circuit breaker is now active
29+
allowed, reason, cooldown := s.CanInspect(serverName)
30+
assert.False(t, allowed, "Should not allow inspection after 3 failures")
31+
assert.Contains(t, reason, "Circuit breaker active")
32+
assert.Greater(t, cooldown, time.Duration(0), "Should have cooldown remaining")
33+
34+
// Verify cooldown duration is approximately 5 minutes
35+
assert.InDelta(t, inspectionCooldown.Seconds(), cooldown.Seconds(), 1.0, "Cooldown should be ~5 minutes")
36+
}
37+
38+
// TestCircuitBreakerReset tests that successful inspection resets the failure counter
39+
func TestCircuitBreakerReset(t *testing.T) {
40+
logger, _ := zap.NewDevelopment()
41+
42+
s := &Supervisor{
43+
logger: logger,
44+
inspectionFailures: make(map[string]*inspectionFailureInfo),
45+
}
46+
47+
serverName := "test-server"
48+
49+
// Record 2 failures
50+
s.RecordInspectionFailure(serverName)
51+
s.RecordInspectionFailure(serverName)
52+
53+
// Verify we're still allowed (under threshold)
54+
allowed, _, _ := s.CanInspect(serverName)
55+
assert.True(t, allowed, "Should still allow inspection with 2 failures")
56+
57+
// Record success - should reset counter
58+
s.RecordInspectionSuccess(serverName)
59+
60+
// Verify failure counter was reset
61+
failures, inCooldown, _ := s.GetInspectionStats(serverName)
62+
assert.Equal(t, 0, failures, "Failure counter should be reset after success")
63+
assert.False(t, inCooldown, "Should not be in cooldown after success")
64+
}
65+
66+
// TestExemptionExpiry tests that exemptions expire correctly
67+
func TestExemptionExpiry(t *testing.T) {
68+
logger, _ := zap.NewDevelopment()
69+
70+
s := &Supervisor{
71+
logger: logger,
72+
inspectionExemptions: make(map[string]time.Time),
73+
inspectionFailures: make(map[string]*inspectionFailureInfo),
74+
}
75+
76+
serverName := "test-server"
77+
78+
// Grant exemption with short duration
79+
s.inspectionExemptions[serverName] = time.Now().Add(100 * time.Millisecond)
80+
81+
// Should be exempted immediately
82+
assert.True(t, s.IsInspectionExempted(serverName), "Should be exempted immediately")
83+
84+
// Wait for expiry
85+
time.Sleep(150 * time.Millisecond)
86+
87+
// Should no longer be exempted
88+
assert.False(t, s.IsInspectionExempted(serverName), "Should not be exempted after expiry")
89+
}

0 commit comments

Comments
 (0)