Skip to content

feat(replays): Emit breadcrumbs to EAP #94648

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 15 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ def as_log_message(event: dict[str, Any]) -> str | None:
return None
case EventType.OPTIONS:
return None
case EventType.MEMORY:
return None


def make_seer_request(request_data: str) -> bytes:
Expand Down
24 changes: 23 additions & 1 deletion src/sentry/replays/lib/kafka.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,29 @@
from sentry.conf.types.kafka_definition import Topic
from arroyo.backends.kafka import KafkaProducer, build_kafka_configuration
from sentry_kafka_schemas.codecs import Codec
from sentry_protos.snuba.v1.trace_item_pb2 import TraceItem

from sentry.conf.types.kafka_definition import Topic, get_topic_codec
from sentry.utils.arroyo_producer import SingletonProducer
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
from sentry.utils.pubsub import KafkaPublisher

# EAP PRODUCER

EAP_ITEMS_CODEC: Codec[TraceItem] = get_topic_codec(Topic.SNUBA_ITEMS)


def _get_eap_items_producer() -> KafkaProducer:
"""Get a Kafka producer for EAP TraceItems."""
cluster_name = get_topic_definition(Topic.SNUBA_ITEMS)["cluster"]
producer_config = get_kafka_producer_cluster_options(cluster_name)
producer_config.pop("compression.type", None)
return KafkaProducer(build_kafka_configuration(default_config=producer_config))


eap_producer = SingletonProducer(_get_eap_items_producer)

# REPLAY PRODUCERS

# We keep a synchronous and asynchronous singleton because a shared singleton could lead
# to synchronous publishing when asynchronous publishing was desired and vice-versa.
sync_publisher: KafkaPublisher | None = None
Expand Down
15 changes: 15 additions & 0 deletions src/sentry/replays/usecases/ingest/event_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,13 @@
from typing import Any, TypedDict

import sentry_sdk
from arroyo import Topic as ArroyoTopic
from arroyo.backends.kafka import KafkaPayload
from sentry_protos.snuba.v1.trace_item_pb2 import TraceItem

from sentry.conf.types.kafka_definition import Topic
from sentry.models.project import Project
from sentry.replays.lib.kafka import EAP_ITEMS_CODEC, eap_producer
from sentry.replays.usecases.ingest.dom_index import (
ReplayActionsEvent,
ReplayActionsEventPayload,
Expand All @@ -18,6 +23,7 @@
report_rage_click_issue_with_replay_event,
)
from sentry.utils import json, metrics
from sentry.utils.kafka_config import get_topic_definition

logger = logging.getLogger()

Expand Down Expand Up @@ -229,6 +235,15 @@ def report_rage_click(
)


@sentry_sdk.trace
def emit_trace_items_to_eap(trace_items: list[TraceItem]) -> None:
"""Emit trace-items to EAP."""
topic = get_topic_definition(Topic.SNUBA_ITEMS)["real_topic_name"]
for trace_item in trace_items:
payload = KafkaPayload(None, EAP_ITEMS_CODEC.encode(trace_item), [])
eap_producer.produce(ArroyoTopic(topic), payload)


@sentry_sdk.trace
def _should_report_hydration_error_issue(project: Project) -> bool:
"""
Expand Down
250 changes: 250 additions & 0 deletions src/sentry/replays/usecases/ingest/event_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@

import logging
import random
import uuid
from collections.abc import Iterator, MutableMapping
from dataclasses import dataclass
from enum import Enum
from typing import Any, TypedDict

import sentry_sdk
from google.protobuf.timestamp_pb2 import Timestamp
from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType
from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue, TraceItem

from sentry.utils import json

Expand Down Expand Up @@ -76,6 +81,8 @@ class EventType(Enum):
UNKNOWN = 13
CANVAS = 14
OPTIONS = 15
FEEDBACK = 16
MEMORY = 17


def which(event: dict[str, Any]) -> EventType:
Expand Down Expand Up @@ -151,6 +158,8 @@ def which(event: dict[str, Any]) -> EventType:
return EventType.FCP
else:
return EventType.UNKNOWN
elif op == "memory":
return EventType.MEMORY
else:
return EventType.UNKNOWN
elif event["data"]["tag"] == "options":
Expand All @@ -166,6 +175,247 @@ def which(event: dict[str, Any]) -> EventType:
return EventType.UNKNOWN


def which_iter(events: list[dict[str, Any]]) -> Iterator[tuple[EventType, dict[str, Any]]]:
for event in events:
yield (which(event), event)


class MessageContext(TypedDict):
organization_id: int
project_id: int
received: float
retention_days: int
trace_id: str | None
replay_id: str
segment_id: int


class TraceItemContext(TypedDict):
attributes: MutableMapping[str, str | int | bool | float]
event_hash: bytes
timestamp: float


def iter_trace_items(context: MessageContext, events: list[dict[str, Any]]) -> Iterator[TraceItem]:
for event_type, event in which_iter(events):
try:
trace_item = as_trace_item(context, event_type, event)
except (KeyError, TypeError, ValueError) as e:
logger.warning("Could not transform breadcrumb to trace-item", exc_info=e)
continue

if trace_item:
yield trace_item


def as_trace_item(
context: MessageContext, event_type: EventType, event: dict[str, Any]
) -> TraceItem | None:
def _anyvalue(value: bool | str | int | float) -> AnyValue:
if isinstance(value, bool):
return AnyValue(bool_value=value)
elif isinstance(value, str):
return AnyValue(string_value=value)
elif isinstance(value, int):
return AnyValue(int_value=value)
elif isinstance(value, float):
return AnyValue(double_value=value)
else:
raise ValueError(f"Invalid value type for AnyValue: {type(value)}")

trace_item_context = as_trace_item_context(event_type, event)

# Not every event produces a trace-item.
if trace_item_context is None:
return None

# Extend the attributes with the replay_id to make it queryable by replay_id after we
# eventually use the trace_id in its rightful position.
trace_item_context["attributes"]["replay_id"] = context["replay_id"]

timestamp = Timestamp()
timestamp.FromMilliseconds(int(trace_item_context["timestamp"] * 1000))

received = Timestamp()
received.FromSeconds(int(context["received"]))

return TraceItem(
organization_id=context["organization_id"],
project_id=context["project_id"],
trace_id=context["trace_id"] or context["replay_id"],
item_id=trace_item_context["event_hash"],
item_type=TraceItemType.TRACE_ITEM_TYPE_REPLAY,
timestamp=timestamp,
attributes={k: _anyvalue(v) for k, v in trace_item_context["attributes"].items()},
client_sample_rate=1.0,
server_sample_rate=1.0,
retention_days=context["retention_days"],
received=received,
)


def as_trace_item_context(event_type: EventType, event: dict[str, Any]) -> TraceItemContext | None:
"""Returns a trace-item row or null for each event."""
match event_type:
case EventType.CLICK | EventType.DEAD_CLICK | EventType.RAGE_CLICK:
payload = event["data"]["payload"]

node = payload["data"]["node"]
node_attributes = node.get("attributes", {})
click_attributes = {
"node_id": int(node["id"]),
"tag": to_string(node["tagName"]),
"text": to_string(node["textContent"][:1024]),
"is_dead": event_type in (EventType.DEAD_CLICK, EventType.RAGE_CLICK),
"is_rage": event_type == EventType.RAGE_CLICK,
"selector": to_string(payload["message"]),
"category": "ui.click",
}
if "alt" in node_attributes:
click_attributes["alt"] = to_string(node_attributes["alt"])
if "aria-label" in node_attributes:
click_attributes["aria_label"] = to_string(node_attributes["aria-label"])
if "class" in node_attributes:
click_attributes["class"] = to_string(node_attributes["class"])
if "data-sentry-component" in node_attributes:
click_attributes["component_name"] = to_string(
node_attributes["data-sentry-component"]
)
if "id" in node_attributes:
click_attributes["id"] = to_string(node_attributes["id"])
if "role" in node_attributes:
click_attributes["role"] = to_string(node_attributes["role"])
if "title" in node_attributes:
click_attributes["title"] = to_string(node_attributes["title"])
if _get_testid(node_attributes):
click_attributes["testid"] = _get_testid(node_attributes)
if "url" in payload:
click_attributes["url"] = to_string(payload["url"])

return {
"attributes": click_attributes, # type: ignore[typeddict-item]
"event_hash": uuid.uuid4().bytes,
"timestamp": float(payload["timestamp"]),
}
case EventType.NAVIGATION:
payload = event["data"]["payload"]
payload_data = payload["data"]

navigation_attributes = {"category": "navigation"}
if "from" in payload_data:
navigation_attributes["from"] = to_string(payload_data["from"])
if "to" in payload_data:
navigation_attributes["to"] = to_string(payload_data["to"])

return {
"attributes": navigation_attributes, # type: ignore[typeddict-item]
"event_hash": uuid.uuid4().bytes,
"timestamp": float(payload["timestamp"]),
}
case EventType.CONSOLE:
return None
case EventType.UI_BLUR:
return None
case EventType.UI_FOCUS:
return None
case EventType.RESOURCE_FETCH | EventType.RESOURCE_XHR:
resource_attributes = {
"category": (
"resource.xhr" if event_type == EventType.RESOURCE_XHR else "resource.fetch"
),
}

request_size, response_size = parse_network_content_lengths(event)
if request_size:
resource_attributes["request_size"] = request_size # type: ignore[assignment]
if response_size:
resource_attributes["response_size"] = response_size # type: ignore[assignment]

return {
"attributes": resource_attributes, # type: ignore[typeddict-item]
"event_hash": uuid.uuid4().bytes,
"timestamp": float(event["data"]["payload"]["timestamp"]),
}
case EventType.LCP | EventType.FCP:
payload = event["data"]["payload"]
return {
"attributes": {
"category": "web-vital.fcp" if event_type == EventType.FCP else "web-vital.lcp",
"rating": to_string(payload["data"]["rating"]),
"size": int(payload["data"]["size"]),
"value": int(payload["data"]["value"]),
},
"event_hash": uuid.uuid4().bytes,
"timestamp": float(payload["timestamp"]),
}
case EventType.HYDRATION_ERROR:
payload = event["data"]["payload"]
return {
"attributes": {
"category": "replay.hydrate-error",
"url": to_string(payload["data"]["url"]),
},
"event_hash": uuid.uuid4().bytes,
"timestamp": float(event["data"]["payload"]["timestamp"]),
}
case EventType.MUTATIONS:
payload = event["data"]["payload"]
return {
"attributes": {
"category": "replay.mutations",
"count": int(payload["data"]["count"]),
},
"event_hash": uuid.uuid4().bytes,
"timestamp": event["timestamp"],
}
case EventType.UNKNOWN:
return None
case EventType.CANVAS:
return None
case EventType.OPTIONS:
payload = event["data"]["payload"]
return {
"attributes": {
"category": "sdk.options",
"shouldRecordCanvas": bool(payload["shouldRecordCanvas"]),
"sessionSampleRate": float(payload["sessionSampleRate"]),
"errorSampleRate": float(payload["errorSampleRate"]),
"useCompressionOption": bool(payload["useCompressionOption"]),
"blockAllMedia": bool(payload["blockAllMedia"]),
"maskAllText": bool(payload["maskAllText"]),
"maskAllInputs": bool(payload["maskAllInputs"]),
"useCompression": bool(payload["useCompression"]),
"networkDetailHasUrls": bool(payload["networkDetailHasUrls"]),
"networkCaptureBodies": bool(payload["networkCaptureBodies"]),
"networkRequestHasHeaders": bool(payload["networkRequestHasHeaders"]),
"networkResponseHasHeaders": bool(payload["networkResponseHasHeaders"]),
},
"event_hash": uuid.uuid4().bytes,
"timestamp": event["timestamp"] / 1000,
}
case EventType.FEEDBACK:
return None
case EventType.MEMORY:
payload = event["data"]["payload"]
return {
"attributes": {
"category": "memory",
"jsHeapSizeLimit": int(payload["data"]["jsHeapSizeLimit"]),
"totalJSHeapSize": int(payload["data"]["totalJSHeapSize"]),
"usedJSHeapSize": int(payload["data"]["usedJSHeapSize"]),
"endTimestamp": float(payload["endTimestamp"]),
},
"event_hash": uuid.uuid4().bytes,
"timestamp": float(payload["startTimestamp"]),
}


def to_string(value: Any) -> str:
if isinstance(value, str):
return value
raise ValueError("Value was not a string.")


class HighlightedEvents(TypedDict, total=False):
canvas_sizes: list[int]
hydration_errors: list[HydrationError]
Expand Down
Loading
Loading