Skip to content

feat(workflow_engine): Hook up activity updates to process_workflows #94678

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 21 additions & 15 deletions src/sentry/workflow_engine/processors/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from sentry import buffer, features
from sentry.eventstore.models import GroupEvent
from sentry.models.activity import Activity
from sentry.models.environment import Environment
from sentry.utils import json
from sentry.workflow_engine.models import (
Expand Down Expand Up @@ -182,7 +183,7 @@ def evaluate_workflow_triggers(
"group_id": event_data.group.id,
"event_id": event_id,
"event_data": asdict(event_data),
"event_environment_id": environment.id,
"event_environment_id": environment.id if environment else None,
"triggered_workflows": [workflow.id for workflow in triggered_workflows],
},
)
Expand Down Expand Up @@ -271,7 +272,7 @@ def evaluate_workflows_action_filters(
return filtered_action_groups


def get_environment_by_event(event_data: WorkflowEventData) -> Environment:
def get_environment_by_event(event_data: WorkflowEventData) -> Environment | None:
if isinstance(event_data.event, GroupEvent):
try:
environment = event_data.event.get_environment()
Expand All @@ -283,22 +284,27 @@ def get_environment_by_event(event_data: WorkflowEventData) -> Environment:
raise Environment.DoesNotExist("Environment does not exist for the event")

return environment
elif isinstance(event_data.event, Activity):
return None

raise TypeError(
"Expected event_data.event to be an instance of GroupEvent, got %s" % type(event_data.event)
)
raise TypeError(f"Cannot access the environment from, {type(event_data.event)}.")


def _get_associated_workflows(
detector: Detector, environment: Environment, event_data: WorkflowEventData
detector: Detector, environment: Environment | None, event_data: WorkflowEventData
) -> set[Workflow]:
"""
This is a wrapper method to get the workflows associated with a detector and environment.
Used in process_workflows to wrap the query + logging into a single method
"""
environment_filter = (
(Q(environment_id=None) | Q(environment_id=environment.id))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have Q(environment_id=None) when we do have an environment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incase there are workflows with "all environments" or the specific environment configured.

if environment
else Q(environment_id=None)
)
workflows = set(
Workflow.objects.filter(
(Q(environment_id=None) | Q(environment_id=environment.id)),
environment_filter,
detectorworkflow__detector_id=detector.id,
enabled=True,
)
Expand All @@ -324,7 +330,7 @@ def _get_associated_workflows(
"group_id": event_data.group.id,
"event_id": event_id,
"event_data": asdict(event_data),
"event_environment_id": environment.id,
"event_environment_id": environment.id if environment else None,
"workflows": [workflow.id for workflow in workflows],
"detector_type": detector.type,
},
Expand All @@ -335,7 +341,7 @@ def _get_associated_workflows(

@log_context.root()
def process_workflows(
event_data: WorkflowEventData, detector_id: DetectorId = None
event_data: WorkflowEventData, detector: Detector | None = None
) -> set[Workflow]:
"""
This method will get the detector based on the event, and then gather the associated workflows.
Expand All @@ -345,13 +351,11 @@ def process_workflows(
Finally, each of the triggered workflows will have their actions evaluated and executed.
"""
try:
detector: Detector
if detector_id is not None:
detector = Detector.objects.get(id=detector_id)
elif isinstance(event_data.event, GroupEvent):
if detector is None and isinstance(event_data.event, GroupEvent):
detector = get_detector_by_event(event_data)
else:
raise ValueError("Unable to determine the detector_id for the event")

if detector is None:
raise ValueError("Unable to determine the detector for the event")

log_context.add_extras(detector_id=detector.id)
organization = detector.project.organization
Expand Down Expand Up @@ -397,9 +401,11 @@ def process_workflows(

actions_to_trigger = evaluate_workflows_action_filters(triggered_workflows, event_data)
actions = filter_recently_fired_workflow_actions(actions_to_trigger, event_data)

if not actions:
# If there aren't any actions on the associated workflows, there's nothing to trigger
return triggered_workflows

create_workflow_fire_histories(detector, actions, event_data)

with sentry_sdk.start_span(op="workflow_engine.process_workflows.trigger_actions"):
Expand Down
47 changes: 37 additions & 10 deletions src/sentry/workflow_engine/tasks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from django.db import router, transaction
from google.api_core.exceptions import DeadlineExceeded, RetryError, ServiceUnavailable

from sentry import nodestore
Expand All @@ -15,6 +16,7 @@
from sentry.types.activity import ActivityType
from sentry.utils import metrics
from sentry.utils.retries import ConditionalRetryPolicy, exponential_delay
from sentry.workflow_engine.models import Detector
from sentry.workflow_engine.processors.workflow import process_workflows
from sentry.workflow_engine.types import WorkflowEventData
from sentry.workflow_engine.utils import log_context
Expand Down Expand Up @@ -42,17 +44,39 @@
),
),
)
def process_workflow_activity(activity_id: int, detector_id: int) -> None:
def process_workflow_activity(activity_id: int, group_id: int, detector_id: int) -> None:
"""
Process a workflow task identified by the given Activity ID and Detector ID.
Process a workflow task identified by the given activity, group, and detector.

The task will get the Activity from the database, create a WorkflowEventData object,
and then process the data in `process_workflows`.
"""
# TODO - @saponifi3d - implement this in a follow-up PR. This update will require WorkflowEventData
# to allow for an activity in the `event` attribute. That refactor is a bit noisy
# and will be done in a subsequent pr.
pass
with transaction.atomic(router.db_for_write(Detector)):
try:
activity = Activity.objects.get(id=activity_id)
group = Group.objects.get(id=group_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we fetch the group through Activity's foreign key?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ish - the typing is super funky when i tried that, since the group isn't required on an activity update.

i could do:

activities = Activity.objects.filter(id=acitivity_id).select_related('group')
activity = activities.first()
group = activity.group

but group is possibly None and there isn't a clear way to handle this with workflow engine if there isn't an associated group.


the alternative i can think of is to use the guaranteed group (that is already provided by Activity registry), and enforce it that way rather than through the data model.

in the end, these should be a similar db load, since they'd both be scanning an indexed value on the table. (note this is happening in a transaction, so afaik, it shouldn't incur any over-the-wire delays as well)

detector = Detector.objects.get(id=detector_id)
except (Activity.DoesNotExist, Group.DoesNotExist, Detector.DoesNotExist):
logger.exception(
"Unable to fetch data to process workflow activity",
extra={
"activity_id": activity_id,
"group_id": group_id,
"detector_id": detector_id,
},
)
return # Exit execution that we cannot recover from

event_data = WorkflowEventData(
event=activity,
group=group,
)

process_workflows(event_data, detector)
metrics.incr(
"workflow_engine.process_workflow.activity_update.executed",
tags={"activity_type": activity.type},
)


@group_status_update_registry.register("workflow_status_update")
Expand All @@ -65,6 +89,9 @@ def workflow_status_update_handler(
Since this handler is called in process for the activity, we want
to queue a task to process workflows asynchronously.
"""
metrics.incr(
"workflow_engine.process_workflow.activity_update", tags={"activity_type": activity.type}
)
if activity.type not in SUPPORTED_ACTIVITIES:
# If the activity type is not supported, we do not need to process it.
return
Expand All @@ -77,10 +104,10 @@ def workflow_status_update_handler(
metrics.incr("workflow_engine.error.tasks.no_detector_id")
return

# TODO - implement in follow-up PR for now, just track a metric that we are seeing the activities.
# process_workflow_task.delay(activity.id, detector_id)
metrics.incr(
"workflow_engine.process_workflow.activity_update", tags={"activity_type": activity.type}
process_workflow_activity.delay(
activity_id=activity.id,
group_id=group.id,
detector_id=detector_id,
)


Expand Down
93 changes: 92 additions & 1 deletion tests/sentry/workflow_engine/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@
from sentry.models.group import GroupStatus
from sentry.testutils.cases import TestCase
from sentry.types.activity import ActivityType
from sentry.workflow_engine.tasks import fetch_event, workflow_status_update_handler
from sentry.workflow_engine.tasks import (
fetch_event,
process_workflow_activity,
workflow_status_update_handler,
)
from sentry.workflow_engine.types import WorkflowEventData


class FetchEventTests(TestCase):
Expand Down Expand Up @@ -57,3 +62,89 @@ def test__no_detector_id(self):
with mock.patch("sentry.workflow_engine.tasks.metrics.incr") as mock_incr:
workflow_status_update_handler(group, message, activity)
mock_incr.assert_called_with("workflow_engine.error.tasks.no_detector_id")


class TestProcessWorkflowActivity(TestCase):
def setUp(self):
self.group = self.create_group(project=self.project)
self.activity = Activity(
project=self.project,
group=self.group,
type=ActivityType.SET_RESOLVED.value,
data={"fingerprint": ["test_fingerprint"]},
)
self.activity.save()
self.detector = self.create_detector()

def test_process_workflow_activity__no_workflows(self):
with mock.patch(
"sentry.workflow_engine.processors.workflow.evaluate_workflow_triggers",
return_value=set(),
) as mock_evaluate:
process_workflow_activity.run(
activity_id=self.activity.id,
group_id=self.group.id,
detector_id=self.detector.id,
)
# Short-circuit evaluation, no workflows associated
assert mock_evaluate.call_count == 0

@mock.patch(
"sentry.workflow_engine.processors.workflow.evaluate_workflow_triggers", return_value=set()
)
@mock.patch(
"sentry.workflow_engine.processors.workflow.evaluate_workflows_action_filters",
return_value=set(),
)
def test_process_workflow_activity__workflows__no_actions(
self, mock_eval_actions, mock_evaluate
):
self.workflow = self.create_workflow(organization=self.organization)
self.create_detector_workflow(
detector=self.detector,
workflow=self.workflow,
)

process_workflow_activity.run(
activity_id=self.activity.id,
group_id=self.group.id,
detector_id=self.detector.id,
)

event_data = WorkflowEventData(
event=self.activity,
group=self.group,
)

mock_evaluate.assert_called_once_with({self.workflow}, event_data)
assert mock_eval_actions.call_count == 0

@mock.patch("sentry.workflow_engine.processors.workflow.filter_recently_fired_workflow_actions")
def test_process_workflow_activity(self, mock_filter_actions):
self.workflow = self.create_workflow(organization=self.organization)

self.action_group = self.create_data_condition_group(logic_type="any-short")
self.action = self.create_action()
self.create_data_condition_group_action(
condition_group=self.action_group,
action=self.action,
)
self.create_workflow_data_condition_group(self.workflow, self.action_group)

self.create_detector_workflow(
detector=self.detector,
workflow=self.workflow,
)

expected_event_data = WorkflowEventData(
event=self.activity,
group=self.group,
)

process_workflow_activity.run(
activity_id=self.activity.id,
group_id=self.group.id,
detector_id=self.detector.id,
)

mock_filter_actions.assert_called_once_with({self.action_group}, expected_event_data)
4 changes: 2 additions & 2 deletions tests/sentry/workflow_engine/test_task_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def test_handler_invoked__when_update_status_called(self):
with mock.patch("sentry.workflow_engine.tasks.metrics.incr") as mock_incr:
_process_message(message)

mock_incr.assert_called_with(
mock_incr.assert_any_call(
"workflow_engine.process_workflow.activity_update",
tags={"activity_type": ActivityType.SET_RESOLVED.value},
)
Expand All @@ -66,7 +66,7 @@ def test_handler_invoked__when_resolved(self):

with mock.patch("sentry.workflow_engine.tasks.metrics.incr") as mock_incr:
update_status(self.group, message)
mock_incr.assert_called_with(
mock_incr.assert_any_call(
"workflow_engine.process_workflow.activity_update",
tags={"activity_type": ActivityType.SET_RESOLVED.value},
)
Loading