diff --git a/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/ClientInterceptor.kt b/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/ClientInterceptor.kt index 753fa0e1c..f8c4e318e 100644 --- a/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/ClientInterceptor.kt +++ b/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/ClientInterceptor.kt @@ -7,7 +7,7 @@ package kotlinx.rpc.grpc.client import kotlinx.coroutines.flow.Flow import kotlinx.rpc.grpc.GrpcMetadata import kotlinx.rpc.grpc.Status -import kotlinx.rpc.grpc.client.internal.GrpcCallOptions +import kotlinx.rpc.grpc.client.GrpcCallOptions import kotlinx.rpc.grpc.descriptor.MethodDescriptor /** diff --git a/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/GrpcCallOptions.kt b/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/GrpcCallOptions.kt new file mode 100644 index 000000000..8bf380bad --- /dev/null +++ b/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/GrpcCallOptions.kt @@ -0,0 +1,52 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.rpc.grpc.client + +import kotlinx.rpc.grpc.GrpcCompression +import kotlin.time.Duration + +/** + * The collection of runtime options for a new gRPC call. + * + * This class allows configuring per-call behavior such as timeouts. + */ +public class GrpcCallOptions { + /** + * The maximum duration to wait for the RPC to complete. + * + * If set, the RPC will be canceled (with `DEADLINE_EXCEEDED`) + * if it does not complete within the specified duration. + * The timeout is measured from the moment the call is initiated. + * If `null`, no timeout is applied, and the call may run indefinitely. + * + * The default value is `null`. + * + * @see kotlin.time.Duration + */ + public var timeout: Duration? = null + + /** + * The compression algorithm to use for encoding outgoing messages in this call. + * + * When set to a value other than [GrpcCompression.None], the client will compress request messages + * using the specified algorithm before sending them to the server. The chosen compression algorithm + * is communicated to the server via the `grpc-encoding` header. + * + * ## Default Behavior + * Defaults to [GrpcCompression.None], meaning no compression is applied to messages. + * + * ## Server Compatibility + * **Important**: It is the caller's responsibility to ensure the server supports the chosen + * compression algorithm. There is no automatic negotiation performed. If the server does not + * support the requested compression, the call will fail. + * + * ## Available Algorithms + * - [GrpcCompression.None]: No compression (identity encoding) - **default** + * - [GrpcCompression.Gzip]: GZIP compression, widely supported + * + * @see GrpcCompression + */ + public var compression: GrpcCompression = GrpcCompression.None +} \ No newline at end of file diff --git a/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/GrpcClient.kt b/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/GrpcClient.kt index 2e6316503..0e0426d6c 100644 --- a/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/GrpcClient.kt +++ b/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/GrpcClient.kt @@ -8,7 +8,7 @@ import kotlinx.coroutines.flow.Flow import kotlinx.rpc.RpcCall import kotlinx.rpc.RpcClient import kotlinx.rpc.grpc.GrpcMetadata -import kotlinx.rpc.grpc.client.internal.GrpcCallOptions +import kotlinx.rpc.grpc.client.GrpcCallOptions import kotlinx.rpc.grpc.client.internal.ManagedChannel import kotlinx.rpc.grpc.client.internal.ManagedChannelBuilder import kotlinx.rpc.grpc.client.internal.bidirectionalStreamingRpc diff --git a/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcCallOptions.kt b/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcCallOptions.kt deleted file mode 100644 index 79fe7c429..000000000 --- a/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcCallOptions.kt +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.rpc.grpc.client.internal - -import kotlin.time.Duration - -/** - * The collection of runtime options for a new gRPC call. - * - * This class allows configuring per-call behavior such as timeouts. - */ -public class GrpcCallOptions { - /** - * The maximum duration to wait for the RPC to complete. - * - * If set, the RPC will be canceled (with `DEADLINE_EXCEEDED`) - * if it does not complete within the specified duration. - * The timeout is measured from the moment the call is initiated. - * If `null`, no timeout is applied, and the call may run indefinitely. - * - * The default value is `null`. - * - * @see Duration - */ - public var timeout: Duration? = null -} diff --git a/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.kt b/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.kt index 6e378df59..e86a3b1f8 100644 --- a/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.kt +++ b/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.kt @@ -4,6 +4,7 @@ package kotlinx.rpc.grpc.client.internal +import kotlinx.rpc.grpc.client.GrpcCallOptions import kotlinx.rpc.grpc.descriptor.MethodDescriptor import kotlinx.rpc.internal.utils.InternalRpcApi diff --git a/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/suspendClientCalls.kt b/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/suspendClientCalls.kt index 984771534..47fb7e60f 100644 --- a/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/suspendClientCalls.kt +++ b/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/suspendClientCalls.kt @@ -22,6 +22,7 @@ import kotlinx.rpc.grpc.Status import kotlinx.rpc.grpc.StatusCode import kotlinx.rpc.grpc.StatusException import kotlinx.rpc.grpc.client.ClientCallScope +import kotlinx.rpc.grpc.client.GrpcCallOptions import kotlinx.rpc.grpc.client.GrpcClient import kotlinx.rpc.grpc.descriptor.MethodDescriptor import kotlinx.rpc.grpc.descriptor.MethodType diff --git a/grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcCallOptions.jvm.kt b/grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/GrpcCallOptions.jvm.kt similarity index 62% rename from grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcCallOptions.jvm.kt rename to grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/GrpcCallOptions.jvm.kt index 2fc5ba105..ba337ef58 100644 --- a/grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcCallOptions.jvm.kt +++ b/grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/GrpcCallOptions.jvm.kt @@ -2,16 +2,21 @@ * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. */ -package kotlinx.rpc.grpc.client.internal +package kotlinx.rpc.grpc.client import io.grpc.CallOptions +import kotlinx.rpc.grpc.GrpcCompression import kotlinx.rpc.internal.utils.InternalRpcApi +import java.util.concurrent.TimeUnit @InternalRpcApi public fun GrpcCallOptions.toJvm(): CallOptions { var default = CallOptions.DEFAULT if (timeout != null) { - default = default.withDeadlineAfter(timeout!!.inWholeMilliseconds, java.util.concurrent.TimeUnit.MILLISECONDS) + default = default.withDeadlineAfter(timeout!!.inWholeMilliseconds, TimeUnit.MILLISECONDS) + } + if (compression !is GrpcCompression.None) { + default = default.withCompression(compression.name) } return default } \ No newline at end of file diff --git a/grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.jvm.kt b/grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.jvm.kt index 9d16fcde6..b5e1b016d 100644 --- a/grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.jvm.kt +++ b/grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.jvm.kt @@ -5,6 +5,8 @@ package kotlinx.rpc.grpc.client.internal import io.grpc.Channel +import kotlinx.rpc.grpc.client.GrpcCallOptions +import kotlinx.rpc.grpc.client.toJvm import kotlinx.rpc.grpc.descriptor.MethodDescriptor import kotlinx.rpc.internal.utils.InternalRpcApi diff --git a/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcCallOptions.native.kt b/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/GrpcCallOptions.native.kt similarity index 94% rename from grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcCallOptions.native.kt rename to grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/GrpcCallOptions.native.kt index 1ca18bb86..3d0f05372 100644 --- a/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcCallOptions.native.kt +++ b/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/GrpcCallOptions.native.kt @@ -4,7 +4,7 @@ @file:OptIn(ExperimentalForeignApi::class) -package kotlinx.rpc.grpc.client.internal +package kotlinx.rpc.grpc.client import kotlinx.cinterop.CValue import kotlinx.cinterop.ExperimentalForeignApi @@ -25,4 +25,4 @@ public fun GrpcCallOptions.rawDeadline(): CValue { gpr_time_from_millis(it.inWholeMilliseconds, GPR_TIMESPAN) ) } ?: gpr_inf_future(GPR_CLOCK_REALTIME) -} +} \ No newline at end of file diff --git a/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.native.kt b/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.native.kt index 9810e0b50..d14356296 100644 --- a/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.native.kt +++ b/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.native.kt @@ -4,6 +4,7 @@ package kotlinx.rpc.grpc.client.internal +import kotlinx.rpc.grpc.client.GrpcCallOptions import kotlinx.rpc.grpc.descriptor.MethodDescriptor import kotlinx.rpc.internal.utils.InternalRpcApi diff --git a/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/NativeClientCall.kt b/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/NativeClientCall.kt index bf32a27e0..cfc95970d 100644 --- a/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/NativeClientCall.kt +++ b/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/NativeClientCall.kt @@ -26,6 +26,7 @@ import kotlinx.coroutines.CompletableJob import kotlinx.rpc.grpc.GrpcMetadata import kotlinx.rpc.grpc.Status import kotlinx.rpc.grpc.StatusCode +import kotlinx.rpc.grpc.append import kotlinx.rpc.grpc.descriptor.MethodDescriptor import kotlinx.rpc.grpc.internal.BatchResult import kotlinx.rpc.grpc.internal.CompletionQueue @@ -36,6 +37,8 @@ import kotlinx.rpc.grpc.internal.toGrpcByteBuffer import kotlinx.rpc.grpc.internal.toKotlin import kotlinx.rpc.protobuf.input.stream.asInputStream import kotlinx.rpc.protobuf.input.stream.asSource +import kotlinx.rpc.grpc.GrpcCompression +import kotlinx.rpc.grpc.client.GrpcCallOptions import libkgrpc.GRPC_OP_RECV_INITIAL_METADATA import libkgrpc.GRPC_OP_RECV_MESSAGE import libkgrpc.GRPC_OP_RECV_STATUS_ON_CLIENT @@ -63,6 +66,7 @@ internal class NativeClientCall( private val cq: CompletionQueue, internal val raw: CPointer, private val methodDescriptor: MethodDescriptor, + private val callOptions: GrpcCallOptions, private val callJob: CompletableJob, ) : ClientCall() { @@ -308,6 +312,16 @@ internal class NativeClientCall( val opsNum = 2uL val ops = arena.allocArray(opsNum.convert()) + // add compression algorithm to the call metadata. + // the gRPC core will read the header and perform the compression (compression_filter.cc). + if (callOptions.compression !is GrpcCompression.None) { + if (callOptions.compression !is GrpcCompression.Gzip) { + // to match the behavior of grpc-java, we throw an error if the compression algorithm is not supported. + cancelInternal(grpc_status_code.GRPC_STATUS_INTERNAL, "Unable to find compressor by name ${callOptions.compression.name}") + } + headers.append("grpc-internal-encoding-request", callOptions.compression.name) + } + // turn given headers into a grpc_metadata_array. val sendInitialMetadata: grpc_metadata_array = with(headers) { arena.allocRawGrpcMetadata() @@ -460,4 +474,3 @@ internal class NativeClientCall( } } } - diff --git a/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/NativeManagedChannel.kt b/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/NativeManagedChannel.kt index 39434194a..5cf6563b9 100644 --- a/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/NativeManagedChannel.kt +++ b/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/NativeManagedChannel.kt @@ -20,6 +20,8 @@ import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.cancelChildren import kotlinx.coroutines.withTimeoutOrNull import kotlinx.rpc.grpc.client.ClientCredentials +import kotlinx.rpc.grpc.client.GrpcCallOptions +import kotlinx.rpc.grpc.client.rawDeadline import kotlinx.rpc.grpc.descriptor.MethodDescriptor import kotlinx.rpc.grpc.internal.CompletionQueue import kotlinx.rpc.grpc.internal.GrpcRuntime @@ -163,7 +165,7 @@ internal class NativeManagedChannel( grpc_slice_unref(methodNameSlice) return NativeClientCall( - cq, rawCall, methodDescriptor, callJob + cq, rawCall, methodDescriptor, callOptions, callJob ) } diff --git a/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/GrpcCompression.kt b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/GrpcCompression.kt new file mode 100644 index 000000000..01a7a0ca3 --- /dev/null +++ b/grpc/grpc-core/src/commonMain/kotlin/kotlinx/rpc/grpc/GrpcCompression.kt @@ -0,0 +1,55 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.rpc.grpc + +/** + * Represents a compression algorithm for gRPC message encoding. + * + * Compression can be applied to gRPC messages to reduce bandwidth usage during transmission. + * + * ## Supported Algorithms + * - [None] (identity): No compression is applied. + * - [Gzip]: GZIP compression algorithm, widely supported and provides good compression ratios. + * + * This interface is not meant to be implemented by users. + * + * @property name The compression algorithm identifier sent in the `grpc-encoding` header. + * + * @see kotlinx.rpc.grpc.client.GrpcCallOptions.compression + * @see GrpcCompression.None + * @see GrpcCompression.Gzip + */ +@OptIn(ExperimentalSubclassOptIn::class) +@SubclassOptInRequired +public interface GrpcCompression { + + /** + * The name of the compression algorithm as it appears in the `grpc-encoding` header. + */ + public val name: String + + /** + * Represents no compression (identity encoding). + * + * This is the default compression setting. When used, messages are transmitted without + * any compression applied. + */ + public object None : GrpcCompression { + override val name: String = "identity" + } + + /** + * Represents GZIP compression. + * + * GZIP is a widely supported compression algorithm that provides good compression ratios + * for most data types. + * + * **Note**: Ensure the server supports GZIP compression before using this option, + * as the call will fail if the server cannot handle the requested compression algorithm. + */ + public object Gzip : GrpcCompression { + override val name: String = "gzip" + } +} \ No newline at end of file diff --git a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt index 1dbef8b5b..d24d197f9 100644 --- a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt +++ b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/CoreClientTest.kt @@ -13,7 +13,7 @@ import kotlinx.rpc.grpc.Status import kotlinx.rpc.grpc.StatusCode import kotlinx.rpc.grpc.client.createInsecureClientCredentials import kotlinx.rpc.grpc.client.internal.ClientCall -import kotlinx.rpc.grpc.client.internal.GrpcCallOptions +import kotlinx.rpc.grpc.client.GrpcCallOptions import kotlinx.rpc.grpc.client.internal.ManagedChannel import kotlinx.rpc.grpc.client.internal.ManagedChannelBuilder import kotlinx.rpc.grpc.client.internal.buildChannel diff --git a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcCompressionTest.kt b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcCompressionTest.kt new file mode 100644 index 000000000..e14276919 --- /dev/null +++ b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcCompressionTest.kt @@ -0,0 +1,239 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.rpc.grpc.test.proto + +import kotlinx.coroutines.test.runTest +import kotlinx.rpc.RpcServer +import kotlinx.rpc.grpc.GrpcCompression +import kotlinx.rpc.grpc.GrpcMetadata +import kotlinx.rpc.grpc.Status +import kotlinx.rpc.grpc.StatusCode +import kotlinx.rpc.grpc.get +import kotlinx.rpc.grpc.keys +import kotlinx.rpc.grpc.test.EchoRequest +import kotlinx.rpc.grpc.test.EchoService +import kotlinx.rpc.grpc.test.EchoServiceImpl +import kotlinx.rpc.grpc.test.Runtime +import kotlinx.rpc.grpc.test.assertContainsAll +import kotlinx.rpc.grpc.test.assertGrpcFailure +import kotlinx.rpc.grpc.test.captureStdErr +import kotlinx.rpc.grpc.test.clearNativeEnv +import kotlinx.rpc.grpc.test.invoke +import kotlinx.rpc.grpc.test.runtime +import kotlinx.rpc.grpc.test.setNativeEnv +import kotlinx.rpc.registerService +import kotlinx.rpc.withService +import kotlin.collections.emptyList +import kotlin.test.Test +import kotlin.test.assertEquals + +/** + * Tests that the client can configure the compression of requests. + * + * This test is hard to realize on native, as the gRPC-Core doesn't expose internal headers like + * `grpc-encoding` to the user application. This means we cannot verify that the client or sever + * actually sent those headers on native. Instead, we capture the grpc trace output (written to stderr) + * and verify that the client and server actually used the compression algorithm. + */ +class GrpcCompressionTest : GrpcProtoTest() { + override fun RpcServer.registerServices() { + return registerService { EchoServiceImpl() } + } + + @Test + fun `test gzip client compression - should succeed`() = runTest { + testCompression( + clientCompression = GrpcCompression.Gzip, + expectedEncoding = "gzip", + expectedRequestCompressionAlg = 2, + expectedRequestDecompressionAlg = 2 + ) + } + + @Test + fun `test identity compression - should not compress`() = runTest { + testCompression( + clientCompression = GrpcCompression.None, + expectedEncoding = null, + expectedRequestCompressionAlg = 0, + expectedRequestDecompressionAlg = 0 + ) + } + + @Test + fun `test no compression set - should not compress`() = runTest { + testCompression( + clientCompression = null, + expectedEncoding = null, + expectedRequestCompressionAlg = 0, + expectedRequestDecompressionAlg = 0 + ) + } + + @Test + fun `test unknown compression - should fail`() = assertGrpcFailure( + StatusCode.INTERNAL, + "Unable to find compressor by name unknownCompressionName" + ) { + runGrpcTest( + clientInterceptors = clientInterceptor { + callOptions.compression = object : GrpcCompression { + override val name: String + get() = "unknownCompressionName" + + } + proceed(it) + } + ) { client -> + client.withService().UnaryEcho(EchoRequest.invoke { message = "Unknown compression" }) + } + } + + private suspend fun testCompression( + clientCompression: GrpcCompression?, + expectedEncoding: String?, + expectedRequestCompressionAlg: Int, + expectedRequestDecompressionAlg: Int, + expectedResponseCompressionAlg: Int = 0, + expectedResponseDecompressionAlg: Int = 0 + ) { + var reqHeaders = emptyMap() + var respHeaders = emptyMap() + val logs = captureNativeGrpcLogs { + runGrpcTest( + clientInterceptors = clientInterceptor { + clientCompression?.let { compression -> + callOptions.compression = compression + } + onHeaders { headers -> respHeaders = headers.toMap() } + proceed(it) + }, + serverInterceptors = serverInterceptor { + reqHeaders = requestHeaders.toMap() + proceed(it) + } + ) { + val message = "Echo with ${clientCompression?.name}" + val response = it.withService().UnaryEcho(EchoRequest.invoke { this.message = message }) + + // Verify the call succeeded and data is correct + assertEquals(message, response.message) + } + } + + if (runtime == Runtime.NATIVE) { + // if we are on native, we need to parse the logs manually to get the `grpc-` prefixed headers + val traceHeaders = HeadersTrace.fromTrace(logs) + reqHeaders = traceHeaders.requestHeaders + respHeaders = traceHeaders.responseHeaders + + // verify that the client and server actually used the expected compression algorithm + val compression = CompressionTrace.fromTrace(logs) + assertEquals(expectedRequestCompressionAlg, compression.requestCompressionAlg) + assertEquals(expectedRequestDecompressionAlg, compression.requestDecompressionAlg) + assertEquals(expectedResponseCompressionAlg, compression.responseCompressionAlg) + assertEquals(expectedResponseDecompressionAlg, compression.responseDecompressionAlg) + } + + fun Map.grpcAcceptEncoding() = + this["grpc-accept-encoding"]?.split(",")?.map { it.trim() } ?: emptyList() + + // check request headers + if (expectedEncoding != null) { + assertEquals(expectedEncoding, reqHeaders["grpc-encoding"]) + } + assertContainsAll(listOf("gzip"), reqHeaders.grpcAcceptEncoding()) + + assertContainsAll(listOf("gzip"), respHeaders.grpcAcceptEncoding()) + } + + private suspend fun captureNativeGrpcLogs(block: suspend () -> Unit): String { + try { + return captureStdErr { + setNativeEnv("GRPC_TRACE", "compression,http") + block() + } + } finally { + clearNativeEnv("GRPC_GRACE") + } + } + + private fun GrpcMetadata.toMap(): Map { + return keys().mapNotNull { key -> + if (!key.endsWith("-bin")) { + key to this@toMap[key]!! + } else null + }.toMap() + } + + data class CompressionTrace( + val requestCompressionAlg: Int, + val requestDecompressionAlg: Int, + val responseCompressionAlg: Int, + val responseDecompressionAlg: Int + ) { + companion object { + fun fromTrace(logs: String): CompressionTrace { + val compressMessageRegex = Regex("""CompressMessage: len=\d+ alg=(\d+)""") + val decompressMessageRegex = Regex("""DecompressMessage: len=\d+ max=\d+ alg=(\d+)""") + + val compressions = compressMessageRegex.findAll(logs).map { it.groupValues[1].toInt() }.toList() + val decompressions = decompressMessageRegex.findAll(logs).map { it.groupValues[1].toInt() }.toList() + + require(compressions.size == 2) { + "Expected exactly 2 CompressMessage entries, but found ${compressions.size}" + } + require(decompressions.size == 2) { + "Expected exactly 2 DecompressMessage entries, but found ${decompressions.size}" + } + + return CompressionTrace( + requestCompressionAlg = compressions[0], + requestDecompressionAlg = decompressions[0], + responseCompressionAlg = compressions[1], + responseDecompressionAlg = decompressions[1] + ) + } + } + } + + data class HeadersTrace( + val requestHeaders: Map, + val responseHeaders: Map + ) { + companion object { + fun fromTrace(logs: String): HeadersTrace { + val metadataRegex = Regex( + """perform_stream_op\[.*SEND_INITIAL_METADATA\{([^}]+)\}""", + RegexOption.MULTILINE + ) + + val metadataBlocks = metadataRegex.findAll(logs).map { it.groupValues[1] }.toList() + + require(metadataBlocks.size == 2) { + "Expected exactly 2 SEND_INITIAL_METADATA entries, but found ${metadataBlocks.size}" + } + + return HeadersTrace( + requestHeaders = parseHeaders(metadataBlocks[0]), + responseHeaders = parseHeaders(metadataBlocks[1]) + ) + } + + private fun parseHeaders(metadataBlock: String): Map { + val headers = mutableMapOf() + val headerRegex = Regex("""([^:,]+):\s*([^,]+(?:,\s*[^:,]+)*)(?=,\s+[^:,]+:|${'$'})""") + + for (match in headerRegex.findAll(metadataBlock)) { + val key = match.groupValues[1].trim() + val value = match.groupValues[2].trim() + headers[key] = value + } + + return headers + } + } + } +} \ No newline at end of file diff --git a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/utils.kt b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/utils.kt index 57282bb57..f4b0ea9cf 100644 --- a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/utils.kt +++ b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/utils.kt @@ -11,7 +11,6 @@ import kotlin.test.assertContains import kotlin.test.assertEquals import kotlin.test.assertFailsWith - fun assertGrpcFailure(statusCode: StatusCode, message: String? = null, block: () -> Unit) { val exc = assertFailsWith(message) { block() } assertEquals(statusCode, exc.getStatus().statusCode) @@ -19,3 +18,30 @@ fun assertGrpcFailure(statusCode: StatusCode, message: String? = null, block: () assertContains(message, exc.getStatus().getDescription() ?: "") } } + +fun assertContainsAll(actual: Iterable, expected: Iterable) { + val expectedSet = expected.toSet() + for (element in actual) { + require(element in expectedSet) { + "Actual element '$element' not found in expected collection" + } + } +} + +enum class Runtime { + JVM, + NATIVE +} +expect val runtime: Runtime + +expect fun setNativeEnv(key: String, value: String) +expect fun clearNativeEnv(key: String) + +/** + * Captures the standard error output written during the execution of the provided suspending block. + * + * @param block A suspending lambda function whose standard error output will be captured. + * @return A string containing the captured standard error output. + */ +expect suspend fun captureStdErr(block: suspend () -> Unit): String + diff --git a/grpc/grpc-core/src/jvmTest/kotlin/kotlinx/rpc/grpc/test/utils.jvm.kt b/grpc/grpc-core/src/jvmTest/kotlin/kotlinx/rpc/grpc/test/utils.jvm.kt new file mode 100644 index 000000000..ef56db05d --- /dev/null +++ b/grpc/grpc-core/src/jvmTest/kotlin/kotlinx/rpc/grpc/test/utils.jvm.kt @@ -0,0 +1,32 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.rpc.grpc.test + +import java.io.ByteArrayOutputStream +import java.io.PrintStream + +actual val runtime: Runtime + get() = Runtime.JVM + +actual fun setNativeEnv(key: String, value: String) { + // Nothing to do on JVM +} + +actual fun clearNativeEnv(key: String) { + // Nothing to do on JVM +} + +actual suspend fun captureStdErr(block: suspend () -> Unit): String { + val orig = System.out + val baos = ByteArrayOutputStream() + System.setOut(PrintStream(baos)) + try { + block() + return baos.toString() + } finally { + System.setOut(orig) + } +} + diff --git a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/GrpcMetadata.native.kt b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/GrpcMetadata.native.kt index f25c89249..6f9437776 100644 --- a/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/GrpcMetadata.native.kt +++ b/grpc/grpc-core/src/nativeMain/kotlin/kotlinx/rpc/grpc/GrpcMetadata.native.kt @@ -2,7 +2,7 @@ * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. */ -@file:OptIn(ExperimentalForeignApi::class, ExperimentalNativeApi::class) +@file:OptIn(ExperimentalForeignApi::class, ExperimentalNativeApi::class, ExperimentalEncodingApi::class) package kotlinx.rpc.grpc @@ -32,6 +32,8 @@ import libkgrpc.grpc_slice_ref import libkgrpc.grpc_slice_unref import libkgrpc.kgrpc_metadata_array_append import kotlin.experimental.ExperimentalNativeApi +import kotlin.io.encoding.Base64 +import kotlin.io.encoding.ExperimentalEncodingApi public actual class GrpcMetadataKey actual constructor(name: String, public val codec: MessageCodec) { public val name: String = name.lowercase() @@ -105,6 +107,28 @@ public actual class GrpcMetadata actual constructor() { return raw } + + @OptIn(ExperimentalEncodingApi::class) + override fun toString(): String { + val sb = StringBuilder("Metadata(") + var first = true + for ((key, values) in map) { + for (value in values) { + if (!first) { + sb.append(',') + } + first = false + sb.append(key).append('=') + if (key.endsWith("-bin")) { + sb.append(Base64.encode(value)) + } else { + sb.append(value.toAsciiString()) + } + } + } + return sb.append(')').toString() + } + } public actual operator fun GrpcMetadata.get(key: String): String? { @@ -276,8 +300,6 @@ private val VALID_KEY_CHARS by lazy { @OptIn(ObsoleteNativeApi::class) private fun GrpcMetadataKey.validateName() { - require(!name.startsWith("grpc-")) { "Header is named $name. It must not start with 'grpc-' as it is reserved for internal use." } - for (char in name) { require(VALID_KEY_CHARS[char.code]) { "Header is named $name. It contains illegal character $char." } } diff --git a/grpc/grpc-core/src/nativeTest/kotlin/kotlinx/rpc/grpc/test/utils.native.kt b/grpc/grpc-core/src/nativeTest/kotlin/kotlinx/rpc/grpc/test/utils.native.kt new file mode 100644 index 000000000..17565693a --- /dev/null +++ b/grpc/grpc-core/src/nativeTest/kotlin/kotlinx/rpc/grpc/test/utils.native.kt @@ -0,0 +1,77 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:OptIn(ExperimentalForeignApi::class) + +package kotlinx.rpc.grpc.test + +import kotlinx.cinterop.ExperimentalForeignApi +import kotlinx.cinterop.IntVar +import kotlinx.cinterop.allocArray +import kotlinx.cinterop.convert +import kotlinx.cinterop.get +import kotlinx.cinterop.memScoped +import kotlinx.cinterop.refTo +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.IO +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch +import platform.posix.STDERR_FILENO +import platform.posix.close +import platform.posix.dup +import platform.posix.dup2 +import platform.posix.fflush +import platform.posix.fprintf +import platform.posix.pipe +import platform.posix.read +import platform.posix.stderr + +actual val runtime: Runtime + get() = Runtime.NATIVE + +actual fun setNativeEnv(key: String, value: String) { + platform.posix.setenv(key, value, 1) +} + +actual fun clearNativeEnv(key: String) { + platform.posix.unsetenv(key) +} + +actual suspend fun captureStdErr(block: suspend () -> Unit): String = coroutineScope { + memScoped { + val pipeErr = allocArray(2) + check(pipe(pipeErr) == 0) { "pipe stderr failed" } + + val savedStderr = dup(STDERR_FILENO) + + // redirect stderr write end + check(dup2(pipeErr[1], STDERR_FILENO) != -1) { "dup2 stderr failed" } + close(pipeErr[1]) + + val outputBuf = StringBuilder() + val readJob = launch(Dispatchers.IO) { + val buf = ByteArray(4096) + var r: Long + do { + r = read(pipeErr[0], buf.refTo(0), buf.size.convert()) + if (r > 0) outputBuf.append(buf.decodeToString(0, r.convert())) + } while (r > 0) + close(pipeErr[0]) + } + + try { + block() + } finally { + fflush(stderr) + // restore stderr + dup2(savedStderr, STDERR_FILENO) + close(savedStderr) + } + + // wait reading to finish + readJob.join() + outputBuf.toString() + } +} +