Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion task-sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,11 @@ def _handle_trigger_dag_run(
trigger=DagStateTrigger(
dag_id=drte.trigger_dag_id,
states=drte.allowed_states + drte.failed_states, # type: ignore[arg-type]
execution_dates=[drte.logical_date] if drte.logical_date else None,
# Don't filter by execution_dates when run_ids is provided.
# run_id uniquely identifies a DAG run, and when reset_dag_run=True,
# drte.logical_date might be a newly calculated value that doesn't match
# the persisted logical_date in the database, causing the trigger to never find the run.
execution_dates=None,
run_ids=[drte.dag_run_id],
poll_interval=drte.poke_interval,
),
Expand Down
53 changes: 53 additions & 0 deletions task-sdk/tests/task_sdk/execution_time/test_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3309,3 +3309,56 @@ def test_handle_trigger_dag_run_deferred(
state, msg, _ = run(ti, ti.get_template_context(), log)

assert state == intermediate_state

@time_machine.travel("2025-01-01 00:00:00", tick=False)
def test_handle_trigger_dag_run_deferred_with_reset_uses_run_id_only(
self, create_runtime_ti, mock_supervisor_comms
):
"""
Test that TriggerDagRunOperator with deferrable=True and reset_dag_run=True
creates a DagStateTrigger with execution_dates=None.

This prevents the bug where reset_dag_run preserves the original logical_date
in the database but the trigger queries with a newly calculated logical_date,
causing a mismatch that makes the trigger never find the dag run.
"""
from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator

task = TriggerDagRunOperator(
task_id="test_task",
trigger_dag_id="test_dag",
trigger_run_id="fixed_run_id",
wait_for_completion=True,
deferrable=True,
reset_dag_run=True,
poke_interval=5,
)
ti = create_runtime_ti(
dag_id="test_handle_trigger_dag_run_deferred_reset", run_id="test_run", task=task
)

log = mock.MagicMock()
state, msg, _ = run(ti, ti.get_template_context(), log)

# Task should be deferred
assert state == TaskInstanceState.DEFERRED
assert isinstance(msg, DeferTask)

# Verify the DeferTask message structure
assert msg.classpath == "airflow.providers.standard.triggers.external_task.DagStateTrigger"
assert msg.next_method == "execute_complete"

# Critical assertion: execution_dates should be None to avoid logical_date mismatch
# when reset_dag_run=True. The run_id alone is sufficient and unique.
trigger_kwargs = msg.trigger_kwargs
assert trigger_kwargs["execution_dates"] is None, (
"execution_dates should be None when using run_ids. "
"When reset_dag_run=True, the logical_date in the database may differ from "
"the newly calculated logical_date, causing the trigger to never find the run."
)
assert trigger_kwargs["run_ids"] == ["fixed_run_id"]
assert trigger_kwargs["dag_id"] == "test_dag"
assert trigger_kwargs["poll_interval"] == 5

# Also verify it was sent to supervisor
mock_supervisor_comms.send.assert_any_call(msg)