-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
2.5.2
What happened
The callback function called the clear_task_instances function and it failed to clear the task that was passed to it
What you think should happen instead
clear_task_instances should have cleared the task passed to it but was unable to. Instead the state of that task was set to 'failed' in the task_instance table in the database and never cleared or restarted.
How to reproduce
Run the following DAG and see that failing_task only fails and never gets cleared or restarted / retried.
Look at the XCOM list and you will see that the task object itself was in running state before the clear_task_instances call and restarting state after the call. However, when monitoring the task_instance table in the database while running this task, the task's state only goes from running to failed and never actually gets set to restarting. In version 2.3.1, running this code works as expected and the task state in the task_instance table goes from running to failed to null, successfully clearing it.
@provide_session
def clear_tasks_fn(tis, session=NEW_SESSION, dag=None):
sleep_time = random.uniform(30.0, 60.0) # sleep between 30 seconds and 1 minute
sleep(sleep_time)
for ti in tis:
ti.xcom_push(key="before_state", value=ti.state)
clear_task_instances(tis=tis, session=session, dag=dag)
for ti in tis:
session.commit()
ti.xcom_push(key="after_state", value=ti.state)
def clear_tasks_callback(context):
all_tasks = context["dag_run"].get_task_instances()
dag_inner = context["dag"]
task_ids_to_clear = context["params"].get("task_ids_to_clear", [])
tasks_to_clear = [ti for ti in all_tasks if ti.task_id in task_ids_to_clear]
clear_tasks_fn(tasks_to_clear, dag=dag_inner)
def failing_task_fn():
raise Exception("Forcing task to fail")
def succeeding_task_fn():
sleep_time = random.uniform(10.0, 20.0) # sleep between 10 and 20 seconds
sleep(sleep_time)
return
default_args = {"owner": "revenge", "start_date": datetime(2022, 10, 1), }
schedule = "@once"
the_dag = DAG("clear_task_test", schedule_interval=schedule, default_args=default_args, catchup=False, is_paused_upon_creation=True)
with the_dag:
failing_task_t = PythonOperator(
task_id="failing_task",
python_callable=failing_task_fn,
params={"task_ids_to_clear": ["failing_task"]},
on_failure_callback=clear_tasks_callback
)
succeeding_task_t = PythonOperator(
task_id="succeeding_task",
python_callable=succeeding_task_fn,
)
failing_task_t.set_downstream(succeeding_task_t)
Operating System
Debian GNU/Linux 11 (bullseye)
Versions of Apache Airflow Providers
No response
Deployment
Docker-Compose
Deployment details
No response
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct