Skip to content

Commit a63f10e

Browse files
authored
Fix SSE transport not properly handling HTTP/2 NO_ERROR disconnections (#509)
* Add OnConnectionLost method to Client and SSE transport to handle HTTP2 idle timeout disconnections gracefully. This allows applications to distinguish between actual errors and expected connection drops. * test: Add comprehensive NO_ERROR handling tests for SSE transport * fix: Make NO_ERROR handling backward compatible and optimize performance * fix: Make NO_ERROR handling backward compatible and add documentation
1 parent fda6b38 commit a63f10e

File tree

4 files changed

+284
-5
lines changed

4 files changed

+284
-5
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -537,7 +537,7 @@ For examples, see the [`examples/`](examples/) directory.
537537

538538
### Transports
539539

540-
MCP-Go supports stdio, SSE and streamable-HTTP transport layers.
540+
MCP-Go supports stdio, SSE and streamable-HTTP transport layers. For SSE transport, you can use `SetConnectionLostHandler()` to detect and handle HTTP/2 idle timeout disconnections (NO_ERROR) for implementing reconnection logic.
541541

542542
### Session Management
543543

client/client.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,17 @@ func (c *Client) OnNotification(
113113
c.notifications = append(c.notifications, handler)
114114
}
115115

116+
// OnConnectionLost registers a handler function to be called when the connection is lost.
117+
// This is useful for handling HTTP2 idle timeout disconnections that should not be treated as errors.
118+
func (c *Client) OnConnectionLost(handler func(error)) {
119+
type connectionLostSetter interface {
120+
SetConnectionLostHandler(func(error))
121+
}
122+
if setter, ok := c.transport.(connectionLostSetter); ok {
123+
setter.SetConnectionLostHandler(handler)
124+
}
125+
}
126+
116127
// sendRequest sends a JSON-RPC request to the server and waits for a response.
117128
// Returns the raw JSON response message or an error if the request fails.
118129
func (c *Client) sendRequest(

client/transport/sse.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,12 @@ type SSE struct {
3434
headers map[string]string
3535
headerFunc HTTPHeaderFunc
3636

37-
started atomic.Bool
38-
closed atomic.Bool
39-
cancelSSEStream context.CancelFunc
40-
protocolVersion atomic.Value // string
37+
started atomic.Bool
38+
closed atomic.Bool
39+
cancelSSEStream context.CancelFunc
40+
protocolVersion atomic.Value // string
41+
onConnectionLost func(error)
42+
connectionLostMu sync.RWMutex
4143

4244
// OAuth support
4345
oauthHandler *OAuthHandler
@@ -204,6 +206,19 @@ func (c *SSE) readSSE(reader io.ReadCloser) {
204206
}
205207
break
206208
}
209+
// Checking whether the connection was terminated due to NO_ERROR in HTTP2 based on RFC9113
210+
// Only handle NO_ERROR specially if onConnectionLost handler is set to maintain backward compatibility
211+
if strings.Contains(err.Error(), "NO_ERROR") {
212+
c.connectionLostMu.RLock()
213+
handler := c.onConnectionLost
214+
c.connectionLostMu.RUnlock()
215+
216+
if handler != nil {
217+
// This is not actually an error - HTTP2 idle timeout disconnection
218+
handler(err)
219+
return
220+
}
221+
}
207222
if !c.closed.Load() {
208223
fmt.Printf("SSE stream error: %v\n", err)
209224
}
@@ -294,6 +309,12 @@ func (c *SSE) SetNotificationHandler(handler func(notification mcp.JSONRPCNotifi
294309
c.onNotification = handler
295310
}
296311

312+
func (c *SSE) SetConnectionLostHandler(handler func(error)) {
313+
c.connectionLostMu.Lock()
314+
defer c.connectionLostMu.Unlock()
315+
c.onConnectionLost = handler
316+
}
317+
297318
// SendRequest sends a JSON-RPC request to the server and waits for a response.
298319
// Returns the raw JSON response message or an error if the request fails.
299320
func (c *SSE) SendRequest(

client/transport/sse_test.go

Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"encoding/json"
66
"errors"
7+
"io"
8+
"strings"
79
"sync"
810
"testing"
911
"time"
@@ -15,6 +17,39 @@ import (
1517
"github.com/mark3labs/mcp-go/mcp"
1618
)
1719

20+
// mockReaderWithError is a mock io.ReadCloser that simulates reading some data
21+
// and then returning a specific error
22+
type mockReaderWithError struct {
23+
data []byte
24+
err error
25+
position int
26+
closed bool
27+
}
28+
29+
func (m *mockReaderWithError) Read(p []byte) (n int, err error) {
30+
if m.closed {
31+
return 0, io.EOF
32+
}
33+
34+
if m.position >= len(m.data) {
35+
return 0, m.err
36+
}
37+
38+
n = copy(p, m.data[m.position:])
39+
m.position += n
40+
41+
if m.position >= len(m.data) {
42+
return n, m.err
43+
}
44+
45+
return n, nil
46+
}
47+
48+
func (m *mockReaderWithError) Close() error {
49+
m.closed = true
50+
return nil
51+
}
52+
1853
// startMockSSEEchoServer starts a test HTTP server that implements
1954
// a minimal SSE-based echo server for testing purposes.
2055
// It returns the server URL and a function to close the server.
@@ -508,6 +543,218 @@ func TestSSE(t *testing.T) {
508543
}
509544
})
510545

546+
t.Run("NO_ERROR_WithoutConnectionLostHandler", func(t *testing.T) {
547+
// Test that NO_ERROR without connection lost handler maintains backward compatibility
548+
// When no connection lost handler is set, NO_ERROR should be treated as a regular error
549+
550+
// Create a mock Reader that simulates NO_ERROR
551+
mockReader := &mockReaderWithError{
552+
data: []byte("event: endpoint\ndata: /message\n\n"),
553+
err: errors.New("connection closed: NO_ERROR"),
554+
}
555+
556+
// Create SSE transport
557+
url, closeF := startMockSSEEchoServer()
558+
defer closeF()
559+
560+
trans, err := NewSSE(url)
561+
if err != nil {
562+
t.Fatal(err)
563+
}
564+
565+
// DO NOT set connection lost handler to test backward compatibility
566+
567+
// Capture stderr to verify the error is printed (backward compatible behavior)
568+
// Since we can't easily capture fmt.Printf output in tests, we'll just verify
569+
// that the readSSE method returns without calling any handler
570+
571+
// Directly test the readSSE method with our mock reader
572+
go trans.readSSE(mockReader)
573+
574+
// Wait for readSSE to complete
575+
time.Sleep(100 * time.Millisecond)
576+
577+
// The test passes if readSSE completes without panicking or hanging
578+
// In backward compatibility mode, NO_ERROR should be treated as a regular error
579+
t.Log("Backward compatibility test passed: NO_ERROR handled as regular error when no handler is set")
580+
})
581+
582+
t.Run("NO_ERROR_ConnectionLost", func(t *testing.T) {
583+
// Test that NO_ERROR in HTTP/2 connection loss is properly handled
584+
// This test verifies that when a connection is lost in a way that produces
585+
// an error message containing "NO_ERROR", the connection lost handler is called
586+
587+
var connectionLostCalled bool
588+
var connectionLostError error
589+
var mu sync.Mutex
590+
591+
// Create a mock Reader that simulates connection loss with NO_ERROR
592+
mockReader := &mockReaderWithError{
593+
data: []byte("event: endpoint\ndata: /message\n\n"),
594+
err: errors.New("http2: stream closed with error code NO_ERROR"),
595+
}
596+
597+
// Create SSE transport
598+
url, closeF := startMockSSEEchoServer()
599+
defer closeF()
600+
601+
trans, err := NewSSE(url)
602+
if err != nil {
603+
t.Fatal(err)
604+
}
605+
606+
// Set connection lost handler
607+
trans.SetConnectionLostHandler(func(err error) {
608+
mu.Lock()
609+
defer mu.Unlock()
610+
connectionLostCalled = true
611+
connectionLostError = err
612+
})
613+
614+
// Directly test the readSSE method with our mock reader that simulates NO_ERROR
615+
go trans.readSSE(mockReader)
616+
617+
// Wait for connection lost handler to be called
618+
timeout := time.After(1 * time.Second)
619+
ticker := time.NewTicker(10 * time.Millisecond)
620+
defer ticker.Stop()
621+
622+
for {
623+
select {
624+
case <-timeout:
625+
t.Fatal("Connection lost handler was not called within timeout for NO_ERROR connection loss")
626+
case <-ticker.C:
627+
mu.Lock()
628+
called := connectionLostCalled
629+
err := connectionLostError
630+
mu.Unlock()
631+
632+
if called {
633+
if err == nil {
634+
t.Fatal("Expected connection lost error, got nil")
635+
}
636+
637+
// Verify that the error contains "NO_ERROR" string
638+
if !strings.Contains(err.Error(), "NO_ERROR") {
639+
t.Errorf("Expected error to contain 'NO_ERROR', got: %v", err)
640+
}
641+
642+
t.Logf("Connection lost handler called with NO_ERROR: %v", err)
643+
return
644+
}
645+
}
646+
}
647+
})
648+
649+
t.Run("NO_ERROR_Handling", func(t *testing.T) {
650+
// Test specific NO_ERROR string handling in readSSE method
651+
// This tests the code path at line 209 where NO_ERROR is checked
652+
653+
// Create a mock Reader that simulates an error containing "NO_ERROR"
654+
mockReader := &mockReaderWithError{
655+
data: []byte("event: endpoint\ndata: /message\n\n"),
656+
err: errors.New("connection closed: NO_ERROR"),
657+
}
658+
659+
// Create SSE transport
660+
url, closeF := startMockSSEEchoServer()
661+
defer closeF()
662+
663+
trans, err := NewSSE(url)
664+
if err != nil {
665+
t.Fatal(err)
666+
}
667+
668+
var connectionLostCalled bool
669+
var connectionLostError error
670+
var mu sync.Mutex
671+
672+
// Set connection lost handler to verify it's called for NO_ERROR
673+
trans.SetConnectionLostHandler(func(err error) {
674+
mu.Lock()
675+
defer mu.Unlock()
676+
connectionLostCalled = true
677+
connectionLostError = err
678+
})
679+
680+
// Directly test the readSSE method with our mock reader
681+
go trans.readSSE(mockReader)
682+
683+
// Wait for connection lost handler to be called
684+
timeout := time.After(1 * time.Second)
685+
ticker := time.NewTicker(10 * time.Millisecond)
686+
defer ticker.Stop()
687+
688+
for {
689+
select {
690+
case <-timeout:
691+
t.Fatal("Connection lost handler was not called within timeout for NO_ERROR")
692+
case <-ticker.C:
693+
mu.Lock()
694+
called := connectionLostCalled
695+
err := connectionLostError
696+
mu.Unlock()
697+
698+
if called {
699+
if err == nil {
700+
t.Fatal("Expected connection lost error with NO_ERROR, got nil")
701+
}
702+
703+
// Verify that the error contains "NO_ERROR" string
704+
if !strings.Contains(err.Error(), "NO_ERROR") {
705+
t.Errorf("Expected error to contain 'NO_ERROR', got: %v", err)
706+
}
707+
708+
t.Logf("Successfully handled NO_ERROR: %v", err)
709+
return
710+
}
711+
}
712+
}
713+
})
714+
715+
t.Run("RegularError_DoesNotTriggerConnectionLost", func(t *testing.T) {
716+
// Test that regular errors (not containing NO_ERROR) do not trigger connection lost handler
717+
718+
// Create a mock Reader that simulates a regular error
719+
mockReader := &mockReaderWithError{
720+
data: []byte("event: endpoint\ndata: /message\n\n"),
721+
err: errors.New("regular connection error"),
722+
}
723+
724+
// Create SSE transport
725+
url, closeF := startMockSSEEchoServer()
726+
defer closeF()
727+
728+
trans, err := NewSSE(url)
729+
if err != nil {
730+
t.Fatal(err)
731+
}
732+
733+
var connectionLostCalled bool
734+
var mu sync.Mutex
735+
736+
// Set connection lost handler - this should NOT be called for regular errors
737+
trans.SetConnectionLostHandler(func(err error) {
738+
mu.Lock()
739+
defer mu.Unlock()
740+
connectionLostCalled = true
741+
})
742+
743+
// Directly test the readSSE method with our mock reader
744+
go trans.readSSE(mockReader)
745+
746+
// Wait and verify connection lost handler is NOT called
747+
time.Sleep(200 * time.Millisecond)
748+
749+
mu.Lock()
750+
called := connectionLostCalled
751+
mu.Unlock()
752+
753+
if called {
754+
t.Error("Connection lost handler should not be called for regular errors")
755+
}
756+
})
757+
511758
}
512759

513760
func TestSSEErrors(t *testing.T) {

0 commit comments

Comments
 (0)