Skip to content

Celery: Running task marked as failed on broker redelivery #58441

@arkadiuszbach

Description

@arkadiuszbach

Apache Airflow version

3.1.3

If "Other Airflow 2/3 version" selected, which one?

3.1.1

What happened?

Issue

When task is in the running state and there is a broker redelivery of the same task then it is marked as failed.

Redelivery

Redis redelivery:

  • after visibility_timeout (default 6 hours) if task hasn't been acked. On connectivity restart to redis, celery will not ack the task even though it succeeded in Airflow

  • if broker exited abruptly and didn't got a chance to dump the snapshot on disk (by default it creates a snapshot every 300 operations or 5minutes as i recall), see, scenario below:

    1. task was pushed by the scheduler to celery queue in redis
    2. redis wrote snapshot of in memory db into disk
    3. worker pulled and started the task
    4. redis crashed
    5. redis started and loaded the state from 2nd point, so the task is in the celery queue instead of in unacked_index
    6. task is redelivered to worker

    PS: this may happen currently in helm chart on pod restart as sigterm is not propagated correctly to redis, see helm: ensure graceful redis shutdown #58432

RabbitMQ:

  • As i recall it redeliver the task after connectivity restart without any delay, in k8s it will happen on pod restart

Scenario

  1. Task is running on some worker
  2. Redelivery happens, supervisor calls:
    ti_context = self.client.task_instances.start(ti.id, self.pid, start_date)
  3. task_instances.start throws ServerResponseError due to 407 http code as the task is already running:
    "message": "TI was not in a state where it could be marked as running",
  4. CeleryExecutor on Scheduler process gets the failure event from Celery
  5. Scheduler is processing the events from executor within process_executor_events, task instance fullfills below condition (ti.state = TaskInstanceState.RUNNING) and is marked as failed:
    ti_queued = ti.try_number == buffer_key.try_number and ti.state in (

In Airflow < 3 such scenario was not causing task failures as only TaskIstanceState.Queued was considered in this condition :

ti_queued = ti.try_number == buffer_key.try_number and ti.state == TaskInstanceState.QUEUED

PS: it was changed in this PR #41625

Logs

Worker:


2025-11-18T18:05:33.195572Z [info     [] starting stale bundle cleanup process [airflow.providers.celery.cli.celery_command] loc=celery_command.py:141
2025-11-18T18:05:33.198872Z [info     [] Starting log server on http://[::[]:8793 [airflow.utils.serve_logs.core] loc=core.py:50
WARNING:  ASGI app factory detected. Using it, but please consider setting the --factory flag explicitly.
INFO:     Started server process [15]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://:8793 (Press CTRL+C to quit)
 
 -------------- celery@airflow-worker-78d74bfcff-77qj2 v5.5.3 (immunity)
--- ***** ----- 
-- ******* ---- Linux-5.15.0-1097-azure-x86_64-with-glibc2.36 2025-11-18 18:05:34
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         airflow.providers.celery.executors.celery_executor:0x7f86fdf94fe0
- ** ---------- .> transport:   redis://:**@airflow-redis:6379/0
- ** ---------- .> results:     postgresql://pgadmin:**@airflow-pgbouncer.airflow-ap-d1:6543/airflow-metadata
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> default          exchange=default(direct) key=default
                

[tasks]
  . execute_workload

2025-11-18T18:05:37.512920Z [info     [] Connected to redis://:**@airflow-redis:6379/0 [celery.worker.consumer.connection] loc=connection.py:22
2025-11-18T18:05:37.541429Z [info     [] celery@airflow-worker-78d74bfcff-77qj2 ready. [celery.apps.worker] loc=worker.py:176
2025-11-18T18:05:38.617744Z [info     [] Events of group {task} enabled by remote. [celery.worker.control] loc=control.py:343
2025-11-18T18:07:05.324375Z [info     [] Task execute_workload[5fc1cd6a-2920-4a70-bea1-696704f17a75[] received [celery.worker.strategy] loc=strategy.py:161
2025-11-18T18:07:05.423828Z [info     [] [5fc1cd6a-2920-4a70-bea1-696704f17a75[] Executing workload in Celery: token='eyJ***' ti=TaskInstance(id=UUID('019a9826-13fa-7352-abcc-138aee8aaa86'), dag_version_id=UUID('019a54e9-057e-7849-bcc9-ccf1af83c6d3'), task_id='test_4', dag_id='test', run_id='scheduled__2025-11-18T18:00:00+00:00', try_number=2, map_index=-1, pool_slots=1, queue='default', priority_weight=1, executor_config=None, parent_context_carrier={}, context_carrier={}) dag_rel_path=PurePosixPath('test_dag.py') bundle_info=BundleInfo(name='dags-folder', version=None) log_path='dag_id=test/run_id=scheduled__2025-11-18T18:00:00+00:00/task_id=test_4/attempt=2.log' type='ExecuteTask' [airflow.providers.celery.executors.celery_executor_utils] loc=celery_executor_utils.py:166
2025-11-18T18:07:05.467438Z [info     [] Secrets backends loaded for worker [supervisor] backend_classes=['EnvironmentVariablesBackend'] count=1 loc=supervisor.py:1917
2025-11-18T18:08:57.547218Z [info     [] Task execute_workload[5fc1cd6a-2920-4a70-bea1-696704f17a75[] received [celery.worker.strategy] loc=strategy.py:161
2025-11-18T18:08:57.610058Z [info     [] [5fc1cd6a-2920-4a70-bea1-696704f17a75[] Executing workload in Celery: token='eyJ***' ti=TaskInstance(id=UUID('019a9826-13fa-7352-abcc-138aee8aaa86'), dag_version_id=UUID('019a54e9-057e-7849-bcc9-ccf1af83c6d3'), task_id='test_4', dag_id='test', run_id='scheduled__2025-11-18T18:00:00+00:00', try_number=2, map_index=-1, pool_slots=1, queue='default', priority_weight=1, executor_config=None, parent_context_carrier={}, context_carrier={}) dag_rel_path=PurePosixPath('test_dag.py') bundle_info=BundleInfo(name='dags-folder', version=None) log_path='dag_id=test/run_id=scheduled__2025-11-18T18:00:00+00:00/task_id=test_4/attempt=2.log' type='ExecuteTask' [airflow.providers.celery.executors.celery_executor_utils] loc=celery_executor_utils.py:166
2025-11-18T18:08:57.652183Z [info     [] Secrets backends loaded for worker [supervisor] backend_classes=['EnvironmentVariablesBackend'] count=1 loc=supervisor.py:1917
2025-11-18T18:08:57.678714Z [warning  [] Server error                   [airflow.sdk.api.client] detail={'detail': {'reason': 'invalid_state', 'message': 'TI was not in a state where it could be marked as running', 'previous_state': 'running'}} loc=client.py:171
2025-11-18T18:08:57.686821Z [info     [] Process exited                 [supervisor] exit_code=-9 loc=supervisor.py:708 pid=86 signal_sent=SIGKILL
2025-11-18T18:08:57.705082Z [error    [] Task execute_workload[5fc1cd6a-2920-4a70-bea1-696704f17a75[] raised unexpected: ServerResponseError('Server returned error') [celery.app.trace] loc=trace.py:267
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.12/site-packages/celery/app/trace.py", line 453, in trace_task
    R = retval = fun(*args, **kwargs)
                 ^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/celery/app/trace.py", line 736, in __protected_call__
    return self.run(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/celery/executors/celery_executor_utils.py", line 174, in execute_workload
    supervise(
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py", line 1926, in supervise
    process = ActivitySubprocess.start(
              ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py", line 950, in start
    proc._on_child_started(ti=what, dag_rel_path=dag_rel_path, bundle_info=bundle_info)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/supervisor.py", line 961, in _on_child_started
    ti_context = self.client.task_instances.start(ti.id, self.pid, start_date)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/api/client.py", line 211, in start
    resp = self.client.patch(f"task-instances/{id}/run", content=body.model_dump_json())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/httpx/_client.py", line 1218, in patch
    return self.request(
           ^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 338, in wrapped_f
    return copy(f, *args, **kw)
           ^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 477, in __call__
    do = self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 378, in iter
    result = action(retry_state)
             ^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 400, in <lambda>
    self._add_action_func(lambda rs: rs.outcome.result())
                                     ^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.12/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 480, in __call__
    result = fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/api/client.py", line 867, in request
    return super().request(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/httpx/_client.py", line 825, in request
    return self.send(request, auth=auth, follow_redirects=follow_redirects)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/httpx/_client.py", line 914, in send
    response = self._send_handling_auth(
               ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/httpx/_client.py", line 942, in _send_handling_auth
    response = self._send_handling_redirects(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/httpx/_client.py", line 999, in _send_handling_redirects
    raise exc
  File "/home/airflow/.local/lib/python3.12/site-packages/httpx/_client.py", line 982, in _send_handling_redirects
    hook(response)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/api/client.py", line 182, in raise_on_4xx_5xx_with_note
    return get_json_error(response) or response.raise_for_status()
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/api/client.py", line 172, in get_json_error
    raise err
airflow.sdk.api.client.ServerResponseError: Server returned error
Correlation-id=019a9827-d13d-7175-b0fb-b69a017a9ffd
INFO:     10.0.49.54:40460 - "GET /log/dag_id%3Dtest/run_id%3Dscheduled__2025-11-18T18%3A00%3A00%2B00%3A00/task_id%3Dtest_4/attempt%3D2.log HTTP/1.1" 200 OK
2025-11-18T18:09:01.229047Z [warning  [] Server error                   [airflow.sdk.api.client] detail={'detail': {'reason': 'not_running', 'message': 'TI is no longer in the running state and task should terminate', 'current_state': 'failed'}} loc=client.py:171
2025-11-18T18:09:01.229556Z [error    [] Server indicated the task shouldn't be running anymore [supervisor] detail={'detail': {'reason': 'not_running', 'message': 'TI is no longer in the running state and task should terminate', 'current_state': 'failed'}} loc=supervisor.py:1109 status_code=409 ti_id=UUID('019a9826-13fa-7352-abcc-138aee8aaa86')
2025-11-18T18:09:06.235061Z [warning  [] Process did not terminate in time; escalating [supervisor] loc=supervisor.py:716 pid=64 signal=SIGTERM
2025-11-18T18:09:06.247180Z [info     [] Process exited                 [supervisor] exit_code=-9 loc=supervisor.py:708 pid=64 signal_sent=SIGKILL
2025-11-18T18:09:06.247710Z [info     [] Task finished                  [supervisor] duration=120.79638931999943 exit_code=-9 final_state=SERVER_TERMINATED loc=supervisor.py:1937 task_instance_id=019a9826-13fa-7352-abcc-138aee8aaa86
2025-11-18T18:09:06.263830Z [info     [] Task execute_workload[5fc1cd6a-2920-4a70-bea1-696704f17a75[] succeeded in 120.9372150579984s: None [celery.app.trace] loc=trace.py:128

Scheduler:


  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
2025-11-18T18:06:18.629850Z [info     [] Starting the scheduler         [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:1054
2025-11-18T18:06:19.812843Z [info     [] Loaded executor: :CeleryExecutor: [airflow.executors.executor_loader] loc=executor_loader.py:281
2025-11-18T18:06:19.921818Z [info     [] Loaded executor: :KubernetesExecutor: [airflow.executors.executor_loader] loc=executor_loader.py:281
2025-11-18T18:06:19.922646Z [info     [] Start Kubernetes executor      [airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor] loc=kubernetes_executor.py:238
2025-11-18T18:06:20.013982Z [info     [] Adopting or resetting orphaned tasks for active dag runs [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:2276
2025-11-18T18:06:20.020902Z [info     [] Event: and now my watch begins starting at resource_version: 0 [airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher] loc=kubernetes_executor_utils.py:132
2025-11-18T18:06:20.025287Z [info     [] Marked 1 SchedulerJob instances as failed [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:2299
2025-11-18T18:07:04.809254Z [info     ] 1 tasks up for execution:
    <TaskInstance: test.test_4 scheduled__2025-11-18T18:00:00+00:00 [scheduled[]> [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:444
2025-11-18T18:07:04.809453Z [info     [] DAG test has 0/16 running and queued tasks [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:516
2025-11-18T18:07:04.810400Z [info     ] Setting the following tasks to queued state:
    <TaskInstance: test.test_4 scheduled__2025-11-18T18:00:00+00:00 [scheduled[]> [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:655
2025-11-18T18:07:04.813201Z [info     [] Trying to enqueue tasks: [<TaskInstance: test.test_4 scheduled__2025-11-18T18:00:00+00:00 [scheduled[]>] for executor: CeleryExecutor(parallelism=32) [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:740
2025-11-18T18:07:05.348969Z [info     [] Received executor event with state queued for task instance TaskInstanceKey(dag_id='test', task_id='test_4', run_id='scheduled__2025-11-18T18:00:00+00:00', try_number=2, map_index=-1) [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:818
2025-11-18T18:07:05.369501Z [info     [] Setting external_executor_id for <TaskInstance: test.test_4 scheduled__2025-11-18T18:00:00+00:00 [queued[]> to 5fc1cd6a-2920-4a70-bea1-696704f17a75 [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:854
2025-11-18T18:08:58.283462Z [info     [] Received executor event with state failed for task instance TaskInstanceKey(dag_id='test', task_id='test_4', run_id='scheduled__2025-11-18T18:00:00+00:00', try_number=2, map_index=-1) [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:818
2025-11-18T18:08:58.294573Z [info     [] TaskInstance Finished: dag_id=test, task_id=test_4, run_id=scheduled__2025-11-18T18:00:00+00:00, map_index=-1, run_start_date=2025-11-18 18:07:05.473530+00:00, run_end_date=None, run_duration=262.232856, state=running, executor=CeleryExecutor(parallelism=32), executor_state=failed, try_number=2, max_tries=1, pool=default_pool, queue=default, priority_weight=1, operator=PythonOperator, queued_dttm=2025-11-18 18:07:04.810721+00:00, scheduled_dttm=2025-11-18 18:07:04.781059+00:00,queued_by_job_id=215, pid=64 [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] loc=scheduler_job_runner.py:864
2025-11-18T18:08:58.297131Z [error    [] Executor CeleryExecutor(parallelism=32) reported that the task instance <TaskInstance: test.test_4 scheduled__2025-11-18T18:00:00+00:00 [running[]> finished with state failed, but the task instance's state attribute is running. Learn more: https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally [airflow.task] loc=taskinstance.py:1505
2025-11-18T18:08:58.301159Z [info     [] Marking task as FAILED. dag_id=test, task_id=test_4, run_id=scheduled__2025-11-18T18:00:00+00:00, logical_date=20251118T180000, start_date=20251118T180705, end_date=20251118T180858 [airflow.models.taskinstance] loc=taskinstance.py:1595
2025-11-18T18:08:59.374603Z [info     [] Marking run <DagRun test @ 2025-11-18 18:00:00+00:00: scheduled__2025-11-18T18:00:00+00:00, state:running, queued_at: 2025-11-18 18:07:03.683680+00:00. run_type: scheduled> failed [airflow.models.dagrun.DagRun] loc=dagrun.py:1171
2025-11-18T18:08:59.374813Z [info     [] DagRun Finished: dag_id=test, logical_date=2025-11-18 18:00:00+00:00, run_id=scheduled__2025-11-18T18:00:00+00:00, run_start_date=2025-11-18 18:07:04.747229+00:00, run_end_date=2025-11-18 18:08:59.374738+00:00, run_duration=114.627509, state=failed, run_type=scheduled, data_interval_start=2025-11-18 18:00:00+00:00, data_interval_end=2025-11-18 18:00:00+00:00, [airflow.models.dagrun.DagRun] loc=dagrun.py:1274
2025-11-18T18:08:59.381120Z [info     [] Setting next_dagrun for test to 2025-11-18 19:00:00+00:00, run_after=2025-11-18 19:00:00+00:00 [airflow.models.dag] loc=dag.py:688

What you think should happen instead?

No response

How to reproduce

With redis:

  1. Start some dummy sleep task for 10 minutes
  2. log into redis-cli
  3. Get id from zrange unacked_index 0 -1 withscores
  4. Run ZINCRBY unacked_index -{{visibility_timeout}} {{id from previous command}} to force redelivery, i.e. ZINCRBY unacked_index -86400 38ff5954-4edd-46fb-aca4-7749b8702977
  5. Wait, worker should get the same message soon and the running task will be marked as failed

With RabbitMQ:

  1. Restart of RabbitMQ while the task is running should be sufficient in theory, but didn't test this

Operating System

Kubernetes

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions