Skip to content

Kubernetes Pod Operator: race condition when using deferrable and logging_interval #41867

@johnhoran

Description

@johnhoran

Apache Airflow Provider(s)

cncf-kubernetes

Versions of Apache Airflow Providers

apache-airflow-providers-cncf-kubernetes==8.4.1

Apache Airflow version

2.9.1

Operating System

Linux

Deployment

Astronomer

Deployment details

Originally we noticed this in astronomer in a dag using astronomer-cosmos. However I have been able to reliably reproduce the issue in airflow running through a python debugger in vscode using just the Kubernetes Pod Operator.

What happened

When running a DAG we noticed a task that failed according to the logs output from DBT. However airflow still marked the task as successful and then proceeded with the next task.

In the airflow logs we see a line like

[2024-08-28, 13:30:10 UTC] {triggerer_job_runner.py:631} INFO - Trigger my_task_group/manual__2024-08-28T11:39:57.188213+00:00/regional_tickets.base_tickets_run/-1/1 (ID 711) fired: TriggerEvent<{'status': 'running', 'last_log_time': DateTime(2024, 8, 28, 13, 29, 1, 820284, tzinfo=Timezone('UTC')), 'namespace': 'sidereal-protostar-2231', 'name': 'my-dag-run-2p3iotfr'}>

Then logs from DBT that show task has failed followed by

2024-08-28, 13:30:56 UTC] {taskinstance.py:352} INFO - Marking task as SUCCESS. dag_id=my_dag, task_id=regional_tickets.base_tickets_run, run_id=manual__2024-08-28T11:39:57.188213+00:00, execution_date=20240828T113957, start_date=20240828T123001, end_date=20240828T133056

What you think should happen instead

The task should have failed and retried.

How to reproduce

I have created a simple dag like

from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
import airflow
from airflow import DAG
from datetime import timedelta

with DAG(
        'kubernetes_defer_test',
        default_args={
            'start_date': airflow.utils.dates.days_ago(0),
            'retries': 20,
            'retry_delay': timedelta(seconds=5)
        },
        schedule_interval=None,
        catchup=False,
) as dag:

    task1 = KubernetesPodOperator(
        name="hello-world",
        image="python:3.11-slim",
        cmds=['python', '-c'],
        arguments=["import time; time.sleep(5); print('hello'); time.sleep(30); print('world'); import sys; sys.exit(1)"],
        task_id="dummy_run",
        deferrable=True,
        logging_interval=10,
    )

And then placing a breakpoint on


I leave it wait here until the pod execution fails, and then when it resumes the task will be marked as successful.

Anything else

I hope this is a faithful replication of the bug. I have seen it just by running the dag in an airflow deployment, but it is intermittent until the timing lines up correctly.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions