Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions bindings/kafka/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ authenticationProfiles:
example: |
{"cluster":"kafka","poolid":"kafkapool"}
type: string
- name: oidcKid
type: string
required: false
description: |
The JWT key ID (kid) to use for the client assertion.
example: '"1234567890"'
- title: "SASL Authentication"
description: |
Authenticate using SASL.
Expand Down
1 change: 1 addition & 0 deletions common/component/kafka/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type KafkaMetadata struct {
OidcClientAssertionKey string `mapstructure:"oidcClientAssertionKey"`
OidcResource string `mapstructure:"oidcResource"`
OidcAudience string `mapstructure:"oidcAudience"`
OidcKid string `mapstructure:"oidcKid"`
internalOidcScopes []string `mapstructure:"-"`
TLSDisable bool `mapstructure:"disableTls"`
TLSSkipVerify bool `mapstructure:"skipVerify"`
Expand Down
5 changes: 5 additions & 0 deletions common/component/kafka/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,11 @@ func TestMissingOidcPrivateKeyJwtValues(t *testing.T) {
meta, err = k.getKafkaMetadata(m)
require.NoError(t, err)
require.Contains(t, meta.internalOidcScopes, "openid")

m["oidcKid"] = "1234567890"
meta, err = k.getKafkaMetadata(m)
require.NoError(t, err)
require.Equal(t, "1234567890", meta.OidcKid)
}

func TestPresentSaslValues(t *testing.T) {
Expand Down
13 changes: 12 additions & 1 deletion common/component/kafka/sasl_oauthbearer_private_key_jwt.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/IBM/sarama"
"github.com/google/uuid"
"github.com/lestrrat-go/jwx/v2/jwa"
"github.com/lestrrat-go/jwx/v2/jws"
"github.com/lestrrat-go/jwx/v2/jwt"
"golang.org/x/oauth2"
)
Expand All @@ -50,6 +51,7 @@ type OAuthTokenSourcePrivateKeyJWT struct {
ClientAssertionKey string
Resource string
Audience string
Kid string
}

type tokenResponse struct {
Expand All @@ -70,6 +72,7 @@ func (m KafkaMetadata) getOAuthTokenSourcePrivateKeyJWT() *OAuthTokenSourcePriva
ClientAssertionKey: m.OidcClientAssertionKey,
Resource: m.OidcResource,
Audience: m.OidcAudience,
Kid: m.OidcKid,
}
}

Expand Down Expand Up @@ -166,7 +169,15 @@ func (ts *OAuthTokenSourcePrivateKeyJWT) Token() (*sarama.AccessToken, error) {
return nil, fmt.Errorf("failed to build token: %w", err)
}

assertion, err := jwt.Sign(token, jwt.WithKey(jwa.RS256, rsaKey))
var signOptions []jwt.Option
if ts.Kid != "" {
headers := jws.NewHeaders()
if err := headers.Set("kid", ts.Kid); err != nil {
return nil, fmt.Errorf("error setting JWT kid header: %w", err)
}
signOptions = append(signOptions, jws.WithProtectedHeaders(headers))
}
assertion, err := jwt.Sign(token, jwt.WithKey(jwa.RS256, rsaKey, signOptions...))
if err != nil {
return nil, fmt.Errorf("error signing client assertion: %w", err)
}
Expand Down
6 changes: 6 additions & 0 deletions pubsub/kafka/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ authenticationProfiles:
example: |
{"cluster":"kafka","poolid":"kafkapool"}
type: string
- name: oidcKid
type: string
required: false
description: |
The JWT key ID (kid) to use for the client assertion.
example: '"1234567890"'
- title: "SASL Authentication"
description: |
Authenticate using SASL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ spec:
value: "openid"
- name: disableTls
value: "true"
- name: oidcKid
secretKeyRef:
name: OIDC_CLIENT_KID
key: OIDC_CLIENT_KID

auth:
secretStore: envvar-secret-store
4 changes: 2 additions & 2 deletions tests/certification/pubsub/kafka/data/realm-export.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
"secret": "dapr-kafka-secret",
"clientAuthenticatorType": "client-jwt",
"attributes": {
"token.endpoint.auth.signing.alg": "RS256",
"jwt.credential.certificate": "${OIDC_CLIENT_ASSERTION_CERT_ONELINE}"
"use.jwks.url": "true",
"jwks.url": "http://jwks:80/jwks.json"
},
"enabled": true,
"protocol": "openid-connect",
Expand Down
11 changes: 11 additions & 0 deletions tests/certification/pubsub/kafka/docker-compose.auth.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,14 @@ services:
- "8080:8080"
volumes:
- ./data/realm-export.json:/opt/keycloak/data/import/local-realm.json

jwks:
image: nginx:1.27-alpine
hostname: jwks
container_name: kafka_jwks
environment:
- OIDC_CLIENT_KID
- OIDC_CLIENT_JWK_N
- OIDC_CLIENT_JWK_E
command: >
/bin/sh -c 'printf "{\n \"keys\": [\n {\n \"kty\": \"RSA\",\n \"use\": \"sig\",\n \"alg\": \"RS256\",\n \"kid\": \"%s\",\n \"n\": \"%s\",\n \"e\": \"%s\"\n }\n ]\n}\n" "$$OIDC_CLIENT_KID" "$$OIDC_CLIENT_JWK_N" "$$OIDC_CLIENT_JWK_E" > /usr/share/nginx/html/jwks.json && exec nginx -g "daemon off;"'
7 changes: 7 additions & 0 deletions tests/certification/pubsub/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"crypto/rsa"
"crypto/x509"
"crypto/x509/pkix"
"encoding/base64"
"encoding/json"
"encoding/pem"
"fmt"
Expand Down Expand Up @@ -522,6 +523,12 @@ func TestKafkaAuth(t *testing.T) {
os.Setenv("OIDC_CLIENT_ASSERTION_KEY", string(keyPEM))
os.Setenv("OIDC_CLIENT_ASSERTION_CERT_ONELINE", strings.ReplaceAll(string(certPEM), "\n", "\\n"))

modulus := key.PublicKey.N.Bytes()
os.Setenv("OIDC_CLIENT_JWK_N", base64.RawURLEncoding.EncodeToString(modulus))
exponent := big.NewInt(int64(key.PublicKey.E)).Bytes()
os.Setenv("OIDC_CLIENT_JWK_E", base64.RawURLEncoding.EncodeToString(exponent))
os.Setenv("OIDC_CLIENT_KID", uuid.New().String())

flow.New(t, "kafka authentication").
Step(dockercompose.Run(clusterNameAuth, dockerComposeYAMLAuth)).
Step("wait for broker sockets",
Expand Down
Loading