Skip to content

Commit 8628143

Browse files
committed
grpc: Implement native side
1 parent 7acf30c commit 8628143

File tree

12 files changed

+170
-67
lines changed

12 files changed

+170
-67
lines changed

grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package kotlinx.rpc.grpc.client.internal
66

7+
import kotlinx.coroutines.CoroutineScope
78
import kotlinx.rpc.grpc.client.GrpcCallOptions
89
import kotlinx.rpc.grpc.descriptor.MethodDescriptor
910
import kotlinx.rpc.internal.utils.InternalRpcApi

grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/suspendClientCalls.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import kotlinx.rpc.grpc.Status
2222
import kotlinx.rpc.grpc.StatusCode
2323
import kotlinx.rpc.grpc.StatusException
2424
import kotlinx.rpc.grpc.client.ClientCallScope
25+
import kotlinx.rpc.grpc.client.EmptyCallCredentials
26+
import kotlinx.rpc.grpc.client.GrpcCallCredentials
2527
import kotlinx.rpc.grpc.client.GrpcCallOptions
2628
import kotlinx.rpc.grpc.client.GrpcClient
2729
import kotlinx.rpc.grpc.descriptor.MethodDescriptor
@@ -197,10 +199,11 @@ private class ClientCallScopeImpl<Request, Response>(
197199

198200
private fun doCall(request: Flow<Request>): Flow<Response> = flow {
199201
coroutineScope {
202+
200203
val call = client.channel.platformApi.createCall(method, callOptions)
201204

202205
/*
203-
* We maintain a buffer of size 1 so onMessage never has to block: it only gets called after
206+
* We maintain a buffer of size 1, so onMessage never has to block: it only gets called after
204207
* we request a response from the server, which only happens when responses is empty and
205208
* there is room in the buffer.
206209
*/

grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/credentials.jvm.kt

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,16 +69,18 @@ internal fun GrpcCallCredentials.toJvm(): CallCredentials {
6969
CoroutineScope(dispatcher).launch {
7070
try {
7171
check(!requiresTransportSecurity || requestInfo.securityLevel != SecurityLevel.NONE) {
72-
"Transport security required but not present"
72+
"Established channel does not have a sufficient security level to transfer call credential."
7373
}
7474

75-
val context = GrpcCallCredentials.Context(requestInfo.methodDescriptor, requestInfo.authority)
75+
val context = GrpcCallCredentials.Context(requestInfo.authority, requestInfo.methodDescriptor.fullMethodName)
7676
val metadata = context.getRequestMetadata()
7777
applier.apply(metadata)
78-
} catch (e: StatusException) {
79-
applier.fail(e.status)
8078
} catch (e: Exception) {
81-
applier.fail(Status.UNAUTHENTICATED.withCause(e))
79+
// we are not treating StatusExceptions separately, as currently there is no
80+
// clean way to support the same feature on native. So for the sake of similar behavior,
81+
// we always fail with Status.UNAVAILABLE. (KRPC-233)
82+
val description = "Getting metadata from call credentials failed with error: ${e.message}"
83+
applier.fail(Status.UNAVAILABLE.withDescription(description).withCause(e))
8284
}
8385
}
8486
}

grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/GrpcCallCredentials.native.kt

Lines changed: 54 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ package kotlinx.rpc.grpc.client
88

99
import cnames.structs.grpc_call_credentials
1010
import kotlinx.cinterop.*
11+
import kotlinx.coroutines.CancellationException
12+
import kotlinx.coroutines.CoroutineDispatcher
1113
import kotlinx.coroutines.CoroutineScope
14+
import kotlinx.coroutines.Job
1215
import kotlinx.coroutines.launch
1316
import kotlinx.rpc.grpc.GrpcMetadata
1417
import kotlinx.rpc.grpc.StatusException
@@ -21,68 +24,74 @@ import platform.posix.size_tVar
2124
// Stable reference holder for Kotlin objects
2225
private class CredentialsPluginState(
2326
val kotlinCreds: GrpcCallCredentials,
24-
val scope: CoroutineScope
27+
val parentJob: Job,
28+
val coroutineDispatcher: CoroutineDispatcher
2529
)
2630

2731
private fun getMetadataCallback(
2832
state: COpaquePointer?,
2933
context: CValue<grpc_auth_metadata_context>,
3034
cb: grpc_credentials_plugin_metadata_cb?,
31-
user_data: COpaquePointer?,
32-
creds_md: CValuesRef<grpc_metadata>?,
33-
num_creds_md: CPointer<size_tVar>?,
35+
userData: COpaquePointer?,
36+
credsMd: CPointer<grpc_metadata>?,
37+
numCredMd: CPointer<size_tVar>?,
3438
status: CPointer<grpc_status_code.Var>?,
35-
error_details: CPointer<CPointerVar<ByteVar>>?
39+
errorDetails: CPointer<CPointerVar<ByteVar>>?
3640
): Int {
3741
val pluginState = state!!.asStableRef<CredentialsPluginState>().get()
3842

39-
// Launch coroutine to call suspend function asynchronously
40-
pluginState.scope.launch {
43+
fun notifyResult(metadata: GrpcMetadata, status: grpc_status_code, errorDetails: String?) {
44+
memScoped {
45+
// Convert GrpcMetadata to grpc_metadata array
46+
val metadataArray = with(metadata) {
47+
this@memScoped.allocRawGrpcMetadata()
48+
}
49+
50+
try {
51+
// Invoke the callback with success
52+
cb?.invoke(
53+
userData,
54+
metadataArray.metadata,
55+
metadataArray.count,
56+
status,
57+
errorDetails?.cstr?.ptr
58+
)
59+
} finally {
60+
metadataArray.destroyEntries()
61+
}
62+
}
63+
}
64+
65+
val scope = CoroutineScope(Job(pluginState.parentJob) + pluginState.coroutineDispatcher)
66+
67+
// Launch coroutine to call a suspend function asynchronously
68+
scope.launch {
4169
// Extract context information
4270
val serviceUrl = context.useContents { service_url?.toKString() ?: "" }
4371
val methodName = context.useContents { method_name?.toKString() ?: "" }
4472
val authority = extractAuthority(serviceUrl)
73+
val serviceFq = serviceUrl.removeUntil("$authority/")
4574

4675
// Create Kotlin context
4776
val kotlinContext = GrpcCallCredentials.Context(
4877
authority = authority,
49-
methodName = methodName,
78+
methodName = "$serviceFq/$methodName"
5079
)
5180

5281
var metadata = GrpcMetadata()
53-
var status = grpc_status_code.GRPC_STATUS_OK
54-
var errorDetails: String? = null
5582
try {
5683
// Call the Kotlin suspend function
5784
metadata = with(pluginState.kotlinCreds) {
5885
kotlinContext.getRequestMetadata()
5986
}
87+
notifyResult(metadata, grpc_status_code.GRPC_STATUS_OK, null)
6088
} catch (e: StatusException) {
61-
status = e.getStatus().statusCode.toRaw()
62-
errorDetails = e.message
89+
notifyResult(metadata, e.getStatus().statusCode.toRaw(), e.message)
90+
} catch (e: CancellationException) {
91+
notifyResult(metadata, grpc_status_code.GRPC_STATUS_CANCELLED, e.message)
92+
throw e
6393
} catch (e: Exception) {
64-
status = grpc_status_code.GRPC_STATUS_UNAUTHENTICATED
65-
errorDetails = e.message
66-
}
67-
68-
// Convert GrpcMetadata to grpc_metadata array
69-
memScoped {
70-
val metadataArray = with(metadata) {
71-
this@memScoped.allocRawGrpcMetadata()
72-
}
73-
74-
try {
75-
// Invoke the callback with success
76-
cb?.invoke(
77-
user_data,
78-
metadataArray.metadata,
79-
metadataArray.count,
80-
status,
81-
errorDetails?.cstr?.ptr
82-
)
83-
} finally {
84-
metadataArray.destroyEntries()
85-
}
94+
notifyResult(metadata, grpc_status_code.GRPC_STATUS_UNAVAILABLE, e.message)
8695
}
8796
}
8897

@@ -95,22 +104,27 @@ private fun debugStringCallback(state: COpaquePointer?): CPointer<ByteVar>? {
95104
}
96105

97106
private fun extractAuthority(serviceUrl: String): String {
98-
// service_url format: "https://example.com:443/service"
107+
// service_url format: "://example.com:443/with.package.service"
99108
return serviceUrl
100-
.removePrefix("http://")
101-
.removePrefix("https://")
109+
.removeUntil("://")
102110
.substringBefore("/")
103111
}
104112

113+
private fun String.removeUntil(pattern: String): String {
114+
val idx = indexOf(pattern)
115+
return if (idx == -1) this else removeRange(0, idx + pattern.length)
116+
}
117+
105118
private fun destroyCallback(state: COpaquePointer?) {
106119
state?.asStableRef<CredentialsPluginState>()?.dispose()
107120
}
108121

109122
internal fun GrpcCallCredentials.createRaw(
110-
scope: CoroutineScope
123+
parentJob: Job,
124+
coroutineDispatcher: CoroutineDispatcher
111125
): CPointer<grpc_call_credentials>? = memScoped {
112-
// Create stable reference to keep Kotlin object alive
113-
val pluginState = CredentialsPluginState(this@createRaw, scope)
126+
// Create a stable reference to keep the Kotlin object alive
127+
val pluginState = CredentialsPluginState(this@createRaw, parentJob, coroutineDispatcher)
114128
val stableRef = StableRef.create(pluginState)
115129

116130
// Create plugin structure

grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/GrpcChannel.native.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
package kotlinx.rpc.grpc.client.internal
66

7+
import kotlinx.coroutines.CoroutineScope
8+
import kotlinx.rpc.grpc.GrpcMetadata
79
import kotlinx.rpc.grpc.client.GrpcCallOptions
810
import kotlinx.rpc.grpc.descriptor.MethodDescriptor
911
import kotlinx.rpc.internal.utils.InternalRpcApi

grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/ManagedChannel.native.kt

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,23 +32,21 @@ public actual abstract class ManagedChannelBuilder<T : ManagedChannelBuilder<T>>
3232

3333
internal class NativeManagedChannelBuilder(
3434
private val target: String,
35-
private var credentials: Lazy<ClientCredentials>,
35+
private val credentials: Lazy<ClientCredentials>,
3636
) : ManagedChannelBuilder<NativeManagedChannelBuilder>() {
3737
fun buildChannel(): NativeManagedChannel {
3838
val keepAlive = config?.keepAlive
3939
keepAlive?.run {
4040
require(time.isPositive()) { "keepalive time must be positive" }
4141
require(timeout.isPositive()) { "keepalive timeout must be positive" }
4242
}
43+
4344
return NativeManagedChannel(
4445
target,
45-
authority = config?.overrideAuthority,
46+
overrideAuthority = config?.overrideAuthority,
4647
keepAlive = config?.keepAlive,
47-
rawChannelCredentials = credentials.value.clientCredentials.takeRaw(),
48-
rawCallCredentials = credentials.value.clientCredentials.callCredentials?.createRaw(
49-
// TODO: Replace GlobalScope with something more appropriate.
50-
GlobalScope,
51-
)
48+
clientCredentials = credentials.value.clientCredentials,
49+
callCredentials = credentials.value.callCredentials
5250
)
5351
}
5452

grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/NativeClientCall.kt

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ import kotlinx.cinterop.toKString
2323
import kotlinx.cinterop.value
2424
import kotlinx.coroutines.CancellationException
2525
import kotlinx.coroutines.CompletableJob
26+
import kotlinx.coroutines.CoroutineScope
27+
import kotlinx.coroutines.Dispatchers
28+
import kotlinx.coroutines.Job
29+
import kotlinx.coroutines.coroutineScope
30+
import kotlinx.coroutines.launch
2631
import kotlinx.rpc.grpc.GrpcMetadata
2732
import kotlinx.rpc.grpc.Status
2833
import kotlinx.rpc.grpc.StatusCode
@@ -38,7 +43,14 @@ import kotlinx.rpc.grpc.internal.toKotlin
3843
import kotlinx.rpc.protobuf.input.stream.asInputStream
3944
import kotlinx.rpc.protobuf.input.stream.asSource
4045
import kotlinx.rpc.grpc.GrpcCompression
46+
import kotlinx.rpc.grpc.StatusException
47+
import kotlinx.rpc.grpc.client.EmptyCallCredentials
48+
import kotlinx.rpc.grpc.client.GrpcCallCredentials
4149
import kotlinx.rpc.grpc.client.GrpcCallOptions
50+
import kotlinx.rpc.grpc.client.createRaw
51+
import kotlinx.rpc.grpc.internal.toRaw
52+
import kotlinx.rpc.grpc.merge
53+
import kotlinx.rpc.grpc.statusCode
4254
import libkgrpc.GRPC_OP_RECV_INITIAL_METADATA
4355
import libkgrpc.GRPC_OP_RECV_MESSAGE
4456
import libkgrpc.GRPC_OP_RECV_STATUS_ON_CLIENT
@@ -49,8 +61,11 @@ import libkgrpc.gpr_free
4961
import libkgrpc.grpc_byte_buffer
5062
import libkgrpc.grpc_byte_buffer_destroy
5163
import libkgrpc.grpc_call_cancel_with_status
64+
import libkgrpc.grpc_call_credentials_release
5265
import libkgrpc.grpc_call_error
66+
import libkgrpc.grpc_call_set_credentials
5367
import libkgrpc.grpc_call_unref
68+
import libkgrpc.grpc_channel_credentials_release
5469
import libkgrpc.grpc_metadata_array
5570
import libkgrpc.grpc_metadata_array_destroy
5671
import libkgrpc.grpc_metadata_array_init
@@ -75,6 +90,15 @@ internal class NativeClientCall<Request, Response>(
7590
grpc_call_unref(it)
7691
}
7792

93+
private val rawCallCredentials = callOptions.callCredentials.let {
94+
if (it is EmptyCallCredentials) null else it.createRaw(callJob, Dispatchers.Default)
95+
}
96+
97+
@Suppress("unused")
98+
private val rawCallCredentialsCleaner = createCleaner(rawCallCredentials) {
99+
if (it != null) grpc_call_credentials_release(it)
100+
}
101+
78102
init {
79103
// cancel the call if the job is canceled.
80104
callJob.invokeOnCompletion {
@@ -181,12 +205,16 @@ internal class NativeClientCall<Request, Response>(
181205

182206
listener = responseListener
183207

208+
// attach call credentials to the call.
209+
if (rawCallCredentials != null) {
210+
grpc_call_set_credentials(raw, rawCallCredentials)
211+
}
212+
184213
// start receiving the status from the completion queue,
185214
// which is bound to the lifetime of the call.
186215
val success = startRecvStatus()
187216
if (!success) return
188217

189-
// send and receive initial headers to/from the server
190218
sendAndReceiveInitialMetadata(headers)
191219
}
192220

@@ -473,4 +501,21 @@ internal class NativeClientCall<Request, Response>(
473501
cancel(cancelMsg, e)
474502
}
475503
}
504+
505+
private fun Job.cancelCallOnFailure() {
506+
invokeOnCompletion { cause ->
507+
when (cause) {
508+
is CancellationException -> {
509+
cancelInternal(grpc_status_code.GRPC_STATUS_CANCELLED, "Call cancelled")
510+
}
511+
is StatusException -> {
512+
cancelInternal(cause.getStatus().statusCode.toRaw(), cause.getStatus().getDescription() ?: "StatusException")
513+
}
514+
is Exception -> {
515+
cancelInternal(grpc_status_code.GRPC_STATUS_INTERNAL, "Call failed: ${cause.message}")
516+
}
517+
}
518+
}
519+
}
520+
476521
}

0 commit comments

Comments
 (0)