Skip to content

ExternalTaskSensor is failing when logical_date is null #47447

@atul-astronomer

Description

@atul-astronomer

Apache Airflow version

3.0.0b1

If "Other Airflow 2 version" selected, which one?

No response

What happened?

Triggering a ExternalTaskSensor DAG with logical_date as null is failing.

ERROR - Task failed with exception logger="task" error_detail=[{"exc_type":"KeyError","exc_value":"'logical_date'","syntax_error":null,"is_cause":false,"frames":[{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":605,"name":"run"},{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":726,"name":"_execute_task"},{"filename":"/opt/airflow/airflow/models/baseoperator.py","lineno":168,"name":"wrapper"},{"filename":"/opt/airflow/providers/standard/src/airflow/providers/standard/sensors/external_task.py","lineno":351,"name":"execute"},{"filename":"/opt/airflow/airflow/models/baseoperator.py","lineno":168,"name":"wrapper"},{"filename":"/opt/airflow/airflow/sensors/base.py","lineno":287,"name":"execute"},{"filename":"/opt/airflow/airflow/sensors/base.py","lineno":267,"name":"execute"},{"filename":"/opt/airflow/airflow/utils/session.py","lineno":101,"name":"wrapper"},{"filename":"/opt/airflow/providers/standard/src/airflow/providers/standard/sensors/external_task.py","lineno":257,"name":"poke"},{"filename":"/opt/airflow/providers/standard/src/airflow/providers/standard/sensors/external_task.py","lineno":244,"name":"_get_dttm_filter"}]}]

What you think should happen instead?

ExternalTaskSensor DAG should pass with logical_date as null

How to reproduce

Run the below DAG:

Parent DAG:

from datetime import datetime, timedelta

from airflow.providers.standard.operators.python import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor

from airflow.models import DAG
from airflow.utils.state import TaskInstanceState


def ex_date(dt):
    new_dt = dt - timedelta(days=1)
    return new_dt

with DAG(
    dag_id="external_task_sensor_parent",
    start_date=datetime(2022, 3, 12, 3, 35, 0),
    schedule=timedelta(days=1),
  #  is_paused_upon_creation=False,
    doc_md=docs,
    tags=["core", "sensor"]
) as dag:

    ets1 = ExternalTaskSensor(
        task_id="check_ids",
        external_dag_id="external_task_sensor_child_dag",
        external_task_ids=["child_dummy1", "child_dummy2"],
        allowed_states=[TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED],
        failed_states=[TaskInstanceState.FAILED, TaskInstanceState.REMOVED,],
        execution_delta=timedelta(minutes=7),
        #execution_date_fn=ex_date,
        check_existence=True,
        poke_interval=20.0,
        timeout=420.0,
    )


ets1

Child DAG:

from datetime import datetime, timedelta

from airflow.providers.standard.operators.empty import EmptyOperator

from airflow.models import DAG

with DAG(
    dag_id="external_task_sensor_child_dag",
    start_date=datetime(2022, 3, 12, 3, 28, 0),
    schedule=timedelta(days=1),
    is_paused_upon_creation=False,
    doc_md=docs,
    tags=["core", "sensor"],
) as dag:

    d0 = EmptyOperator(task_id="child_dummy1")

    d2 = EmptyOperator(task_id="child_dummy2")

d0 >> d2

Operating System

Linux

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

area:corekind:bugThis is a clearly a bugpriority:highHigh priority bug that should be patched quickly but does not require immediate new release

Type

No type

Projects

Status

Done

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions