From 38ed47ff30f9913ba05b241b39ea6c1505ae9784 Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Tue, 25 Nov 2025 10:23:09 +0100 Subject: [PATCH 1/2] Add `kid` header to kafka jwt auth Signed-off-by: Albert Callarisa --- bindings/kafka/metadata.yaml | 6 ++++++ common/component/kafka/metadata.go | 1 + common/component/kafka/metadata_test.go | 5 +++++ .../kafka/sasl_oauthbearer_private_key_jwt.go | 13 ++++++++++++- pubsub/kafka/metadata.yaml | 6 ++++++ .../kafka/components/auth_oidc_certs/kafka.yaml | 4 ++++ .../pubsub/kafka/data/realm-export.json | 4 ++-- .../pubsub/kafka/docker-compose.auth.yml | 11 +++++++++++ tests/certification/pubsub/kafka/kafka_test.go | 7 +++++++ 9 files changed, 54 insertions(+), 3 deletions(-) diff --git a/bindings/kafka/metadata.yaml b/bindings/kafka/metadata.yaml index c04ab9a3e4..79ddc7bd8c 100644 --- a/bindings/kafka/metadata.yaml +++ b/bindings/kafka/metadata.yaml @@ -187,6 +187,12 @@ authenticationProfiles: example: | {"cluster":"kafka","poolid":"kafkapool"} type: string + - name: oidcKid + type: string + required: false + description: | + The JWT key ID (kid) to use for the client assertion. + example: '"1234567890"' - title: "SASL Authentication" description: | Authenticate using SASL. diff --git a/common/component/kafka/metadata.go b/common/component/kafka/metadata.go index 2e8def21f2..9fa87fb45e 100644 --- a/common/component/kafka/metadata.go +++ b/common/component/kafka/metadata.go @@ -86,6 +86,7 @@ type KafkaMetadata struct { OidcClientAssertionKey string `mapstructure:"oidcClientAssertionKey"` OidcResource string `mapstructure:"oidcResource"` OidcAudience string `mapstructure:"oidcAudience"` + OidcKid string `mapstructure:"oidcKid"` internalOidcScopes []string `mapstructure:"-"` TLSDisable bool `mapstructure:"disableTls"` TLSSkipVerify bool `mapstructure:"skipVerify"` diff --git a/common/component/kafka/metadata_test.go b/common/component/kafka/metadata_test.go index 9dfd6edaf9..f1c49a2c1c 100644 --- a/common/component/kafka/metadata_test.go +++ b/common/component/kafka/metadata_test.go @@ -273,6 +273,11 @@ func TestMissingOidcPrivateKeyJwtValues(t *testing.T) { meta, err = k.getKafkaMetadata(m) require.NoError(t, err) require.Contains(t, meta.internalOidcScopes, "openid") + + m["oidcKid"] = "1234567890" + meta, err = k.getKafkaMetadata(m) + require.NoError(t, err) + require.Equal(t, "1234567890", meta.OidcKid) } func TestPresentSaslValues(t *testing.T) { diff --git a/common/component/kafka/sasl_oauthbearer_private_key_jwt.go b/common/component/kafka/sasl_oauthbearer_private_key_jwt.go index 6f5c6b092c..97df1d2548 100644 --- a/common/component/kafka/sasl_oauthbearer_private_key_jwt.go +++ b/common/component/kafka/sasl_oauthbearer_private_key_jwt.go @@ -31,6 +31,7 @@ import ( "github.com/IBM/sarama" "github.com/google/uuid" "github.com/lestrrat-go/jwx/v2/jwa" + "github.com/lestrrat-go/jwx/v2/jws" "github.com/lestrrat-go/jwx/v2/jwt" "golang.org/x/oauth2" ) @@ -50,6 +51,7 @@ type OAuthTokenSourcePrivateKeyJWT struct { ClientAssertionKey string Resource string Audience string + Kid string } type tokenResponse struct { @@ -70,6 +72,7 @@ func (m KafkaMetadata) getOAuthTokenSourcePrivateKeyJWT() *OAuthTokenSourcePriva ClientAssertionKey: m.OidcClientAssertionKey, Resource: m.OidcResource, Audience: m.OidcAudience, + Kid: m.OidcKid, } } @@ -166,7 +169,15 @@ func (ts *OAuthTokenSourcePrivateKeyJWT) Token() (*sarama.AccessToken, error) { return nil, fmt.Errorf("failed to build token: %w", err) } - assertion, err := jwt.Sign(token, jwt.WithKey(jwa.RS256, rsaKey)) + var signOptions []jwt.Option + if ts.Kid != "" { + headers := jws.NewHeaders() + if err := headers.Set("kid", ts.Kid); err != nil { + return nil, fmt.Errorf("error setting JWT kid header: %w", err) + } + signOptions = append(signOptions, jws.WithProtectedHeaders(headers)) + } + assertion, err := jwt.Sign(token, jwt.WithKey(jwa.RS256, rsaKey, signOptions...)) if err != nil { return nil, fmt.Errorf("error signing client assertion: %w", err) } diff --git a/pubsub/kafka/metadata.yaml b/pubsub/kafka/metadata.yaml index 2c08d3e56c..1f1d298e48 100644 --- a/pubsub/kafka/metadata.yaml +++ b/pubsub/kafka/metadata.yaml @@ -181,6 +181,12 @@ authenticationProfiles: example: | {"cluster":"kafka","poolid":"kafkapool"} type: string + - name: oidcKid + type: string + required: false + description: | + The JWT key ID (kid) to use for the client assertion. + example: '"1234567890"' - title: "SASL Authentication" description: | Authenticate using SASL. diff --git a/tests/certification/pubsub/kafka/components/auth_oidc_certs/kafka.yaml b/tests/certification/pubsub/kafka/components/auth_oidc_certs/kafka.yaml index 0d662c3a14..37237e0999 100644 --- a/tests/certification/pubsub/kafka/components/auth_oidc_certs/kafka.yaml +++ b/tests/certification/pubsub/kafka/components/auth_oidc_certs/kafka.yaml @@ -28,6 +28,10 @@ spec: value: "openid" - name: disableTls value: "true" + - name: oidcKid + secretKeyRef: + name: OIDC_CLIENT_KID + key: OIDC_CLIENT_KID auth: secretStore: envvar-secret-store diff --git a/tests/certification/pubsub/kafka/data/realm-export.json b/tests/certification/pubsub/kafka/data/realm-export.json index 5e310f0958..bf87a12009 100644 --- a/tests/certification/pubsub/kafka/data/realm-export.json +++ b/tests/certification/pubsub/kafka/data/realm-export.json @@ -8,8 +8,8 @@ "secret": "dapr-kafka-secret", "clientAuthenticatorType": "client-jwt", "attributes": { - "token.endpoint.auth.signing.alg": "RS256", - "jwt.credential.certificate": "${OIDC_CLIENT_ASSERTION_CERT_ONELINE}" + "use.jwks.url": "true", + "jwks.url": "http://jwks:80/jwks.json" }, "enabled": true, "protocol": "openid-connect", diff --git a/tests/certification/pubsub/kafka/docker-compose.auth.yml b/tests/certification/pubsub/kafka/docker-compose.auth.yml index fc34a53ee9..41bf1a36dc 100644 --- a/tests/certification/pubsub/kafka/docker-compose.auth.yml +++ b/tests/certification/pubsub/kafka/docker-compose.auth.yml @@ -60,3 +60,14 @@ services: - "8080:8080" volumes: - ./data/realm-export.json:/opt/keycloak/data/import/local-realm.json + + jwks: + image: nginx:1.27-alpine + hostname: jwks + container_name: kafka_jwks + environment: + - OIDC_CLIENT_KID + - OIDC_CLIENT_JWK_N + - OIDC_CLIENT_JWK_E + command: > + /bin/sh -c 'printf "{\n \"keys\": [\n {\n \"kty\": \"RSA\",\n \"use\": \"sig\",\n \"alg\": \"RS256\",\n \"kid\": \"%s\",\n \"n\": \"%s\",\n \"e\": \"%s\"\n }\n ]\n}\n" "$$OIDC_CLIENT_KID" "$$OIDC_CLIENT_JWK_N" "$$OIDC_CLIENT_JWK_E" > /usr/share/nginx/html/jwks.json && exec nginx -g "daemon off;"' diff --git a/tests/certification/pubsub/kafka/kafka_test.go b/tests/certification/pubsub/kafka/kafka_test.go index a0cb1c51bf..6344852cae 100644 --- a/tests/certification/pubsub/kafka/kafka_test.go +++ b/tests/certification/pubsub/kafka/kafka_test.go @@ -19,6 +19,7 @@ import ( "crypto/rsa" "crypto/x509" "crypto/x509/pkix" + "encoding/base64" "encoding/json" "encoding/pem" "fmt" @@ -522,6 +523,12 @@ func TestKafkaAuth(t *testing.T) { os.Setenv("OIDC_CLIENT_ASSERTION_KEY", string(keyPEM)) os.Setenv("OIDC_CLIENT_ASSERTION_CERT_ONELINE", strings.ReplaceAll(string(certPEM), "\n", "\\n")) + modulus := key.PublicKey.N.Bytes() + os.Setenv("OIDC_CLIENT_JWK_N", base64.RawURLEncoding.EncodeToString(modulus)) + exponent := big.NewInt(int64(key.PublicKey.E)).Bytes() + os.Setenv("OIDC_CLIENT_JWK_E", base64.RawURLEncoding.EncodeToString(exponent)) + os.Setenv("OIDC_CLIENT_KID", uuid.New().String()) + flow.New(t, "kafka authentication"). Step(dockercompose.Run(clusterNameAuth, dockerComposeYAMLAuth)). Step("wait for broker sockets", From 734457713898ab38749571ee9169aee5bb485157 Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Thu, 27 Nov 2025 12:35:56 +0100 Subject: [PATCH 2/2] Update common/component/kafka/sasl_oauthbearer_private_key_jwt.go Co-authored-by: Mike Nguyen Signed-off-by: Albert Callarisa --- common/component/kafka/sasl_oauthbearer_private_key_jwt.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/component/kafka/sasl_oauthbearer_private_key_jwt.go b/common/component/kafka/sasl_oauthbearer_private_key_jwt.go index 97df1d2548..f7480c30d0 100644 --- a/common/component/kafka/sasl_oauthbearer_private_key_jwt.go +++ b/common/component/kafka/sasl_oauthbearer_private_key_jwt.go @@ -172,7 +172,7 @@ func (ts *OAuthTokenSourcePrivateKeyJWT) Token() (*sarama.AccessToken, error) { var signOptions []jwt.Option if ts.Kid != "" { headers := jws.NewHeaders() - if err := headers.Set("kid", ts.Kid); err != nil { + if err = headers.Set("kid", ts.Kid); err != nil { return nil, fmt.Errorf("error setting JWT kid header: %w", err) } signOptions = append(signOptions, jws.WithProtectedHeaders(headers))