Skip to content

ref(alerts): Pull code out into smaller functions #94654

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 145 additions & 96 deletions src/sentry/incidents/subscription_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
from sentry.utils.dates import to_datetime
from sentry.workflow_engine.models import DataPacket, Detector
from sentry.workflow_engine.processors.data_packet import process_data_packets
from sentry.workflow_engine.types import DetectorEvaluationResult, DetectorGroupKey

logger = logging.getLogger(__name__)
REDIS_TTL = int(timedelta(days=7).total_seconds())
Expand Down Expand Up @@ -235,7 +236,7 @@ def get_crash_rate_alert_metrics_aggregation_value(
return aggregation_value

def get_aggregation_value(
self, subscription_update: QuerySubscriptionUpdate, comparison_delta: int | None = None
self, subscription_update: QuerySubscriptionUpdate, comparison_delta: int | None
) -> float | None:
if self.subscription.snuba_query.dataset == Dataset.Metrics.value:
aggregation_value = self.get_crash_rate_alert_metrics_aggregation_value(
Expand Down Expand Up @@ -268,24 +269,159 @@ def handle_trigger_anomalies(
tags={"detection_type": self.alert_rule.detection_type},
)
incident_trigger = self.trigger_alert_threshold(trigger, aggregation_value)
if features.has(
"organizations:workflow-engine-metric-alert-dual-processing-logs",
self.subscription.project.organization,
):
logger.info(
"Firing dynamic rule",
extra={
"rule_id": self.alert_rule.id,
"aggregation_value": aggregation_value,
},
)
if incident_trigger is not None:
fired_incident_triggers.append(incident_trigger)
else:
self.trigger_alert_counts[trigger.id] = 0

if not has_anomaly and self.active_incident and trigger_matches_status:
incident_trigger = self.trigger_resolve_threshold(trigger, aggregation_value)
metrics.incr(
"incidents.alert_rules.threshold.resolve",
tags={"detection_type": self.alert_rule.detection_type},
)
if features.has(
"organizations:workflow-engine-metric-alert-dual-processing-logs",
self.subscription.project.organization,
):
logger.info(
"Resolving dynamic rule",
extra={
"rule_id": self.alert_rule.id,
"aggregation_value": aggregation_value,
},
)
if incident_trigger is not None:
fired_incident_triggers.append(incident_trigger)

else:
self.trigger_resolve_counts[trigger.id] = 0

return fired_incident_triggers

def handle_trigger_alerts(
self,
trigger: AlertRuleTrigger,
aggregation_value: float,
fired_incident_triggers: list[IncidentTrigger],
metrics_incremented: bool,
) -> tuple[list[IncidentTrigger], bool]:
# OVER/UNDER value trigger
alert_operator, resolve_operator = self.THRESHOLD_TYPE_OPERATORS[
AlertRuleThresholdType(self.alert_rule.threshold_type)
]
trigger_matches_status = self.check_trigger_matches_status(trigger, TriggerStatus.ACTIVE)
if (
alert_operator(aggregation_value, trigger.alert_threshold)
and not trigger_matches_status
):
# If the value has breached our threshold (above/below)
# And the trigger is not yet active
metrics.incr(
"incidents.alert_rules.threshold.alert",
tags={"detection_type": self.alert_rule.detection_type},
)
if (
features.has(
"organizations:workflow-engine-metric-alert-dual-processing-logs",
self.subscription.project.organization,
)
and not metrics_incremented
):
metrics.incr("dual_processing.alert_rules.fire")
metrics_incremented = True
# triggering a threshold will create an incident and set the status to active
incident_trigger = self.trigger_alert_threshold(trigger, aggregation_value)
if incident_trigger is not None:
fired_incident_triggers.append(incident_trigger)
else:
self.trigger_alert_counts[trigger.id] = 0

if (
resolve_operator(aggregation_value, self.calculate_resolve_threshold(trigger))
and self.active_incident
and trigger_matches_status
):
metrics.incr(
"incidents.alert_rules.threshold.resolve",
tags={"detection_type": self.alert_rule.detection_type},
)
if features.has(
"organizations:workflow-engine-metric-alert-dual-processing-logs",
self.subscription.project.organization,
):
metrics.incr("dual_processing.alert_rules.resolve")
incident_trigger = self.trigger_resolve_threshold(trigger, aggregation_value)

if incident_trigger is not None:
fired_incident_triggers.append(incident_trigger)
else:
self.trigger_resolve_counts[trigger.id] = 0

return fired_incident_triggers
return fired_incident_triggers, metrics_incremented

def get_comparison_delta(self, detector: Detector | None) -> int | None:
comparison_delta = None

if detector:
comparison_delta = detector.config.get("comparison_delta")
else:
comparison_delta = self.alert_rule.comparison_delta

return comparison_delta

def get_detector(self, has_metric_alert_processing: bool) -> Detector | None:
detector = None
if has_metric_alert_processing:
try:
detector = Detector.objects.get(
data_sources__source_id=str(self.subscription.id),
data_sources__type=DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION,
)
except Detector.DoesNotExist:
logger.exception(
"Detector not found", extra={"subscription_id": self.subscription.id}
)
return detector

def process_results_workflow_engine(
self, subscription_update: QuerySubscriptionUpdate, aggregation_value: float
) -> list[tuple[Detector, dict[DetectorGroupKey, DetectorEvaluationResult]]]:
packet = QuerySubscriptionUpdate(
entity=subscription_update.get("entity", ""),
subscription_id=subscription_update["subscription_id"],
values={"value": aggregation_value},
timestamp=self.last_update,
)
data_packet = DataPacket[QuerySubscriptionUpdate](
source_id=str(self.subscription.id), packet=packet
)
results = process_data_packets([data_packet], DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION)
if features.has(
"organizations:workflow-engine-metric-alert-dual-processing-logs",
self.alert_rule.organization,
):
logger.info(
"dual processing results for alert rule",
extra={
"results": results,
"num_results": len(results),
"value": aggregation_value,
"rule_id": self.alert_rule.id,
},
)
return results

def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
"""
Expand Down Expand Up @@ -349,54 +485,17 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
"organizations:anomaly-detection-alerts", organization
) and features.has("organizations:anomaly-detection-rollout", organization)

comparison_delta = None
detector = None

if has_metric_alert_processing:
try:
detector = Detector.objects.get(
data_sources__source_id=str(self.subscription.id),
data_sources__type=DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION,
)
comparison_delta = detector.config.get("comparison_delta")
except Detector.DoesNotExist:
logger.exception(
"Detector not found", extra={"subscription_id": self.subscription.id}
)

else:
comparison_delta = self.alert_rule.comparison_delta

detector = self.get_detector(has_metric_alert_processing)
comparison_delta = self.get_comparison_delta(detector)
aggregation_value = self.get_aggregation_value(subscription_update, comparison_delta)

if aggregation_value is not None:
# temporarily skip processing dynamic alerts
if (
has_metric_alert_processing
and not self.alert_rule.detection_type == AlertRuleDetectionType.DYNAMIC
):
packet = QuerySubscriptionUpdate(
entity=subscription_update.get("entity", ""),
subscription_id=subscription_update["subscription_id"],
values={"value": aggregation_value},
timestamp=self.last_update,
)
data_packet = DataPacket[QuerySubscriptionUpdate](
source_id=str(self.subscription.id), packet=packet
)
results = process_data_packets([data_packet], DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION)
if features.has(
"organizations:workflow-engine-metric-alert-dual-processing-logs",
self.alert_rule.organization,
):
logger.info(
"dual processing results for alert rule",
extra={
"results": results,
"num_results": len(results),
"value": aggregation_value,
"rule_id": self.alert_rule.id,
},
)
self.process_results_workflow_engine(subscription_update, aggregation_value)

potential_anomalies = None
if (
Expand Down Expand Up @@ -444,59 +543,9 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
is_anomalous, trigger, aggregation_value, fired_incident_triggers
)
else:
# OVER/UNDER value trigger
alert_operator, resolve_operator = self.THRESHOLD_TYPE_OPERATORS[
AlertRuleThresholdType(self.alert_rule.threshold_type)
]
if alert_operator(
aggregation_value, trigger.alert_threshold
) and not self.check_trigger_matches_status(trigger, TriggerStatus.ACTIVE):
# If the value has breached our threshold (above/below)
# And the trigger is not yet active
metrics.incr(
"incidents.alert_rules.threshold.alert",
tags={"detection_type": self.alert_rule.detection_type},
)
if (
features.has(
"organizations:workflow-engine-metric-alert-dual-processing-logs",
self.subscription.project.organization,
)
and not metrics_incremented
):
metrics.incr("dual_processing.alert_rules.fire")
metrics_incremented = True
# triggering a threshold will create an incident and set the status to active
incident_trigger = self.trigger_alert_threshold(trigger, aggregation_value)
if incident_trigger is not None:
fired_incident_triggers.append(incident_trigger)
else:
self.trigger_alert_counts[trigger.id] = 0

if (
resolve_operator(
aggregation_value, self.calculate_resolve_threshold(trigger)
)
and self.active_incident
and self.check_trigger_matches_status(trigger, TriggerStatus.ACTIVE)
):
metrics.incr(
"incidents.alert_rules.threshold.resolve",
tags={"detection_type": self.alert_rule.detection_type},
)
if features.has(
"organizations:workflow-engine-metric-alert-dual-processing-logs",
self.subscription.project.organization,
):
metrics.incr("dual_processing.alert_rules.resolve")
incident_trigger = self.trigger_resolve_threshold(
trigger, aggregation_value
)

if incident_trigger is not None:
fired_incident_triggers.append(incident_trigger)
else:
self.trigger_resolve_counts[trigger.id] = 0
fired_incident_triggers, metrics_incremented = self.handle_trigger_alerts(
trigger, aggregation_value, fired_incident_triggers, metrics_incremented
)

if fired_incident_triggers:
# For all the newly created incidents
Expand Down
Loading