Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 4 additions & 39 deletions src/sentry/eventstream/item_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,11 @@

from google.protobuf.timestamp_pb2 import Timestamp
from sentry_protos.snuba.v1.request_common_pb2 import TRACE_ITEM_TYPE_OCCURRENCE
from sentry_protos.snuba.v1.trace_item_pb2 import (
AnyValue,
ArrayValue,
KeyValue,
KeyValueList,
TraceItem,
)
from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue, TraceItem

from sentry.models.project import Project
from sentry.services.eventstore.models import Event, GroupEvent
from sentry.utils.eap import encode_value


def serialize_event_data_as_item(
Expand All @@ -35,36 +30,6 @@ def serialize_event_data_as_item(
)


def _encode_value(value: Any) -> AnyValue:
if isinstance(value, str):
return AnyValue(string_value=value)
elif isinstance(value, bool):
# Note: bool check must come before int check since bool is a subclass of int
return AnyValue(bool_value=value)
elif isinstance(value, int):
return AnyValue(int_value=value)
elif isinstance(value, float):
return AnyValue(double_value=value)
elif isinstance(value, list) or isinstance(value, tuple):
# Not yet processed on EAP side
return AnyValue(
array_value=ArrayValue(values=[_encode_value(v) for v in value if v is not None])
)
elif isinstance(value, dict):
# Not yet processed on EAP side
return AnyValue(
kvlist_value=KeyValueList(
values=[
KeyValue(key=str(kv[0]), value=_encode_value(kv[1]))
for kv in value.items()
if kv[1] is not None
]
)
)
else:
raise NotImplementedError(f"encode not supported for {type(value)}")


def encode_attributes(
event: Event | GroupEvent, event_data: Mapping[str, Any], ignore_fields: set[str] | None = None
) -> Mapping[str, AnyValue]:
Expand All @@ -76,14 +41,14 @@ def encode_attributes(
continue
if value is None:
continue
attributes[key] = _encode_value(value)
attributes[key] = encode_value(value)

if event.group_id:
attributes["group_id"] = AnyValue(int_value=event.group_id)

for key, value in event_data["tags"]:
if value is None:
continue
attributes[f"tags[{key}]"] = _encode_value(value)
attributes[f"tags[{key}]"] = encode_value(value)

return attributes
6 changes: 2 additions & 4 deletions src/sentry/eventstream/kafka/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,19 @@
from confluent_kafka import KafkaError
from confluent_kafka import Message as KafkaMessage
from confluent_kafka import Producer
from sentry_kafka_schemas.codecs import Codec
from sentry_protos.snuba.v1.trace_item_pb2 import TraceItem

from sentry import options
from sentry.conf.types.kafka_definition import Topic, get_topic_codec
from sentry.conf.types.kafka_definition import Topic
from sentry.eventstream.base import GroupStates
from sentry.eventstream.snuba import KW_SKIP_SEMANTIC_PARTITIONING, SnubaProtocolEventStream
from sentry.eventstream.types import EventStreamEventType
from sentry.killswitches import killswitch_matches_context
from sentry.utils import json
from sentry.utils.confluent_producer import get_confluent_producer
from sentry.utils.eap import EAP_ITEMS_CODEC
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition

EAP_ITEMS_CODEC: Codec[TraceItem] = get_topic_codec(Topic.SNUBA_ITEMS)

logger = logging.getLogger(__name__)

if TYPE_CHECKING:
Expand Down
28 changes: 3 additions & 25 deletions src/sentry/replays/lib/eap/write.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@
from arroyo.backends.kafka import KafkaPayload
from django.conf import settings
from google.protobuf.timestamp_pb2 import Timestamp
from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue, ArrayValue, KeyValue, KeyValueList
from sentry_protos.snuba.v1.trace_item_pb2 import TraceItem as EAPTraceItem

from sentry.conf.types.kafka_definition import Topic
from sentry.replays.lib.eap.snuba_transpiler import TRACE_ITEM_TYPE_MAP, TRACE_ITEM_TYPES
from sentry.replays.lib.kafka import EAP_ITEMS_CODEC, eap_producer
from sentry.utils.eap import EAP_ITEMS_CODEC, eap_items_producer, encode_value
from sentry.utils.kafka_config import get_topic_definition

Value = bool | bytes | str | int | float | Sequence["Value"] | MutableMapping[str, "Value"]
Expand All @@ -33,27 +32,6 @@ class TraceItem(TypedDict):


def new_trace_item(trace_item: TraceItem) -> EAPTraceItem:
def _anyvalue(value: Value) -> AnyValue:
if isinstance(value, bool):
return AnyValue(bool_value=value)
elif isinstance(value, str):
return AnyValue(string_value=value)
elif isinstance(value, int):
return AnyValue(int_value=value)
elif isinstance(value, float):
return AnyValue(double_value=value)
elif isinstance(value, bytes):
return AnyValue(bytes_value=value)
elif isinstance(value, list):
return AnyValue(array_value=ArrayValue(values=[_anyvalue(v) for v in value]))
elif isinstance(value, dict):
return AnyValue(
kvlist_value=KeyValueList(
values=[KeyValue(key=k, value=_anyvalue(v)) for k, v in value.items()]
)
)
else:
raise ValueError(f"Invalid value type for AnyValue: {type(value)}")

timestamp = Timestamp()
timestamp.FromDatetime(trace_item["timestamp"])
Expand All @@ -70,7 +48,7 @@ def _anyvalue(value: Value) -> AnyValue:
item_id=trace_item["trace_item_id"],
received=received,
retention_days=trace_item["retention_days"],
attributes={k: _anyvalue(v) for k, v in trace_item["attributes"].items()},
attributes={k: encode_value(v) for k, v in trace_item["attributes"].items()},
client_sample_rate=trace_item["client_sample_rate"],
server_sample_rate=trace_item["server_sample_rate"],
)
Expand Down Expand Up @@ -99,4 +77,4 @@ def write_trace_items(trace_items: list[EAPTraceItem]) -> None:
topic = get_topic_definition(Topic.SNUBA_ITEMS)["real_topic_name"]
for trace_item in trace_items:
payload = KafkaPayload(None, EAP_ITEMS_CODEC.encode(trace_item), [])
eap_producer.produce(ArroyoTopic(topic), payload)
eap_items_producer.produce(ArroyoTopic(topic), payload)
23 changes: 1 addition & 22 deletions src/sentry/replays/lib/kafka.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,11 @@
from arroyo.backends.kafka import KafkaPayload
from arroyo.types import Topic as ArroyoTopic
from sentry_kafka_schemas.codecs import Codec
from sentry_protos.snuba.v1.trace_item_pb2 import TraceItem

from sentry.conf.types.kafka_definition import Topic, get_topic_codec
from sentry.conf.types.kafka_definition import Topic
from sentry.utils.arroyo_producer import SingletonProducer, get_arroyo_producer
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
from sentry.utils.pubsub import KafkaPublisher

#
# EAP PRODUCER
#


EAP_ITEMS_CODEC: Codec[TraceItem] = get_topic_codec(Topic.SNUBA_ITEMS)


def _get_eap_items_producer():
"""Get a Kafka producer for EAP TraceItems."""
return get_arroyo_producer(
name="sentry.replays.lib.kafka.eap_items",
topic=Topic.SNUBA_ITEMS,
)


eap_producer = SingletonProducer(_get_eap_items_producer)


#
# REPLAY PRODUCER
#
Expand Down
35 changes: 9 additions & 26 deletions src/sentry/spans/consumers/process_segments/convert.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
from typing import Any, cast
from typing import cast

import orjson
import sentry_sdk
from google.protobuf.timestamp_pb2 import Timestamp
from sentry_kafka_schemas.schema_types.buffered_segments_v1 import SpanLink
from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType
from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue, TraceItem

from sentry.spans.consumers.process_segments.types import CompatibleSpan

I64_MAX = 2**63 - 1
from sentry.utils.eap import encode_value

FIELD_TO_ATTRIBUTE = {
"end_timestamp": "sentry.end_timestamp_precise",
Expand Down Expand Up @@ -41,7 +39,7 @@ def convert_span_to_item(span: CompatibleSpan) -> TraceItem:
continue
try:
# NOTE: This ignores the `type` field of the attribute itself
attributes[k] = _anyvalue(value)
attributes[k] = encode_value(value, dump_arrays=True)
except Exception:
sentry_sdk.capture_exception()
else:
Expand All @@ -58,12 +56,12 @@ def convert_span_to_item(span: CompatibleSpan) -> TraceItem:

# For `is_segment`, we trust the value written by `flush_segments` over a pre-existing attribute:
if (is_segment := span.get("is_segment")) is not None:
attributes["sentry.is_segment"] = _anyvalue(is_segment)
attributes["sentry.is_segment"] = encode_value(is_segment, dump_arrays=True)

for field_name, attribute_name in FIELD_TO_ATTRIBUTE.items():
attribute = span.get(field_name) # type:ignore[assignment]
if attribute is not None:
attributes[attribute_name] = _anyvalue(attribute)
attributes[attribute_name] = encode_value(attribute, dump_arrays=True)

# Rename some attributes from their sentry-conventions name to what the product currently expects.
# Eventually this should all be handled by deprecation policies in sentry-conventions.
Expand All @@ -83,14 +81,16 @@ def convert_span_to_item(span: CompatibleSpan) -> TraceItem:
try:
if attr in RENAME_ATTRIBUTES:
attr = RENAME_ATTRIBUTES[attr]
attributes[f"sentry._meta.fields.attributes.{attr}"] = _anyvalue({"meta": meta})
attributes[f"sentry._meta.fields.attributes.{attr}"] = encode_value(
{"meta": meta}, dump_arrays=True
)
except Exception:
sentry_sdk.capture_exception()

if links := span.get("links"):
try:
sanitized_links = [_sanitize_span_link(link) for link in links if link is not None]
attributes["sentry.links"] = _anyvalue(sanitized_links)
attributes["sentry.links"] = encode_value(sanitized_links, dump_arrays=True)
except Exception:
sentry_sdk.capture_exception()
attributes["sentry.dropped_links_count"] = AnyValue(int_value=len(links))
Expand All @@ -111,23 +111,6 @@ def convert_span_to_item(span: CompatibleSpan) -> TraceItem:
)


def _anyvalue(value: Any) -> AnyValue:
if isinstance(value, str):
return AnyValue(string_value=value)
elif isinstance(value, bool):
return AnyValue(bool_value=value)
elif isinstance(value, int):
if value > I64_MAX:
return AnyValue(double_value=float(value))
return AnyValue(int_value=value)
elif isinstance(value, float):
return AnyValue(double_value=value)
elif isinstance(value, (list, dict)):
return AnyValue(string_value=orjson.dumps(value).decode())

raise ValueError(f"Unknown value type: {type(value)}")


def _timestamp(value: float) -> Timestamp:
return Timestamp(
seconds=int(value),
Expand Down
35 changes: 11 additions & 24 deletions src/sentry/testutils/cases.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@
from sentry.users.models.useremail import UserEmail
from sentry.utils import json
from sentry.utils.auth import SsoSession
from sentry.utils.eap import encode_value
from sentry.utils.json import dumps_htmlsafe
from sentry.utils.not_set import NOT_SET, NotSet, default_if_not_set
from sentry.utils.samples import load_data
Expand Down Expand Up @@ -3316,20 +3317,6 @@ class _OptionalOurLogData(TypedDict, total=False):
item_id: int


def scalar_to_any_value(value: Any) -> AnyValue:
if isinstance(value, str):
return AnyValue(string_value=value)
if isinstance(value, int):
return AnyValue(int_value=value)
if isinstance(value, float):
return AnyValue(double_value=value)
if isinstance(value, bool):
return AnyValue(bool_value=value)
if isinstance(value, dict):
return AnyValue(**value)
raise Exception(f"cannot convert {value} of type {type(value)} to AnyValue")


def span_to_trace_item(span) -> TraceItem:
client_sample_rate = 1.0
server_sample_rate = 1.0
Expand All @@ -3339,15 +3326,15 @@ def span_to_trace_item(span) -> TraceItem:
for k, v in span.get(field, {}).items():
if v is None:
continue
attributes[k] = scalar_to_any_value(v)
attributes[k] = encode_value(v, expand_arrays=True)

for k, v in span.get("sentry_tags", {}).items():
if v is None:
continue
if k == "description":
k = "normalized_description"

attributes[f"sentry.{k}"] = scalar_to_any_value(v)
attributes[f"sentry.{k}"] = encode_value(v, expand_arrays=True)

for k, v in span.get("measurements", {}).items():
if v is None or v["value"] is None:
Expand All @@ -3357,10 +3344,10 @@ def span_to_trace_item(span) -> TraceItem:
elif k == "server_sample_rate":
server_sample_rate = v["value"]
else:
attributes[k] = scalar_to_any_value(float(v["value"]))
attributes[k] = encode_value(float(v["value"]))

if "description" in span and span["description"] is not None:
description = scalar_to_any_value(span["description"])
description = encode_value(span["description"])
attributes["sentry.raw_description"] = description

for field in {
Expand All @@ -3382,7 +3369,7 @@ def span_to_trace_item(span) -> TraceItem:
double_value=float(is_segment),
)
else:
value = scalar_to_any_value(span[field])
value = encode_value(span[field])
attributes[f"sentry.{field}"] = value

timestamp = Timestamp()
Expand Down Expand Up @@ -3453,10 +3440,10 @@ def create_ourlog(
attributes_proto = {}

for k, v in attributes.items():
attributes_proto[k] = scalar_to_any_value(v)
attributes_proto[k] = encode_value(v, expand_arrays=True)

for k, v in extra_data.items():
attributes_proto[f"sentry.{k}"] = scalar_to_any_value(v)
attributes_proto[f"sentry.{k}"] = encode_value(v, expand_arrays=True)

timestamp_proto = Timestamp()

Expand Down Expand Up @@ -3532,7 +3519,7 @@ def create_trace_metric(

if attributes:
for k, v in attributes.items():
attributes_proto[k] = scalar_to_any_value(v)
attributes_proto[k] = encode_value(v, expand_arrays=True)

return TraceItem(
organization_id=organization.id,
Expand Down Expand Up @@ -3577,7 +3564,7 @@ def create_profile_function(

if attributes:
for k, v in attributes.items():
attributes_proto[k] = scalar_to_any_value(v)
attributes_proto[k] = encode_value(v, expand_arrays=True)

return TraceItem(
organization_id=organization.id,
Expand Down Expand Up @@ -3892,7 +3879,7 @@ def create_eap_uptime_result(
attributes_proto = {}
for k, v in attributes_data.items():
if v is not None:
attributes_proto[k] = scalar_to_any_value(v)
attributes_proto[k] = encode_value(v, expand_arrays=True)

timestamp_proto = Timestamp()
timestamp_proto.FromDatetime(scheduled_check_time)
Expand Down
Loading
Loading