Skip to content

airflow-scheduler deadlock and crash #41992

@zchunhai

Description

@zchunhai

Apache Airflow version

2.10.0

If "Other Airflow 2 version" selected, which one?

No response

What happened?

[2024-09-04T16:08:33.735+0800] {kubernetes_executor.py:356} INFO - Changing state of (TaskInstanceKey(dag_id='7402a0ed-4746-44b6-9a91-5e1682f32cf0', task_id='table_counts-child-3', run_id='scheduled__2024-09-03T08:03:23.150918+00:00', try_number=1, map_index=-1), None, '7402a0ed-4746-44b6-9a91-5e1682f32cf0-table-counts-child-3-p3lreggg', 'airflow', '41210824') to None
[2024-09-04T16:08:33.741+0800] {kubernetes_executor.py:441} INFO - Deleted pod: TaskInstanceKey(dag_id='7402a0ed-4746-44b6-9a91-5e1682f32cf0', task_id='table_counts-child-3', run_id='scheduled__2024-09-03T08:03:23.150918+00:00', try_number=1, map_index=-1) in namespace airflow
[2024-09-04T16:08:33.744+0800] {kubernetes_executor.py:360} ERROR - Exception: None is not a valid TaskInstanceState when attempting to change state of (TaskInstanceKey(dag_id='7402a0ed-4746-44b6-9a91-5e1682f32cf0', task_id='table_counts-child-3', run_id='scheduled__2024-09-03T08:03:23.150918+00:00', try_number=1, map_index=-1), None, '7402a0ed-4746-44b6-9a91-5e1682f32cf0-table-counts-child-3-p3lreggg', 'airflow', '41210824') to None, re-queueing.
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py", line 358, in sync
self._change_state(key, state, pod_name, namespace)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 97, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py", line 456, in _change_state
state = TaskInstanceState(state)
File "/usr/local/lib/python3.10/enum.py", line 385, in call
return cls.new(cls, value)
File "/usr/local/lib/python3.10/enum.py", line 710, in new
raise ve_exc
ValueError: None is not a valid TaskInstanceState
[2024-09-04T16:08:33.746+0800] {kubernetes_executor.py:356} INFO - Changing state of (TaskInstanceKey(dag_id='7402a0ed-4746-44b6-9a91-5e1682f32cf0', task_id='table_counts-child-3', run_id='scheduled__2024-09-03T08:03:23.150918+00:00', try_number=1, map_index=-1), None, '7402a0ed-4746-44b6-9a91-5e1682f32cf0-table-counts-child-3-p3lreggg', 'airflow', '41210824') to None
[2024-09-04T16:08:33.747+0800] {scheduler_job_runner.py:260} INFO - Exiting gracefully upon receiving signal 15
[2024-09-04T16:08:34.750+0800] {process_utils.py:132} INFO - Sending Signals.SIGTERM to group 2540. PIDs of all processes in the group: [20146, 20284, 2540]
[2024-09-04T16:08:34.750+0800] {process_utils.py:87} INFO - Sending the signal Signals.SIGTERM to group 2540
[2024-09-04T16:08:37.565+0800] {process_utils.py:266} INFO - Waiting up to 5 seconds for processes to exit...
[2024-09-04T16:08:37.572+0800] {process_utils.py:80} INFO - Process psutil.Process(pid=20146, status='terminated', started='16:08:33') (20146) terminated with exit code None
[2024-09-04T16:08:37.586+0800] {process_utils.py:266} INFO - Waiting up to 5 seconds for processes to exit...
[2024-09-04T16:08:37.586+0800] {process_utils.py:80} INFO - Process psutil.Process(pid=20284, status='terminated', started='16:08:34') (20284) terminated with exit code None
[2024-09-04T16:08:37.613+0800] {process_utils.py:80} INFO - Process psutil.Process(pid=2540, status='terminated', exitcode=0, started='16:04:42') (2540) terminated with exit code 0
[2024-09-04T16:08:37.615+0800] {scheduler_job_runner.py:1001} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py", line 358, in sync
self._change_state(key, state, pod_name, namespace)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 97, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py", line 440, in _change_state
self.kube_scheduler.delete_pod(pod_name=pod_name, namespace=namespace)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 416, in delete_pod
body=client.V1DeleteOptions(**self.kube_config.delete_option_kwargs),
File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/models/v1_delete_options.py", line 58, in init
local_vars_configuration = Configuration()
File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/configuration.py", line 126, in init
self.debug = False
File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/configuration.py", line 271, in debug
logger.setLevel(logging.WARNING)
File "/usr/local/lib/python3.10/logging/init.py", line 1453, in setLevel
self.manager._clear_cache()
File "/usr/local/lib/python3.10/logging/init.py", line 1412, in _clear_cache
logger._cache.clear()
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py", line 263, in _exit_gracefully
sys.exit(os.EX_OK)
SystemExit: 0

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py", line 984, in _execute
self._run_scheduler_loop()
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py", line 1127, in _run_scheduler_loop
executor.heartbeat()
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/traces/tracer.py", line 58, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/executors/base_executor.py", line 247, in heartbeat
self.sync()
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py", line 368, in sync
self.result_queue.task_done()
File "", line 2, in task_done
File "/usr/local/lib/python3.10/multiprocessing/managers.py", line 817, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 411, in _send_bytes
self._send(header + buf)
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
[2024-09-04T16:08:37.619+0800] {kubernetes_executor.py:695} INFO - Shutting down Kubernetes executor
[2024-09-04T16:08:37.620+0800] {scheduler_job_runner.py:1008} ERROR - Exception when executing Executor.end on KubernetesExecutor(parallelism=32)
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py", line 358, in sync
self._change_state(key, state, pod_name, namespace)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 97, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py", line 440, in _change_state
self.kube_scheduler.delete_pod(pod_name=pod_name, namespace=namespace)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 416, in delete_pod
body=client.V1DeleteOptions(**self.kube_config.delete_option_kwargs),
File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/models/v1_delete_options.py", line 58, in init
local_vars_configuration = Configuration()
File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/configuration.py", line 126, in init
self.debug = False
File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/configuration.py", line 271, in debug
logger.setLevel(logging.WARNING)
File "/usr/local/lib/python3.10/logging/init.py", line 1453, in setLevel
self.manager._clear_cache()
File "/usr/local/lib/python3.10/logging/init.py", line 1412, in _clear_cache
logger._cache.clear()
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py", line 263, in _exit_gracefully
sys.exit(os.EX_OK)
SystemExit: 0

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py", line 984, in _execute
self._run_scheduler_loop()
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py", line 1127, in _run_scheduler_loop
executor.heartbeat()
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/traces/tracer.py", line 58, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/executors/base_executor.py", line 247, in heartbeat
self.sync()
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py", line 368, in sync
self.result_queue.task_done()
File "", line 2, in task_done
File "/usr/local/lib/python3.10/multiprocessing/managers.py", line 817, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 411, in _send_bytes
self._send(header + buf)
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py", line 1006, in _execute
executor.end()
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py", line 698, in end
self._flush_task_queue()
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py", line 654, in _flush_task_queue
self.log.debug("Executor shutting down, task_queue approximate size=%d", self.task_queue.qsize())
File "", line 2, in qsize
File "/usr/local/lib/python3.10/multiprocessing/managers.py", line 817, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 411, in _send_bytes
self._send(header + buf)
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
[2024-09-04T16:08:37.624+0800] {process_utils.py:132} INFO - Sending Signals.SIGTERM to group 2540. PIDs of all processes in the group: []
[2024-09-04T16:08:37.625+0800] {process_utils.py:87} INFO - Sending the signal Signals.SIGTERM to group 2540
[2024-09-04T16:08:37.625+0800] {process_utils.py:101} INFO - Sending the signal Signals.SIGTERM to process 2540 as process group is missing.
[2024-09-04T16:08:37.626+0800] {scheduler_job_runner.py:1014} INFO - Exited execute loop
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py", line 358, in sync
self._change_state(key, state, pod_name, namespace)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 97, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py", line 440, in _change_state
self.kube_scheduler.delete_pod(pod_name=pod_name, namespace=namespace)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 416, in delete_pod
body=client.V1DeleteOptions(**self.kube_config.delete_option_kwargs),
File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/models/v1_delete_options.py", line 58, in init
local_vars_configuration = Configuration()
File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/configuration.py", line 126, in init
self.debug = False
File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/configuration.py", line 271, in debug
logger.setLevel(logging.WARNING)
File "/usr/local/lib/python3.10/logging/init.py", line 1453, in setLevel
self.manager._clear_cache()
File "/usr/local/lib/python3.10/logging/init.py", line 1412, in _clear_cache
logger._cache.clear()
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py", line 263, in _exit_gracefully
sys.exit(os.EX_OK)
SystemExit: 0

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/home/airflow/.local/bin/airflow", line 8, in
sys.exit(main())
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/main.py", line 62, in main
args.func(args)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/cli_config.py", line 49, in command
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/cli.py", line 115, in wrapper
return f(*args, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 59, in scheduler
run_command_with_daemon_option(
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option
callback()
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 62, in
callback=lambda: _run_scheduler_job(args),
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 48, in _run_scheduler_job
run_job(job=job_runner.job, execute_callable=job_runner._execute)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 97, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/job.py", line 421, in run_job
return execute_job(job, execute_callable=execute_callable)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/job.py", line 450, in execute_job
ret = execute_callable()
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py", line 984, in _execute
self._run_scheduler_loop()
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py", line 1127, in _run_scheduler_loop
executor.heartbeat()
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/traces/tracer.py", line 58, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/executors/base_executor.py", line 247, in heartbeat
self.sync()
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py", line 368, in sync
self.result_queue.task_done()
File "", line 2, in task_done
File "/usr/local/lib/python3.10/multiprocessing/managers.py", line 817, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 411, in _send_bytes
self._send(header + buf)
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe

What you think should happen instead?

No response

How to reproduce

Use the Kubernetes scheduler of Airflow to execute a DAG that includes a PythonOperator.

Operating System

Centos 7.9

Versions of Apache Airflow Providers

apache_airflow_providers_cncf_kubernetes-7.8.0

Deployment

Other 3rd-party Helm chart

Deployment details

https://github.com/open-metadata/openmetadata-helm-charts

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:Schedulerincluding HA (high availability) schedulerarea:corekind:bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yetprovider:cncf-kubernetesKubernetes (k8s) provider related issues

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions