-
Notifications
You must be signed in to change notification settings - Fork 490
Unified Client Transport Layer for Streamable HTTP Support #114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Abstract Transport Layer
WalkthroughThe pull request refactors the MCP client implementation by replacing the earlier interface-based design with a concrete Changes
Tip ⚡💬 Agentic Chat (Pro Plan, General Availability)
✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (13)
client/stdio.go (2)
21-29
: Pass a user context rather thancontext.Background()
.
Currently, the call tostdioTransport.Start(context.Background())
ignores any potential caller-provided context for cancellation or timeouts. This may limit flexibility for graceful startup cancellation.Consider modifying the signature to accept a caller-provided context:
-func NewStdioMCPClient(command string, env []string, args ...string) (*Client, error) { - stdioTransport := transport.NewStdio(command, env, args...) - err := stdioTransport.Start(context.Background()) +func NewStdioMCPClient(ctx context.Context, command string, env []string, args ...string) (*Client, error) { + stdioTransport := transport.NewStdio(command, env, args...) + err := stdioTransport.Start(ctx)This would allow the caller to manage cancellation logic if needed.
32-39
: Avoid potential panic when cast is invalid.
The doc comments mention thatGetStderr
works only with stdio transport, but a direct castt.(*transport.Stdio)
can panic if the transport is not actuallyStdio
. Consider checking the transport type or returning an error instead of panicking.func GetStderr(c *Client) io.Reader { t := c.GetTransport() - stdio := t.(*transport.Stdio) - return stdio.Stderr() + stdio, ok := t.(*transport.Stdio) + if !ok { + return nil // or consider returning an explicit error, e.g., an error or a no-op reader + } + return stdio.Stderr() }client/sse.go (2)
11-12
: This pass-through function is acceptable but somewhat redundant.Consider whether you need a separate client-level utility if it does not add logic beyond calling
transport.WithHeaders
.
15-16
: Another pass-through that might be consolidated.Similar to
WithHeaders
, there's no additional logic here beyond calling an identical function intransport
.client/transport/stdio_test.go (1)
17-29
: Compilation approach is fine, but consider caching.Compiling the mock server for every test run may slow down test cycles. Caching the compiled artifact (if feasible) or conditionally rebuilding might improve performance.
client/transport/stdio.go (4)
36-55
: Straightforward constructor.This function initializes the client properly. If you ever need to validate or sanitize arguments, this would be the place to do it.
100-111
: Graceful shutdown logic.The code closes stdin and stderr, then waits on the subprocess. Consider returning a combined error if multiple closings fail.
func (c *Stdio) Close() error { close(c.done) - if err := c.stdin.Close(); err != nil { - return fmt.Errorf("failed to close stdin: %w", err) - } - if err := c.stderr.Close(); err != nil { - return fmt.Errorf("failed to close stderr: %w", err) - } - return c.cmd.Wait() + var errs []error + if err := c.stdin.Close(); err != nil { + errs = append(errs, fmt.Errorf("stdin close error: %w", err)) + } + if err := c.stderr.Close(); err != nil { + errs = append(errs, fmt.Errorf("stderr close error: %w", err)) + } + if err := c.cmd.Wait(); err != nil { + errs = append(errs, fmt.Errorf("cmd wait error: %w", err)) + } + if len(errs) > 0 { + return fmt.Errorf("Close() encountered errors: %v", errs) + } + return nil }
113-121
: Flexible notification handler approach.Registering a single handler pointer at a time is adequate. If multiple handlers are needed, consider appending them to a slice.
123-171
: Error logging is direct.Reading responses in a loop is good. The console print for read errors should be replaced with more robust logging if production-level usage is intended.
client/transport/sse.go (3)
155-156
: Avoid using fmt.Printf for error logging in library code.Printing SSE stream errors directly to stdout can clutter logs and limit error handling for consumers. Prefer a configurable logger or return these errors to the caller or a notification handler. This ensures consistent logging and better control over error visibility.
- fmt.Printf("SSE stream error: %v\n", err) + // Example: replace with structured logger or pass error upstream + log.Errorf("SSE stream error: %v", err)
185-194
: Validate full URL rather than only the hostname.When receiving the
endpoint
event, only theHost
is compared against the originalbaseURL
. If the server’s SSE endpoint uses a different scheme or port, the check will fail. Consider adding checks for scheme/port or clarifying in documentation that the remote must have the same exact origin to avoid false negatives or positives.
232-310
: Consider parsing JSON-RPC errors even for non-2xx responses.Currently, if the server returns a non-OK status, you extract a string body but do not parse it as JSON. If the server returns a valid JSON-RPC error object in a 4xx or 5xx status, you might lose structured error details. Consider optionally doing a JSON parse and exposing that error object to callers.
client/client.go (1)
82-105
: Return more structured JSON-RPC error details.When a JSON-RPC error occurs, the code only returns the error message (
response.Error.Message
). Consider exposingresponse.Error.Code
andresponse.Error.Data
as well, or returning a specialized error type to help callers implement more precise error handling.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
client/client.go
(1 hunks)client/interface.go
(1 hunks)client/sse.go
(1 hunks)client/sse_test.go
(2 hunks)client/stdio.go
(1 hunks)client/stdio_test.go
(1 hunks)client/transport/interface.go
(1 hunks)client/transport/sse.go
(1 hunks)client/transport/sse_test.go
(1 hunks)client/transport/stdio.go
(1 hunks)client/transport/stdio_test.go
(1 hunks)client/types.go
(0 hunks)testdata/mockstdio_server.go
(2 hunks)
💤 Files with no reviewable changes (1)
- client/types.go
🧰 Additional context used
🧬 Code Definitions (9)
client/stdio_test.go (1)
client/stdio.go (1)
GetStderr
(36-40)
client/sse_test.go (1)
client/transport/sse.go (1)
SSE
(24-39)
client/stdio.go (1)
client/client.go (2)
Client
(16-24)NewClient
(33-37)
client/sse.go (2)
client/transport/sse.go (5)
WithHeaders
(43-47)ClientOption
(41-41)WithSSEReadTimeout
(49-53)NewSSE
(57-77)SSE
(24-39)client/client.go (2)
Client
(16-24)NewClient
(33-37)
testdata/mockstdio_server.go (1)
mcp/types.go (2)
Params
(104-104)Result
(173-177)
client/transport/stdio.go (1)
testdata/mockstdio_server.go (2)
JSONRPCResponse
(18-26)JSONRPCRequest
(11-16)
client/transport/stdio_test.go (2)
client/transport/stdio.go (1)
NewStdio
(39-55)testdata/mockstdio_server.go (2)
JSONRPCRequest
(11-16)JSONRPCResponse
(18-26)
client/transport/sse.go (1)
client/sse.go (3)
WithHeaders
(11-13)WithSSEReadTimeout
(15-17)GetEndpoint
(34-38)
client/client.go (1)
mcp/types.go (10)
JSONRPCNotification
(192-195)ServerCapabilities
(319-342)Params
(104-104)Result
(173-177)InitializeRequest
(266-275)InitializeResult
(279-293)Implementation
(345-348)ClientCapabilities
(304-314)Notification
(106-109)ReadResourceResult
(439-442)
🔇 Additional comments (38)
client/stdio_test.go (1)
50-50
: Encapsulation improvement for transport layer accessThe change from direct stderr access to using
GetStderr(client)
aligns with the refactoring strategy of separating transport concerns from the client interface. This approach improves abstraction by delegating transport-specific operations to the appropriate layer.client/sse_test.go (2)
4-5
: Appropriate import for transport layerAdding the transport package import is necessary for the refactored code to properly access transport-specific functionality.
50-51
: Improved transport layer abstractionGood refactoring to access the base URL through the transport layer rather than directly from the client. This change aligns with the architectural shift toward a clear separation between client logic and transport mechanisms.
client/interface.go (1)
10-84
: Well-structured MCPClient interface definitionThe MCPClient interface is comprehensive, well-documented, and follows good design patterns:
- Clear method documentation
- Consistent use of context for cancellation
- Comprehensive coverage of all necessary client operations
- Clean method signatures with appropriate parameter and return types
This interface provides a solid foundation for the client implementation and enables better testability through interface mocking.
client/transport/interface.go (3)
10-27
: Well-designed transport interface with clear responsibilitiesThe transport Interface cleanly separates concerns by abstracting connection management and communication protocols. The methods have clear responsibilities and are well-documented. This design enables pluggable transport implementations while keeping the client code focused on application-level logic.
29-34
: Appropriate JSON-RPC request structureThe JSONRPCRequest struct properly follows the JSON-RPC specification with appropriate fields and JSON tags. The optional params field is correctly marked with omitempty.
36-45
: Comprehensive JSON-RPC response structureThe JSONRPCResponse struct correctly handles both successful responses (with Result) and error responses (with Error). The nested error structure properly captures error code, message, and additional data, following the JSON-RPC specification.
client/stdio.go (2)
8-8
: No concerns about the new import.
This additional import for thetransport
package aligns well with the new pluggable transport design.
14-16
: Good clarification in the function-level comments.
Stating thatNewStdioMCPClient
auto-starts the transport and should not be started manually helps avoid confusion.testdata/mockstdio_server.go (4)
13-13
: Optional ID field for JSON-RPC requests.
ChangingID
to*int64
withomitempty
aligns with JSON-RPC convention where notifications have no ID. This is correct.
20-20
: Optional ID field for JSON-RPC responses.
Similarly, makingID
optional in responses allows for flexibility in handling notifications. Looks good.
142-155
: Debug echo methods enhance testing coverage.
Introducing"debug/echo"
and"debug/echo_notification"
is useful for validating transport correctness. Logging to stdout appears intentional for debugging.
156-165
:debug/echo_error_string
reuses a standard JSON-RPC error code.
It’s fine to reuse code-32601
(method not found) for debugging, but be aware it might confuse some JSON-RPC clients that strictly interpret this code. This is acceptable if intended for testing.client/transport/sse_test.go (4)
1-16
: Thorough test setup and imports.
The broad coverage for context handling, JSON encoding/decoding, concurrency, and SSE behavior is well-organized.
18-119
: Robust SSE mock server helpful for integration testing.
Starting a minimal SSE server with a dedicated message endpoint is an excellent way to validate real-world SSE flows. The approach for partial concurrency and ensuring correct SSE headers is solid.
121-377
: Comprehensive SSE transport tests.
Verifying request/response flow, notifications, concurrency, and timeouts covers the crucial aspects of SSE-based communication. The usage of context cancellations and direct SSE events is well-structured.
380-469
: Error scenarios thoroughly tested.
Checks for invalid URLs, early requests (before start), and requests after close ensure the SSE transport handles edge cases reliably.client/sse.go (2)
8-8
: No concerns regarding the new import.This import statement is necessary for referencing the transport package.
21-28
: Constructor logic looks appropriate.This function correctly initializes an SSE transport, checks for errors, and returns a unified
Client
. The error handling is straightforwardly managed, and the refactor aligns with the new architecture.client/transport/stdio_test.go (9)
31-50
: Test setup logic is solid.This section compiles the server, creates a
Stdio
instance, and starts it with a timeout. The error handling and cleanup steps look good.
52-104
: Comprehensive request test coverage.The “SendRequest” sub-test thoroughly checks JSON-RPC correctness and result data matching. No major issues found.
106-125
: Correct handling of canceled context.The logic properly verifies that a request fails under a canceled context.
127-174
: Notification handling test is thorough.Utilizing a channel to validate that notifications arrive is a clean approach. No concerns.
176-251
: Good concurrency test with multiple requests.Concurrent requests and the checks for matching IDs are well-implemented. The approach to concurrency via a WaitGroup is effective.
253-289
: Verifies error responses correctly.Testing the “debug/echo_error_string” method ensures that error messages are unmarshaled and handled properly.
290-303
: InvalidCommand scenario is well-tested.Verifying failure when starting with a non-existent command ensures robust error handling.
304-330
: Requests beforeStart()
are handled correctly.The test properly checks for an error message if requests run on an uninitialized transport.
331-367
: Post-close request scenario is covered.Ensuring an error is returned after transport closure is good practice, preventing accidental usage.
client/transport/stdio.go (6)
16-19
: Documentation is clear and helpful.These comments succinctly describe the transport approach and its usage of JSON-RPC over stdio.
20-34
: Struct design is logical.The fields adequately track command details, streams, response channels, and notification handlers.
57-99
: Robust startup logic.The environment merging, pipe creation, and goroutine-based reading of responses are well-structured. Watch out for partial failures in pipe creation, but the current approach is acceptable.
173-210
: Solid request-handling code.The concurrency logic using
responses[request.ID]
is straightforward, but watch out for potential race conditions if the same ID is reused.
212-228
: Notification sending logic is consistent.This straightforward method for writing notification bytes to stdin satisfies JSON-RPC requirements.
230-234
: Convenient stderr access.Exposing the stderr stream can be helpful for debugging. No issues.
client/transport/sse.go (1)
24-39
:✅ Verification successful
Consider clarifying handling of SSE concurrency.
Within the SSE struct, multiple fields (e.g.,
responses
) depend on locks to synchronize concurrent writes. It's important to confirm that every write/read site is correctly guarded, including the subscription and unsubscription for responses in other methods. This looks consistent so far, but please ensure thorough usage of the mutex or RWMutex throughout all interactions with shared fields.Could you verify whether all concurrent reads/writes to
responses
are wrapped in locks by scanning the codebase with a simple search for its references?
🏁 Script executed:
#!/bin/bash rg -A 5 'responses'Length of output: 10569
Verified Concurrency Handling in SSE
The grep search confirmed that all accesses to the
responses
map inclient/transport/sse.go
are guarded by the RWMutexmu
. Both reading (using RLock/RUnlock) and modifying (entries
added or deleted) are consistently protected, including the use of locks in theClose()
method. The SSE implementation mirrors the safe concurrency patterns used in the stdio transport code. No additional changes are required.client/client.go (3)
39-58
: Good approach for hooking notification handler in Start().By delegating the notification callback to an internal loop and collecting them in
notifications
, you ensure each registered handler can act on incoming events. This structure is clear and easy to extend with additional logic if needed in the future.
107-155
: Re-check post-initialization behavior.After sending the
initialize
request, you issue an "initialized" notification. Ensure the server is designed to handle incoming notifications gracefully in that sequence. This can avoid race conditions if the server expects the client to wait for further instructions before sending more messages.
65-73
: Nice concurrency handling of notification handlers.Using a single
RWMutex
and appending new handlers is straightforward, ensuring thread-safe modifications without blocking reads. This improves extensibility if you plan to add priority or filtering in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (4)
client/stdio.go (1)
32-37
: This helper will panic for non-stdio transports.This approach matches the SSE pattern of panicking if used with the wrong transport. If you want to avoid runtime panics, consider adding a type check or returning an error instead.
client/sse.go (1)
31-33
: Similar panic risk if used with a non-SSE transport.Matches the stdio approach of panicking when called with the wrong transport.
Also applies to: 35-37
client/transport/sse.go (1)
79-132
: Initialization with a 30-second wait for 'endpoint' event is consistent with the original logic.Acknowledging the earlier discussion on maintaining the same timeout handling from the previous SSE client.
client/client.go (1)
15-24
: Potential concurrency race oninitialized
field.
This mirrors a prior comment regarding thread safety. Theinitialized
field is accessed outside a lock or atomic operation, unlikerequestID
. If multiple goroutines invokesendRequest
or callInitialize
concurrently, it may lead to data races.
🧹 Nitpick comments (3)
client/stdio.go (2)
14-16
: Good documentation on backward compatibility.It might be helpful to clarify whether callers can ever safely invoke
Start
for advanced customization or if doing so will lead to undefined behavior.
23-24
: Consider using a caller-provided context.Currently,
context.Background()
is used. In some scenarios, allowing users to supply their own context could make cancellation or timeout handling more flexible.Also applies to: 26-26, 29-29
client/transport/sse.go (1)
186-235
: Consider replacing direct stdout prints with a logging mechanism.Library logs via
fmt.Printf
may not integrate well in user environments. For example:- fmt.Printf("Endpoint origin does not match connection origin\n") + // Example of switching to a logger or returning an error + log.Printf("[SSE] Endpoint origin mismatch: ignoring %s\n", data)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
client/client.go
(1 hunks)client/sse.go
(1 hunks)client/stdio.go
(1 hunks)client/transport/sse.go
(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
client/transport/sse.go (1)
Learnt from: leavez
PR: mark3labs/mcp-go#114
File: client/transport/sse.go:137-179
Timestamp: 2025-04-06T10:07:06.685Z
Learning: The SSE client implementation in the MCP-Go project uses a 30-second timeout for reading SSE events to match the behavior of the original implementation before the transport layer refactoring.
🧬 Code Definitions (2)
client/transport/sse.go (3)
client/client.go (1)
Client
(16-24)mcp/types.go (1)
JSONRPCNotification
(192-195)client/sse.go (3)
WithHeaders
(11-13)WithSSEReadTimeout
(15-17)GetEndpoint
(34-38)
client/client.go (5)
client/transport/interface.go (1)
Interface
(11-27)mcp/types.go (18)
JSONRPCNotification
(192-195)ServerCapabilities
(319-342)JSONRPCRequest
(184-189)Params
(104-104)Result
(173-177)InitializeRequest
(266-275)InitializeResult
(279-293)Implementation
(345-348)ClientCapabilities
(304-314)Notification
(106-109)ListResourcesRequest
(400-402)ListResourcesResult
(406-409)ListResourceTemplatesRequest
(413-415)ListResourceTemplatesResult
(419-422)ReadResourceRequest
(426-435)ReadResourceResult
(439-442)SubscribeRequest
(454-461)UnsubscribeRequest
(466-472)mcp/utils.go (3)
ParseReadResourceResult
(554-596)ParseGetPromptResult
(414-474)ParseCallToolResult
(476-525)mcp/prompts.go (4)
ListPromptsRequest
(7-9)ListPromptsResult
(13-16)GetPromptRequest
(20-28)GetPromptResult
(32-37)mcp/tools.go (4)
ListToolsRequest
(13-15)ListToolsResult
(19-22)CallToolRequest
(44-59)CallToolResult
(34-41)
🔇 Additional comments (39)
client/stdio.go (2)
8-8
: No issues with the updated import.
21-21
:✅ Verification successful
Confirm that references are updated to reflect the new return type.
Ensure all code calling
NewStdioMCPClient
expects a*Client
instead of the old*StdioMCPClient
.
🏁 Script executed:
#!/bin/bash # Searching usage of NewStdioMCPClient to verify calls expect new return type rg -A 3 "NewStdioMCPClient"Length of output: 1165
Update Verification: Return Type Consistency Confirmed
- The function now returns
(*Client, error)
as intended.- Usage in both
examples/filesystem_stdio_client/main.go
andclient/stdio_test.go
correctly assign the return value without any indication of expecting the previous*StdioMCPClient
type.- There are no remaining references to the old type, ensuring code consistency across the project.
client/sse.go (6)
8-8
: No additional concerns on the import line.
11-12
: Simple pass-through helper for headers.
15-16
: Simple pass-through helper for read timeouts.
23-25
: Confirm consistent auto-start behavior with stdio transport.Unlike stdio, this does not automatically call
Start
. Verify that users relying on auto-start for SSE are handled appropriately.
28-29
: No issues returning the new unified Client.
21-21
:✅ Verification successful
Ensure references to the old SSEMCPClient usage are properly migrated.
🏁 Script executed:
#!/bin/bash # Verify that all calls to old SSEMCPClient constructor are updated rg -A 3 "SSEMCPClient"Length of output: 1872
Update: SSEMCPClient Migration Verified
The grep results confirm that everywhere (including tests) the constructor now explicitly uses
NewSSEMCPClient
. All active calls have been successfully updated, and no old references remain. There are a few commented-out instances in the tests; consider removing them if they are no longer needed.client/transport/sse.go (10)
20-39
: Struct design appears concurrency-safe.The use of atomic booleans for
started
andclosed
and RWMutexes forresponses
andonNotification
seems appropriate.
41-53
: Client options logic looks clean.These wrappers allow flexible configuration of headers and timeouts.
55-77
: Constructor is clear and maintains backward-compatible defaults.The default 30-second SSE read timeout is consistent with prior behavior per your notes.
134-184
: SSE reading loop follows standard line-based parsing.The approach of parsing events and data, then dispatching them, looks typical for SSE.
237-241
: Notification handler setter is straightforward.
243-311
: JSON-RPC request flow is well-structured.The channel-based response tracking and cleanup on context cancellation seem correct.
313-335
: Clean close procedure handles concurrency gracefully.Cancels the SSE stream and cleans up pending responses, preventing goroutine leaks.
337-381
: Notification sending logic is consistent with JSON-RPC.No return channel needed, and the HTTP response status is checked properly.
382-385
: Endpoint getter is straightforward.
387-390
: Base URL getter is straightforward.client/client.go (21)
5-9
: Imports look appropriate for JSON handling, error wrapping, and concurrency.
The usage ofencoding/json
,errors
,fmt
,sync
, andsync/atomic
is standard for these concerns.
11-11
: Transport import is well-organized.
No concerns.
26-38
: Constructor logic is concise and clear.
The usage example is helpful; no issues found.
40-59
:Start
method is well-structured.
Null transport checks and callback setup for notifications are properly handled.
61-64
:Close
method cleanly defers to transport.
Straightforward pass-through with no notable issues.
66-74
: Notification handler registration is correct.
Use of an RWMutex to safeguard the slice of handlers appears appropriate.
76-106
:sendRequest
logic is solid; requestID increment is handled atomically.
Error propagation, JSON-RPC request building, and response handling are well-implemented. Just be mindful of the concurrency check oninitialized
.
108-156
:Initialize
method properly negotiates capabilities and setsinitialized
.
Implementation follows typical JSON-RPC patterns. Be aware of the potential data race oninitialized
if used multi-threaded.
158-161
:Ping
method is straightforward.
Sending a simple request and returning the error is fine.
163-178
:ListResources
unmarshalling logic looks good.
Error checks are properly handled with%w
for wrapped errors.
180-199
:ListResourceTemplates
is consistent with the pattern.
Shows the same JSON-RPC pattern and error handling, no issues spotted.
201-211
:ReadResource
helper usage.
Delegating parsing tomcp.ParseReadResourceResult
is clean and reduces duplication.
213-219
:Subscribe
method.
Simple pass-through requesting subscription, no concerns.
221-227
:Unsubscribe
method.
Mirrors the subscribe logic for unsubscribing. Looks consistent.
229-244
:ListPrompts
follows the request/response pattern.
No logical issues present.
246-256
:GetPrompt
is minimal and relies on parser.
No further feedback.
258-273
:ListTools
function.
Error wrapping and unmarshal are consistent. No concerns.
275-285
:CallTool
properly delegates parsing.
Implementation is straightforward.
287-293
:SetLevel
method is correct.
Passes params through in a standard request.
295-310
:Complete
method.
Request/response pattern is consistently applied. No concerns.
312-319
:GetTransport
accessor is good for advanced usage.
Returning the underlying interface is useful for specialized needs.
@ezynda3 What do you think about this? If this approach is ok, I'd like to solve the conflict right now. As the files is moved, every PR merged changing the client will lead to the conflict. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
client/sse.go (1)
26-32
: Revisit potential panic for non-SSE transport.The comment warns that
GetEndpoint
only works with SSE, and the type assertion will panic otherwise. A runtime check might provide a safer failure mode. A past comment suggested the same check; referencing it here:client/client.go (1)
15-24
: Concurrency caution forinitialized
.As previously noted, using a non-atomic bool could lead to data races if multiple goroutines call client methods concurrently. If concurrency is not intended, a clarifying comment would help future maintainers.
🧹 Nitpick comments (4)
client/interface.go (1)
10-109
: Consider splitting interface into smaller segments.The
MCPClient
interface covers a broad range of API calls, which is practical here. However, if these functionalities grow further or become more specialized, splitting them into multiple coherent interfaces (e.g., Resource-related, Prompt-related, Tool-related) could simplify mocking and testing.client/transport/sse.go (3)
71-124
: Consider clarifying or customizing the 30-second endpoint wait.
Your timeout approach on lines 110–120 helps ensure that the connection fails fast if the server never sends an “endpoint” event. If your use cases require more flexibility (e.g., idle intervals longer than 30 seconds, dynamic backoff, or reconnection attempts), you could make this timeout configurable.
131-170
: Handle multi-line data fields in SSE events.
The SSE specification allows transmitting an event’s “data:” field in multiple lines, which should be concatenated with newline characters. Currently, the code only stores the last “data:” line (lines 166–167) instead of accumulating them. If your server or future expansions require multi-line event data, consider extending the parsing logic.Below is an illustrative snippet to accumulate multiple data lines:
var event string -var data string +var dataLines []string // ... if strings.HasPrefix(line, "event:") { event = strings.TrimSpace(strings.TrimPrefix(line, "event:")) } else if strings.HasPrefix(line, "data:") { - data = strings.TrimSpace(strings.TrimPrefix(line, "data:")) + dataLines = append(dataLines, strings.TrimSpace(strings.TrimPrefix(line, "data:"))) } // ... if line == "" { - if event != "" && data != "" { - c.handleSSEEvent(event, data) - event = "" - data = "" + if event != "" && len(dataLines) > 0 { + combinedData := strings.Join(dataLines, "\n") + c.handleSSEEvent(event, combinedData) + dataLines = nil event = "" } continue }
147-149
: Consider a more robust logging mechanism.
Usingfmt.Printf
in production libraries (e.g., lines 147–149, 179, 183) can hamper log aggregation and configurability. Switching to a structured logger or your project’s existing logging framework would help filter logs by severity and integrate with observability platforms.Also applies to: 177-188
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
client/client.go
(2 hunks)client/interface.go
(1 hunks)client/sse.go
(1 hunks)client/transport/sse.go
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (3)
client/interface.go (1)
mcp/types.go (14)
InitializeRequest
(266-275)InitializeResult
(279-293)ListResourcesRequest
(400-402)ListResourcesResult
(406-409)ListResourceTemplatesRequest
(413-415)ListResourceTemplatesResult
(419-422)ReadResourceRequest
(426-435)ReadResourceResult
(439-442)SubscribeRequest
(454-461)UnsubscribeRequest
(466-472)SetLevelRequest
(558-566)CompleteRequest
(759-770)CompleteResult
(773-785)JSONRPCNotification
(192-195)
client/transport/sse.go (3)
client/client.go (1)
Client
(16-24)mcp/types.go (1)
JSONRPCNotification
(192-195)client/sse.go (2)
WithHeaders
(9-11)GetEndpoint
(28-32)
client/client.go (1)
mcp/types.go (19)
ServerCapabilities
(319-342)Params
(104-104)Result
(173-177)InitializeRequest
(266-275)InitializeResult
(279-293)Implementation
(345-348)ClientCapabilities
(304-314)ListResourcesRequest
(400-402)ListResourcesResult
(406-409)PaginatedRequest
(379-386)ListResourceTemplatesRequest
(413-415)ListResourceTemplatesResult
(419-422)ReadResourceRequest
(426-435)ReadResourceResult
(439-442)SubscribeRequest
(454-461)UnsubscribeRequest
(466-472)SetLevelRequest
(558-566)CompleteRequest
(759-770)CompleteResult
(773-785)
🔇 Additional comments (10)
client/interface.go (1)
1-2
: Good initial documentation.The package-level comment succinctly introduces the
client
package’s purpose. Keeping this overview concise is helpful.client/sse.go (2)
9-10
: Function forwarding is clear.
WithHeaders
delegates directly to the transport package. This keeps logic centralized and is straightforward to maintain.
15-22
: Error handling and return consistency.When initializing the SSE transport, you succinctly handle potential errors. Returning a wrapped error preserves context, which is good.
client/client.go (5)
26-59
: Transport initialization logic is well-structured.
NewClient
andStart
ensure that the transport is non-nil and started before further actions. Assigning the notification handler insideStart
centralizes the connection workflow.
76-106
: Robust error propagation insendRequest
.The method correctly distinguishes transport errors (
transport error
) from server-side errors (response.Error
) and returns meaningful diagnostics. This approach simplifies debugging.
108-156
: Initialize flow is thorough.You properly handle the request struct, parse the server's response, store capabilities, and send the "initialized" notification. This provides a clear handshake mechanism.
158-246
: API coverage is comprehensive.From
Ping
to resource operations, each method is implemented consistently with similar structure. The usage of pagination helpers (listByPage
) is a nice way to reduce duplication and keep the code concise.
248-409
: Helper methods and typed parsing.Methods like
ParseReadResourceResult
,ParseGetPromptResult
, etc., keep JSON unmarshalling neatly separated. Combined withlistByPage
for pagination, the client design remains modular and extensible.client/transport/sse.go (2)
223-227
: Appreciation for the accessible notification handler registration.
The usage ofSetNotificationHandler
(lines 223–227) is straightforward, and the read-write lock ensures concurrency safety. This is clean and clear.
299-319
: Graceful shutdown logic is well-implemented.
YourClose
method (lines 299–319) properly cancels the SSE stream context and clears pending response channels. This ensures that in-flight requests do not block. Great job!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (4)
client/transport/sse_test.go (4)
18-130
: The mock server implementation is well-structured, but consider removing debug prints.The mock SSE echo server implementation is comprehensive and handles the necessary test scenarios effectively. However, there's a debug print statement on line 33 (
fmt.Printf("SSEHandler ends: %v\n", r.Context().Err())
) that could pollute test output.- fmt.Printf("SSEHandler ends: %v\n", r.Context().Err()) + // Debug print removed for cleaner test output
276-352
: Simplify mutex usage in MultipleRequests test.The mutex locking in this test is unnecessary since each goroutine writes to its own index in the slices. This can be simplified for better readability and performance.
- mu.Lock() responses[idx] = resp errors[idx] = err - mu.Unlock()
138-150
: Consider error details handling in transport.Start failure.The current error handling on transport.Start() just logs the error, but it might be valuable to inspect the specific error type to verify the expected failure mode.
err = trans.Start(ctx) if err != nil { - t.Fatalf("Failed to start transport: %v", err) + t.Fatalf("Failed to start transport: %v (type: %T)", err, err) }
132-389
: Consider adding test for error propagation in concurrent requests.The current tests cover many scenarios but don't specifically test how errors are propagated when multiple concurrent requests are made and some fail. This is important for ensuring robustness.
Consider adding a test case within TestSSE that sends multiple concurrent requests, some of which are designed to fail (e.g., by using "debug/echo_error_string"), and verify that:
- Failures in one request don't affect others
- All errors are properly captured and returned
- The transport remains stable after partial failures
This would strengthen the test suite's coverage of error handling in concurrent scenarios.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
client/transport/sse_test.go
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
client/transport/sse_test.go (2)
mcp/types.go (6)
Request
(89-102)Params
(104-104)Result
(173-177)JSONRPCNotification
(192-195)Notification
(106-109)NotificationParams
(111-118)client/transport/sse.go (1)
NewSSE
(50-69)
🔇 Additional comments (1)
client/transport/sse_test.go (1)
1-481
: Overall the test implementation is comprehensive and well-structured.The SSE transport tests cover a wide range of scenarios including success cases, error handling, concurrent requests, and edge cases. The mock server implementation is robust and simulates real-world behavior effectively.
The test coverage is thorough and will help ensure the reliability of the SSE transport implementation as part of the unified transport layer.
@ezynda3 CI is fixed. ( only some races in UT helper server code) |
@leavez just posting to let you know I am not ignoring this PR. It's on my radar just having 1 one or 2 other people have a look at it before merging it. Thanks for your patience. |
Goal
I'm implementing the StreamableHTTP transport protocol for the MCP client. The existing
stdio-client
andsse-client
share a significant amount of duplicated code in the application-layer protocol implementation (e.g.,Initialize
,ListTool
,CallTool
, etc.).To prepare for the new transport protocol, this PR extracts the transport layer from the client. After the refactor, there will be a single
Client
type with a pluggableTransport
parameter representing the transport layer (e.g., stdio, HTTP+SSE, streamable HTTP).This separation of the application-layer protocol from the transport layer results in a clearer and more maintainable architecture.
The PR implementing the streamable HTTP transport will follow once this is merged:
Changes
This PR aims to minimize changes. Although the
git diff
may appear large, most of the code is moved from the existing implementation.Client
type as the unified MCP client implementation.StdioClient
andSSEClient
, keeping a lot of common code.client.go
tointerface.go
Transport.Interface
interface along with two transport implementations:transport/stdio.go
andtransport/see.go
stdio.go
see.go
preserve the original connection implementationsSSEMCPClient
andStdioMCPClient
type, while keeping theNewSSEMCPClient
and similar APIsAPI compatibility
This change is not 100% backward compatible. This PR aims to minimize breaking changes, while avoiding unnecessary complexity just to maintain 100% compatibility.
It replaces
SSEMCPClient
andStdioMCPClient
with the unifiedClient
type. However, theNewSSEMCPClient
andNewStdioMCPClient
constructors remain unchanged to minimize integration effort in downstream projects. They now return the newClient
type, which provides the same common APIs, but transport-specific methods (e.g.,GetEndpoint()
) break.Summary by CodeRabbit
Summary by CodeRabbit