Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ cryptography,PyPI,BSD-3-Clause,Copyright (c) Individual contributors.
cryptography,PyPI,PSF,Copyright (c) Individual contributors.
ddtrace,PyPI,BSD-3-Clause,"Copyright 2016 Datadog, Inc."
dnspython,PyPI,ISC,Copyright (C) Dnspython Contributors
fastavro,PyPI,MIT,Copyright (c) 2011 Miki Tebeka
flup,Vendor,BSD-3-Clause,Copyright (c) 2005 Allan Saddi. All Rights Reserved.
flup-py3,Vendor,BSD-3-Clause,"Copyright (c) 2005, 2006 Allan Saddi <allan@saddi.com> All rights reserved."
foundationdb,PyPI,Apache-2.0,Copyright 2017 FoundationDB
Expand Down
1 change: 1 addition & 0 deletions agent_requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ confluent-kafka==2.8.0
cryptography==45.0.4
ddtrace==3.9.3
dnspython==2.7.0
fastavro==1.11.1
foundationdb==6.3.25
hazelcast-python-client==5.5.0
in-toto==2.0.0
Expand Down
1 change: 1 addition & 0 deletions kafka_consumer/changelog.d/20862.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for Avro and Protobuf formats for Data Streams messages feature.
38 changes: 33 additions & 5 deletions kafka_consumer/datadog_checks/kafka_consumer/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,19 +148,47 @@ def _validate_live_messages_configs(self):
):
self.log.debug('Data Streams live messages configuration missing required kafka parameters.', kafka)
continue
# Only json format is supported for Data Streams live messages

# Validate value format
if kafka.get('value_format', '') == '':
kafka['value_format'] = 'json'
if kafka['value_format'] != 'json':
value_format = kafka['value_format']
if value_format not in ['json', 'avro', 'protobuf']:
self.log.debug(
'Only json format is supported for Data Streams live messages, got %s', kafka['value_format']
'Unsupported value format for Data Streams live messages, got %s. '
'Supported formats: json, avro, protobuf',
value_format,
)
continue

# Validate key format
if kafka.get('key_format', '') == '':
kafka['key_format'] = 'json'
if kafka['key_format'] != 'json':
key_format = kafka['key_format']
if key_format not in ['json', 'avro', 'protobuf']:
self.log.debug(
'Only json format is supported for Data Streams live messages, got %s', kafka['key_format']
'Unsupported key format for Data Streams live messages, got %s. '
'Supported formats: json, avro, protobuf',
key_format,
)
continue

# Validate schemas for non-JSON formats
if value_format in ['avro', 'protobuf']:
if 'value_schema' not in kafka or not kafka['value_schema']:
self.log.debug(
'Value schema is required for %s format in Data Streams live messages configuration',
value_format,
)
continue

if key_format in ['avro', 'protobuf']:
if 'key_schema' not in kafka or not kafka['key_schema']:
self.log.debug(
'Key schema is required for %s format in Data Streams live messages configuration', key_format
)
continue

live_messages_configs.append(config)
self.live_messages_configs = live_messages_configs

Expand Down
139 changes: 128 additions & 11 deletions kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
# (C) Datadog, Inc. 2019-present
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)
import base64
import json
from collections import defaultdict
from io import BytesIO
from time import time

from confluent_kafka import TopicPartition
from fastavro import schemaless_reader
from google.protobuf import descriptor_pb2, descriptor_pool, message_factory
from google.protobuf.json_format import MessageToJson
from google.protobuf.message import DecodeError, EncodeError

from datadog_checks.base import AgentCheck, is_affirmative
from datadog_checks.kafka_consumer.client import KafkaClient
Expand Down Expand Up @@ -416,6 +422,10 @@ def data_streams_live_message(self, highwater_offsets, cluster_id):
n_messages = kafka["n_messages"]
cluster = kafka["cluster"]
config_id = cfg["id"]
value_format = kafka["value_format"]
value_schema_str = kafka.get("value_schema", "")
key_format = kafka["key_format"]
key_schema_str = kafka.get("key_schema", "")
if self._messages_have_been_retrieved(config_id):
continue
if not cluster or not cluster_id or cluster.lower() != cluster_id.lower():
Expand All @@ -437,6 +447,30 @@ def data_streams_live_message(self, highwater_offsets, cluster_id):
)
continue

try:
value_schema, key_schema = (
build_schema(value_format, value_schema_str),
build_schema(key_format, key_schema_str),
)
except (
ValueError,
json.JSONDecodeError,
base64.binascii.Error,
IndexError,
KeyError,
TypeError,
DecodeError,
EncodeError,
) as e:
self.log.error(
"Failed to build schemas for config_id: %s, topic: %s, partition: %s. Error: %s",
config_id,
topic,
partition,
e,
)
continue

self.client.start_collecting_messages(start_offsets)
for _ in range(n_messages):
message = self.client.get_next_message()
Expand All @@ -463,7 +497,9 @@ def data_streams_live_message(self, highwater_offsets, cluster_id):
'partition': str(message.partition()),
'offset': str(message.offset()),
}
decoded_value, value_schema_id, decoded_key, key_schema_id = deserialize_message(message)
decoded_value, value_schema_id, decoded_key, key_schema_id = deserialize_message(
message, value_format, value_schema, key_format, key_schema
)
if decoded_value:
data['message_value'] = decoded_value
else:
Expand Down Expand Up @@ -533,32 +569,34 @@ def resolve_start_offsets(highwater_offsets, target_topic, target_partition, sta
return [TopicPartition(target_topic, target_partition, start_offset)]


def deserialize_message(message):
def deserialize_message(message, value_format, value_schema, key_format, key_schema):
try:
decoded_value, value_schema_id = _deserialize_bytes_maybe_schema_registry(message.value())
except (UnicodeDecodeError, json.JSONDecodeError):
decoded_value, value_schema_id = _deserialize_bytes_maybe_schema_registry(
message.value(), value_format, value_schema
)
except (UnicodeDecodeError, json.JSONDecodeError, ValueError):
return None, None, None, None
try:
decoded_key, key_schema_id = _deserialize_bytes_maybe_schema_registry(message.key())
decoded_key, key_schema_id = _deserialize_bytes_maybe_schema_registry(message.key(), key_format, key_schema)
return decoded_value, value_schema_id, decoded_key, key_schema_id
except (UnicodeDecodeError, json.JSONDecodeError):
except (UnicodeDecodeError, json.JSONDecodeError, ValueError):
return decoded_value, value_schema_id, None, None


def _deserialize_bytes_maybe_schema_registry(message):
def _deserialize_bytes_maybe_schema_registry(message, message_format, schema):
try:
return _deserialize_bytes(message), None
except (UnicodeDecodeError, json.JSONDecodeError) as e:
return _deserialize_bytes(message, message_format, schema), None
except (UnicodeDecodeError, json.JSONDecodeError, ValueError) as e:
# If the message is not a valid JSON, it might be a schema registry message, that is prefixed
# with a magic byte and a schema ID.
if len(message) < 5 or message[0] != SCHEMA_REGISTRY_MAGIC_BYTE:
raise e
schema_id = int.from_bytes(message[1:5], 'big')
message = message[5:] # Skip the schema ID bytes
return _deserialize_bytes(message), schema_id
return _deserialize_bytes(message, message_format, schema), schema_id


def _deserialize_bytes(message):
def _deserialize_bytes(message, message_format, schema):
"""Deserialize a message from Kafka. Supports JSON format.
Args:
message: Raw message bytes from Kafka
Expand All @@ -567,6 +605,85 @@ def _deserialize_bytes(message):
"""
if not message:
return ""
if message_format == 'protobuf':
return _deserialize_protobuf(message, schema)
elif message_format == 'avro':
return _deserialize_avro(message, schema)
else:
return _deserialize_json(message)


def _deserialize_json(message):
decoded = message.decode('utf-8')
json.loads(decoded)
return decoded


def _deserialize_protobuf(message, schema):
"""Deserialize a Protobuf message using google.protobuf."""
try:
schema.ParseFromString(message)
return MessageToJson(schema)
except Exception as e:
raise ValueError(f"Failed to deserialize Protobuf message: {e}")


def _deserialize_avro(message, schema):
"""Deserialize an Avro message using fastavro."""
try:
data = schemaless_reader(BytesIO(message), schema)
return json.dumps(data)
except Exception as e:
raise ValueError(f"Failed to deserialize Avro message: {e}")


def build_schema(message_format, schema_str):
if message_format == 'protobuf':
return build_protobuf_schema(schema_str)
elif message_format == 'avro':
return build_avro_schema(schema_str)
return None


def build_avro_schema(schema_str):
"""Build an Avro schema from a JSON string."""
schema = json.loads(schema_str)

if schema is None:
raise ValueError("Avro schema cannot be None")

return schema


def build_protobuf_schema(schema_str):
# schema is encoded in base64, decode it before passing it to ParseFromString
schema_str = base64.b64decode(schema_str)
descriptor_set = descriptor_pb2.FileDescriptorSet()
descriptor_set.ParseFromString(schema_str)

# Register all the file descriptors in a descriptor pool
pool = descriptor_pool.DescriptorPool()
for fd_proto in descriptor_set.file:
pool.Add(fd_proto)

# Pick the first message type from the first file descriptor
first_fd = descriptor_set.file[0]
# The file descriptor contains a list of message types (DescriptorProto)
first_message_proto = first_fd.message_type[0]

# The fully qualified name includes the package name + message name
package = first_fd.package
message_name = first_message_proto.name
if package:
full_name = f"{package}.{message_name}"
else:
full_name = message_name
# # Get the message descriptor
message_descriptor = pool.FindMessageTypeByName(full_name)
# Create a dynamic message class
schema = message_factory.GetMessageClass(message_descriptor)()

if schema is None:
raise ValueError("Protobuf schema cannot be None")

return schema
2 changes: 2 additions & 0 deletions kafka_consumer/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ license = "BSD-3-Clause"
[project.optional-dependencies]
deps = [
"confluent-kafka==2.8.0",
"fastavro==1.11.1",
"protobuf==6.31.1",
]

[project.urls]
Expand Down
Loading
Loading