Skip to content

Commit 19418e6

Browse files
committed
feat: Kafka producer config, ability to set batch.size
feat: handling producer events
1 parent 982925c commit 19418e6

File tree

4 files changed

+40
-13
lines changed

4 files changed

+40
-13
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ Prometheus-kafka-adapter listens for metrics coming from Prometheus and sends th
4242
- `KAFKA_TOPIC`: defines kafka topic to be used, defaults to `metrics`. Could use go template, labels are passed (as a map) to the template: e.g: `metrics.{{ index . "__name__" }}` to use per-metric topic. Two template functions are available: replace (`{{ index . "__name__" | replace "message" "msg" }}`) and substring (`{{ index . "__name__" | substring 0 5 }}`)
4343
- `KAFKA_COMPRESSION`: defines the compression type to be used, defaults to `none`.
4444
- `KAFKA_BATCH_NUM_MESSAGES`: defines the number of messages to batch write, defaults to `10000`.
45+
- `KAFKA_BATCH_SIZE`: Maximum size (in bytes) of all messages batched in one MessageSet, including protocol framing overhead, defaults to `1000000`.
4546
- `KAFKA_LINGER_MS`: Delay in milliseconds to wait for messages in the producer queue to accumulate before constructing message batches, defaults to `5`.
4647
- `SERIALIZATION_FORMAT`: defines the serialization format, can be `json`, `avro-json`, defaults to `json`.
4748
- `PORT`: defines http port to listen, defaults to `8080`, used directly by [gin](https://github.com/gin-gonic/gin).

config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ var (
3737
basicauthPassword = ""
3838
kafkaCompression = "none"
3939
kafkaBatchNumMessages = "10000"
40+
kafkaBatchSize = "1000000"
4041
kafkaLingerMs = "5"
4142
kafkaSslClientCertFile = ""
4243
kafkaSslClientKeyFile = ""
@@ -83,6 +84,10 @@ func init() {
8384
kafkaBatchNumMessages = value
8485
}
8586

87+
if value := os.Getenv("KAFKA_BATCH_SIZE"); value != "" {
88+
kafkaBatchSize = value
89+
}
90+
8691
if value := os.Getenv("KAFKA_LINGER_MS"); value != "" {
8792
kafkaLingerMs = value
8893
}

handlers.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,34 @@ func receiveHandler(producer *kafka.Producer, serializer Serializer) func(c *gin
7676
Value: metric,
7777
}, nil)
7878

79+
go func() {
80+
for event := range producer.Events() {
81+
switch ev := event.(type) {
82+
case *kafka.Message:
83+
message := ev
84+
if message.TopicPartition.Error != nil {
85+
logrus.WithError(message.TopicPartition.Error).Errorf("failed to deliver message: %v",
86+
message.TopicPartition)
87+
} else {
88+
logrus.Debugf("delivered to topic %s [%d] at offset %v",
89+
*message.TopicPartition.Topic,
90+
message.TopicPartition.Partition,
91+
message.TopicPartition.Offset)
92+
}
93+
case kafka.Error:
94+
logrus.WithError(err).Errorf("Error: %v", ev)
95+
default:
96+
logrus.Infof("Ignored event: %s", ev)
97+
}
98+
}
99+
}()
100+
79101
if err != nil {
80102
if err.(kafka.Error).Code() == kafka.ErrQueueFull {
81-
// Producer queue is full, wait 1s for messages
82-
// to be delivered then try again.
83-
logrus.Info("producer queue is full, waiting 1s")
103+
// Producer queue is full, wait 1s for messages to delivered
104+
// Maybe we should fail fast? As we are losing data...
105+
logrus.Warning("producer queue is full, waiting 1s")
84106
time.Sleep(time.Second)
85-
continue
86107
}
87108

88109
objectsFailed.Add(float64(1))

main.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,15 @@ func main() {
2828
logrus.Info("creating kafka producer")
2929

3030
kafkaConfig := kafka.ConfigMap{
31-
"bootstrap.servers": kafkaBrokerList,
32-
"compression.codec": kafkaCompression,
33-
"batch.num.messages": kafkaBatchNumMessages,
34-
"queue.buffering.max.messages": kafkaBatchNumMessages,
35-
"enable.idempotence": true,
36-
"linger.ms": kafkaLingerMs,
37-
"go.batch.producer": true, // Enable batch producer (for increased performance).
38-
"go.delivery.reports": false, // per-message delivery reports to the Events() channel
39-
"acks": kafkaAcks,
31+
"bootstrap.servers": kafkaBrokerList,
32+
"compression.codec": kafkaCompression,
33+
"batch.num.messages": kafkaBatchNumMessages,
34+
"batch.size": kafkaBatchSize,
35+
"linger.ms": kafkaLingerMs,
36+
"go.batch.producer": true, // Enable batch producer (for increased performance).
37+
"go.delivery.reports": true, // per-message delivery reports to the Events() channel
38+
"go.logs.channel.enable": true,
39+
"acks": kafkaAcks,
4040
}
4141

4242
if kafkaSslClientCertFile != "" && kafkaSslClientKeyFile != "" && kafkaSslCACertFile != "" {

0 commit comments

Comments
 (0)