Skip to content

Commit d1bd9bd

Browse files
authored
Merge pull request #15 from jhgaylor/feat/io-stream-transport
add a transport to support connecting with arbitrary streams
2 parents ae626ae + 971231f commit d1bd9bd

File tree

10 files changed

+843
-1
lines changed

10 files changed

+843
-1
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
# 0.4.1
2+
3+
- Add IOStreamTransport to connect a client and server via dart streams in a single application
4+
15
## 0.4.0
26

37
- Add support for StreamableHTTP client

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ Ensure you have the correct Dart SDK version installed. See <https://dart.dev/ge
1818
- Stdio support (Server and Client)
1919
- StreamableHTTP support (Server and Client)
2020
- SSE support (Server only) - Deprecated
21+
- Stream Transport using dart streams (Server and Client in shared process)
2122
- Tools
2223
- Resources
2324
- Prompts
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import 'dart:async';
2+
3+
import 'package:mcp_dart/mcp_dart.dart';
4+
5+
Future<McpServer> getServer() async {
6+
final mcpServer = McpServer(
7+
Implementation(name: "example-dart-iostream-server", version: "1.0.0"),
8+
options: ServerOptions(capabilities: ServerCapabilities()),
9+
);
10+
11+
mcpServer.tool(
12+
"calculate",
13+
description: 'Perform basic arithmetic operations',
14+
inputSchemaProperties: {
15+
'operation': {
16+
'type': 'string',
17+
'enum': ['add', 'subtract', 'multiply', 'divide'],
18+
},
19+
'a': {'type': 'number'},
20+
'b': {'type': 'number'},
21+
},
22+
callback: ({args, extra}) async {
23+
final operation = args!['operation'];
24+
final a = args['a'];
25+
final b = args['b'];
26+
return CallToolResult(
27+
content: [
28+
TextContent(
29+
text: switch (operation) {
30+
'add' => 'Result: ${a + b}',
31+
'subtract' => 'Result: ${a - b}',
32+
'multiply' => 'Result: ${a * b}',
33+
'divide' => 'Result: ${a / b}',
34+
_ => throw Exception('Invalid operation'),
35+
},
36+
),
37+
],
38+
);
39+
},
40+
);
41+
return mcpServer;
42+
}
43+
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import 'dart:async';
2+
3+
import 'package:mcp_dart/mcp_dart.dart';
4+
import 'server_iostream.dart';
5+
/// Creates and returns a client with custom stream transport connected to a server.
6+
Future<void> main() async {
7+
// Create a client
8+
final client = Client(
9+
Implementation(name: "example-dart-iostream-client", version: "1.0.0"),
10+
options: ClientOptions(capabilities: ClientCapabilities()),
11+
);
12+
13+
final server = await getServer();
14+
15+
// Create custom streams for transport
16+
final serverToClientStreamController = StreamController<List<int>>();
17+
final clientToServerStreamController = StreamController<List<int>>();
18+
19+
// Create transport using custom streams
20+
final clientTransport = IOStreamTransport(
21+
stream: serverToClientStreamController.stream,
22+
sink: clientToServerStreamController.sink,
23+
);
24+
25+
final serverTransport = IOStreamTransport(
26+
stream: clientToServerStreamController.stream,
27+
sink: serverToClientStreamController.sink,
28+
);
29+
30+
// Set up listeners for transport events
31+
clientTransport.onclose = () {
32+
print('Client transport closed');
33+
};
34+
35+
clientTransport.onerror = (error) {
36+
print('Client transport error: $error');
37+
};
38+
39+
serverTransport.onclose = () {
40+
print('Server transport closed');
41+
};
42+
43+
serverTransport.onerror = (error) {
44+
print('Server transport error: $error');
45+
};
46+
47+
print('Starting client with custom stream transport...');
48+
49+
await server.connect(serverTransport);
50+
// Connect the client to the transport
51+
await client.connect(clientTransport);
52+
53+
final availableTools = await client.listTools();
54+
final toolNames = availableTools.tools.map((e) => e.name);
55+
print("Client setup complete. Available tools: $toolNames");
56+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import 'dart:async';
2+
3+
import 'package:mcp_dart/mcp_dart.dart';
4+
import 'server_iostream.dart';
5+
6+
class PipeTransport {
7+
/// The client end of the pipe transport
8+
late final IOStreamTransport client;
9+
10+
/// The server end of the pipe transport
11+
late final IOStreamTransport server;
12+
13+
/// Creates a new pipe transport with in-memory streams.
14+
PipeTransport() {
15+
final clientToServerController = StreamController<List<int>>();
16+
final serverToClientController = StreamController<List<int>>();
17+
18+
// Client reads from server's output, writes to server's input
19+
client = IOStreamTransport(
20+
stream: serverToClientController.stream,
21+
sink: clientToServerController.sink,
22+
);
23+
24+
// Server reads from client's output, writes to client's input
25+
server = IOStreamTransport(
26+
stream: clientToServerController.stream,
27+
sink: serverToClientController.sink,
28+
);
29+
}
30+
}
31+
32+
/// Creates and returns a client with custom stream transport connected to a server.
33+
Future<void> main() async {
34+
// Create a client
35+
final client = Client(
36+
Implementation(name: "example-dart-iostream-client", version: "1.0.0"),
37+
options: ClientOptions(capabilities: ClientCapabilities()),
38+
);
39+
40+
final server = await getServer();
41+
42+
final transport = PipeTransport();
43+
44+
// Set up listeners for transport events
45+
transport.client.onclose = () {
46+
print('Client transport closed');
47+
};
48+
49+
transport.client.onerror = (error) {
50+
print('Client transport error: $error');
51+
};
52+
53+
transport.server.onclose = () {
54+
print('Server transport closed');
55+
};
56+
57+
transport.server.onerror = (error) {
58+
print('Server transport error: $error');
59+
};
60+
61+
print('Starting client with custom stream transport...');
62+
63+
await server.connect(transport.server);
64+
// Connect the client to the transport
65+
await client.connect(transport.client);
66+
67+
final availableTools = await client.listTools();
68+
final toolNames = availableTools.tools.map((e) => e.name);
69+
print("Client setup complete. Available tools: $toolNames");
70+
}

lib/mcp_dart.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,6 @@ library;
1010

1111
export 'src/server/module.dart'; // Exports the server module for handling MCP server logic.
1212
export 'src/client/module.dart'; // Exports the client module for handling MCP client logic.
13+
export 'src/shared/module.dart'; // Exports the shared module for handling MCP shared logic.
1314
export 'src/types.dart'; // Exports shared types used across the MCP protocol.
1415
export 'src/shared/uuid.dart'; // Exports UUID generation utilities.

lib/src/shared/iostream.dart

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
import 'dart:async';
2+
import 'dart:convert';
3+
import 'dart:typed_data';
4+
5+
import 'package:mcp_dart/src/shared/stdio.dart';
6+
import 'package:mcp_dart/src/shared/transport.dart';
7+
import 'package:mcp_dart/src/types.dart';
8+
9+
/// Transport implementation that uses standard I/O for communication.
10+
class IOStreamTransport implements Transport {
11+
/// The input stream to read from.
12+
final Stream<List<int>> stream;
13+
14+
/// The output sink to write to.
15+
final StreamSink<List<int>> sink;
16+
17+
/// Buffer for incoming data from the stream.
18+
final ReadBuffer _readBuffer = ReadBuffer();
19+
20+
/// Subscription to the input stream
21+
StreamSubscription<List<int>>? _streamSubscription;
22+
23+
/// Whether the transport has been started
24+
bool _started = false;
25+
26+
/// Whether the transport has been closed
27+
bool _closed = false;
28+
29+
/// Callback invoked when the transport is closed
30+
@override
31+
void Function()? onclose;
32+
33+
/// Callback invoked when an error occurs
34+
@override
35+
void Function(Error error)? onerror;
36+
37+
/// Callback invoked when a message is received
38+
@override
39+
void Function(JsonRpcMessage message)? onmessage;
40+
41+
/// Session ID is not applicable to direct transport
42+
@override
43+
String? get sessionId => null;
44+
45+
/// Creates a transport with the provided streams.
46+
///
47+
/// [stream] is the stream to read from.
48+
/// [sink] is the sink to write to.
49+
IOStreamTransport({
50+
required this.stream,
51+
required this.sink,
52+
});
53+
54+
/// Starts the transport by setting up listeners on the input stream.
55+
///
56+
/// This must be called before sending or receiving messages.
57+
/// Throws [StateError] if already started.
58+
@override
59+
Future<void> start() async {
60+
if (_started) {
61+
throw StateError(
62+
"IOStreamTransport already started! Note that server/client .connect() calls start() automatically.",
63+
);
64+
}
65+
_started = true;
66+
_closed = false;
67+
68+
try {
69+
// Listen to input stream for messages
70+
_streamSubscription = stream.listen(
71+
_onStreamData,
72+
onError: _onStreamError,
73+
onDone: _onStreamDone,
74+
cancelOnError: false,
75+
);
76+
77+
return Future.value();
78+
} catch (error, stackTrace) {
79+
_started = false; // Reset state
80+
final startError = StateError(
81+
"Failed to start IOStreamTransport: $error\n$stackTrace",
82+
);
83+
try {
84+
onerror?.call(startError);
85+
} catch (e) {
86+
print("Error in onerror handler: $e");
87+
}
88+
throw startError; // Rethrow to signal failure
89+
}
90+
}
91+
92+
/// Internal handler for data received from the input stream
93+
void _onStreamData(List<int> chunk) {
94+
if (chunk is! Uint8List) chunk = Uint8List.fromList(chunk);
95+
_readBuffer.append(chunk);
96+
_processReadBuffer();
97+
}
98+
99+
/// Internal handler for when the input stream closes
100+
void _onStreamDone() {
101+
print("IOStreamTransport: Input stream closed.");
102+
close(); // Close transport when input ends
103+
}
104+
105+
/// Internal handler for errors on input stream
106+
void _onStreamError(dynamic error, StackTrace stackTrace) {
107+
final Error streamError = (error is Error)
108+
? error
109+
: StateError("Stream error: $error\n$stackTrace");
110+
try {
111+
onerror?.call(streamError);
112+
} catch (e) {
113+
print("Error in onerror handler: $e");
114+
}
115+
close();
116+
}
117+
118+
/// Internal handler processing buffered input data for messages
119+
void _processReadBuffer() {
120+
while (true) {
121+
try {
122+
final message = _readBuffer.readMessage();
123+
if (message == null) break; // No complete message
124+
try {
125+
onmessage?.call(message);
126+
} catch (e) {
127+
print("Error in onmessage handler: $e");
128+
onerror?.call(StateError("Error in onmessage handler: $e"));
129+
}
130+
} catch (error) {
131+
final Error parseError = (error is Error)
132+
? error
133+
: StateError("Message parsing error: $error");
134+
try {
135+
onerror?.call(parseError);
136+
} catch (e) {
137+
print("Error in onerror handler: $e");
138+
}
139+
print(
140+
"IOStreamTransport: Error processing read buffer: $parseError. Skipping data.",
141+
);
142+
break; // Stop processing buffer on error
143+
}
144+
}
145+
}
146+
147+
/// Closes the transport connection and cleans up resources.
148+
@override
149+
Future<void> close() async {
150+
if (_closed || !_started) return; // Already closed or never started
151+
152+
print("IOStreamTransport: Closing transport...");
153+
154+
// Mark as closing immediately to prevent further sends/starts
155+
_started = false;
156+
_closed = true;
157+
158+
// Cancel stream subscription
159+
await _streamSubscription?.cancel();
160+
_streamSubscription = null;
161+
162+
_readBuffer.clear();
163+
164+
// Invoke the onclose callback
165+
try {
166+
onclose?.call();
167+
} catch (e) {
168+
print("Error in onclose handler: $e");
169+
}
170+
print("IOStreamTransport: Transport closed.");
171+
}
172+
173+
/// Sends a message to the output stream.
174+
///
175+
/// Throws [StateError] if the transport is not started.
176+
@override
177+
Future<void> send(JsonRpcMessage message) async {
178+
if (!_started || _closed) {
179+
throw StateError(
180+
"Cannot send message: IOStreamTransport is not running or is closed.",
181+
);
182+
}
183+
184+
try {
185+
final jsonString = "${jsonEncode(message.toJson())}\n";
186+
sink.add(utf8.encode(jsonString));
187+
// No need to flush as StreamSink should handle this
188+
} catch (error, stackTrace) {
189+
print("IOStreamTransport: Error writing to output stream: $error");
190+
final Error sendError = (error is Error)
191+
? error
192+
: StateError("Output stream write error: $error\n$stackTrace");
193+
try {
194+
onerror?.call(sendError);
195+
} catch (e) {
196+
print("Error in onerror handler: $e");
197+
}
198+
close();
199+
throw sendError; // Rethrow after cleanup attempt
200+
}
201+
}
202+
}

0 commit comments

Comments
 (0)