-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow Provider(s)
cncf-kubernetes
Versions of Apache Airflow Providers
apache-airflow-providers-cncf-kubernetes==8.4.1
Apache Airflow version
2.9.1
Operating System
Linux
Deployment
Astronomer
Deployment details
Originally we noticed this in astronomer in a dag using astronomer-cosmos. However I have been able to reliably reproduce the issue in airflow running through a python debugger in vscode using just the Kubernetes Pod Operator.
What happened
When running a DAG we noticed a task that failed according to the logs output from DBT. However airflow still marked the task as successful and then proceeded with the next task.
In the airflow logs we see a line like
[2024-08-28, 13:30:10 UTC] {triggerer_job_runner.py:631} INFO - Trigger my_task_group/manual__2024-08-28T11:39:57.188213+00:00/regional_tickets.base_tickets_run/-1/1 (ID 711) fired: TriggerEvent<{'status': 'running', 'last_log_time': DateTime(2024, 8, 28, 13, 29, 1, 820284, tzinfo=Timezone('UTC')), 'namespace': 'sidereal-protostar-2231', 'name': 'my-dag-run-2p3iotfr'}>
Then logs from DBT that show task has failed followed by
2024-08-28, 13:30:56 UTC] {taskinstance.py:352} INFO - Marking task as SUCCESS. dag_id=my_dag, task_id=regional_tickets.base_tickets_run, run_id=manual__2024-08-28T11:39:57.188213+00:00, execution_date=20240828T113957, start_date=20240828T123001, end_date=20240828T133056
What you think should happen instead
The task should have failed and retried.
How to reproduce
I have created a simple dag like
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
import airflow
from airflow import DAG
from datetime import timedelta
with DAG(
'kubernetes_defer_test',
default_args={
'start_date': airflow.utils.dates.days_ago(0),
'retries': 20,
'retry_delay': timedelta(seconds=5)
},
schedule_interval=None,
catchup=False,
) as dag:
task1 = KubernetesPodOperator(
name="hello-world",
image="python:3.11-slim",
cmds=['python', '-c'],
arguments=["import time; time.sleep(5); print('hello'); time.sleep(30); print('world'); import sys; sys.exit(1)"],
task_id="dummy_run",
deferrable=True,
logging_interval=10,
)And then placing a breakpoint on
| exception = None |
I leave it wait here until the pod execution fails, and then when it resumes the task will be marked as successful.
Anything else
I hope this is a faithful replication of the bug. I have seen it just by running the dag in an airflow deployment, but it is intermittent until the timing lines up correctly.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct