11package io.modelcontextprotocol.kotlin.sdk.client
22
33import io.github.oshai.kotlinlogging.KotlinLogging
4+ import io.modelcontextprotocol.kotlin.sdk.client.StdioClientTransport.StderrSeverity.DEBUG
5+ import io.modelcontextprotocol.kotlin.sdk.client.StdioClientTransport.StderrSeverity.FATAL
6+ import io.modelcontextprotocol.kotlin.sdk.client.StdioClientTransport.StderrSeverity.IGNORE
7+ import io.modelcontextprotocol.kotlin.sdk.client.StdioClientTransport.StderrSeverity.INFO
8+ import io.modelcontextprotocol.kotlin.sdk.client.StdioClientTransport.StderrSeverity.WARNING
49import io.modelcontextprotocol.kotlin.sdk.internal.IODispatcher
510import io.modelcontextprotocol.kotlin.sdk.shared.AbstractTransport
611import io.modelcontextprotocol.kotlin.sdk.shared.ReadBuffer
@@ -39,28 +44,50 @@ import kotlin.coroutines.CoroutineContext
3944import kotlin.jvm.JvmOverloads
4045
4146/* *
42- * A transport implementation for JSON-RPC communication that leverages standard input and output streams.
47+ * A transport implementation for JSON-RPC communication over standard I/O streams.
4348 *
44- * This class reads from an input stream to process incoming JSON-RPC messages and writes JSON-RPC messages
45- * to an output stream .
49+ * Reads JSON-RPC messages from [ input] and writes messages to [output]. Optionally monitors
50+ * [error] stream for stderr output with configurable severity handling .
4651 *
47- * Uses structured concurrency principles:
52+ * ## Structured Concurrency
4853 * - Parent job controls all child coroutines
4954 * - Proper cancellation propagation
50- * - Resource cleanup guaranteed via structured concurrency
55+ * - Resource cleanup guaranteed
56+ *
57+ * ## Usage Example
58+ * ```kotlin
59+ * val process = ProcessBuilder("mcp-server").start()
60+ *
61+ * val transport = StdioClientTransport(
62+ * input = process.inputStream.asSource().buffered(),
63+ * output = process.outputStream.asSink().buffered(),
64+ * error = process.errorStream.asSource().buffered()
65+ * ) { stderrLine ->
66+ * when {
67+ * stderrLine.contains("error", ignoreCase = true) -> StderrSeverity.FATAL
68+ * stderrLine.contains("warning", ignoreCase = true) -> StderrSeverity.WARNING
69+ * else -> StderrSeverity.INFO
70+ * }
71+ * }
72+ *
73+ * transport.start()
74+ * ```
5175 *
5276 * @param input The input stream where messages are received.
5377 * @param output The output stream where messages are sent.
54- * @param error Optional error stream for stderr processing.
55- * @param processStdError Callback for stderr lines. Returns true for fatal errors.
78+ * @param error Optional error stream for stderr monitoring.
79+ * @param sendChannel Channel for outbound messages. Default: buffered channel (capacity 64).
80+ * @param classifyStderr Callback to classify stderr lines. Return [StderrSeverity.FATAL] to fail transport,
81+ * or [StderrSeverity.WARNING]/[INFO]/[DEBUG] to log, or [IGNORE] to discard.
82+ * Default: treats all stderr as [FATAL].
5683 */
5784@OptIn(ExperimentalAtomicApi ::class )
5885public class StdioClientTransport @JvmOverloads public constructor(
5986 private val input : Source ,
6087 private val output : Sink ,
6188 private val error : Source ? = null ,
6289 private val sendChannel : Channel <JSONRPCMessage > = Channel (Channel .BUFFERED ),
63- private val processStdError : (String ) -> Boolean = { true },
90+ private val classifyStderr : (String ) -> StderrSeverity = { FATAL },
6491) : AbstractTransport() {
6592
6693 private companion object {
@@ -69,9 +96,20 @@ public class StdioClientTransport @JvmOverloads public constructor(
6996 * 8KB is optimal for most systems (matches default page size).
7097 */
7198 const val BUFFER_SIZE = 8 * 1024L
99+
100+ private val logger = KotlinLogging .logger {}
72101 }
73102
74- private val logger = KotlinLogging .logger {}
103+ /* *
104+ * Severity classification for stderr messages.
105+ *
106+ * - [FATAL]: Calls error handler and terminates transport.
107+ * - [WARNING]: Logs at WARN level, transport continues.
108+ * - [INFO]: Logs at INFO level, transport continues.
109+ * - [DEBUG]: Logs at DEBUG level, transport continues.
110+ * - [IGNORE]: Discards message silently, transport continues.
111+ */
112+ public enum class StderrSeverity { FATAL , WARNING , INFO , DEBUG , IGNORE }
75113
76114 private val ioCoroutineContext: CoroutineContext = IODispatcher
77115 private val scope = CoroutineScope (SupervisorJob () + Dispatchers .Default )
@@ -141,11 +179,32 @@ public class StdioClientTransport @JvmOverloads public constructor(
141179 }
142180
143181 is Event .StderrEvent -> {
144- if (processStdError(event.message)) {
145- runCatching {
146- _onError (McpException (INTERNAL_ERROR , " Message in StdErr: ${event.message} " ))
182+ val errorSeverity = classifyStderr(event.message)
183+ when (errorSeverity) {
184+ FATAL -> {
185+ runCatching {
186+ _onError (
187+ McpException (INTERNAL_ERROR , " Message in StdErr: ${event.message} " ),
188+ )
189+ }
190+ stopProcessing(" Fatal STDERR message received" )
191+ }
192+
193+ WARNING -> {
194+ logger.warn { " STDERR message received: ${event.message} " }
195+ }
196+
197+ INFO -> {
198+ logger.info { " STDERR message received: ${event.message} " }
199+ }
200+
201+ DEBUG -> {
202+ logger.debug { " STDERR message received: ${event.message} " }
203+ }
204+
205+ IGNORE -> {
206+ // do nothing
147207 }
148- stopProcessing(" STDERR message received" )
149208 }
150209 }
151210
0 commit comments