-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
3.1.3
If "Other Airflow 2/3 version" selected, which one?
3.1.1
What happened?
Issue
When task is in the running state and there is a broker redelivery of the same task then it is marked as failed.
Redelivery
Redis redelivery:
-
after visibility_timeout (default 6 hours) if task hasn't been acked. On connectivity restart to redis, celery will not ack the task even though it succeeded in Airflow
-
if broker exited abruptly and didn't got a chance to dump the snapshot on disk (by default it creates a snapshot every 300 operations or 5minutes as i recall), see, scenario below:
- task was pushed by the scheduler to celery queue in redis
- redis wrote snapshot of in memory db into disk
- worker pulled and started the task
- redis crashed
- redis started and loaded the state from 2nd point, so the task is in the celery queue instead of in unacked_index
- task is redelivered to worker
PS: this may happen currently in helm chart on pod restart as sigterm is not propagated correctly to redis, see helm: ensure graceful redis shutdown #58432
RabbitMQ:
- As i recall it redeliver the task after connectivity restart without any delay, in k8s it will happen on pod restart
Scenario
- Task is running on some worker
- Redelivery happens, supervisor calls:
ti_context = self.client.task_instances.start(ti.id, self.pid, start_date) - task_instances.start throws ServerResponseError due to 407 http code as the task is already running:
airflow/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
Line 195 in b4b9418
"message": "TI was not in a state where it could be marked as running", - CeleryExecutor on Scheduler process gets the failure event from Celery
- Scheduler is processing the events from executor within
process_executor_events, task instance fullfills below condition (ti.state = TaskInstanceState.RUNNING) and is marked as failed:ti_queued = ti.try_number == buffer_key.try_number and ti.state in (
In Airflow < 3 such scenario was not causing task failures as only TaskIstanceState.Queued was considered in this condition :
airflow/airflow/jobs/scheduler_job_runner.py
Line 904 in b93c3db
| ti_queued = ti.try_number == buffer_key.try_number and ti.state == TaskInstanceState.QUEUED |
PS: it was changed in this PR #41625
Logs
Worker:
2025-11-18T18:05:33.195572Z [info [] starting stale bundle cleanup process [airflow.providers.celery.cli.celery_command] loc=celery_command.py:141
2025-11-18T18:05:33.198872Z [info [] Starting log server on http://[::[]:8793 [airflow.utils.serve_logs.core] loc=core.py:50
WARNING: ASGI app factory detected. Using it, but please consider setting the --factory flag explicitly.
INFO: Started server process [15]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://:8793 (Press CTRL+C to quit)
-------------- celery@airflow-worker-78d74bfcff-77qj2 v5.5.3 (immunity)
--- ***** -----
-- ******* ---- Linux-5.15.0-1097-azure-x86_64-with-glibc2.36 2025-11-18 18:05:34
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: airflow.providers.celery.executors.celery_executor:0x7f86fdf94fe0
- ** ---------- .> transport: redis://:**@airflow-redis:6379/0
- ** ---------- .> results: postgresql://pgadmin:**@airflow-pgbouncer.airflow-ap-d1:6543/airflow-metadata
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> default exchange=default(direct) key=default
[tasks]
. execute_workload
2025-11-18T18:05:37.512920Z [info [] Connected to redis://:**@airflow-redis:6379/0 [celery.worker.consumer.connection] loc=connection.py:22
2025-11-18T18:05:37.541429Z [info [] celery@airflow-worker-78d74bfcff-77qj2 ready. [celery.apps.worker] loc=worker.py:176
2025-11-18T18:05:38.617744Z [info [] Events of group {task} enabled by remote. [celery.worker.control] loc=control.py:343
2025-11-18T18:07:05.324375Z [info [] Task execute_workload[5fc1cd6a-2920-4a70-bea1-696704f17a75[] received [celery.worker.strategy] loc=strategy.py:161
2025-11-18T18:07:05.423828Z [info [] [5fc1cd6a-2920-4a70-bea1-696704f17a75[] Executing workload in Celery: token='eyJ***' ti=TaskInstance(id=UUID('019a9826-13fa-7352-abcc-138aee8aaa86'), dag_version_id=UUID('019a54e9-057e-7849-bcc9-ccf1af83c6d3'), task_id='test_4', dag_id='test', run_id='scheduled__2025-11-18T18:00:00+00:00', try_number=2, map_index=-1, pool_slots=1, queue='default', priority_weight=1, executor_config=None, parent_context_carrier={}, context_carrier={}) dag_rel_path=PurePosixPath('test_dag.py') bundle_info=BundleInfo(name='dags-folder', version=None) log_path='dag_id=test/run_id=scheduled__2025-11-18T18:00:00+00:00/task_id=test_4/attempt=2.log' type='ExecuteTask' [airflow.providers.celery.executors.celery_executor_utils] loc=celery_executor_utils.py:166
2025-11-18T18:07:05.467438Z [info [] Secrets backends loaded for worker [supervisor] backend_classes=['EnvironmentVariablesBackend'] count=1 loc=supervisor.py:1917
2025-11-18T18:08:57.547218Z [info [] Task execute_workload[5fc1cd6a-2920-4a70-bea1-696704f17a75[] received [celery.worker.strategy] loc=strategy.py:161
2025-11-18T18:08:57.610058Z [info [] [5fc1cd6a-2920-4a70-bea1-696704f17a75[] Executing workload in Celery: token='eyJ***' ti=TaskInstance(id=UUID('019a9826-13fa-7352-abcc-138aee8aaa86'), dag_version_id=UUID('019a54e9-057e-7849-bcc9-ccf1af83c6d3'), task_id='test_4', dag_id='test', run_id='scheduled__2025-11-18T18:00:00+00:00', try_number=2, map_index=-1, pool_slots=1, queue='default', priority_weight=1, executor_config=None, parent_context_carrier={}, context_carrier={}) dag_rel_path=PurePosixPath('test_dag.py') bundle_info=BundleInfo(name='dags-folder', version=None) log_path='dag_id=test/run_id=scheduled__2025-11-18T18:00:00+00:00/task_id=test_4/attempt=2.log' type='ExecuteTask' [airflow.providers.celery.executors.celery_executor_utils] loc=celery_executor_utils.py:166
2025-11-18T18:08:57.652183Z [info [] Secrets backends loaded for worker [supervisor] backend_classes=['EnvironmentVariablesBackend'] count=1 loc=supervisor.py:1917
2025-11-18T18:08:57.678714Z [warning [] Server error [airflow.sdk.api.client] detail={'detail': {'reason': 'invalid_state', 'message': 'TI was not in a state where it could be marked as running', 'previous_state': 'running'}} loc=client.py:171
2025-11-18T18:08:57.686821Z [info [] Process exited [supervisor] exit_code=-9 loc=supervisor.py:708 pid=86 signal_sent=SIGKILL
2025-11-18T18:08:57.705082Z [error [] Task execute_workload[5fc1cd6a-2920-4a70-bea1-696704f17a75[] raised unexpected: ServerResponseError('Server returned error') [celery.app.trace] loc=trace.py:267
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.12/site-packages/celery/app/trace.py", line 453, in trace_task
R = retval = fun(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/celery/app/trace.py", line 736, in __protected_call__
return self.run(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/celery/executors/celery_executor_utils.py", line 174, in execute_workload
supervise(
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py", line 1926, in supervise
process = ActivitySubprocess.start(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py", line 950, in start
proc._on_child_started(ti=what, dag_rel_path=dag_rel_path, bundle_info=bundle_info)
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py", line 961, in _on_child_started
ti_context = self.client.task_instances.start(ti.id, self.pid, start_date)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/api/client.py", line 211, in start
resp = self.client.patch(f"task-instances/{id}/run", content=body.model_dump_json())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/httpx/_client.py", line 1218, in patch
return self.request(
^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 338, in wrapped_f
return copy(f, *args, **kw)
^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 477, in __call__
do = self.iter(retry_state=retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 378, in iter
result = action(retry_state)
^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 400, in <lambda>
self._add_action_func(lambda rs: rs.outcome.result())
^^^^^^^^^^^^^^^^^^^
File "/usr/python/lib/python3.12/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/python/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 480, in __call__
result = fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/api/client.py", line 867, in request
return super().request(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/httpx/_client.py", line 825, in request
return self.send(request, auth=auth, follow_redirects=follow_redirects)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/httpx/_client.py", line 914, in send
response = self._send_handling_auth(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/httpx/_client.py", line 942, in _send_handling_auth
response = self._send_handling_redirects(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/httpx/_client.py", line 999, in _send_handling_redirects
raise exc
File "/home/airflow/.local/lib/python3.12/site-packages/httpx/_client.py", line 982, in _send_handling_redirects
hook(response)
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/api/client.py", line 182, in raise_on_4xx_5xx_with_note
return get_json_error(response) or response.raise_for_status()
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/api/client.py", line 172, in get_json_error
raise err
airflow.sdk.api.client.ServerResponseError: Server returned error
Correlation-id=019a9827-d13d-7175-b0fb-b69a017a9ffd
INFO: 10.0.49.54:40460 - "GET /log/dag_id%3Dtest/run_id%3Dscheduled__2025-11-18T18%3A00%3A00%2B00%3A00/task_id%3Dtest_4/attempt%3D2.log HTTP/1.1" 200 OK
2025-11-18T18:09:01.229047Z [warning [] Server error [airflow.sdk.api.client] detail={'detail': {'reason': 'not_running', 'message': 'TI is no longer in the running state and task should terminate', 'current_state': 'failed'}} loc=client.py:171
2025-11-18T18:09:01.229556Z [error [] Server indicated the task shouldn't be running anymore [supervisor] detail={'detail': {'reason': 'not_running', 'message': 'TI is no longer in the running state and task should terminate', 'current_state': 'failed'}} loc=supervisor.py:1109 status_code=409 ti_id=UUID('019a9826-13fa-7352-abcc-138aee8aaa86')
2025-11-18T18:09:06.235061Z [warning [] Process did not terminate in time; escalating [supervisor] loc=supervisor.py:716 pid=64 signal=SIGTERM
2025-11-18T18:09:06.247180Z [info [] Process exited [supervisor] exit_code=-9 loc=supervisor.py:708 pid=64 signal_sent=SIGKILL
2025-11-18T18:09:06.247710Z [info [] Task finished [supervisor] duration=120.79638931999943 exit_code=-9 final_state=SERVER_TERMINATED loc=supervisor.py:1937 task_instance_id=019a9826-13fa-7352-abcc-138aee8aaa86
2025-11-18T18:09:06.263830Z [info [] Task execute_workload[5fc1cd6a-2920-4a70-bea1-696704f17a75[] succeeded in 120.9372150579984s: None [celery.app.trace] loc=trace.py:128
Scheduler:
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
2025-11-18T18:06:18.629850Z [info [] Starting the scheduler [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:1054
2025-11-18T18:06:19.812843Z [info [] Loaded executor: :CeleryExecutor: [airflow.executors.executor_loader] loc=executor_loader.py:281
2025-11-18T18:06:19.921818Z [info [] Loaded executor: :KubernetesExecutor: [airflow.executors.executor_loader] loc=executor_loader.py:281
2025-11-18T18:06:19.922646Z [info [] Start Kubernetes executor [airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor] loc=kubernetes_executor.py:238
2025-11-18T18:06:20.013982Z [info [] Adopting or resetting orphaned tasks for active dag runs [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:2276
2025-11-18T18:06:20.020902Z [info [] Event: and now my watch begins starting at resource_version: 0 [airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher] loc=kubernetes_executor_utils.py:132
2025-11-18T18:06:20.025287Z [info [] Marked 1 SchedulerJob instances as failed [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:2299
2025-11-18T18:07:04.809254Z [info ] 1 tasks up for execution:
<TaskInstance: test.test_4 scheduled__2025-11-18T18:00:00+00:00 [scheduled[]> [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:444
2025-11-18T18:07:04.809453Z [info [] DAG test has 0/16 running and queued tasks [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:516
2025-11-18T18:07:04.810400Z [info ] Setting the following tasks to queued state:
<TaskInstance: test.test_4 scheduled__2025-11-18T18:00:00+00:00 [scheduled[]> [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:655
2025-11-18T18:07:04.813201Z [info [] Trying to enqueue tasks: [<TaskInstance: test.test_4 scheduled__2025-11-18T18:00:00+00:00 [scheduled[]>] for executor: CeleryExecutor(parallelism=32) [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:740
2025-11-18T18:07:05.348969Z [info [] Received executor event with state queued for task instance TaskInstanceKey(dag_id='test', task_id='test_4', run_id='scheduled__2025-11-18T18:00:00+00:00', try_number=2, map_index=-1) [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:818
2025-11-18T18:07:05.369501Z [info [] Setting external_executor_id for <TaskInstance: test.test_4 scheduled__2025-11-18T18:00:00+00:00 [queued[]> to 5fc1cd6a-2920-4a70-bea1-696704f17a75 [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:854
2025-11-18T18:08:58.283462Z [info [] Received executor event with state failed for task instance TaskInstanceKey(dag_id='test', task_id='test_4', run_id='scheduled__2025-11-18T18:00:00+00:00', try_number=2, map_index=-1) [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:818
2025-11-18T18:08:58.294573Z [info [] TaskInstance Finished: dag_id=test, task_id=test_4, run_id=scheduled__2025-11-18T18:00:00+00:00, map_index=-1, run_start_date=2025-11-18 18:07:05.473530+00:00, run_end_date=None, run_duration=262.232856, state=running, executor=CeleryExecutor(parallelism=32), executor_state=failed, try_number=2, max_tries=1, pool=default_pool, queue=default, priority_weight=1, operator=PythonOperator, queued_dttm=2025-11-18 18:07:04.810721+00:00, scheduled_dttm=2025-11-18 18:07:04.781059+00:00,queued_by_job_id=215, pid=64 [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:864
2025-11-18T18:08:58.297131Z [error [] Executor CeleryExecutor(parallelism=32) reported that the task instance <TaskInstance: test.test_4 scheduled__2025-11-18T18:00:00+00:00 [running[]> finished with state failed, but the task instance's state attribute is running. Learn more: https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally [airflow.task] loc=taskinstance.py:1505
2025-11-18T18:08:58.301159Z [info [] Marking task as FAILED. dag_id=test, task_id=test_4, run_id=scheduled__2025-11-18T18:00:00+00:00, logical_date=20251118T180000, start_date=20251118T180705, end_date=20251118T180858 [airflow.models.taskinstance] loc=taskinstance.py:1595
2025-11-18T18:08:59.374603Z [info [] Marking run <DagRun test @ 2025-11-18 18:00:00+00:00: scheduled__2025-11-18T18:00:00+00:00, state:running, queued_at: 2025-11-18 18:07:03.683680+00:00. run_type: scheduled> failed [airflow.models.dagrun.DagRun] loc=dagrun.py:1171
2025-11-18T18:08:59.374813Z [info [] DagRun Finished: dag_id=test, logical_date=2025-11-18 18:00:00+00:00, run_id=scheduled__2025-11-18T18:00:00+00:00, run_start_date=2025-11-18 18:07:04.747229+00:00, run_end_date=2025-11-18 18:08:59.374738+00:00, run_duration=114.627509, state=failed, run_type=scheduled, data_interval_start=2025-11-18 18:00:00+00:00, data_interval_end=2025-11-18 18:00:00+00:00, [airflow.models.dagrun.DagRun] loc=dagrun.py:1274
2025-11-18T18:08:59.381120Z [info [] Setting next_dagrun for test to 2025-11-18 19:00:00+00:00, run_after=2025-11-18 19:00:00+00:00 [airflow.models.dag] loc=dag.py:688
What you think should happen instead?
No response
How to reproduce
With redis:
- Start some dummy sleep task for 10 minutes
- log into redis-cli
- Get id from
zrange unacked_index 0 -1 withscores - Run
ZINCRBY unacked_index -{{visibility_timeout}} {{id from previous command}}to force redelivery, i.e.ZINCRBY unacked_index -86400 38ff5954-4edd-46fb-aca4-7749b8702977 - Wait, worker should get the same message soon and the running task will be marked as failed
With RabbitMQ:
- Restart of RabbitMQ while the task is running should be sufficient in theory, but didn't test this
Operating System
Kubernetes
Versions of Apache Airflow Providers
No response
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct