Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions docs/v3/concepts/automations.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,23 @@ Prefect infers the relevant event whenever possible, but sometimes one does not

Specify a name and, optionally, a description for the automation.

### Tracing automation actions

When an automation fires, it emits events that you can use to trace what happened:

- `prefect.automation.triggered` or `prefect.automation.resolved` - emitted when the trigger condition is met
- `prefect.automation.action.triggered` - emitted when an action starts
- `prefect.automation.action.executed` or `prefect.automation.action.failed` - emitted when an action completes

The action events include related resources that link back to their source events:

| Related resource role | Description |
| --------------------- | ----------- |
| `triggering-event` | The original event that caused the automation to fire |
| `automation-triggered-event` | The `automation.triggered` or `automation.resolved` event that prompted the action |

These links help you trace from an action failure back to the specific trigger and original event that caused it.


## Sending notifications with automations

Expand Down
42 changes: 24 additions & 18 deletions src/prefect/server/events/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,17 @@ async def fail(self, triggered_action: "TriggeredAction", reason: str) -> None:
if abs(time_since_trigger) < TIGHT_TIMING:
follows_id = triggered_action.triggering_event.id

# Build related resources including triggering event reference
# Build related resources including automation.triggered and triggering event
related_resources = list(self._resulting_related_resources)
if triggered_action.automation_triggered_event_id:
related_resources.append(
RelatedResource(
{
"prefect.resource.id": f"prefect.event.{triggered_action.automation_triggered_event_id}",
"prefect.resource.role": "automation-triggered-event",
}
)
)
if triggered_action.triggering_event:
related_resources.append(
RelatedResource(
Expand All @@ -201,7 +210,7 @@ async def fail(self, triggered_action: "TriggeredAction", reason: str) -> None:
occurred=now("UTC"),
event="prefect.automation.action.failed",
resource=resource,
related=self._resulting_related_resources,
related=related_resources,
payload={
**action_details,
"reason": reason,
Expand Down Expand Up @@ -254,8 +263,17 @@ async def succeed(self, triggered_action: "TriggeredAction") -> None:
if abs(time_since_trigger) < TIGHT_TIMING:
follows_id = triggered_action.triggering_event.id

# Build related resources including triggering event reference
# Build related resources including automation.triggered and triggering event
related_resources = list(self._resulting_related_resources)
if triggered_action.automation_triggered_event_id:
related_resources.append(
RelatedResource(
{
"prefect.resource.id": f"prefect.event.{triggered_action.automation_triggered_event_id}",
"prefect.resource.role": "automation-triggered-event",
}
)
)
if triggered_action.triggering_event:
related_resources.append(
RelatedResource(
Expand All @@ -269,13 +287,7 @@ async def succeed(self, triggered_action: "TriggeredAction") -> None:
Event(
occurred=triggered_action.triggered,
event="prefect.automation.action.triggered",
resource=Resource(
{
"prefect.resource.id": automation_resource_id,
"prefect.resource.name": automation.name,
"prefect.trigger-type": automation.trigger.type,
}
),
resource=resource,
related=related_resources,
payload=action_details,
id=triggered_event_id,
Expand All @@ -286,14 +298,8 @@ async def succeed(self, triggered_action: "TriggeredAction") -> None:
Event(
occurred=now("UTC"),
event="prefect.automation.action.executed",
resource=Resource(
{
"prefect.resource.id": automation_resource_id,
"prefect.resource.name": automation.name,
"prefect.trigger-type": automation.trigger.type,
}
),
related=self._resulting_related_resources,
resource=resource,
related=related_resources,
payload={
**action_details,
**self._result_details,
Expand Down
8 changes: 8 additions & 0 deletions src/prefect/server/events/schemas/automations.py
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,14 @@ class TriggeredAction(PrefectBaseModel):
default=0,
description="The index of the action within the automation",
)
automation_triggered_event_id: UUID | None = Field(
default=None,
description=(
"The ID of the automation.triggered or automation.resolved event that "
"prompted this action, used to link automation.action.* events back to "
"the state change event"
),
)

def idempotency_key(self) -> str:
"""Produce a human-friendly idempotency key for this action"""
Expand Down
33 changes: 27 additions & 6 deletions src/prefect/server/events/triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,21 +436,37 @@ async def act(firing: Firing) -> None:
}
await messaging.publish(state_change_events.values())

# By default, all `automation.actions` are fired
source_actions: List[Tuple[Optional[ReceivedEvent], ServerActionTypes]] = [
(firing.triggering_event, action) for action in automation.actions
# Determine the primary state change event ID for linking action events back to
# the automation.triggered or automation.resolved event. Prefer Triggered over
# Resolved when both are present.
primary_state_change_event = state_change_events.get(
TriggerState.Triggered
) or state_change_events.get(TriggerState.Resolved)
primary_state_change_event_id = (
primary_state_change_event.id if primary_state_change_event else None
)

# By default, all `automation.actions` are fired. Each tuple contains:
# (triggering_event, action, automation_triggered_event_id)
source_actions: List[
Tuple[Optional[ReceivedEvent], ServerActionTypes, UUID | None]
] = [
(firing.triggering_event, action, primary_state_change_event_id)
for action in automation.actions
]

# Conditionally add in actions that fire on specific trigger states
if TriggerState.Triggered in firing.trigger_states:
triggered_event = state_change_events[TriggerState.Triggered]
source_actions += [
(state_change_events[TriggerState.Triggered], action)
(triggered_event, action, triggered_event.id)
for action in automation.actions_on_trigger
]

if TriggerState.Resolved in firing.trigger_states:
resolved_event = state_change_events[TriggerState.Resolved]
source_actions += [
(state_change_events[TriggerState.Resolved], action)
(resolved_event, action, resolved_event.id)
for action in automation.actions_on_resolve
]

Expand All @@ -463,8 +479,13 @@ async def act(firing: Firing) -> None:
triggering_event=action_triggering_event,
action=action,
action_index=index,
automation_triggered_event_id=automation_triggered_event_id,
)
for index, (action_triggering_event, action) in enumerate(source_actions)
for index, (
action_triggering_event,
action,
automation_triggered_event_id,
) in enumerate(source_actions)
]

async with messaging.create_actions_publisher() as publisher:
Expand Down
Loading
Loading