diff --git a/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/ClientInterceptor.kt b/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/ClientInterceptor.kt index 8b9dc9390..753fa0e1c 100644 --- a/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/ClientInterceptor.kt +++ b/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/ClientInterceptor.kt @@ -30,6 +30,9 @@ import kotlinx.rpc.grpc.descriptor.MethodDescriptor * // Example: add a header before proceeding * requestHeaders[MyKeys.Authorization] = token * + * // Example: modify call options + * callOptions.timeout = 5.seconds + * * // Example: observe response metadata * onHeaders { headers -> /* inspect headers */ } * onClose { status, trailers -> /* log status/trailers */ } diff --git a/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/GrpcClient.kt b/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/GrpcClient.kt index 7e4b00cd5..2e6316503 100644 --- a/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/GrpcClient.kt +++ b/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/GrpcClient.kt @@ -8,7 +8,7 @@ import kotlinx.coroutines.flow.Flow import kotlinx.rpc.RpcCall import kotlinx.rpc.RpcClient import kotlinx.rpc.grpc.GrpcMetadata -import kotlinx.rpc.grpc.client.internal.GrpcDefaultCallOptions +import kotlinx.rpc.grpc.client.internal.GrpcCallOptions import kotlinx.rpc.grpc.client.internal.ManagedChannel import kotlinx.rpc.grpc.client.internal.ManagedChannelBuilder import kotlinx.rpc.grpc.client.internal.bidirectionalStreamingRpc @@ -58,7 +58,7 @@ public class GrpcClient internal constructor( } override suspend fun call(call: RpcCall): T = withGrpcCall(call) { methodDescriptor, request -> - val callOptions = GrpcDefaultCallOptions + val callOptions = GrpcCallOptions() val trailers = GrpcMetadata() return when (methodDescriptor.methodType) { @@ -66,14 +66,14 @@ public class GrpcClient internal constructor( descriptor = methodDescriptor, request = request, callOptions = callOptions, - trailers = trailers, + headers = trailers, ) MethodType.CLIENT_STREAMING -> @Suppress("UNCHECKED_CAST") clientStreamingRpc( descriptor = methodDescriptor, requests = request as Flow, callOptions = callOptions, - trailers = trailers, + headers = trailers, ) else -> error("Wrong method type ${methodDescriptor.methodType}") @@ -81,22 +81,22 @@ public class GrpcClient internal constructor( } override fun callServerStreaming(call: RpcCall): Flow = withGrpcCall(call) { methodDescriptor, request -> - val callOptions = GrpcDefaultCallOptions - val trailers = GrpcMetadata() + val callOptions = GrpcCallOptions() + val headers = GrpcMetadata() when (methodDescriptor.methodType) { MethodType.SERVER_STREAMING -> serverStreamingRpc( descriptor = methodDescriptor, request = request, callOptions = callOptions, - trailers = trailers, + headers = headers, ) MethodType.BIDI_STREAMING -> @Suppress("UNCHECKED_CAST") bidirectionalStreamingRpc( descriptor = methodDescriptor, requests = request as Flow, callOptions = callOptions, - trailers = trailers, + headers = headers, ) else -> error("Wrong method type ${methodDescriptor.methodType}") diff --git a/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcCallOptions.kt b/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcCallOptions.kt index 757a910ba..79fe7c429 100644 --- a/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcCallOptions.kt +++ b/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcCallOptions.kt @@ -4,10 +4,25 @@ package kotlinx.rpc.grpc.client.internal -import kotlinx.rpc.internal.utils.InternalRpcApi +import kotlin.time.Duration -@InternalRpcApi -public expect class GrpcCallOptions - -@InternalRpcApi -public expect val GrpcDefaultCallOptions: GrpcCallOptions +/** + * The collection of runtime options for a new gRPC call. + * + * This class allows configuring per-call behavior such as timeouts. + */ +public class GrpcCallOptions { + /** + * The maximum duration to wait for the RPC to complete. + * + * If set, the RPC will be canceled (with `DEADLINE_EXCEEDED`) + * if it does not complete within the specified duration. + * The timeout is measured from the moment the call is initiated. + * If `null`, no timeout is applied, and the call may run indefinitely. + * + * The default value is `null`. + * + * @see Duration + */ + public var timeout: Duration? = null +} diff --git a/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.kt b/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.kt index e064eb6e2..6e378df59 100644 --- a/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.kt +++ b/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.kt @@ -8,11 +8,10 @@ import kotlinx.rpc.grpc.descriptor.MethodDescriptor import kotlinx.rpc.internal.utils.InternalRpcApi @InternalRpcApi -public expect abstract class GrpcChannel { - public abstract fun newCall( - methodDescriptor: MethodDescriptor, - callOptions: GrpcCallOptions, - ): ClientCall +public expect abstract class GrpcChannel - public abstract fun authority(): String? -} +@InternalRpcApi +public expect fun GrpcChannel.createCall( + methodDescriptor: MethodDescriptor, + callOptions: GrpcCallOptions, +): ClientCall \ No newline at end of file diff --git a/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/suspendClientCalls.kt b/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/suspendClientCalls.kt index 87635893d..984771534 100644 --- a/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/suspendClientCalls.kt +++ b/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/suspendClientCalls.kt @@ -39,8 +39,8 @@ import kotlinx.rpc.internal.utils.InternalRpcApi public suspend fun GrpcClient.unaryRpc( descriptor: MethodDescriptor, request: Request, - callOptions: GrpcCallOptions = GrpcDefaultCallOptions, - trailers: GrpcMetadata = GrpcMetadata(), + callOptions: GrpcCallOptions = GrpcCallOptions(), + headers: GrpcMetadata = GrpcMetadata(), ): Response { val type = descriptor.methodType require(type == MethodType.UNARY) { @@ -50,7 +50,7 @@ public suspend fun GrpcClient.unaryRpc( return rpcImpl( descriptor = descriptor, callOptions = callOptions, - trailers = trailers, + headers = headers, request = flowOf(request) ).singleOrStatus("request", descriptor) } @@ -59,8 +59,8 @@ public suspend fun GrpcClient.unaryRpc( public fun GrpcClient.serverStreamingRpc( descriptor: MethodDescriptor, request: Request, - callOptions: GrpcCallOptions = GrpcDefaultCallOptions, - trailers: GrpcMetadata = GrpcMetadata(), + callOptions: GrpcCallOptions = GrpcCallOptions(), + headers: GrpcMetadata = GrpcMetadata(), ): Flow { val type = descriptor.methodType require(type == MethodType.SERVER_STREAMING) { @@ -70,7 +70,7 @@ public fun GrpcClient.serverStreamingRpc( return rpcImpl( descriptor = descriptor, callOptions = callOptions, - trailers = trailers, + headers = headers, request = flowOf(request) ) } @@ -79,8 +79,8 @@ public fun GrpcClient.serverStreamingRpc( public suspend fun GrpcClient.clientStreamingRpc( descriptor: MethodDescriptor, requests: Flow, - callOptions: GrpcCallOptions = GrpcDefaultCallOptions, - trailers: GrpcMetadata = GrpcMetadata(), + callOptions: GrpcCallOptions = GrpcCallOptions(), + headers: GrpcMetadata = GrpcMetadata(), ): Response { val type = descriptor.methodType require(type == MethodType.CLIENT_STREAMING) { @@ -90,7 +90,7 @@ public suspend fun GrpcClient.clientStreamingRpc( return rpcImpl( descriptor = descriptor, callOptions = callOptions, - trailers = trailers, + headers = headers, request = requests ).singleOrStatus("response", descriptor) } @@ -99,8 +99,8 @@ public suspend fun GrpcClient.clientStreamingRpc( public fun GrpcClient.bidirectionalStreamingRpc( descriptor: MethodDescriptor, requests: Flow, - callOptions: GrpcCallOptions = GrpcDefaultCallOptions, - trailers: GrpcMetadata = GrpcMetadata(), + callOptions: GrpcCallOptions = GrpcCallOptions(), + headers: GrpcMetadata = GrpcMetadata(), ): Flow { val type = descriptor.methodType check(type == MethodType.BIDI_STREAMING) { @@ -110,7 +110,7 @@ public fun GrpcClient.bidirectionalStreamingRpc( return rpcImpl( descriptor = descriptor, callOptions = callOptions, - trailers = trailers, + headers = headers, request = requests ) } @@ -147,13 +147,13 @@ private sealed interface ClientRequest { private fun GrpcClient.rpcImpl( descriptor: MethodDescriptor, callOptions: GrpcCallOptions, - trailers: GrpcMetadata, + headers: GrpcMetadata, request: Flow, ): Flow { val clientCallScope = ClientCallScopeImpl( client = this, method = descriptor, - requestHeaders = trailers, + requestHeaders = headers, callOptions = callOptions, ) return clientCallScope.proceed(request) @@ -165,8 +165,6 @@ private class ClientCallScopeImpl( override val requestHeaders: GrpcMetadata, override val callOptions: GrpcCallOptions, ) : ClientCallScope { - - val call = client.channel.platformApi.newCall(method, callOptions) val interceptors = client.interceptors val onHeadersFuture = CallbackFuture() val onCloseFuture = CallbackFuture>() @@ -198,6 +196,7 @@ private class ClientCallScopeImpl( private fun doCall(request: Flow): Flow = flow { coroutineScope { + val call = client.channel.platformApi.createCall(method, callOptions) /* * We maintain a buffer of size 1 so onMessage never has to block: it only gets called after @@ -207,7 +206,7 @@ private class ClientCallScopeImpl( val responses = Channel(1) val ready = Ready { call.isReady() } - call.start(channelResponseListener(responses, ready), requestHeaders) + call.start(channelResponseListener(call, responses, ready), requestHeaders) suspend fun Flow.send() { if (method.methodType == MethodType.UNARY || method.methodType == MethodType.SERVER_STREAMING) { @@ -256,6 +255,7 @@ private class ClientCallScopeImpl( } private fun channelResponseListener( + call: ClientCall<*, Response>, responses: Channel, ready: Ready, ) = clientCallListener( diff --git a/grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcCallOptions.jvm.kt b/grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcCallOptions.jvm.kt index e0f8cb985..2fc5ba105 100644 --- a/grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcCallOptions.jvm.kt +++ b/grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcCallOptions.jvm.kt @@ -5,12 +5,13 @@ package kotlinx.rpc.grpc.client.internal import io.grpc.CallOptions -import kotlinx.rpc.grpc.client.internal.GrpcCallOptions import kotlinx.rpc.internal.utils.InternalRpcApi @InternalRpcApi -public actual typealias GrpcCallOptions = CallOptions - -@InternalRpcApi -public actual val GrpcDefaultCallOptions: GrpcCallOptions - get() = GrpcCallOptions.DEFAULT +public fun GrpcCallOptions.toJvm(): CallOptions { + var default = CallOptions.DEFAULT + if (timeout != null) { + default = default.withDeadlineAfter(timeout!!.inWholeMilliseconds, java.util.concurrent.TimeUnit.MILLISECONDS) + } + return default +} \ No newline at end of file diff --git a/grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.jvm.kt b/grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.jvm.kt index 98909c3bc..9d16fcde6 100644 --- a/grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.jvm.kt +++ b/grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.jvm.kt @@ -5,7 +5,16 @@ package kotlinx.rpc.grpc.client.internal import io.grpc.Channel +import kotlinx.rpc.grpc.descriptor.MethodDescriptor import kotlinx.rpc.internal.utils.InternalRpcApi @InternalRpcApi public actual typealias GrpcChannel = Channel + +@InternalRpcApi +public actual fun GrpcChannel.createCall( + methodDescriptor: MethodDescriptor, + callOptions: GrpcCallOptions, +): ClientCall { + return this.newCall(methodDescriptor, callOptions.toJvm()) +} \ No newline at end of file diff --git a/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcCallOptions.native.kt b/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcCallOptions.native.kt index b3e4b0e15..1ca18bb86 100644 --- a/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcCallOptions.native.kt +++ b/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcCallOptions.native.kt @@ -2,15 +2,27 @@ * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. */ +@file:OptIn(ExperimentalForeignApi::class) + package kotlinx.rpc.grpc.client.internal -import kotlinx.rpc.grpc.client.internal.GrpcCallOptions +import kotlinx.cinterop.CValue +import kotlinx.cinterop.ExperimentalForeignApi import kotlinx.rpc.internal.utils.InternalRpcApi +import libkgrpc.GPR_CLOCK_REALTIME +import libkgrpc.GPR_TIMESPAN +import libkgrpc.gpr_inf_future +import libkgrpc.gpr_now +import libkgrpc.gpr_time_add +import libkgrpc.gpr_time_from_millis +import libkgrpc.gpr_timespec @InternalRpcApi -public actual class GrpcCallOptions { - // TODO: Do something with it +public fun GrpcCallOptions.rawDeadline(): CValue { + return timeout?.let { + gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(it.inWholeMilliseconds, GPR_TIMESPAN) + ) + } ?: gpr_inf_future(GPR_CLOCK_REALTIME) } - -@InternalRpcApi -public actual val GrpcDefaultCallOptions: GrpcCallOptions = GrpcCallOptions() diff --git a/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.native.kt b/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.native.kt index 42317122c..9810e0b50 100644 --- a/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.native.kt +++ b/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.native.kt @@ -9,10 +9,16 @@ import kotlinx.rpc.internal.utils.InternalRpcApi @InternalRpcApi public actual abstract class GrpcChannel { - public actual abstract fun newCall( + public abstract fun newCall( methodDescriptor: MethodDescriptor, callOptions: GrpcCallOptions, ): ClientCall - - public actual abstract fun authority(): String? } + +@InternalRpcApi +public actual fun GrpcChannel.createCall( + methodDescriptor: MethodDescriptor, + callOptions: GrpcCallOptions, +): ClientCall { + return this.newCall(methodDescriptor, callOptions) +} \ No newline at end of file diff --git a/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/NativeManagedChannel.kt b/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/NativeManagedChannel.kt index 57ab85a79..39434194a 100644 --- a/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/NativeManagedChannel.kt +++ b/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/NativeManagedChannel.kt @@ -20,14 +20,12 @@ import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.cancelChildren import kotlinx.coroutines.withTimeoutOrNull import kotlinx.rpc.grpc.client.ClientCredentials +import kotlinx.rpc.grpc.descriptor.MethodDescriptor import kotlinx.rpc.grpc.internal.CompletionQueue import kotlinx.rpc.grpc.internal.GrpcRuntime -import kotlinx.rpc.grpc.descriptor.MethodDescriptor import kotlinx.rpc.grpc.internal.internalError import kotlinx.rpc.grpc.internal.toGrpcSlice -import libkgrpc.GPR_CLOCK_REALTIME import libkgrpc.GRPC_PROPAGATE_DEFAULTS -import libkgrpc.gpr_inf_future import libkgrpc.grpc_arg import libkgrpc.grpc_arg_type import libkgrpc.grpc_channel_args @@ -158,7 +156,7 @@ internal class NativeManagedChannel( completion_queue = cq.raw, method = methodNameSlice, host = null, - deadline = gpr_inf_future(GPR_CLOCK_REALTIME), + deadline = callOptions.rawDeadline(), reserved = null ) ?: error("Failed to create call") @@ -169,8 +167,4 @@ internal class NativeManagedChannel( ) } - override fun authority(): String? { - return authority - } - } diff --git a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt index bc22e2f53..1dbef8b5b 100644 --- a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt +++ b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt @@ -9,19 +9,20 @@ import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest import kotlinx.coroutines.withTimeout import kotlinx.rpc.grpc.GrpcMetadata -import kotlinx.rpc.grpc.server.GrpcServer -import kotlinx.rpc.grpc.client.internal.ManagedChannel -import kotlinx.rpc.grpc.client.internal.ManagedChannelBuilder import kotlinx.rpc.grpc.Status import kotlinx.rpc.grpc.StatusCode -import kotlinx.rpc.grpc.client.internal.buildChannel import kotlinx.rpc.grpc.client.createInsecureClientCredentials import kotlinx.rpc.grpc.client.internal.ClientCall -import kotlinx.rpc.grpc.client.internal.GrpcDefaultCallOptions +import kotlinx.rpc.grpc.client.internal.GrpcCallOptions +import kotlinx.rpc.grpc.client.internal.ManagedChannel +import kotlinx.rpc.grpc.client.internal.ManagedChannelBuilder +import kotlinx.rpc.grpc.client.internal.buildChannel +import kotlinx.rpc.grpc.client.internal.clientCallListener +import kotlinx.rpc.grpc.client.internal.createCall import kotlinx.rpc.grpc.descriptor.MethodDescriptor import kotlinx.rpc.grpc.descriptor.MethodType -import kotlinx.rpc.grpc.client.internal.clientCallListener import kotlinx.rpc.grpc.descriptor.methodDescriptor +import kotlinx.rpc.grpc.server.GrpcServer import kotlinx.rpc.grpc.statusCode import kotlinx.rpc.registerService import kotlin.test.Test @@ -51,7 +52,7 @@ class GrpcCoreClientTest { ) private fun ManagedChannel.newHelloCall(fullName: String = "kotlinx.rpc.grpc.test.GreeterService/SayHello"): ClientCall = - platformApi.newCall(descriptorFor(fullName), GrpcDefaultCallOptions) + platformApi.createCall(descriptorFor(fullName), GrpcCallOptions()) private fun createChannel(): ManagedChannel = ManagedChannelBuilder( target = "localhost:$PORT", diff --git a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcTimeoutTest.kt b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcTimeoutTest.kt new file mode 100644 index 000000000..8b51571ae --- /dev/null +++ b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcTimeoutTest.kt @@ -0,0 +1,127 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.rpc.grpc.test.proto + +import kotlinx.rpc.RpcServer +import kotlinx.rpc.grpc.StatusCode +import kotlinx.rpc.grpc.StatusException +import kotlinx.rpc.grpc.statusCode +import kotlinx.rpc.grpc.test.EchoRequest +import kotlinx.rpc.grpc.test.EchoService +import kotlinx.rpc.grpc.test.EchoServiceImpl +import kotlinx.rpc.grpc.test.invoke +import kotlinx.rpc.registerService +import kotlinx.rpc.withService +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertNull +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.seconds + +class GrpcTimeoutTest : GrpcProtoTest() { + + override fun RpcServer.registerServices() { + registerService { EchoServiceImpl() } + } + + @Test + fun `test timeout causes DEADLINE_EXCEEDED when call exceeds timeout`() { + val exc = assertFailsWith { + runGrpcTest( + clientInterceptors = clientInterceptor { + callOptions.timeout = 500.milliseconds + proceed(it) + } + ) { + // Server will delay for 1 second, but timeout is 0.5 seconds + val request = EchoRequest { message = "Echo"; timeout = 1000u } + it.withService().UnaryEcho(request) + } + } + assertEquals(StatusCode.DEADLINE_EXCEEDED, exc.getStatus().statusCode) + } + + @Test + fun `test timeout does not trigger when call completes within timeout`() { + runGrpcTest( + clientInterceptors = clientInterceptor { + callOptions.timeout = 2.seconds + proceed(it) + } + ) { + // Server will delay for 500ms, timeout is 2 seconds + val request = EchoRequest { message = "Success"; timeout = 500u } + val response = it.withService().UnaryEcho(request) + assertEquals("Success", response.message) + } + } + + @Test + fun `test default timeout is null`() { + runGrpcTest( + clientInterceptors = clientInterceptor { + // Verify default timeout is null + assertNull(callOptions.timeout) + proceed(it) + } + ) { + val request = EchoRequest { message = "Default timeout"; timeout = 100u } + val response = it.withService().UnaryEcho(request) + assertEquals("Default timeout", response.message) + } + } + + @Test + fun `test timeout applies to actual call duration not just processing time`() { + val exc = assertFailsWith { + runGrpcTest( + clientInterceptors = clientInterceptor { + // Set timeout before making the call + callOptions.timeout = 500.milliseconds + proceed(it) + } + ) { + // Server delays for 1 second + val request = EchoRequest { message = "Echo"; timeout = 500u } + it.withService().UnaryEcho(request) + } + } + assertEquals(StatusCode.DEADLINE_EXCEEDED, exc.getStatus().statusCode) + } + + @Test + fun `test timeout set to very short milliseconds triggers immediately`() { + val exc = assertFailsWith { + runGrpcTest( + clientInterceptors = clientInterceptor { + callOptions.timeout = 1.milliseconds + proceed(it) + } + ) { + // Even with no server delay, 0ms timeout should trigger + val request = EchoRequest { message = "Echo"; timeout = 0u } + it.withService().UnaryEcho(request) + } + } + assertEquals(StatusCode.DEADLINE_EXCEEDED, exc.getStatus().statusCode) + } + + @Test + fun `test timeout boundary condition - call completes just before timeout`() { + runGrpcTest( + clientInterceptors = clientInterceptor { + callOptions.timeout = 2.seconds + proceed(it) + } + ) { + // Server delays for slightly less than timeout + val request = EchoRequest { message = "Just in time"; timeout = 1800u } + val response = it.withService().UnaryEcho(request) + assertEquals("Just in time", response.message) + } + } + +} \ No newline at end of file