Skip to content

ref(spans): Detect performance issues directly in segments consumer #86595

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/sentry/event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2451,12 +2451,10 @@ def _calculate_span_grouping(jobs: Sequence[Job], projects: ProjectsMapping) ->


@sentry_sdk.tracing.trace
def _detect_performance_problems(
jobs: Sequence[Job], projects: ProjectsMapping, is_standalone_spans: bool = False
) -> None:
def _detect_performance_problems(jobs: Sequence[Job], projects: ProjectsMapping) -> None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer need the is_standalone_spans flag here since we don't call this from the consumer anymore.

for job in jobs:
job["performance_problems"] = detect_performance_problems(
job["data"], projects[job["project_id"]], is_standalone_spans=is_standalone_spans
job["data"], projects[job["project_id"]]
)


Expand Down
273 changes: 90 additions & 183 deletions src/sentry/spans/consumers/process_segments/message.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
import logging
import uuid
from collections.abc import Sequence, Set
from copy import deepcopy
from typing import Any, cast

from django.core.exceptions import ValidationError
from sentry_kafka_schemas.schema_types.buffered_segments_v1 import SegmentSpan as SchemaSpan

from sentry import options
from sentry.event_manager import (
Job,
ProjectsMapping,
_detect_performance_problems,
_pull_out_data,
_record_transaction_info,
)
from sentry.event_manager import Job, _pull_out_data, _record_transaction_info
from sentry.issues.grouptype import PerformanceStreamedSpansGroupTypeExperimental
from sentry.issues.issue_occurrence import IssueOccurrence
from sentry.issues.producer import PayloadType, produce_occurrence_to_kafka
Expand All @@ -26,6 +19,7 @@
from sentry.spans.grouping.api import load_span_grouping_config
from sentry.utils import metrics
from sentry.utils.dates import to_datetime
from sentry.utils.performance_issues.performance_detection import detect_performance_problems

logger = logging.getLogger(__name__)

Expand All @@ -37,114 +31,6 @@ class Span(SchemaSpan, total=False):
hash: str | None # Added in enrichment


@metrics.wraps("save_event.send_occurrence_to_platform")
def _send_occurrence_to_platform(jobs: Sequence[Job], projects: ProjectsMapping) -> None:
for job in jobs:
event = job["event"]
project = event.project
event_id = event.event_id

performance_problems = job["performance_problems"]
for problem in performance_problems:
occurrence = IssueOccurrence(
id=uuid.uuid4().hex,
resource_id=None,
project_id=project.id,
event_id=event_id,
fingerprint=[problem.fingerprint],
type=problem.type,
issue_title=problem.title,
subtitle=problem.desc,
culprit=event.transaction,
evidence_data=problem.evidence_data,
evidence_display=problem.evidence_display,
detection_time=event.datetime,
level=job["level"],
)

produce_occurrence_to_kafka(
payload_type=PayloadType.OCCURRENCE,
occurrence=occurrence,
event_data=job["event_data"],
is_buffered_spans=True,
)


def build_tree(spans) -> tuple[dict[str, Any], str | None]:
span_tree = {}
root_span_id = None

for span in spans:
span_id = span["span_id"]
is_root = span["is_segment"]
if is_root:
root_span_id = span_id
if span_id not in span_tree:
span_tree[span_id] = span
span_tree[span_id]["children"] = []

for span in span_tree.values():
parent_id = span.get("parent_span_id")
if parent_id is not None and parent_id in span_tree:
parent_span = span_tree[parent_id]
children = parent_span["children"]
children.append(span)

return span_tree, root_span_id


def dfs(visited, flattened_spans, tree, span_id):
stack = [span_id]

while len(stack):
span_id = stack.pop()

span = deepcopy(tree[span_id])
children = span.pop("children")

if span_id not in visited:
flattened_spans.append(span)
tree.pop(span_id)
visited.add(span_id)

for child in sorted(children, key=lambda span: span["start_timestamp"], reverse=True):
if child["span_id"] not in visited:
stack.append(child["span_id"])


def flatten_tree(tree: dict[str, Any], root_span_id: str | None) -> list[Span]:
visited: Set[str] = set()
flattened_spans: list[Span] = []

if root_span_id:
dfs(visited, flattened_spans, tree, root_span_id)

# Catch all for orphan spans
remaining = sorted(tree.items(), key=lambda span: span[1]["start_timestamp"])
for span_id, _ in remaining:
if span_id not in visited:
dfs(visited, flattened_spans, tree, span_id)

return flattened_spans


def _update_occurrence_group_type(jobs: Sequence[Job], projects: ProjectsMapping) -> None:
"""
Exclusive to the segments consumer: Updates group type and fingerprint of
all performance problems so they don't double write occurrences as we test.
"""

for job in jobs:
updated_problems = []
performance_problems = job.pop("performance_problems")
for performance_problem in performance_problems:
performance_problem.type = PerformanceStreamedSpansGroupTypeExperimental
performance_problem.fingerprint = f"{performance_problem.fingerprint}-{PerformanceStreamedSpansGroupTypeExperimental.type_id}"
updated_problems.append(performance_problem)

job["performance_problems"] = updated_problems


def _find_segment_span(spans: list[Span]) -> Span | None:
"""
Finds the segment in the span in the list that has ``is_segment`` set to
Expand Down Expand Up @@ -213,63 +99,100 @@ def _create_models(segment: Span, project: Project) -> None:
)


def transform_spans_to_event_dict(segment_span: Span, spans: list[Span]) -> dict[str, Any]:
event_spans: list[dict[str, Any]] = []
def _detect_performance_problems(segment_span: Span, spans: list[Span], project: Project) -> None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please have a look at what this function does. It now:

  • Creates an event payload with as many fields as possible populated.
  • Runs that event payload through issue detectors.
  • Trims the event payload to a smaller subset.
  • Creates an issue occurrence and sends it into the issue platform.

I would suggest to keep this for now, until we've disabled issue detection in the old transactions pipeline. That will make it easier for the performance/issues teams to refactor issue detectors on span payloads and create this flow instead:

  • Run the segment's spans directly through issue detectors. Internally, detectors pull top-level attributes (like "browser.name") from the segment span.
  • Create a minimal event payload for the issue occurrence
  • Send it to the issue platform

if not options.get("standalone-spans.detect-performance-problems.enable"):
return

event_data = _build_shim_event_data(segment_span, spans)
performance_problems = detect_performance_problems(event_data, project, standalone=True)

if not options.get("standalone-spans.send-occurrence-to-platform.enable"):
return

# Prepare a slimmer event payload for the occurrence consumer. This event
# will be persisted by the consumer. Once issue detectors can run on
# standalone spans, we should directly build a minimal occurrence event
# payload here, instead.
event_data["spans"] = []
event_data["timestamp"] = event_data["datetime"]

for problem in performance_problems:
problem.type = PerformanceStreamedSpansGroupTypeExperimental
problem.fingerprint = (
f"{problem.fingerprint}-{PerformanceStreamedSpansGroupTypeExperimental.type_id}"
)

occurrence = IssueOccurrence(
id=uuid.uuid4().hex,
resource_id=None,
project_id=project.id,
event_id=event_data["event_id"],
fingerprint=[problem.fingerprint],
type=problem.type,
issue_title=problem.title,
subtitle=problem.desc,
culprit=event_data["transaction"],
evidence_data=problem.evidence_data or {},
evidence_display=problem.evidence_display,
detection_time=to_datetime(segment_span["end_timestamp_precise"]),
level="info",
)

produce_occurrence_to_kafka(
payload_type=PayloadType.OCCURRENCE,
occurrence=occurrence,
event_data=event_data,
is_buffered_spans=True,
Comment on lines +144 to +145
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sends the trimmed down event payload along with the is_buffered_spans flag into the issue platform. @mrduncan we can further trim down the payload based on what is needed for the created issues to work.

Note that the issues are still created with PerformanceStreamedSpansGroupTypeExperimental - I didn't check what exactly this type does different. I just noticed that when I remove it, at least in my test there are no more occurrences created.

)


def _build_shim_event_data(segment_span: Span, spans: list[Span]) -> dict[str, Any]:
sentry_tags = segment_span.get("sentry_tags", {})

event: dict[str, Any] = {"type": "transaction", "contexts": {}, "level": "info"}
event["event_id"] = segment_span.get("event_id")
event["project_id"] = segment_span["project_id"]
event["transaction"] = sentry_tags.get("transaction")
event["release"] = sentry_tags.get("release")
event["dist"] = sentry_tags.get("dist")
event["environment"] = sentry_tags.get("environment")
event["platform"] = sentry_tags.get("platform")
event["tags"] = [["environment", sentry_tags.get("environment")]]

event["contexts"]["trace"] = {
"trace_id": segment_span["trace_id"],
"type": "trace",
"op": sentry_tags.get("transaction.op"),
"span_id": segment_span["span_id"],
"hash": segment_span["hash"],
event: dict[str, Any] = {
"type": "transaction",
"level": "info",
"contexts": {
"trace": {
"trace_id": segment_span["trace_id"],
"type": "trace",
"op": sentry_tags.get("transaction.op"),
"span_id": segment_span["span_id"],
"hash": segment_span["hash"],
},
},
"event_id": uuid.uuid4().hex,
"project_id": segment_span["project_id"],
"transaction": sentry_tags.get("transaction"),
"release": sentry_tags.get("release"),
"dist": sentry_tags.get("dist"),
"environment": sentry_tags.get("environment"),
"platform": sentry_tags.get("platform"),
"tags": [["environment", sentry_tags.get("environment")]],
"received": segment_span["received"],
"timestamp": segment_span["end_timestamp_precise"],
"start_timestamp": segment_span["start_timestamp_precise"],
"datetime": to_datetime(segment_span["end_timestamp_precise"]).strftime(
"%Y-%m-%dT%H:%M:%SZ"
),
"spans": [],
}

if (profile_id := segment_span.get("profile_id")) is not None:
event["contexts"]["profile"] = {"profile_id": profile_id, "type": "profile"}

# Add legacy span attributes required only by issue detectors. As opposed to
# real event payloads, this also adds the segment span so detectors can run
# topological sorting on the span tree.
for span in spans:
event_span = cast(dict[str, Any], deepcopy(span))
event_span["start_timestamp"] = span["start_timestamp_ms"] / 1000
event_span["timestamp"] = (span["start_timestamp_ms"] + span["duration_ms"]) / 1000
event_spans.append(event_span)

# The performance detectors expect the span list to be ordered/flattened in the way they
# are structured in the tree. This is an implicit assumption in the performance detectors.
# So we build a tree and flatten it depth first.
# TODO: See if we can update the detectors to work without this assumption so we can
# just pass it a list of spans.
tree, root_span_id = build_tree(event_spans)
flattened_spans = flatten_tree(tree, root_span_id)
event["spans"] = flattened_spans

root_span = flattened_spans[0]
event["received"] = root_span["received"]
event["timestamp"] = (root_span["start_timestamp_ms"] + root_span["duration_ms"]) / 1000
event["start_timestamp"] = root_span["start_timestamp_ms"] / 1000
event["datetime"] = to_datetime(event["timestamp"]).strftime("%Y-%m-%dT%H:%M:%SZ")
event_span["start_timestamp"] = span["start_timestamp_precise"]
event_span["timestamp"] = span["end_timestamp_precise"]
Comment on lines +189 to +190
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue detectors don't read the _precise timestamps from standalone spans yet, since Relay doesn't produce them for embedded spans in transaction events.

  1. We can inject the _precise fields into Event spans in Relay so that we can update detectors and get rid of this modification here.
  2. Alternatively, we always create timestamp and start_timestamp and operate on those in the entire pipeline + we pull the attributes from there in EAP ingest.

@mjq Is there already a definition of how spans should look wrt this?

event["spans"].append(event_span)

return event


def prepare_event_for_occurrence_consumer(event):
event_light = deepcopy(event)
event_light["spans"] = []
event_light["timestamp"] = event["datetime"]
return event_light


def process_segment(spans: list[Span]) -> list[Span]:
segment_span = _find_segment_span(spans)
if segment_span is None:
Expand All @@ -293,43 +216,27 @@ def process_segment(spans: list[Span]) -> list[Span]:
# - [ ] _materialize_metadata_many
# - [X] _get_or_create_environment_many -> _create_models
# - [X] _get_or_create_release_associated_models -> _create_models
# - [X] _tsdb_record_all_metrics
# - [ ] _tsdb_record_all_metrics
# - [ ] _materialize_event_metrics
# - [ ] _nodestore_save_many
# - [ ] _eventstream_insert_many
# - [ ] _track_outcome_accepted_many
# - [X] _detect_performance_problems
# - [X] _send_occurrence_to_platform
# - [X] _detect_performance_problems -> _detect_performance_problems
# - [X] _send_occurrence_to_platform -> _detect_performance_problems
# - [X] _record_transaction_info

_enrich_spans(segment_span, spans)
_create_models(segment_span, project)
_detect_performance_problems(segment_span, spans, project)

# XXX: Below are old-style functions imported from EventManager that rely on
# the Event schema:

event = transform_spans_to_event_dict(segment_span, spans)
event_light = prepare_event_for_occurrence_consumer(event)
event = _build_shim_event_data(segment_span, spans)
projects = {project.id: project}
job: Job = {"data": event, "project_id": project.id, "raw": False, "start_time": None}

jobs: Sequence[Job] = [
{
"data": event,
"project_id": project.id,
"raw": False,
"start_time": None,
"event_data": event_light,
}
]

_pull_out_data(jobs, projects)

if options.get("standalone-spans.detect-performance-problems.enable"):
_detect_performance_problems(jobs, projects, is_standalone_spans=True)
_update_occurrence_group_type(jobs, projects) # NB: exclusive to spans consumer
if options.get("standalone-spans.send-occurrence-to-platform.enable"):
_send_occurrence_to_platform(jobs, projects)

_record_transaction_info(jobs, projects)
_pull_out_data([job], projects)
_record_transaction_info([job], projects)

return spans
6 changes: 2 additions & 4 deletions src/sentry/testutils/cases.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,11 +577,9 @@ def create_performance_issue(
perf_event_manager.normalize()

def detect_performance_problems_interceptor(
data: Event, project: Project, is_standalone_spans: bool = False
data: Event, project: Project, standalone: bool = False
):
perf_problems = detect_performance_problems(
data, project, is_standalone_spans=is_standalone_spans
)
perf_problems = detect_performance_problems(data, project, standalone=standalone)
if fingerprint:
for perf_problem in perf_problems:
perf_problem.fingerprint = fingerprint
Expand Down
Loading
Loading