-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
3.0.0b1
If "Other Airflow 2 version" selected, which one?
No response
What happened?
Triggering a ExternalTaskSensor DAG with logical_date as null is failing.
ERROR - Task failed with exception logger="task" error_detail=[{"exc_type":"KeyError","exc_value":"'logical_date'","syntax_error":null,"is_cause":false,"frames":[{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":605,"name":"run"},{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":726,"name":"_execute_task"},{"filename":"/opt/airflow/airflow/models/baseoperator.py","lineno":168,"name":"wrapper"},{"filename":"/opt/airflow/providers/standard/src/airflow/providers/standard/sensors/external_task.py","lineno":351,"name":"execute"},{"filename":"/opt/airflow/airflow/models/baseoperator.py","lineno":168,"name":"wrapper"},{"filename":"/opt/airflow/airflow/sensors/base.py","lineno":287,"name":"execute"},{"filename":"/opt/airflow/airflow/sensors/base.py","lineno":267,"name":"execute"},{"filename":"/opt/airflow/airflow/utils/session.py","lineno":101,"name":"wrapper"},{"filename":"/opt/airflow/providers/standard/src/airflow/providers/standard/sensors/external_task.py","lineno":257,"name":"poke"},{"filename":"/opt/airflow/providers/standard/src/airflow/providers/standard/sensors/external_task.py","lineno":244,"name":"_get_dttm_filter"}]}]
What you think should happen instead?
ExternalTaskSensor DAG should pass with logical_date as null
How to reproduce
Run the below DAG:
Parent DAG:
from datetime import datetime, timedelta
from airflow.providers.standard.operators.python import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.models import DAG
from airflow.utils.state import TaskInstanceState
def ex_date(dt):
new_dt = dt - timedelta(days=1)
return new_dt
with DAG(
dag_id="external_task_sensor_parent",
start_date=datetime(2022, 3, 12, 3, 35, 0),
schedule=timedelta(days=1),
# is_paused_upon_creation=False,
doc_md=docs,
tags=["core", "sensor"]
) as dag:
ets1 = ExternalTaskSensor(
task_id="check_ids",
external_dag_id="external_task_sensor_child_dag",
external_task_ids=["child_dummy1", "child_dummy2"],
allowed_states=[TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED],
failed_states=[TaskInstanceState.FAILED, TaskInstanceState.REMOVED,],
execution_delta=timedelta(minutes=7),
#execution_date_fn=ex_date,
check_existence=True,
poke_interval=20.0,
timeout=420.0,
)
ets1Child DAG:
from datetime import datetime, timedelta
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.models import DAG
with DAG(
dag_id="external_task_sensor_child_dag",
start_date=datetime(2022, 3, 12, 3, 28, 0),
schedule=timedelta(days=1),
is_paused_upon_creation=False,
doc_md=docs,
tags=["core", "sensor"],
) as dag:
d0 = EmptyOperator(task_id="child_dummy1")
d2 = EmptyOperator(task_id="child_dummy2")
d0 >> d2Operating System
Linux
Versions of Apache Airflow Providers
No response
Deployment
Other
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Metadata
Metadata
Assignees
Labels
Type
Projects
Status