-
-
Notifications
You must be signed in to change notification settings - Fork 4.4k
ref(spans): Detect performance issues directly in segments consumer #86595
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,20 +1,13 @@ | ||
import logging | ||
import uuid | ||
from collections.abc import Sequence, Set | ||
from copy import deepcopy | ||
from typing import Any, cast | ||
|
||
from django.core.exceptions import ValidationError | ||
from sentry_kafka_schemas.schema_types.buffered_segments_v1 import SegmentSpan as SchemaSpan | ||
|
||
from sentry import options | ||
from sentry.event_manager import ( | ||
Job, | ||
ProjectsMapping, | ||
_detect_performance_problems, | ||
_pull_out_data, | ||
_record_transaction_info, | ||
) | ||
from sentry.event_manager import Job, _pull_out_data, _record_transaction_info | ||
from sentry.issues.grouptype import PerformanceStreamedSpansGroupTypeExperimental | ||
from sentry.issues.issue_occurrence import IssueOccurrence | ||
from sentry.issues.producer import PayloadType, produce_occurrence_to_kafka | ||
|
@@ -26,6 +19,7 @@ | |
from sentry.spans.grouping.api import load_span_grouping_config | ||
from sentry.utils import metrics | ||
from sentry.utils.dates import to_datetime | ||
from sentry.utils.performance_issues.performance_detection import detect_performance_problems | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
@@ -37,114 +31,6 @@ class Span(SchemaSpan, total=False): | |
hash: str | None # Added in enrichment | ||
|
||
|
||
@metrics.wraps("save_event.send_occurrence_to_platform") | ||
def _send_occurrence_to_platform(jobs: Sequence[Job], projects: ProjectsMapping) -> None: | ||
for job in jobs: | ||
event = job["event"] | ||
project = event.project | ||
event_id = event.event_id | ||
|
||
performance_problems = job["performance_problems"] | ||
for problem in performance_problems: | ||
occurrence = IssueOccurrence( | ||
id=uuid.uuid4().hex, | ||
resource_id=None, | ||
project_id=project.id, | ||
event_id=event_id, | ||
fingerprint=[problem.fingerprint], | ||
type=problem.type, | ||
issue_title=problem.title, | ||
subtitle=problem.desc, | ||
culprit=event.transaction, | ||
evidence_data=problem.evidence_data, | ||
evidence_display=problem.evidence_display, | ||
detection_time=event.datetime, | ||
level=job["level"], | ||
) | ||
|
||
produce_occurrence_to_kafka( | ||
payload_type=PayloadType.OCCURRENCE, | ||
occurrence=occurrence, | ||
event_data=job["event_data"], | ||
is_buffered_spans=True, | ||
) | ||
|
||
|
||
def build_tree(spans) -> tuple[dict[str, Any], str | None]: | ||
span_tree = {} | ||
root_span_id = None | ||
|
||
for span in spans: | ||
span_id = span["span_id"] | ||
is_root = span["is_segment"] | ||
if is_root: | ||
root_span_id = span_id | ||
if span_id not in span_tree: | ||
span_tree[span_id] = span | ||
span_tree[span_id]["children"] = [] | ||
|
||
for span in span_tree.values(): | ||
parent_id = span.get("parent_span_id") | ||
if parent_id is not None and parent_id in span_tree: | ||
parent_span = span_tree[parent_id] | ||
children = parent_span["children"] | ||
children.append(span) | ||
|
||
return span_tree, root_span_id | ||
|
||
|
||
def dfs(visited, flattened_spans, tree, span_id): | ||
stack = [span_id] | ||
|
||
while len(stack): | ||
span_id = stack.pop() | ||
|
||
span = deepcopy(tree[span_id]) | ||
children = span.pop("children") | ||
|
||
if span_id not in visited: | ||
flattened_spans.append(span) | ||
tree.pop(span_id) | ||
visited.add(span_id) | ||
|
||
for child in sorted(children, key=lambda span: span["start_timestamp"], reverse=True): | ||
if child["span_id"] not in visited: | ||
stack.append(child["span_id"]) | ||
|
||
|
||
def flatten_tree(tree: dict[str, Any], root_span_id: str | None) -> list[Span]: | ||
visited: Set[str] = set() | ||
flattened_spans: list[Span] = [] | ||
|
||
if root_span_id: | ||
dfs(visited, flattened_spans, tree, root_span_id) | ||
|
||
# Catch all for orphan spans | ||
remaining = sorted(tree.items(), key=lambda span: span[1]["start_timestamp"]) | ||
for span_id, _ in remaining: | ||
if span_id not in visited: | ||
dfs(visited, flattened_spans, tree, span_id) | ||
|
||
return flattened_spans | ||
|
||
|
||
def _update_occurrence_group_type(jobs: Sequence[Job], projects: ProjectsMapping) -> None: | ||
""" | ||
Exclusive to the segments consumer: Updates group type and fingerprint of | ||
all performance problems so they don't double write occurrences as we test. | ||
""" | ||
|
||
for job in jobs: | ||
updated_problems = [] | ||
performance_problems = job.pop("performance_problems") | ||
for performance_problem in performance_problems: | ||
performance_problem.type = PerformanceStreamedSpansGroupTypeExperimental | ||
performance_problem.fingerprint = f"{performance_problem.fingerprint}-{PerformanceStreamedSpansGroupTypeExperimental.type_id}" | ||
updated_problems.append(performance_problem) | ||
|
||
job["performance_problems"] = updated_problems | ||
|
||
|
||
def _find_segment_span(spans: list[Span]) -> Span | None: | ||
""" | ||
Finds the segment in the span in the list that has ``is_segment`` set to | ||
|
@@ -213,63 +99,100 @@ def _create_models(segment: Span, project: Project) -> None: | |
) | ||
|
||
|
||
def transform_spans_to_event_dict(segment_span: Span, spans: list[Span]) -> dict[str, Any]: | ||
event_spans: list[dict[str, Any]] = [] | ||
def _detect_performance_problems(segment_span: Span, spans: list[Span], project: Project) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please have a look at what this function does. It now:
I would suggest to keep this for now, until we've disabled issue detection in the old transactions pipeline. That will make it easier for the performance/issues teams to refactor issue detectors on span payloads and create this flow instead:
|
||
if not options.get("standalone-spans.detect-performance-problems.enable"): | ||
return | ||
|
||
event_data = _build_shim_event_data(segment_span, spans) | ||
performance_problems = detect_performance_problems(event_data, project, standalone=True) | ||
|
||
if not options.get("standalone-spans.send-occurrence-to-platform.enable"): | ||
return | ||
|
||
# Prepare a slimmer event payload for the occurrence consumer. This event | ||
# will be persisted by the consumer. Once issue detectors can run on | ||
# standalone spans, we should directly build a minimal occurrence event | ||
# payload here, instead. | ||
event_data["spans"] = [] | ||
event_data["timestamp"] = event_data["datetime"] | ||
|
||
for problem in performance_problems: | ||
problem.type = PerformanceStreamedSpansGroupTypeExperimental | ||
problem.fingerprint = ( | ||
f"{problem.fingerprint}-{PerformanceStreamedSpansGroupTypeExperimental.type_id}" | ||
) | ||
|
||
occurrence = IssueOccurrence( | ||
id=uuid.uuid4().hex, | ||
resource_id=None, | ||
project_id=project.id, | ||
event_id=event_data["event_id"], | ||
fingerprint=[problem.fingerprint], | ||
type=problem.type, | ||
issue_title=problem.title, | ||
subtitle=problem.desc, | ||
culprit=event_data["transaction"], | ||
evidence_data=problem.evidence_data or {}, | ||
evidence_display=problem.evidence_display, | ||
detection_time=to_datetime(segment_span["end_timestamp_precise"]), | ||
level="info", | ||
) | ||
|
||
produce_occurrence_to_kafka( | ||
payload_type=PayloadType.OCCURRENCE, | ||
occurrence=occurrence, | ||
event_data=event_data, | ||
is_buffered_spans=True, | ||
Comment on lines
+144
to
+145
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This sends the trimmed down event payload along with the Note that the issues are still created with |
||
) | ||
|
||
|
||
def _build_shim_event_data(segment_span: Span, spans: list[Span]) -> dict[str, Any]: | ||
sentry_tags = segment_span.get("sentry_tags", {}) | ||
|
||
event: dict[str, Any] = {"type": "transaction", "contexts": {}, "level": "info"} | ||
event["event_id"] = segment_span.get("event_id") | ||
event["project_id"] = segment_span["project_id"] | ||
event["transaction"] = sentry_tags.get("transaction") | ||
event["release"] = sentry_tags.get("release") | ||
event["dist"] = sentry_tags.get("dist") | ||
event["environment"] = sentry_tags.get("environment") | ||
event["platform"] = sentry_tags.get("platform") | ||
event["tags"] = [["environment", sentry_tags.get("environment")]] | ||
|
||
event["contexts"]["trace"] = { | ||
"trace_id": segment_span["trace_id"], | ||
"type": "trace", | ||
"op": sentry_tags.get("transaction.op"), | ||
"span_id": segment_span["span_id"], | ||
"hash": segment_span["hash"], | ||
event: dict[str, Any] = { | ||
"type": "transaction", | ||
"level": "info", | ||
"contexts": { | ||
"trace": { | ||
"trace_id": segment_span["trace_id"], | ||
"type": "trace", | ||
"op": sentry_tags.get("transaction.op"), | ||
"span_id": segment_span["span_id"], | ||
"hash": segment_span["hash"], | ||
}, | ||
}, | ||
"event_id": uuid.uuid4().hex, | ||
"project_id": segment_span["project_id"], | ||
"transaction": sentry_tags.get("transaction"), | ||
"release": sentry_tags.get("release"), | ||
"dist": sentry_tags.get("dist"), | ||
"environment": sentry_tags.get("environment"), | ||
"platform": sentry_tags.get("platform"), | ||
"tags": [["environment", sentry_tags.get("environment")]], | ||
"received": segment_span["received"], | ||
"timestamp": segment_span["end_timestamp_precise"], | ||
"start_timestamp": segment_span["start_timestamp_precise"], | ||
"datetime": to_datetime(segment_span["end_timestamp_precise"]).strftime( | ||
"%Y-%m-%dT%H:%M:%SZ" | ||
), | ||
"spans": [], | ||
} | ||
|
||
if (profile_id := segment_span.get("profile_id")) is not None: | ||
event["contexts"]["profile"] = {"profile_id": profile_id, "type": "profile"} | ||
|
||
# Add legacy span attributes required only by issue detectors. As opposed to | ||
# real event payloads, this also adds the segment span so detectors can run | ||
# topological sorting on the span tree. | ||
for span in spans: | ||
event_span = cast(dict[str, Any], deepcopy(span)) | ||
event_span["start_timestamp"] = span["start_timestamp_ms"] / 1000 | ||
event_span["timestamp"] = (span["start_timestamp_ms"] + span["duration_ms"]) / 1000 | ||
event_spans.append(event_span) | ||
|
||
# The performance detectors expect the span list to be ordered/flattened in the way they | ||
# are structured in the tree. This is an implicit assumption in the performance detectors. | ||
# So we build a tree and flatten it depth first. | ||
# TODO: See if we can update the detectors to work without this assumption so we can | ||
# just pass it a list of spans. | ||
tree, root_span_id = build_tree(event_spans) | ||
flattened_spans = flatten_tree(tree, root_span_id) | ||
event["spans"] = flattened_spans | ||
|
||
root_span = flattened_spans[0] | ||
event["received"] = root_span["received"] | ||
event["timestamp"] = (root_span["start_timestamp_ms"] + root_span["duration_ms"]) / 1000 | ||
event["start_timestamp"] = root_span["start_timestamp_ms"] / 1000 | ||
event["datetime"] = to_datetime(event["timestamp"]).strftime("%Y-%m-%dT%H:%M:%SZ") | ||
event_span["start_timestamp"] = span["start_timestamp_precise"] | ||
event_span["timestamp"] = span["end_timestamp_precise"] | ||
Comment on lines
+189
to
+190
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Issue detectors don't read the
@mjq Is there already a definition of how spans should look wrt this? |
||
event["spans"].append(event_span) | ||
|
||
return event | ||
|
||
|
||
def prepare_event_for_occurrence_consumer(event): | ||
event_light = deepcopy(event) | ||
event_light["spans"] = [] | ||
event_light["timestamp"] = event["datetime"] | ||
return event_light | ||
|
||
|
||
def process_segment(spans: list[Span]) -> list[Span]: | ||
segment_span = _find_segment_span(spans) | ||
if segment_span is None: | ||
|
@@ -293,43 +216,27 @@ def process_segment(spans: list[Span]) -> list[Span]: | |
# - [ ] _materialize_metadata_many | ||
# - [X] _get_or_create_environment_many -> _create_models | ||
# - [X] _get_or_create_release_associated_models -> _create_models | ||
# - [X] _tsdb_record_all_metrics | ||
# - [ ] _tsdb_record_all_metrics | ||
# - [ ] _materialize_event_metrics | ||
# - [ ] _nodestore_save_many | ||
# - [ ] _eventstream_insert_many | ||
# - [ ] _track_outcome_accepted_many | ||
# - [X] _detect_performance_problems | ||
# - [X] _send_occurrence_to_platform | ||
# - [X] _detect_performance_problems -> _detect_performance_problems | ||
# - [X] _send_occurrence_to_platform -> _detect_performance_problems | ||
# - [X] _record_transaction_info | ||
|
||
_enrich_spans(segment_span, spans) | ||
_create_models(segment_span, project) | ||
_detect_performance_problems(segment_span, spans, project) | ||
|
||
# XXX: Below are old-style functions imported from EventManager that rely on | ||
# the Event schema: | ||
|
||
event = transform_spans_to_event_dict(segment_span, spans) | ||
event_light = prepare_event_for_occurrence_consumer(event) | ||
event = _build_shim_event_data(segment_span, spans) | ||
projects = {project.id: project} | ||
job: Job = {"data": event, "project_id": project.id, "raw": False, "start_time": None} | ||
|
||
jobs: Sequence[Job] = [ | ||
{ | ||
"data": event, | ||
"project_id": project.id, | ||
"raw": False, | ||
"start_time": None, | ||
"event_data": event_light, | ||
} | ||
] | ||
|
||
_pull_out_data(jobs, projects) | ||
|
||
if options.get("standalone-spans.detect-performance-problems.enable"): | ||
_detect_performance_problems(jobs, projects, is_standalone_spans=True) | ||
_update_occurrence_group_type(jobs, projects) # NB: exclusive to spans consumer | ||
if options.get("standalone-spans.send-occurrence-to-platform.enable"): | ||
_send_occurrence_to_platform(jobs, projects) | ||
|
||
_record_transaction_info(jobs, projects) | ||
_pull_out_data([job], projects) | ||
_record_transaction_info([job], projects) | ||
|
||
return spans |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We no longer need the
is_standalone_spans
flag here since we don't call this from the consumer anymore.