Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ Prometheus-kafka-adapter listens for metrics coming from Prometheus and sends th
- `KAFKA_TOPIC`: defines kafka topic to be used, defaults to `metrics`. Could use go template, labels are passed (as a map) to the template: e.g: `metrics.{{ index . "__name__" }}` to use per-metric topic. Two template functions are available: replace (`{{ index . "__name__" | replace "message" "msg" }}`) and substring (`{{ index . "__name__" | substring 0 5 }}`)
- `KAFKA_COMPRESSION`: defines the compression type to be used, defaults to `none`.
- `KAFKA_BATCH_NUM_MESSAGES`: defines the number of messages to batch write, defaults to `10000`.
- `KAFKA_BATCH_SIZE`: Maximum size (in bytes) of all messages batched in one MessageSet, including protocol framing overhead, defaults to `1000000`.
- `KAFKA_LINGER_MS`: Delay in milliseconds to wait for messages in the producer queue to accumulate before constructing message batches, defaults to `5`.
- `SERIALIZATION_FORMAT`: defines the serialization format, can be `json`, `avro-json`, defaults to `json`.
- `PORT`: defines http port to listen, defaults to `8080`, used directly by [gin](https://github.com/gin-gonic/gin).
- `BASIC_AUTH_USERNAME`: basic auth username to be used for receive endpoint, defaults is no basic auth.
Expand Down
12 changes: 11 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ var (
basicauthPassword = ""
kafkaCompression = "none"
kafkaBatchNumMessages = "10000"
kafkaBatchSize = "1000000"
kafkaLingerMs = "5"
kafkaSslClientCertFile = ""
kafkaSslClientKeyFile = ""
kafkaSslClientKeyPass = ""
Expand Down Expand Up @@ -82,6 +84,14 @@ func init() {
kafkaBatchNumMessages = value
}

if value := os.Getenv("KAFKA_BATCH_SIZE"); value != "" {
kafkaBatchSize = value
}

if value := os.Getenv("KAFKA_LINGER_MS"); value != "" {
kafkaLingerMs = value
}

if value := os.Getenv("KAFKA_SSL_CLIENT_CERT_FILE"); value != "" {
kafkaSslClientCertFile = value
}
Expand All @@ -99,7 +109,7 @@ func init() {
}

if value := os.Getenv("KAFKA_SECURITY_PROTOCOL"); value != "" {
kafkaSecurityProtocol = strings.ToLower(value)
kafkaSecurityProtocol = value
}

if value := os.Getenv("KAFKA_SASL_MECHANISM"); value != "" {
Expand Down
30 changes: 30 additions & 0 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"time"

"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -75,7 +76,36 @@ func receiveHandler(producer *kafka.Producer, serializer Serializer) func(c *gin
Value: metric,
}, nil)

go func() {
for event := range producer.Events() {
switch ev := event.(type) {
case *kafka.Message:
message := ev
if message.TopicPartition.Error != nil {
logrus.WithError(message.TopicPartition.Error).Errorf("failed to deliver message: %v",
message.TopicPartition)
} else {
logrus.Debugf("delivered to topic %s [%d] at offset %v",
*message.TopicPartition.Topic,
message.TopicPartition.Partition,
message.TopicPartition.Offset)
}
case kafka.Error:
logrus.WithError(err).Errorf("Error: %v", ev)
default:
logrus.Infof("Ignored event: %s", ev)
}
}
}()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function that will increase goroutines when server receives requests should move to somewhere such as main().

if err != nil {
if err.(kafka.Error).Code() == kafka.ErrQueueFull {
// Producer queue is full, wait 1s for messages to delivered
// Maybe we should fail fast? As we are losing data...
logrus.Warning("producer queue is full, waiting 1s")
time.Sleep(time.Second)
}

objectsFailed.Add(float64(1))
c.AbortWithStatus(http.StatusInternalServerError)
logrus.WithError(err).Debug(fmt.Sprintf("Failing metric %v", metric))
Expand Down
19 changes: 11 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,23 @@ func main() {
logrus.Info("creating kafka producer")

kafkaConfig := kafka.ConfigMap{
"bootstrap.servers": kafkaBrokerList,
"compression.codec": kafkaCompression,
"batch.num.messages": kafkaBatchNumMessages,
"go.batch.producer": true, // Enable batch producer (for increased performance).
"go.delivery.reports": false, // per-message delivery reports to the Events() channel
"acks": kafkaAcks,
"bootstrap.servers": kafkaBrokerList,
"compression.codec": kafkaCompression,
"batch.num.messages": kafkaBatchNumMessages,
"batch.size": kafkaBatchSize,
"linger.ms": kafkaLingerMs,
"go.batch.producer": true, // Enable batch producer (for increased performance).
"go.delivery.reports": true, // per-message delivery reports to the Events() channel
"go.logs.channel.enable": true,
"acks": kafkaAcks,
}

if kafkaSslClientCertFile != "" && kafkaSslClientKeyFile != "" && kafkaSslCACertFile != "" {
if kafkaSecurityProtocol == "" {
kafkaSecurityProtocol = "ssl"
}

if kafkaSecurityProtocol != "ssl" && kafkaSecurityProtocol != "sasl_ssl" {
if kafkaSecurityProtocol != "ssl" && kafkaSecurityProtocol != "SASL_SSL" {
logrus.Fatal("invalid config: kafka security protocol is not ssl based but ssl config is provided")
}

Expand All @@ -53,7 +56,7 @@ func main() {
}

if kafkaSaslMechanism != "" && kafkaSaslUsername != "" && kafkaSaslPassword != "" {
if kafkaSecurityProtocol != "sasl_ssl" && kafkaSecurityProtocol != "sasl_plaintext" {
if kafkaSecurityProtocol != "SASL_SSL" && kafkaSecurityProtocol != "SASL_PLAINTEXT" {
logrus.Fatal("invalid config: kafka security protocol is not sasl based but sasl config is provided")
}

Expand Down