Skip to content

clear_task_instances function not clearing tasks #30228

@nicklip

Description

@nicklip

Apache Airflow version

2.5.2

What happened

The callback function called the clear_task_instances function and it failed to clear the task that was passed to it

What you think should happen instead

clear_task_instances should have cleared the task passed to it but was unable to. Instead the state of that task was set to 'failed' in the task_instance table in the database and never cleared or restarted.

How to reproduce

Run the following DAG and see that failing_task only fails and never gets cleared or restarted / retried.
Look at the XCOM list and you will see that the task object itself was in running state before the clear_task_instances call and restarting state after the call. However, when monitoring the task_instance table in the database while running this task, the task's state only goes from running to failed and never actually gets set to restarting. In version 2.3.1, running this code works as expected and the task state in the task_instance table goes from running to failed to null, successfully clearing it.

@provide_session
def clear_tasks_fn(tis, session=NEW_SESSION, dag=None):

sleep_time = random.uniform(30.0, 60.0)  # sleep between 30 seconds and 1 minute
sleep(sleep_time)
for ti in tis:
    ti.xcom_push(key="before_state", value=ti.state)
clear_task_instances(tis=tis, session=session, dag=dag)
for ti in tis:
    session.commit()
    ti.xcom_push(key="after_state", value=ti.state)

def clear_tasks_callback(context):

all_tasks = context["dag_run"].get_task_instances()
dag_inner = context["dag"]
task_ids_to_clear = context["params"].get("task_ids_to_clear", [])

tasks_to_clear = [ti for ti in all_tasks if ti.task_id in task_ids_to_clear]

clear_tasks_fn(tasks_to_clear, dag=dag_inner)

def failing_task_fn():

raise Exception("Forcing task to fail")

def succeeding_task_fn():

sleep_time = random.uniform(10.0, 20.0)  # sleep between 10 and 20 seconds
sleep(sleep_time)
return

default_args = {"owner": "revenge", "start_date": datetime(2022, 10, 1), }
schedule = "@once"

the_dag = DAG("clear_task_test", schedule_interval=schedule, default_args=default_args, catchup=False, is_paused_upon_creation=True)

with the_dag:

failing_task_t = PythonOperator(
    task_id="failing_task",
    python_callable=failing_task_fn,
    params={"task_ids_to_clear": ["failing_task"]},
    on_failure_callback=clear_tasks_callback
)

succeeding_task_t = PythonOperator(
    task_id="succeeding_task",
    python_callable=succeeding_task_fn,
)

failing_task_t.set_downstream(succeeding_task_t)

Operating System

Debian GNU/Linux 11 (bullseye)

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:corekind:bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yet

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions