Skip to content

Commit ecaeb03

Browse files
committed
Refactored StreamableHttpServerTransport to improve nullable safety, simplify session handling, and replace LATEST_PROTOCOL_VERSION with DEFAULT_NEGOTIATED_PROTOCOL_VERSION. Adjusted request validation and related imports.
1 parent 1895552 commit ecaeb03

File tree

2 files changed

+18
-30
lines changed

2 files changed

+18
-30
lines changed

kotlin-sdk-core/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/types/common.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package io.modelcontextprotocol.kotlin.sdk.types
22

3-
import io.modelcontextprotocol.kotlin.sdk.types.Icon.Theme.Dark
4-
import io.modelcontextprotocol.kotlin.sdk.types.Icon.Theme.Light
53
import kotlinx.serialization.SerialName
64
import kotlinx.serialization.Serializable
75
import kotlinx.serialization.json.JsonObject
@@ -12,6 +10,8 @@ import kotlinx.serialization.json.JsonObject
1210

1311
public const val LATEST_PROTOCOL_VERSION: String = "2025-06-18"
1412

13+
public const val DEFAULT_NEGOTIATED_PROTOCOL_VERSION: String = "2025-03-26"
14+
1515
public val SUPPORTED_PROTOCOL_VERSIONS: List<String> = listOf(
1616
LATEST_PROTOCOL_VERSION,
1717
"2025-03-26",

kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/StreamableHttpServerTransport.kt

Lines changed: 16 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,16 @@ import io.ktor.server.request.httpMethod
1212
import io.ktor.server.request.receiveText
1313
import io.ktor.server.response.header
1414
import io.ktor.server.response.respond
15-
import io.ktor.server.response.respondBytes
1615
import io.ktor.server.response.respondNullable
1716
import io.ktor.server.sse.ServerSSESession
1817
import io.ktor.util.collections.ConcurrentMap
1918
import io.modelcontextprotocol.kotlin.sdk.shared.AbstractTransport
19+
import io.modelcontextprotocol.kotlin.sdk.shared.TransportSendOptions
20+
import io.modelcontextprotocol.kotlin.sdk.types.DEFAULT_NEGOTIATED_PROTOCOL_VERSION
2021
import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCError
2122
import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCMessage
2223
import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCRequest
2324
import io.modelcontextprotocol.kotlin.sdk.types.JSONRPCResponse
24-
import io.modelcontextprotocol.kotlin.sdk.types.LATEST_PROTOCOL_VERSION
2525
import io.modelcontextprotocol.kotlin.sdk.types.McpJson
2626
import io.modelcontextprotocol.kotlin.sdk.types.Method
2727
import io.modelcontextprotocol.kotlin.sdk.types.RPCError
@@ -169,7 +169,7 @@ public class StreamableHttpServerTransport(
169169
}
170170
}
171171

172-
override suspend fun send(message: JSONRPCMessage) {
172+
override suspend fun send(message: JSONRPCMessage, options: TransportSendOptions?) {
173173
val requestId: RequestId? = when (message) {
174174
is JSONRPCResponse -> message.id
175175
is JSONRPCError -> message.id
@@ -182,17 +182,16 @@ public class StreamableHttpServerTransport(
182182
"Cannot send a response on a standalone SSE stream unless resuming a previous client request"
183183
}
184184
val standaloneStream = streamsMapping[STANDALONE_SSE_STREAM_ID] ?: return
185-
emitOnStream(STANDALONE_SSE_STREAM_ID, standaloneStream.session!!, message)
185+
emitOnStream(STANDALONE_SSE_STREAM_ID, standaloneStream.session, message)
186186
return
187187
}
188188

189-
val streamId = requestToStreamMapping[requestId]
190-
?: error("No connection established for request ID: $requestId")
189+
val streamId = requestToStreamMapping[requestId] ?: error("No connection established for request id $requestId")
191190
val activeStream = streamsMapping[streamId]
192191

193192
if (!enableJsonResponse) {
194193
activeStream?.let { stream ->
195-
emitOnStream(streamId, stream.session!!, message)
194+
emitOnStream(streamId, stream.session, message)
196195
}
197196
}
198197

@@ -202,26 +201,23 @@ public class StreamableHttpServerTransport(
202201
requestToResponseMapping[requestId] = message
203202
val relatedIds = requestToStreamMapping.filterValues { it == streamId }.keys
204203

205-
val allResponseReady = relatedIds.all { it in requestToResponseMapping }
206-
if (!allResponseReady) return
204+
if (relatedIds.any { it !in requestToResponseMapping }) return
207205

208206
streamMutex.withLock {
209207
if (activeStream == null) error("No connection established for request ID: $requestId")
210208

211209
if (enableJsonResponse) {
212210
activeStream.call.response.header(HttpHeaders.ContentType, ContentType.Application.Json.toString())
213211
sessionId?.let { activeStream.call.response.header(MCP_SESSION_ID_HEADER, it) }
214-
val responses = relatedIds
215-
.mapNotNull { requestToResponseMapping[it] }
216-
.map { McpJson.encodeToString(it) }
212+
val responses = relatedIds.mapNotNull { requestToResponseMapping[it] }
217213
val payload = if (responses.size == 1) {
218214
responses.first()
219215
} else {
220216
responses
221217
}
222218
activeStream.call.respond(payload)
223219
} else {
224-
activeStream.session!!.close()
220+
activeStream.session?.close()
225221
}
226222

227223
// Clean up
@@ -261,7 +257,7 @@ public class StreamableHttpServerTransport(
261257

262258
HttpMethod.Get -> handleGetRequest(session, call)
263259

264-
HttpMethod.Delete -> handleDeleteRequest(session, call)
260+
HttpMethod.Delete -> handleDeleteRequest(call)
265261

266262
else -> call.run {
267263
response.header(HttpHeaders.Allow, "GET, POST, DELETE")
@@ -334,15 +330,15 @@ public class StreamableHttpServerTransport(
334330

335331
val hasRequest = messages.any { it is JSONRPCRequest }
336332
if (!hasRequest) {
337-
call.respondBytes(status = HttpStatusCode.Accepted, bytes = ByteArray(0))
333+
call.respondNullable(status = HttpStatusCode.Accepted, message = null)
338334
messages.forEach { message -> _onMessage(message) }
339335
return
340336
}
341337

342338
val streamId = Uuid.random().toString()
343339
if (!enableJsonResponse) {
344340
call.appendSseHeaders()
345-
session!!.send(data = "") // flush headers immediately
341+
session?.send(data = "") // flush headers immediately
346342
}
347343

348344
streamMutex.withLock {
@@ -407,15 +403,7 @@ public class StreamableHttpServerTransport(
407403
session.coroutineContext.job.invokeOnCompletion { streamsMapping.remove(STANDALONE_SSE_STREAM_ID) }
408404
}
409405

410-
public suspend fun handleDeleteRequest(session: ServerSSESession?, call: ApplicationCall) {
411-
if (enableJsonResponse) {
412-
call.reject(
413-
HttpStatusCode.MethodNotAllowed,
414-
RPCError.ErrorCode.CONNECTION_CLOSED,
415-
"Method not allowed.",
416-
)
417-
}
418-
406+
public suspend fun handleDeleteRequest(call: ApplicationCall) {
419407
if (!validateSession(call) || !validateProtocolVersion(call)) return
420408
sessionId?.let { onSessionClosed?.invoke(it) }
421409
close()
@@ -482,7 +470,7 @@ public class StreamableHttpServerTransport(
482470
}
483471

484472
private suspend fun validateProtocolVersion(call: ApplicationCall): Boolean {
485-
val version = call.request.header(MCP_PROTOCOL_VERSION_HEADER) ?: LATEST_PROTOCOL_VERSION
473+
val version = call.request.header(MCP_PROTOCOL_VERSION_HEADER) ?: DEFAULT_NEGOTIATED_PROTOCOL_VERSION
486474

487475
return when (version) {
488476
!in SUPPORTED_PROTOCOL_VERSIONS -> {
@@ -548,10 +536,10 @@ public class StreamableHttpServerTransport(
548536
return pattern.containsMatchIn(this)
549537
}
550538

551-
private suspend fun emitOnStream(streamId: String, session: ServerSSESession, message: JSONRPCMessage) {
539+
private suspend fun emitOnStream(streamId: String, session: ServerSSESession?, message: JSONRPCMessage) {
552540
val eventId = eventStore?.storeEvent(streamId, message)
553541
try {
554-
session.send(event = "message", id = eventId, data = McpJson.encodeToString(message))
542+
session?.send(event = "message", id = eventId, data = McpJson.encodeToString(message))
555543
} catch (_: Exception) {
556544
streamsMapping.remove(streamId)
557545
}

0 commit comments

Comments
 (0)