-
-
Notifications
You must be signed in to change notification settings - Fork 4.4k
feat(workflow_engine): Hook up activity updates to process_workflows #94678
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
2837b7d
6c3286c
019b460
e764719
454f152
3219e14
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
from django.db import router, transaction | ||
from google.api_core.exceptions import DeadlineExceeded, RetryError, ServiceUnavailable | ||
|
||
from sentry import nodestore | ||
|
@@ -15,6 +16,7 @@ | |
from sentry.types.activity import ActivityType | ||
from sentry.utils import metrics | ||
from sentry.utils.retries import ConditionalRetryPolicy, exponential_delay | ||
from sentry.workflow_engine.models import Detector | ||
from sentry.workflow_engine.processors.workflow import process_workflows | ||
from sentry.workflow_engine.types import WorkflowEventData | ||
from sentry.workflow_engine.utils import log_context | ||
|
@@ -42,17 +44,39 @@ | |
), | ||
), | ||
) | ||
def process_workflow_activity(activity_id: int, detector_id: int) -> None: | ||
def process_workflow_activity(activity_id: int, group_id: int, detector_id: int) -> None: | ||
""" | ||
Process a workflow task identified by the given Activity ID and Detector ID. | ||
Process a workflow task identified by the given activity, group, and detector. | ||
saponifi3d marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
The task will get the Activity from the database, create a WorkflowEventData object, | ||
and then process the data in `process_workflows`. | ||
""" | ||
# TODO - @saponifi3d - implement this in a follow-up PR. This update will require WorkflowEventData | ||
# to allow for an activity in the `event` attribute. That refactor is a bit noisy | ||
# and will be done in a subsequent pr. | ||
pass | ||
with transaction.atomic(router.db_for_write(Detector)): | ||
try: | ||
activity = Activity.objects.get(id=activity_id) | ||
group = Group.objects.get(id=group_id) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could we fetch the group through There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ish - the typing is super funky when i tried that, since the group isn't required on an activity update. i could do: activities = Activity.objects.filter(id=acitivity_id).select_related('group')
activity = activities.first()
group = activity.group but the alternative i can think of is to use the guaranteed group (that is already provided by Activity registry), and enforce it that way rather than through the data model. in the end, these should be a similar db load, since they'd both be scanning an indexed value on the table. (note this is happening in a transaction, so afaik, it shouldn't incur any over-the-wire delays as well) |
||
detector = Detector.objects.get(id=detector_id) | ||
except (Activity.DoesNotExist, Group.DoesNotExist, Detector.DoesNotExist): | ||
logger.exception( | ||
"Unable to fetch data to process workflow activity", | ||
extra={ | ||
"activity_id": activity_id, | ||
"group_id": group_id, | ||
"detector_id": detector_id, | ||
}, | ||
) | ||
return # Exit execution that we cannot recover from | ||
|
||
event_data = WorkflowEventData( | ||
event=activity, | ||
group=group, | ||
) | ||
|
||
process_workflows(event_data, detector) | ||
metrics.incr( | ||
"workflow_engine.process_workflow.activity_update.executed", | ||
tags={"activity_type": activity.type}, | ||
) | ||
|
||
|
||
@group_status_update_registry.register("workflow_status_update") | ||
|
@@ -65,6 +89,9 @@ def workflow_status_update_handler( | |
Since this handler is called in process for the activity, we want | ||
to queue a task to process workflows asynchronously. | ||
""" | ||
metrics.incr( | ||
"workflow_engine.process_workflow.activity_update", tags={"activity_type": activity.type} | ||
) | ||
if activity.type not in SUPPORTED_ACTIVITIES: | ||
# If the activity type is not supported, we do not need to process it. | ||
return | ||
|
@@ -77,10 +104,10 @@ def workflow_status_update_handler( | |
metrics.incr("workflow_engine.error.tasks.no_detector_id") | ||
return | ||
|
||
# TODO - implement in follow-up PR for now, just track a metric that we are seeing the activities. | ||
# process_workflow_task.delay(activity.id, detector_id) | ||
metrics.incr( | ||
"workflow_engine.process_workflow.activity_update", tags={"activity_type": activity.type} | ||
process_workflow_activity.delay( | ||
activity_id=activity.id, | ||
group_id=group.id, | ||
detector_id=detector_id, | ||
) | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we have
Q(environment_id=None)
when we do have an environment?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
incase there are workflows with "all environments" or the specific environment configured.