diff --git a/src/sentry/eventstream/item_helpers.py b/src/sentry/eventstream/item_helpers.py index a5387b7f6f79a9..b2eb9fd2467aed 100644 --- a/src/sentry/eventstream/item_helpers.py +++ b/src/sentry/eventstream/item_helpers.py @@ -3,16 +3,11 @@ from google.protobuf.timestamp_pb2 import Timestamp from sentry_protos.snuba.v1.request_common_pb2 import TRACE_ITEM_TYPE_OCCURRENCE -from sentry_protos.snuba.v1.trace_item_pb2 import ( - AnyValue, - ArrayValue, - KeyValue, - KeyValueList, - TraceItem, -) +from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue, TraceItem from sentry.models.project import Project from sentry.services.eventstore.models import Event, GroupEvent +from sentry.utils.eap import encode_value def serialize_event_data_as_item( @@ -35,36 +30,6 @@ def serialize_event_data_as_item( ) -def _encode_value(value: Any) -> AnyValue: - if isinstance(value, str): - return AnyValue(string_value=value) - elif isinstance(value, bool): - # Note: bool check must come before int check since bool is a subclass of int - return AnyValue(bool_value=value) - elif isinstance(value, int): - return AnyValue(int_value=value) - elif isinstance(value, float): - return AnyValue(double_value=value) - elif isinstance(value, list) or isinstance(value, tuple): - # Not yet processed on EAP side - return AnyValue( - array_value=ArrayValue(values=[_encode_value(v) for v in value if v is not None]) - ) - elif isinstance(value, dict): - # Not yet processed on EAP side - return AnyValue( - kvlist_value=KeyValueList( - values=[ - KeyValue(key=str(kv[0]), value=_encode_value(kv[1])) - for kv in value.items() - if kv[1] is not None - ] - ) - ) - else: - raise NotImplementedError(f"encode not supported for {type(value)}") - - def encode_attributes( event: Event | GroupEvent, event_data: Mapping[str, Any], ignore_fields: set[str] | None = None ) -> Mapping[str, AnyValue]: @@ -76,7 +41,7 @@ def encode_attributes( continue if value is None: continue - attributes[key] = _encode_value(value) + attributes[key] = encode_value(value) if event.group_id: attributes["group_id"] = AnyValue(int_value=event.group_id) @@ -84,6 +49,6 @@ def encode_attributes( for key, value in event_data["tags"]: if value is None: continue - attributes[f"tags[{key}]"] = _encode_value(value) + attributes[f"tags[{key}]"] = encode_value(value) return attributes diff --git a/src/sentry/eventstream/kafka/backend.py b/src/sentry/eventstream/kafka/backend.py index d6f163d14b9a9d..ec22de248892cc 100644 --- a/src/sentry/eventstream/kafka/backend.py +++ b/src/sentry/eventstream/kafka/backend.py @@ -10,21 +10,19 @@ from confluent_kafka import KafkaError from confluent_kafka import Message as KafkaMessage from confluent_kafka import Producer -from sentry_kafka_schemas.codecs import Codec from sentry_protos.snuba.v1.trace_item_pb2 import TraceItem from sentry import options -from sentry.conf.types.kafka_definition import Topic, get_topic_codec +from sentry.conf.types.kafka_definition import Topic from sentry.eventstream.base import GroupStates from sentry.eventstream.snuba import KW_SKIP_SEMANTIC_PARTITIONING, SnubaProtocolEventStream from sentry.eventstream.types import EventStreamEventType from sentry.killswitches import killswitch_matches_context from sentry.utils import json from sentry.utils.confluent_producer import get_confluent_producer +from sentry.utils.eap import EAP_ITEMS_CODEC from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition -EAP_ITEMS_CODEC: Codec[TraceItem] = get_topic_codec(Topic.SNUBA_ITEMS) - logger = logging.getLogger(__name__) if TYPE_CHECKING: diff --git a/src/sentry/replays/lib/eap/write.py b/src/sentry/replays/lib/eap/write.py index 4f26685dcea68a..1cc3b62b47d679 100644 --- a/src/sentry/replays/lib/eap/write.py +++ b/src/sentry/replays/lib/eap/write.py @@ -7,12 +7,11 @@ from arroyo.backends.kafka import KafkaPayload from django.conf import settings from google.protobuf.timestamp_pb2 import Timestamp -from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue, ArrayValue, KeyValue, KeyValueList from sentry_protos.snuba.v1.trace_item_pb2 import TraceItem as EAPTraceItem from sentry.conf.types.kafka_definition import Topic from sentry.replays.lib.eap.snuba_transpiler import TRACE_ITEM_TYPE_MAP, TRACE_ITEM_TYPES -from sentry.replays.lib.kafka import EAP_ITEMS_CODEC, eap_producer +from sentry.utils.eap import EAP_ITEMS_CODEC, eap_items_producer, encode_value from sentry.utils.kafka_config import get_topic_definition Value = bool | bytes | str | int | float | Sequence["Value"] | MutableMapping[str, "Value"] @@ -33,27 +32,6 @@ class TraceItem(TypedDict): def new_trace_item(trace_item: TraceItem) -> EAPTraceItem: - def _anyvalue(value: Value) -> AnyValue: - if isinstance(value, bool): - return AnyValue(bool_value=value) - elif isinstance(value, str): - return AnyValue(string_value=value) - elif isinstance(value, int): - return AnyValue(int_value=value) - elif isinstance(value, float): - return AnyValue(double_value=value) - elif isinstance(value, bytes): - return AnyValue(bytes_value=value) - elif isinstance(value, list): - return AnyValue(array_value=ArrayValue(values=[_anyvalue(v) for v in value])) - elif isinstance(value, dict): - return AnyValue( - kvlist_value=KeyValueList( - values=[KeyValue(key=k, value=_anyvalue(v)) for k, v in value.items()] - ) - ) - else: - raise ValueError(f"Invalid value type for AnyValue: {type(value)}") timestamp = Timestamp() timestamp.FromDatetime(trace_item["timestamp"]) @@ -70,7 +48,7 @@ def _anyvalue(value: Value) -> AnyValue: item_id=trace_item["trace_item_id"], received=received, retention_days=trace_item["retention_days"], - attributes={k: _anyvalue(v) for k, v in trace_item["attributes"].items()}, + attributes={k: encode_value(v) for k, v in trace_item["attributes"].items()}, client_sample_rate=trace_item["client_sample_rate"], server_sample_rate=trace_item["server_sample_rate"], ) @@ -99,4 +77,4 @@ def write_trace_items(trace_items: list[EAPTraceItem]) -> None: topic = get_topic_definition(Topic.SNUBA_ITEMS)["real_topic_name"] for trace_item in trace_items: payload = KafkaPayload(None, EAP_ITEMS_CODEC.encode(trace_item), []) - eap_producer.produce(ArroyoTopic(topic), payload) + eap_items_producer.produce(ArroyoTopic(topic), payload) diff --git a/src/sentry/replays/lib/kafka.py b/src/sentry/replays/lib/kafka.py index c4d2f658f4add2..e3bcf2573e4620 100644 --- a/src/sentry/replays/lib/kafka.py +++ b/src/sentry/replays/lib/kafka.py @@ -1,32 +1,11 @@ from arroyo.backends.kafka import KafkaPayload from arroyo.types import Topic as ArroyoTopic -from sentry_kafka_schemas.codecs import Codec -from sentry_protos.snuba.v1.trace_item_pb2 import TraceItem -from sentry.conf.types.kafka_definition import Topic, get_topic_codec +from sentry.conf.types.kafka_definition import Topic from sentry.utils.arroyo_producer import SingletonProducer, get_arroyo_producer from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition from sentry.utils.pubsub import KafkaPublisher -# -# EAP PRODUCER -# - - -EAP_ITEMS_CODEC: Codec[TraceItem] = get_topic_codec(Topic.SNUBA_ITEMS) - - -def _get_eap_items_producer(): - """Get a Kafka producer for EAP TraceItems.""" - return get_arroyo_producer( - name="sentry.replays.lib.kafka.eap_items", - topic=Topic.SNUBA_ITEMS, - ) - - -eap_producer = SingletonProducer(_get_eap_items_producer) - - # # REPLAY PRODUCER # diff --git a/src/sentry/spans/consumers/process_segments/convert.py b/src/sentry/spans/consumers/process_segments/convert.py index 7897ef3b99c7f3..a443bdb0cf8536 100644 --- a/src/sentry/spans/consumers/process_segments/convert.py +++ b/src/sentry/spans/consumers/process_segments/convert.py @@ -1,6 +1,5 @@ -from typing import Any, cast +from typing import cast -import orjson import sentry_sdk from google.protobuf.timestamp_pb2 import Timestamp from sentry_kafka_schemas.schema_types.buffered_segments_v1 import SpanLink @@ -8,8 +7,7 @@ from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue, TraceItem from sentry.spans.consumers.process_segments.types import CompatibleSpan - -I64_MAX = 2**63 - 1 +from sentry.utils.eap import encode_value FIELD_TO_ATTRIBUTE = { "end_timestamp": "sentry.end_timestamp_precise", @@ -41,7 +39,7 @@ def convert_span_to_item(span: CompatibleSpan) -> TraceItem: continue try: # NOTE: This ignores the `type` field of the attribute itself - attributes[k] = _anyvalue(value) + attributes[k] = encode_value(value, dump_arrays=True) except Exception: sentry_sdk.capture_exception() else: @@ -58,12 +56,12 @@ def convert_span_to_item(span: CompatibleSpan) -> TraceItem: # For `is_segment`, we trust the value written by `flush_segments` over a pre-existing attribute: if (is_segment := span.get("is_segment")) is not None: - attributes["sentry.is_segment"] = _anyvalue(is_segment) + attributes["sentry.is_segment"] = encode_value(is_segment, dump_arrays=True) for field_name, attribute_name in FIELD_TO_ATTRIBUTE.items(): attribute = span.get(field_name) # type:ignore[assignment] if attribute is not None: - attributes[attribute_name] = _anyvalue(attribute) + attributes[attribute_name] = encode_value(attribute, dump_arrays=True) # Rename some attributes from their sentry-conventions name to what the product currently expects. # Eventually this should all be handled by deprecation policies in sentry-conventions. @@ -83,14 +81,16 @@ def convert_span_to_item(span: CompatibleSpan) -> TraceItem: try: if attr in RENAME_ATTRIBUTES: attr = RENAME_ATTRIBUTES[attr] - attributes[f"sentry._meta.fields.attributes.{attr}"] = _anyvalue({"meta": meta}) + attributes[f"sentry._meta.fields.attributes.{attr}"] = encode_value( + {"meta": meta}, dump_arrays=True + ) except Exception: sentry_sdk.capture_exception() if links := span.get("links"): try: sanitized_links = [_sanitize_span_link(link) for link in links if link is not None] - attributes["sentry.links"] = _anyvalue(sanitized_links) + attributes["sentry.links"] = encode_value(sanitized_links, dump_arrays=True) except Exception: sentry_sdk.capture_exception() attributes["sentry.dropped_links_count"] = AnyValue(int_value=len(links)) @@ -111,23 +111,6 @@ def convert_span_to_item(span: CompatibleSpan) -> TraceItem: ) -def _anyvalue(value: Any) -> AnyValue: - if isinstance(value, str): - return AnyValue(string_value=value) - elif isinstance(value, bool): - return AnyValue(bool_value=value) - elif isinstance(value, int): - if value > I64_MAX: - return AnyValue(double_value=float(value)) - return AnyValue(int_value=value) - elif isinstance(value, float): - return AnyValue(double_value=value) - elif isinstance(value, (list, dict)): - return AnyValue(string_value=orjson.dumps(value).decode()) - - raise ValueError(f"Unknown value type: {type(value)}") - - def _timestamp(value: float) -> Timestamp: return Timestamp( seconds=int(value), diff --git a/src/sentry/testutils/cases.py b/src/sentry/testutils/cases.py index 4363acb02527dc..07d67230f0f814 100644 --- a/src/sentry/testutils/cases.py +++ b/src/sentry/testutils/cases.py @@ -150,6 +150,7 @@ from sentry.users.models.useremail import UserEmail from sentry.utils import json from sentry.utils.auth import SsoSession +from sentry.utils.eap import encode_value from sentry.utils.json import dumps_htmlsafe from sentry.utils.not_set import NOT_SET, NotSet, default_if_not_set from sentry.utils.samples import load_data @@ -3316,20 +3317,6 @@ class _OptionalOurLogData(TypedDict, total=False): item_id: int -def scalar_to_any_value(value: Any) -> AnyValue: - if isinstance(value, str): - return AnyValue(string_value=value) - if isinstance(value, int): - return AnyValue(int_value=value) - if isinstance(value, float): - return AnyValue(double_value=value) - if isinstance(value, bool): - return AnyValue(bool_value=value) - if isinstance(value, dict): - return AnyValue(**value) - raise Exception(f"cannot convert {value} of type {type(value)} to AnyValue") - - def span_to_trace_item(span) -> TraceItem: client_sample_rate = 1.0 server_sample_rate = 1.0 @@ -3339,7 +3326,7 @@ def span_to_trace_item(span) -> TraceItem: for k, v in span.get(field, {}).items(): if v is None: continue - attributes[k] = scalar_to_any_value(v) + attributes[k] = encode_value(v, expand_arrays=True) for k, v in span.get("sentry_tags", {}).items(): if v is None: @@ -3347,7 +3334,7 @@ def span_to_trace_item(span) -> TraceItem: if k == "description": k = "normalized_description" - attributes[f"sentry.{k}"] = scalar_to_any_value(v) + attributes[f"sentry.{k}"] = encode_value(v, expand_arrays=True) for k, v in span.get("measurements", {}).items(): if v is None or v["value"] is None: @@ -3357,10 +3344,10 @@ def span_to_trace_item(span) -> TraceItem: elif k == "server_sample_rate": server_sample_rate = v["value"] else: - attributes[k] = scalar_to_any_value(float(v["value"])) + attributes[k] = encode_value(float(v["value"])) if "description" in span and span["description"] is not None: - description = scalar_to_any_value(span["description"]) + description = encode_value(span["description"]) attributes["sentry.raw_description"] = description for field in { @@ -3382,7 +3369,7 @@ def span_to_trace_item(span) -> TraceItem: double_value=float(is_segment), ) else: - value = scalar_to_any_value(span[field]) + value = encode_value(span[field]) attributes[f"sentry.{field}"] = value timestamp = Timestamp() @@ -3453,10 +3440,10 @@ def create_ourlog( attributes_proto = {} for k, v in attributes.items(): - attributes_proto[k] = scalar_to_any_value(v) + attributes_proto[k] = encode_value(v, expand_arrays=True) for k, v in extra_data.items(): - attributes_proto[f"sentry.{k}"] = scalar_to_any_value(v) + attributes_proto[f"sentry.{k}"] = encode_value(v, expand_arrays=True) timestamp_proto = Timestamp() @@ -3532,7 +3519,7 @@ def create_trace_metric( if attributes: for k, v in attributes.items(): - attributes_proto[k] = scalar_to_any_value(v) + attributes_proto[k] = encode_value(v, expand_arrays=True) return TraceItem( organization_id=organization.id, @@ -3577,7 +3564,7 @@ def create_profile_function( if attributes: for k, v in attributes.items(): - attributes_proto[k] = scalar_to_any_value(v) + attributes_proto[k] = encode_value(v, expand_arrays=True) return TraceItem( organization_id=organization.id, @@ -3892,7 +3879,7 @@ def create_eap_uptime_result( attributes_proto = {} for k, v in attributes_data.items(): if v is not None: - attributes_proto[k] = scalar_to_any_value(v) + attributes_proto[k] = encode_value(v, expand_arrays=True) timestamp_proto = Timestamp() timestamp_proto.FromDatetime(scheduled_check_time) diff --git a/src/sentry/uptime/consumers/eap_converter.py b/src/sentry/uptime/consumers/eap_converter.py index b6ac099adc53ea..ed1f5d5878a621 100644 --- a/src/sentry/uptime/consumers/eap_converter.py +++ b/src/sentry/uptime/consumers/eap_converter.py @@ -23,25 +23,13 @@ from sentry import quotas from sentry.models.project import Project from sentry.uptime.types import IncidentStatus +from sentry.utils.eap import encode_value logger = logging.getLogger(__name__) UPTIME_NAMESPACE = uuid.UUID("f8d7a4e2-5b3c-4a9d-8e1f-3c2b1a0d9f8e") -def _anyvalue(value: bool | str | int | float) -> AnyValue: - if isinstance(value, bool): - return AnyValue(bool_value=value) - elif isinstance(value, str): - return AnyValue(string_value=value) - elif isinstance(value, int): - return AnyValue(int_value=value) - elif isinstance(value, float): - return AnyValue(double_value=value) - else: - raise ValueError(f"Invalid value type for AnyValue: {type(value)}") - - def ms_to_us(milliseconds: float | int) -> int: """Convert milliseconds to microseconds.""" return int(milliseconds * 1000) @@ -72,63 +60,65 @@ def convert_uptime_request_to_trace_item( """ attributes: MutableMapping[str, AnyValue] = {} - attributes["guid"] = _anyvalue(result["guid"]) - attributes["subscription_id"] = _anyvalue(result["subscription_id"]) - attributes["check_status"] = _anyvalue(result["status"]) + attributes["guid"] = encode_value(result["guid"]) + attributes["subscription_id"] = encode_value(result["subscription_id"]) + attributes["check_status"] = encode_value(result["status"]) if "region" in result: - attributes["region"] = _anyvalue(result["region"]) + attributes["region"] = encode_value(result["region"]) - attributes["scheduled_check_time_us"] = _anyvalue(ms_to_us(result["scheduled_check_time_ms"])) - attributes["actual_check_time_us"] = _anyvalue(ms_to_us(result["actual_check_time_ms"])) + attributes["scheduled_check_time_us"] = encode_value( + ms_to_us(result["scheduled_check_time_ms"]) + ) + attributes["actual_check_time_us"] = encode_value(ms_to_us(result["actual_check_time_ms"])) duration_ms = result["duration_ms"] if duration_ms is not None: - attributes["check_duration_us"] = _anyvalue(ms_to_us(duration_ms)) + attributes["check_duration_us"] = encode_value(ms_to_us(duration_ms)) status_reason = result["status_reason"] if status_reason is not None: - attributes["status_reason_type"] = _anyvalue(status_reason["type"]) - attributes["status_reason_description"] = _anyvalue(status_reason["description"]) + attributes["status_reason_type"] = encode_value(status_reason["type"]) + attributes["status_reason_description"] = encode_value(status_reason["description"]) if "request_info_list" in result and result["request_info_list"]: first_request = result["request_info_list"][0] - attributes["method"] = _anyvalue(first_request["request_type"]) + attributes["method"] = encode_value(first_request["request_type"]) if "url" in first_request: # This should always be here once we start passing url, but for backwards compat # we should be cautious here - attributes["original_url"] = _anyvalue(first_request["url"]) + attributes["original_url"] = encode_value(first_request["url"]) - attributes["check_id"] = _anyvalue(result["guid"]) - attributes["request_sequence"] = _anyvalue(request_sequence) - attributes["incident_status"] = _anyvalue(incident_status.value) - attributes["span_id"] = _anyvalue(result["span_id"]) + attributes["check_id"] = encode_value(result["guid"]) + attributes["request_sequence"] = encode_value(request_sequence) + attributes["incident_status"] = encode_value(incident_status.value) + attributes["span_id"] = encode_value(result["span_id"]) if request_info is not None: - attributes["request_type"] = _anyvalue(request_info["request_type"]) + attributes["request_type"] = encode_value(request_info["request_type"]) http_status_code = request_info["http_status_code"] if http_status_code is not None: - attributes["http_status_code"] = _anyvalue(http_status_code) + attributes["http_status_code"] = encode_value(http_status_code) if "url" in request_info: - attributes["request_url"] = _anyvalue(request_info["url"]) + attributes["request_url"] = encode_value(request_info["url"]) if "request_body_size_bytes" in request_info: - attributes["request_body_size_bytes"] = _anyvalue( + attributes["request_body_size_bytes"] = encode_value( request_info["request_body_size_bytes"] ) if "response_body_size_bytes" in request_info: - attributes["response_body_size_bytes"] = _anyvalue( + attributes["response_body_size_bytes"] = encode_value( request_info["response_body_size_bytes"] ) if "request_duration_us" in request_info: - attributes["request_duration_us"] = _anyvalue(request_info["request_duration_us"]) + attributes["request_duration_us"] = encode_value(request_info["request_duration_us"]) if "durations" in request_info: durations = request_info["durations"] for phase_name in RequestDurations.__annotations__.keys(): if phase_name in durations: timing = durations[phase_name] # type: ignore[literal-required] - attributes[f"{phase_name}_start_us"] = _anyvalue(timing["start_us"]) - attributes[f"{phase_name}_duration_us"] = _anyvalue(timing["duration_us"]) + attributes[f"{phase_name}_start_us"] = encode_value(timing["start_us"]) + attributes[f"{phase_name}_duration_us"] = encode_value(timing["duration_us"]) return TraceItem( organization_id=project.organization_id, diff --git a/src/sentry/uptime/consumers/eap_producer.py b/src/sentry/uptime/consumers/eap_producer.py index f63341a6d73858..5cf1985a02b62f 100644 --- a/src/sentry/uptime/consumers/eap_producer.py +++ b/src/sentry/uptime/consumers/eap_producer.py @@ -2,34 +2,18 @@ from arroyo import Topic as ArroyoTopic from arroyo.backends.kafka import KafkaPayload -from sentry_kafka_schemas.codecs import Codec from sentry_kafka_schemas.schema_types.uptime_results_v1 import CheckResult -from sentry_protos.snuba.v1.trace_item_pb2 import TraceItem -from sentry.conf.types.kafka_definition import Topic, get_topic_codec +from sentry.conf.types.kafka_definition import Topic from sentry.uptime.consumers.eap_converter import convert_uptime_result_to_trace_items from sentry.uptime.types import IncidentStatus from sentry.utils import metrics -from sentry.utils.arroyo_producer import SingletonProducer, get_arroyo_producer +from sentry.utils.eap import EAP_ITEMS_CODEC, eap_items_producer from sentry.utils.kafka_config import get_topic_definition from sentry.workflow_engine.models.detector import Detector logger = logging.getLogger(__name__) -EAP_ITEMS_CODEC: Codec[TraceItem] = get_topic_codec(Topic.SNUBA_ITEMS) - - -def _get_eap_items_producer(): - """Get a Kafka producer for EAP TraceItems.""" - return get_arroyo_producer( - "sentry.uptime.consumers.eap_producer", - Topic.SNUBA_ITEMS, - exclude_config_keys=["compression.type", "message.max.bytes"], - ) - - -_eap_items_producer = SingletonProducer(_get_eap_items_producer) - def produce_eap_uptime_result( detector: Detector, @@ -56,7 +40,7 @@ def produce_eap_uptime_result( for trace_item in trace_items: payload = KafkaPayload(None, EAP_ITEMS_CODEC.encode(trace_item), []) - _eap_items_producer.produce(ArroyoTopic(topic), payload) + eap_items_producer.produce(ArroyoTopic(topic), payload) metrics.incr( "uptime.result_processor.eap_message_produced", diff --git a/src/sentry/utils/eap.py b/src/sentry/utils/eap.py new file mode 100644 index 00000000000000..919c91d18237da --- /dev/null +++ b/src/sentry/utils/eap.py @@ -0,0 +1,70 @@ +import logging +from typing import Any + +import orjson +from sentry_kafka_schemas.codecs import Codec +from sentry_protos.snuba.v1.trace_item_pb2 import ( + AnyValue, + ArrayValue, + KeyValue, + KeyValueList, + TraceItem, +) + +from sentry.conf.types.kafka_definition import Topic, get_topic_codec +from sentry.db.models.fields.bounded import I64_MAX +from sentry.utils.arroyo_producer import SingletonProducer, get_arroyo_producer + +logger = logging.getLogger(__name__) + +EAP_ITEMS_CODEC: Codec[TraceItem] = get_topic_codec(Topic.SNUBA_ITEMS) + + +def _get_eap_items_producer(): + """Get a Kafka producer for EAP TraceItems.""" + return get_arroyo_producer( + "sentry.utils.eap_producer", + Topic.SNUBA_ITEMS, + exclude_config_keys=["compression.type", "message.max.bytes"], + ) + + +eap_items_producer = SingletonProducer(_get_eap_items_producer) + + +def encode_value(value: Any, *, dump_arrays: bool = False, expand_arrays: bool = False) -> AnyValue: + if isinstance(value, str): + return AnyValue(string_value=value) + elif isinstance(value, bool): + # Note: bool check must come before int check since bool is a subclass of int + return AnyValue(bool_value=value) + elif isinstance(value, int): + if value > I64_MAX: + return AnyValue(double_value=float(value)) + return AnyValue(int_value=value) + elif isinstance(value, float): + return AnyValue(double_value=value) + elif isinstance(value, bytes): + return AnyValue(bytes_value=value) + elif dump_arrays and isinstance(value, (list, tuple, dict)): + return AnyValue(string_value=orjson.dumps(value).decode()) + elif expand_arrays and isinstance(value, dict): + return AnyValue(**value) + elif isinstance(value, list) or isinstance(value, tuple): + # Not yet processed on EAP side + return AnyValue( + array_value=ArrayValue(values=[encode_value(v) for v in value if v is not None]) + ) + elif isinstance(value, dict): + # Not yet processed on EAP side + return AnyValue( + kvlist_value=KeyValueList( + values=[ + KeyValue(key=str(kv[0]), value=encode_value(kv[1])) + for kv in value.items() + if kv[1] is not None + ] + ) + ) + else: + raise NotImplementedError(f"encode not supported for {type(value)}") diff --git a/tests/sentry/uptime/consumers/test_eap_converter.py b/tests/sentry/uptime/consumers/test_eap_converter.py index 37a6ec9ec284ea..5475fb5aaa3779 100644 --- a/tests/sentry/uptime/consumers/test_eap_converter.py +++ b/tests/sentry/uptime/consumers/test_eap_converter.py @@ -6,34 +6,34 @@ from sentry.testutils.cases import TestCase as SentryTestCase from sentry.uptime.consumers.eap_converter import ( - _anyvalue, convert_uptime_request_to_trace_item, convert_uptime_result_to_trace_items, ms_to_us, ) from sentry.uptime.types import IncidentStatus +from sentry.utils.eap import encode_value class TestHelperFunctions(TestCase): def test_anyvalue_string(self) -> None: - result = _anyvalue("test") + result = encode_value("test") assert result.string_value == "test" def test_anyvalue_int(self) -> None: - result = _anyvalue(123) + result = encode_value(123) assert result.int_value == 123 def test_anyvalue_float(self) -> None: - result = _anyvalue(123.45) + result = encode_value(123.45) assert result.double_value == 123.45 def test_anyvalue_bool(self) -> None: - result = _anyvalue(True) + result = encode_value(True) assert result.bool_value is True def test_anyvalue_fallback(self) -> None: - with pytest.raises(ValueError): - _anyvalue([1, 2, 3]) # type: ignore[arg-type] # Test with unsupported type + with pytest.raises(NotImplementedError): + encode_value({1, 2, 3}) # Test with unsupported type def test_microseconds_conversion(self) -> None: assert ms_to_us(1000) == 1000000 diff --git a/tests/sentry/uptime/consumers/test_eap_producer.py b/tests/sentry/uptime/consumers/test_eap_producer.py index fe8692b630ac1a..e0f6f7b7dec06c 100644 --- a/tests/sentry/uptime/consumers/test_eap_producer.py +++ b/tests/sentry/uptime/consumers/test_eap_producer.py @@ -40,7 +40,7 @@ def create_check_result(self) -> CheckResult: } return result - @patch("sentry.uptime.consumers.eap_producer._eap_items_producer") + @patch("sentry.uptime.consumers.eap_producer.eap_items_producer") @patch("sentry.uptime.consumers.eap_producer.get_topic_definition") def test_produce_eap_uptime_result_success( self, mock_get_topic: MagicMock, mock_producer: MagicMock @@ -72,7 +72,7 @@ def test_produce_eap_uptime_result_success( codec = get_topic_codec(Topic.SNUBA_ITEMS) assert [codec.decode(payload.value)] == expected_trace_items - @patch("sentry.uptime.consumers.eap_producer._eap_items_producer") + @patch("sentry.uptime.consumers.eap_producer.eap_items_producer") @patch("sentry.uptime.consumers.eap_producer.logger") def test_produce_eap_uptime_result_error_handling( self, mock_logger: MagicMock, mock_producer: MagicMock @@ -90,7 +90,7 @@ def test_produce_eap_uptime_result_error_handling( ) @patch("sentry.uptime.consumers.eap_producer.metrics") - @patch("sentry.uptime.consumers.eap_producer._eap_items_producer") + @patch("sentry.uptime.consumers.eap_producer.eap_items_producer") def test_metrics_tracking(self, mock_producer: MagicMock, mock_metrics: MagicMock) -> None: result = self.create_check_result() metric_tags = {"status": "success", "region": "us-east-1"} @@ -106,7 +106,7 @@ def test_metrics_tracking(self, mock_producer: MagicMock, mock_metrics: MagicMoc ) @patch("sentry.uptime.consumers.eap_producer.metrics") - @patch("sentry.uptime.consumers.eap_producer._eap_items_producer") + @patch("sentry.uptime.consumers.eap_producer.eap_items_producer") def test_error_metrics_tracking( self, mock_producer: MagicMock, mock_metrics: MagicMock ) -> None: @@ -124,7 +124,7 @@ def test_error_metrics_tracking( tags=metric_tags, ) - @patch("sentry.uptime.consumers.eap_producer._eap_items_producer") + @patch("sentry.uptime.consumers.eap_producer.eap_items_producer") @patch("sentry.uptime.consumers.eap_producer.get_topic_definition") def test_produce_with_triggered_detector_state( self, mock_get_topic: MagicMock, mock_producer: MagicMock diff --git a/tests/sentry/uptime/consumers/test_results_consumer.py b/tests/sentry/uptime/consumers/test_results_consumer.py index 1444162679ca42..8f027d75d9122f 100644 --- a/tests/sentry/uptime/consumers/test_results_consumer.py +++ b/tests/sentry/uptime/consumers/test_results_consumer.py @@ -374,7 +374,7 @@ def test_missed_check_updated_interval(self) -> None: extra={"num_missed_checks": 1, **result}, ) - @mock.patch("sentry.uptime.consumers.eap_producer._eap_items_producer.produce") + @mock.patch("sentry.utils.eap.eap_items_producer.produce") def test_no_missed_check_for_disabled(self, mock_produce: MagicMock) -> None: result = self.create_uptime_result(self.subscription.subscription_id) @@ -405,7 +405,7 @@ def test_no_missed_check_for_disabled(self, mock_produce: MagicMock) -> None: assert check.attributes["check_status"].string_value == "failure" - @mock.patch("sentry.uptime.consumers.eap_producer._eap_items_producer.produce") + @mock.patch("sentry.utils.eap.eap_items_producer.produce") def test_missed_check_true_positive(self, mock_produce: MagicMock) -> None: result = self.create_uptime_result(self.subscription.subscription_id) @@ -1079,7 +1079,7 @@ def test_provider_stats(self) -> None: ] ) - @mock.patch("sentry.uptime.consumers.eap_producer._eap_items_producer.produce") + @mock.patch("sentry.utils.eap.eap_items_producer.produce") def test_produces_snuba_uptime_results(self, mock_produce: MagicMock) -> None: """ Validates that the consumer produces a message to Snuba's Kafka topic for uptime check results @@ -1100,7 +1100,7 @@ def test_produces_snuba_uptime_results(self, mock_produce: MagicMock) -> None: assert trace_item.attributes["incident_status"].int_value == 0 assert trace_item.retention_days == 90 - @mock.patch("sentry.uptime.consumers.eap_producer._eap_items_producer.produce") + @mock.patch("sentry.utils.eap.eap_items_producer.produce") def test_produces_snuba_uptime_results_in_incident(self, mock_produce: MagicMock) -> None: """ Validates that the consumer produces a message to Snuba's Kafka topic for uptime check results @@ -1120,7 +1120,7 @@ def test_produces_snuba_uptime_results_in_incident(self, mock_produce: MagicMock trace_item = self.decode_trace_item(mock_produce.call_args.args[1].value) assert trace_item.attributes["incident_status"].int_value == 1 - @mock.patch("sentry.uptime.consumers.eap_producer._eap_items_producer.produce") + @mock.patch("sentry.utils.eap.eap_items_producer.produce") def test_produces_eap_uptime_results(self, mock_produce: MagicMock) -> None: """ Validates that the consumer produces TraceItems to EAP's Kafka topic for uptime check results