Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
fdd246b
fix(external_task_sensor): fallback to execution_date key for executi…
yeswanth-s1th Jun 28, 2025
13cc3e3
Merge branch 'main' into fix-52236-execution-date-fallback
yeswanth-s1th Jun 28, 2025
7ce900f
test: alias stdlib datetime to dt to avoid shadowing timezone datetime
yeswanth-s1th Jun 29, 2025
44fc7bf
Merge branch 'fix-52236-execution-date-fallback' of https://github.co…
yeswanth-s1th Jun 29, 2025
8d48c2c
chore: apply blank-line fix to docstring and add logical_date tests
yeswanth-s1th Jul 2, 2025
8e8af56
Merge branch 'main' into fix-52236-execution-date-fallback
yeswanth-s1th Jul 2, 2025
03d000c
Merge branch 'main' into fix-52236-execution-date-fallback
yeswanth-s1th Jul 2, 2025
3f27e96
Update external_task.py
yeswanth-s1th Jul 17, 2025
9c3d71d
Merge branch 'main' into fix-52236-execution-date-fallback
yeswanth-s1th Jul 17, 2025
3231425
Merge branch 'main' into fix-52236-execution-date-fallback
yeswanth-s1th Jul 17, 2025
f7c60f4
Merge branch 'main' into fix-52236-execution-date-fallback
yeswanth-s1th Jul 17, 2025
5e02e98
Merge branch 'main' into fix-52236-execution-date-fallback
yeswanth-s1th Jul 18, 2025
b2b8f72
Merge branch 'main' into fix-52236-execution-date-fallback
yeswanth-s1th Jul 18, 2025
1d76376
Merge branch 'main' into fix-52236-execution-date-fallback
yeswanth-s1th Jul 18, 2025
aa641aa
Merge branch 'main' into fix-52236-execution-date-fallback
yeswanth-s1th Jul 23, 2025
e58d64a
Merge branch 'main' into fix-52236-execution-date-fallback
yeswanth-s1th Jul 23, 2025
9a0c06a
Merge branch 'main' into fix-52236-execution-date-fallback
yeswanth-s1th Jul 24, 2025
2a86f7a
Fix key error in _handle_execution_date_fn for ExternalTaskSensor
gopidesupavan Jul 24, 2025
0c82f92
Fix tests
gopidesupavan Jul 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -252,20 +252,15 @@ def __init__(
self.poll_interval = poll_interval

def _get_dttm_filter(self, context):
logical_date = context.get("logical_date")
if AIRFLOW_V_3_0_PLUS:
if logical_date is None:
dag_run = context.get("dag_run")
if TYPE_CHECKING:
assert dag_run
logical_date = self._get_logical_date(context)

logical_date = dag_run.run_after
if self.execution_delta:
dttm = logical_date - self.execution_delta
elif self.execution_date_fn:
dttm = self._handle_execution_date_fn(context=context)
else:
dttm = logical_date

return dttm if isinstance(dttm, list) else [dttm]

def poke(self, context: Context) -> bool:
Expand Down Expand Up @@ -522,6 +517,28 @@ def get_external_task_group_task_ids(self, session, dttm_filter):
dttm_filter, self.external_task_group_id, self.external_dag_id, session
)

def _get_logical_date(self, context) -> datetime.datetime:
"""
Handle backwards- and forwards-compatible retrieval of the date.

to pass as the positional argument to execution_date_fn.
"""
# Airflow 3.x: contexts define "logical_date" (or fall back to dag_run.run_after).
if AIRFLOW_V_3_0_PLUS:
logical_date = context.get("logical_date")
dag_run = context.get("dag_run")
if not (logical_date or (dag_run and dag_run.run_after)):
raise ValueError(
"Either `logical_date` or `dag_run.run_after` must be provided in the context"
)
return logical_date or dag_run.run_after

# Airflow 2.x and earlier: contexts used "execution_date"
execution_date = context.get("execution_date")
if not execution_date:
raise ValueError("Either `execution_date` must be provided in the context`")
return execution_date

def _handle_execution_date_fn(self, context) -> Any:
"""
Handle backward compatibility.
Expand All @@ -534,7 +551,7 @@ def _handle_execution_date_fn(self, context) -> Any:
from airflow.utils.operator_helpers import make_kwargs_callable

# Remove "logical_date" because it is already a mandatory positional argument
logical_date = context["logical_date"]
logical_date = self._get_logical_date(context)
kwargs = {k: v for k, v in context.items() if k not in {"execution_date", "logical_date"}}
# Add "context" in the kwargs for backward compatibility (because context used to be
# an acceptable argument of execution_date_fn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,7 @@ def test_fail__check_for_existence(

@pytest.mark.execution_timeout(10)
def test_external_task_sensor_deferrable(self, dag_maker):
context = {}
context = {"execution_date": DEFAULT_DATE}
with dag_maker() as dag:
op = ExternalTaskSensor(
task_id="test_external_task_sensor_check",
Expand All @@ -1047,6 +1047,30 @@ def test_external_task_sensor_deferrable(self, dag_maker):
assert exc.value.trigger.external_task_ids == ["test_task"]
assert exc.value.trigger.execution_dates == [DEFAULT_DATE]

def test_get_logical_date(self):
"""For AF 2, we check for execution_date in context."""
context = {"execution_date": DEFAULT_DATE}
op = ExternalTaskSensor(
task_id="test_external_task_sensor_check",
external_dag_id="test_dag_parent",
external_task_id="test_task",
)
assert op._get_logical_date(context) == DEFAULT_DATE

def test_handle_execution_date_fn(self):
def func(dt, context):
assert context["execution_date"] == dt
return dt + timedelta(0)

op = ExternalTaskSensor(
task_id="test_external_task_sensor_check",
external_dag_id="test_dag_parent",
external_task_id="test_task",
execution_date_fn=func,
)
context = {"execution_date": DEFAULT_DATE}
assert op._handle_execution_date_fn(context) == DEFAULT_DATE


@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Different test for AF 2")
@pytest.mark.usefixtures("testing_dag_bundle")
Expand Down Expand Up @@ -1335,6 +1359,43 @@ def test_external_task_sensor_task_group_failed_states(self, dag_maker):
task_group_id="test_group",
)

def test_get_logical_date(self):
"""For AF 3, we check for logical date or dag_run.run_after in context."""

context = {"logical_date": DEFAULT_DATE}
op = ExternalTaskSensor(
task_id="test_external_task_sensor_check",
external_dag_id="test_dag_parent",
external_task_id="test_task",
)
assert op._get_logical_date(context) == DEFAULT_DATE

def test_get_logical_date_with_dag_run_after(self):
"""For AF 3, we check for logical date or dag_run.run_after in context."""
op = ExternalTaskSensor(
task_id="test_external_task_sensor_check",
external_dag_id="test_dag_parent",
external_task_id="test_task",
)
mock_dag_run = mock.MagicMock()
mock_dag_run.run_after = DEFAULT_DATE
context = {"dag_run": mock_dag_run}
assert op._get_logical_date(context) == DEFAULT_DATE

def test_handle_execution_date_fn(self):
def func(dt, context):
assert context["logical_date"] == dt
return dt + timedelta(0)

op = ExternalTaskSensor(
task_id="test_external_task_sensor_check",
external_dag_id="test_dag_parent",
external_task_id="test_task",
execution_date_fn=func,
)
context = {"logical_date": DEFAULT_DATE}
assert op._handle_execution_date_fn(context) == DEFAULT_DATE


class TestExternalTaskAsyncSensor:
TASK_ID = "external_task_sensor_check"
Expand Down