-
Notifications
You must be signed in to change notification settings - Fork 181
fix(sse-client): Skip SSE in StreamableHttpClientTransport when data is empty #433
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
926c487
d90c1ea
2a5dd16
5a839c7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,8 +42,6 @@ import kotlin.concurrent.atomics.AtomicBoolean | |
| import kotlin.concurrent.atomics.ExperimentalAtomicApi | ||
| import kotlin.time.Duration | ||
|
|
||
| private val logger = KotlinLogging.logger {} | ||
|
|
||
| private const val MCP_SESSION_ID_HEADER = "mcp-session-id" | ||
| private const val MCP_PROTOCOL_VERSION_HEADER = "mcp-protocol-version" | ||
| private const val MCP_RESUMPTION_TOKEN_HEADER = "Last-Event-ID" | ||
|
|
@@ -67,6 +65,10 @@ public class StreamableHttpClientTransport( | |
| private val requestBuilder: HttpRequestBuilder.() -> Unit = {}, | ||
| ) : AbstractTransport() { | ||
|
|
||
| private companion object { | ||
| private val logger = KotlinLogging.logger {} | ||
| } | ||
|
|
||
| public var sessionId: String? = null | ||
| private set | ||
| public var protocolVersion: String? = null | ||
|
|
@@ -316,11 +318,14 @@ public class StreamableHttpClientTransport( | |
| var id: String? = null | ||
| var eventName: String? = null | ||
|
|
||
| suspend fun dispatch(data: String) { | ||
| suspend fun dispatch(id: String?, eventName: String?, data: String) { | ||
| id?.let { | ||
| lastEventId = it | ||
| onResumptionToken?.invoke(it) | ||
| } | ||
| if (data.isBlank()) { | ||
| return | ||
| } | ||
| if (eventName == null || eventName == "message") { | ||
| runCatching { McpJson.decodeFromString<JSONRPCMessage>(data) } | ||
| .onSuccess { msg -> | ||
|
|
@@ -335,16 +340,16 @@ public class StreamableHttpClientTransport( | |
| throw it | ||
| } | ||
| } | ||
| // reset | ||
| id = null | ||
| eventName = null | ||
| sb.clear() | ||
| } | ||
|
|
||
| while (!channel.isClosedForRead) { | ||
| val line = channel.readUTF8Line() ?: break | ||
| if (line.isEmpty()) { | ||
| dispatch(sb.toString()) | ||
| dispatch(id = id, eventName = eventName, data = sb.toString()) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In that case, you can move
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not quite yet. The "lastEventId" is updated inside. I wanted to avoid a major refactoring for now. |
||
| // reset | ||
| id = null | ||
| eventName = null | ||
| sb.clear() | ||
|
Comment on lines
+349
to
+352
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. less side-effects inside dispath |
||
| continue | ||
| } | ||
| when { | ||
|
|
||
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,7 +11,9 @@ import io.modelcontextprotocol.kotlin.sdk.types.Implementation | |
| import io.modelcontextprotocol.kotlin.sdk.types.Tool | ||
| import io.modelcontextprotocol.kotlin.sdk.types.ToolSchema | ||
| import kotlinx.coroutines.delay | ||
| import kotlinx.coroutines.flow.emptyFlow | ||
| import kotlinx.coroutines.flow.flow | ||
| import kotlinx.coroutines.flow.flowOf | ||
| import kotlinx.coroutines.runBlocking | ||
| import kotlinx.serialization.json.buildJsonObject | ||
| import kotlinx.serialization.json.put | ||
|
|
@@ -28,6 +30,50 @@ import kotlin.uuid.Uuid | |
| @Suppress("LongMethod") | ||
| internal class StreamableHttpClientTest : AbstractStreamableHttpClientTest() { | ||
|
|
||
| @Test | ||
| fun `Should skip empty SSE`(): Unit = runBlocking { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. plus the following cases?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. I will update the integration test for multiline |
||
| val client = Client( | ||
| clientInfo = Implementation( | ||
| name = "client1", | ||
| version = "1.0.0", | ||
| ), | ||
| options = ClientOptions( | ||
| capabilities = ClientCapabilities(), | ||
| ), | ||
| ) | ||
| val sessionId = Uuid.random().toString() | ||
|
|
||
| mockMcp.onJSONRPCRequest( | ||
| httpMethod = HttpMethod.Post, | ||
| jsonRpcMethod = "initialize", | ||
| ).respondsWithStream { | ||
| headers += MCP_SESSION_ID_HEADER to sessionId | ||
| flow = flowOf( | ||
| "id: ${Uuid.random()}\n", | ||
| "data: \n", | ||
| "\n", | ||
| "id: ${Uuid.random()}\n", | ||
| "event: message\n", | ||
| @Suppress("MaxLineLength") | ||
| "data: {\"result\":{\"protocolVersion\":\"2025-06-18\",\"capabilities\":{},\"serverInfo\":{\"name\":\"simple-streamable-http-server\",\"version\":\"1.0.0\"}},\"jsonrpc\":\"2.0\",\"id\":\"7ce065b0678f49e5b04ce5a0fcc7d518\"}\n", | ||
| "\n", | ||
|
Comment on lines
52
to
70
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. raw SSE stream data |
||
| ) | ||
| } | ||
|
|
||
| mockMcp.handleJSONRPCRequest( | ||
| jsonRpcMethod = "notifications/initialized", | ||
| expectedSessionId = sessionId, | ||
| sessionId = sessionId, | ||
| statusCode = HttpStatusCode.Accepted, | ||
| ) | ||
|
|
||
| mockMcp.handleSubscribeWithGet(sessionId) { | ||
| emptyFlow() | ||
| } | ||
|
|
||
| connect(client) | ||
| } | ||
|
|
||
| @Test | ||
| fun `test streamableHttpClient`() = runBlocking { | ||
| val client = Client( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be
isEmpty()instead?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree
isBlankwill also skip a string if it contains only whitespace. But such a value is validThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There’s no need to worry about spaces when parsing, since it’s not JSON.