diff --git a/.gitignore b/.gitignore index ce6763fa..4617838a 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,4 @@ prometheus-kafka-adapter-libc prometheus-kafka-adapter-musl prometheus-kafka-adapter +mycode.go.txt diff --git a/README.md b/README.md index b5b93237..99ad4f75 100644 --- a/README.md +++ b/README.md @@ -6,8 +6,10 @@ Prometheus-kafka-adapter is a service which receives [Prometheus](https://github ## output -It is able to write JSON or Avro-JSON messages in a kafka topic, depending on the `SERIALIZATION_FORMAT` configuration variable. +It is able to write JSON or Avro-JSON messages in a kafka topic, depending on the `SERIALIZATION_FORMAT` configuration variable. +Metric metadata can be included in the JSON output, if the `PROM_METADATA_ENDPOINT` is set to correct API endpoint of the prometheus service, eg http://localhost:9090/api/v1/metadata. +Only the metrics which are available at application startup will have the metadata included. To inlcude metadata for newer metrics, the application will need to be restarted ### JSON ```json @@ -15,12 +17,14 @@ It is able to write JSON or Avro-JSON messages in a kafka topic, depending on th "timestamp": "1970-01-01T00:00:00Z", "value": "9876543210", "name": "up", - "labels": { "__name__": "up", "label1": "value1", "label2": "value2" - } + }, + "type": "type", + "help": "help", + "unit": "unit" } ``` @@ -38,6 +42,8 @@ There is a docker image `telefonica/prometheus-kafka-adapter:1.8.0` [available o Prometheus-kafka-adapter listens for metrics coming from Prometheus and sends them to Kafka. This behaviour can be configured with the following environment variables: +- `PROM_METADATA_ENDPOINT`: defines prometheus metric metadata endpoint , not set by default and hence metadata wont be included. +- `INLCUDED_METADATA`: specifies which attributes to be exported. The attributes should be comma separated. Permitted values are _type_, _help_ and _unit_. Only _type_ is included by default - `KAFKA_BROKER_LIST`: defines kafka endpoint and port, defaults to `kafka:9092`. - `KAFKA_TOPIC`: defines kafka topic to be used, defaults to `metrics`. Could use go template, labels are passed (as a map) to the template: e.g: `metrics.{{ index . "__name__" }}` to use per-metric topic. Two template functions are available: replace (`{{ index . "__name__" | replace "message" "msg" }}`) and substring (`{{ index . "__name__" | substring 0 5 }}`) - `KAFKA_COMPRESSION`: defines the compression type to be used, defaults to `none`. diff --git a/config.go b/config.go index 08cb2cd7..f51e30a0 100644 --- a/config.go +++ b/config.go @@ -16,18 +16,27 @@ package main import ( "fmt" - dto "github.com/prometheus/client_model/go" - "github.com/prometheus/common/expfmt" - "gopkg.in/yaml.v2" "os" "strings" "text/template" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "gopkg.in/yaml.v2" + "github.com/sirupsen/logrus" ) +type MetricMetadata struct { + metricType string + metricHelp string + metricUnit string +} + var ( kafkaBrokerList = "kafka:9092" + promMetaDataEndPoint = "" + getMetricMetadata = false kafkaTopic = "metrics" topicTemplate *template.Template match = make(map[string]*dto.MetricFamily, 0) @@ -45,6 +54,8 @@ var ( kafkaSaslUsername = "" kafkaSaslPassword = "" serializer Serializer + metricsList = make(map[string]MetricMetadata) + includedMetaData = "type" ) func init() { @@ -55,6 +66,15 @@ func init() { logrus.SetLevel(parseLogLevel(value)) } + if value := os.Getenv("PROM_METADATA_ENDPOINT"); value != "" { + promMetaDataEndPoint = value + getMetricMetadata = true + } + + if value := os.Getenv("INLCUDED_METADATA"); value != "" { + includedMetaData = value + } + if value := os.Getenv("KAFKA_BROKER_LIST"); value != "" { kafkaBrokerList = value } diff --git a/helm/prometheus-kafka-adapter/templates/deployment.yaml b/helm/prometheus-kafka-adapter/templates/deployment.yaml index 51be8a7c..ad7a7b18 100644 --- a/helm/prometheus-kafka-adapter/templates/deployment.yaml +++ b/helm/prometheus-kafka-adapter/templates/deployment.yaml @@ -91,6 +91,10 @@ spec: name: {{ include "prometheus-kafka-adapter.fullname" . }} key: KAFKA_SSL_CLIENT_KEY_PASS {{- end }} + - name: PROM_METADATA_ENDPOINT + value: {{ .Values.environment.PROM_METADATA_ENDPOINT | quote }} # may want customizable service references + - name: INLCUDED_METADATA + value: {{ .Values.environment.INLCUDED_METADATA | quote }} # may want customizable service references ports: - name: http containerPort: {{ .Values.environment.PORT }} diff --git a/helm/prometheus-kafka-adapter/values.yaml b/helm/prometheus-kafka-adapter/values.yaml index 473400d0..4a2e670d 100644 --- a/helm/prometheus-kafka-adapter/values.yaml +++ b/helm/prometheus-kafka-adapter/values.yaml @@ -65,6 +65,10 @@ environment: KAFKA_SSL_CA_CERT_FILE: # defines the match rules, simple metric name match and label match MATCH: + # defines the API endpoint of the prometheus server to read the metadata of the metrics from, eg http://localhost:9090/api/v1/metadata. If this is not set,metadata is not included in the topic message + PROM_METADATA_ENDPOINT: "" + # defines the metadata to be included in the topic if PROM_METADATA_ENDPOINT is set. Permitted values are type , unit & help. They need to be in CSV format. Defaulted to type + INLCUDED_METADATA: "type" serviceAccount: # Specifies whether a service account should be created diff --git a/main.go b/main.go index a9173ed6..888298b9 100644 --- a/main.go +++ b/main.go @@ -63,6 +63,10 @@ func main() { } producer, err := kafka.NewProducer(&kafkaConfig) + if getMetricMetadata == true { + logrus.Debugf("Prometheus API URL is %s", promMetaDataEndPoint) + GetAllMetricMetadata(promMetaDataEndPoint, metricsList) + } if err != nil { logrus.WithError(err).Fatal("couldn't create kafka producer") diff --git a/metadata.go b/metadata.go new file mode 100644 index 00000000..06b07c67 --- /dev/null +++ b/metadata.go @@ -0,0 +1,60 @@ +package main + +import ( + "encoding/json" + "io/ioutil" + "net/http" + "strings" + + "github.com/sirupsen/logrus" +) + +func GetAllMetricMetadata(promMetaDataEndPoint string, metricsList map[string]MetricMetadata) { + + // Make a GET request to the Prometheus metadata API + response, err := http.Get(promMetaDataEndPoint) + if err != nil { + logrus.WithError(err).Errorln("Error making request") + return + } + defer response.Body.Close() + + // Read the response body + body, err := ioutil.ReadAll(response.Body) + if err != nil { + logrus.WithError(err).Errorln("Error reading response body") + // logrus.error("Error reading response body: %s\n", err.Error()) + return + } + + // Parse the JSON data into a map + var data map[string]interface{} + err = json.Unmarshal([]byte(body), &data) + if err != nil { + logrus.WithError(err).Errorln("Error parsing json") + return + } + // var metricList = make(map[string]MetricMetadata) + for key, metrics := range data["data"].(map[string]interface{}) { + for _, metric := range metrics.([]interface{}) { + var metricMetadata MetricMetadata + logrus.Debugf("Processing Metric %s, Metadata to be included %s", key, includedMetaData) + + if strings.Contains(strings.ToLower(includedMetaData), "type") { + metricMetadata.metricType = metric.(map[string]interface{})["type"].(string) + logrus.Debugf("Type is %s", metricMetadata.metricType) + } + if strings.Contains(strings.ToLower(includedMetaData), "help") { + metricMetadata.metricHelp = metric.(map[string]interface{})["help"].(string) + logrus.Debugf("Help is %s", metricMetadata.metricHelp) + } + if strings.Contains(strings.ToLower(includedMetaData), "unit") { + metricMetadata.metricUnit = metric.(map[string]interface{})["unit"].(string) + logrus.Debugf("Unit is %s", metricMetadata.metricUnit) + } + metricsList[key] = metricMetadata + // fmt.Printf("Metric: %s, Type: %s, Help: %s, Unit: %s", key, metricMetadata.metricType, metricMetadata.metricHelp, metricMetadata.metricUnit) + } + } + logrus.Debugf("Total number of metrics parsed is %v", len(metricsList)) +} diff --git a/serializers.go b/serializers.go index e75e86dd..c510b895 100644 --- a/serializers.go +++ b/serializers.go @@ -49,11 +49,11 @@ func Serialize(s Serializer, req *prompb.WriteRequest) (map[string][][]byte, err for _, sample := range ts.Samples { name := string(labels["__name__"]) + if !filter(name, labels) { objectsFiltered.Add(float64(1)) continue } - epoch := time.Unix(sample.Timestamp/1000, 0).UTC() m := map[string]interface{}{ "timestamp": epoch.Format(time.RFC3339), @@ -62,6 +62,22 @@ func Serialize(s Serializer, req *prompb.WriteRequest) (map[string][][]byte, err "labels": labels, } + if getMetricMetadata { + val, ok := metricsList[name] + + if ok { + if val.metricType != "" { + m["type"] = val.metricType + } + if val.metricUnit != "" { + m["unit"] = val.metricUnit + } + if val.metricHelp != "" { + m["help"] = val.metricHelp + } + } + } + data, err := s.Marshal(m) if err != nil { serializeFailed.Add(float64(1))