Skip to content

Commit 68e334a

Browse files
committed
feat: Support AVRO with Schema Registry
feat: bump confluent-kafka-go/v2
1 parent 19418e6 commit 68e334a

File tree

513 files changed

+84843
-11405
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

513 files changed

+84843
-11405
lines changed

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,11 @@ Prometheus-kafka-adapter listens for metrics coming from Prometheus and sends th
4444
- `KAFKA_BATCH_NUM_MESSAGES`: defines the number of messages to batch write, defaults to `10000`.
4545
- `KAFKA_BATCH_SIZE`: Maximum size (in bytes) of all messages batched in one MessageSet, including protocol framing overhead, defaults to `1000000`.
4646
- `KAFKA_LINGER_MS`: Delay in milliseconds to wait for messages in the producer queue to accumulate before constructing message batches, defaults to `5`.
47-
- `SERIALIZATION_FORMAT`: defines the serialization format, can be `json`, `avro-json`, defaults to `json`.
47+
- `SERIALIZATION_FORMAT`: defines the serialization format, can be `json`, `avro-json`, `avro-schema-registry`, defaults to `json`.
48+
- `SCHEMA_REGISTRY_URL`: defines the schema registry url to be used, only used if `SERIALIZATION_FORMAT=avro-schema-registry`.
49+
- `SCHEMA_REGISTRY_USERNAME`: defines the schema registry username to be used, only used if `SERIALIZATION_FORMAT=avro-schema-registry`.
50+
- `SCHEMA_REGISTRY_PASSWORD`: defines the schema registry password to be used, only used if `SERIALIZATION_FORMAT=avro-schema-registry`.
51+
- `SCHEMA_REGISTRY_AUTO_REGISTRY_SCHEMAS`: defines the schema registry auto registry schema, only used if `SERIALIZATION_FORMAT=avro-schema-registry`, defaults to `false`.
4852
- `PORT`: defines http port to listen, defaults to `8080`, used directly by [gin](https://github.com/gin-gonic/gin).
4953
- `BASIC_AUTH_USERNAME`: basic auth username to be used for receive endpoint, defaults is no basic auth.
5054
- `BASIC_AUTH_PASSWORD`: basic auth password to be used for receive endpoint, defaults is no basic auth.

config.go

Lines changed: 46 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package main
1717
import (
1818
"fmt"
1919
"os"
20+
"strconv"
2021
"strings"
2122
"text/template"
2223

@@ -28,27 +29,31 @@ import (
2829
)
2930

3031
var (
31-
kafkaBrokerList = "kafka:9092"
32-
kafkaTopic = "metrics"
33-
topicTemplate *template.Template
34-
match = make(map[string]*dto.MetricFamily, 0)
35-
basicauth = false
36-
basicauthUsername = ""
37-
basicauthPassword = ""
38-
kafkaCompression = "none"
39-
kafkaBatchNumMessages = "10000"
40-
kafkaBatchSize = "1000000"
41-
kafkaLingerMs = "5"
42-
kafkaSslClientCertFile = ""
43-
kafkaSslClientKeyFile = ""
44-
kafkaSslClientKeyPass = ""
45-
kafkaSslCACertFile = ""
46-
kafkaSecurityProtocol = ""
47-
kafkaSaslMechanism = ""
48-
kafkaSaslUsername = ""
49-
kafkaSaslPassword = ""
50-
serializer Serializer
51-
kafkaAcks = "all"
32+
kafkaBrokerList = "kafka:9092"
33+
kafkaTopic = "metrics"
34+
topicTemplate *template.Template
35+
match = make(map[string]*dto.MetricFamily, 0)
36+
basicauth = false
37+
basicauthUsername = ""
38+
basicauthPassword = ""
39+
kafkaCompression = "none"
40+
kafkaBatchNumMessages = "10000"
41+
kafkaBatchSize = "1000000"
42+
kafkaLingerMs = "5"
43+
kafkaSslClientCertFile = ""
44+
kafkaSslClientKeyFile = ""
45+
kafkaSslClientKeyPass = ""
46+
kafkaSslCACertFile = ""
47+
kafkaSecurityProtocol = ""
48+
kafkaSaslMechanism = ""
49+
kafkaSaslUsername = ""
50+
kafkaSaslPassword = ""
51+
serializer Serializer
52+
kafkaAcks = "all"
53+
schemaRegistryUrl = ""
54+
schemaRegistryUsername = ""
55+
schemaRegistryPassword = ""
56+
schemaRegistryAutoRegisterSchemas = false
5257
)
5358

5459
func init() {
@@ -135,6 +140,24 @@ func init() {
135140
match = matchList
136141
}
137142

143+
if value := os.Getenv("SCHEMA_REGISTRY_URL"); value != "" {
144+
schemaRegistryUrl = value
145+
}
146+
if value := os.Getenv("SCHEMA_REGISTRY_USERNAME"); value != "" {
147+
schemaRegistryUsername = value
148+
}
149+
if value := os.Getenv("SCHEMA_REGISTRY_PASSWORD"); value != "" {
150+
schemaRegistryPassword = value
151+
}
152+
if value := os.Getenv("SCHEMA_REGISTRY_AUTO_REGISTRY_SCHEMAS"); value != "" {
153+
v, err := strconv.ParseBool(value)
154+
if err != nil {
155+
logrus.WithError(err).Fatalln("couldn't parse SCHEMA_REGISTRY_AUTO_REGISTRY_SCHEMAS to bool, using false")
156+
v = false
157+
}
158+
schemaRegistryAutoRegisterSchemas = v
159+
}
160+
138161
var err error
139162
serializer, err = parseSerializationFormat(os.Getenv("SERIALIZATION_FORMAT"))
140163
if err != nil {
@@ -185,6 +208,8 @@ func parseSerializationFormat(value string) (Serializer, error) {
185208
return NewJSONSerializer()
186209
case "avro-json":
187210
return NewAvroJSONSerializer("schemas/metric.avsc")
211+
case "avro-schema-registry":
212+
return NewAvroSchemaRegistrySerializer(schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword)
188213
default:
189214
logrus.WithField("serialization-format-value", value).Warningln("invalid serialization format, using json")
190215
return NewJSONSerializer()

go.mod

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,18 @@ module github.com/Telefonica/prometheus-kafka-adapter
33
go 1.22.3
44

55
require (
6-
github.com/confluentinc/confluent-kafka-go v1.9.2
6+
github.com/confluentinc/confluent-kafka-go/v2 v2.6.1
77
github.com/gin-gonic/contrib v0.0.0-20240508051311-c1c6bf0061b0
88
github.com/gin-gonic/gin v1.10.0
99
github.com/gogo/protobuf v1.3.2
1010
github.com/golang/snappy v0.0.4
1111
github.com/linkedin/goavro v2.1.0+incompatible
12-
github.com/prometheus/client_golang v1.19.1
12+
github.com/prometheus/client_golang v1.20.5
1313
github.com/prometheus/client_model v0.6.1
14-
github.com/prometheus/common v0.53.0
15-
github.com/prometheus/prometheus v0.52.1
14+
github.com/prometheus/common v0.61.0
15+
github.com/prometheus/prometheus v0.300.1
1616
github.com/sirupsen/logrus v1.9.3
17-
github.com/stretchr/testify v1.9.0
17+
github.com/stretchr/testify v1.10.0
1818
gopkg.in/yaml.v2 v2.4.0
1919
)
2020

@@ -32,23 +32,30 @@ require (
3232
github.com/go-playground/universal-translator v0.18.1 // indirect
3333
github.com/go-playground/validator/v10 v10.20.0 // indirect
3434
github.com/goccy/go-json v0.10.2 // indirect
35+
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect
36+
github.com/hamba/avro/v2 v2.24.0 // indirect
3537
github.com/json-iterator/go v1.1.12 // indirect
38+
github.com/klauspost/compress v1.17.10 // indirect
3639
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
40+
github.com/kr/text v0.2.0 // indirect
3741
github.com/leodido/go-urn v1.4.0 // indirect
3842
github.com/mattn/go-isatty v0.0.20 // indirect
43+
github.com/mitchellh/mapstructure v1.5.0 // indirect
44+
github.com/moby/sys/userns v0.1.0 // indirect
3945
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
4046
github.com/modern-go/reflect2 v1.0.2 // indirect
47+
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
4148
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
4249
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
43-
github.com/prometheus/procfs v0.12.0 // indirect
50+
github.com/prometheus/procfs v0.15.1 // indirect
4451
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
4552
github.com/ugorji/go/codec v1.2.12 // indirect
4653
golang.org/x/arch v0.8.0 // indirect
47-
golang.org/x/crypto v0.23.0 // indirect
48-
golang.org/x/net v0.25.0 // indirect
49-
golang.org/x/sys v0.20.0 // indirect
50-
golang.org/x/text v0.15.0 // indirect
51-
google.golang.org/protobuf v1.34.1 // indirect
54+
golang.org/x/crypto v0.30.0 // indirect
55+
golang.org/x/net v0.32.0 // indirect
56+
golang.org/x/sys v0.28.0 // indirect
57+
golang.org/x/text v0.21.0 // indirect
58+
google.golang.org/protobuf v1.35.2 // indirect
5259
gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect
5360
gopkg.in/yaml.v3 v3.0.1 // indirect
5461
)

0 commit comments

Comments
 (0)