-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
2.7.3
What happened
We are facing an issue using Kubernetes Executor where process_watcher_task that gets None state and is pushed to result_queue. On fetching the state from queue in kubernetes_executor.py it's passed to _change_state and if the state is None then state is fetched from database which when is also None due to some reason the TaskInstanceState(state) throws ValueError which is caught in the exception and the result is again added to the queue causing scheduler to go into infinite loop trying to set state. We need to restart the scheduler to make it run. If state is None database query too then we shouldn't set the state or to catch ValueError instead of generic exception handling to not retry by pushing the same result to queue. The validation was introduced by this change 9556d6d#diff-11bb8713bf2f01502e66ffa91136f939cc8445839517187f818f044233414f7eR459
airflow/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
Lines 453 to 465 in 5d74ffb
| def process_watcher_task(self, task: KubernetesWatchType) -> None: | |
| """Process the task by watcher.""" | |
| pod_name, namespace, state, annotations, resource_version = task | |
| self.log.debug( | |
| "Attempting to finish pod; pod_name: %s; state: %s; annotations: %s", | |
| pod_name, | |
| state, | |
| annotations_for_logging_task_metadata(annotations), | |
| ) | |
| key = annotations_to_key(annotations=annotations) | |
| if key: | |
| self.log.debug("finishing job %s - %s (%s)", key, state, pod_name) | |
| self.result_queue.put((key, state, pod_name, namespace, resource_version)) |
airflow/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
Lines 379 to 393 in f3ddefc
| results = self.result_queue.get_nowait() | |
| try: | |
| key, state, pod_name, namespace, resource_version = results | |
| last_resource_version[namespace] = resource_version | |
| self.log.info("Changing state of %s to %s", results, state) | |
| try: | |
| self._change_state(key, state, pod_name, namespace) | |
| except Exception as e: | |
| self.log.exception( | |
| "Exception: %s when attempting to change state of %s to %s, re-queueing.", | |
| e, | |
| results, | |
| state, | |
| ) | |
| self.result_queue.put(results) |
airflow/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
Lines 478 to 485 in 5d74ffb
| # If we don't have a TI state, look it up from the db. event_buffer expects the TI state | |
| if state is None: | |
| from airflow.models.taskinstance import TaskInstance | |
| state = session.scalar(select(TaskInstance.state).where(TaskInstance.filter_for_tis([key]))) | |
| state = TaskInstanceState(state) | |
| self.event_buffer[key] = state, None |
What you think should happen instead
scheduler should not retry infinitely
How to reproduce
We are not sure of the exact scenario where this reproducible. We tried running a task that returns an event which k8s returns None in rare case when pod is deleted or killed and also delete the task instance to make sure db query also returns None but we are not able to consistently get to the case that causes this.
Operating System
Ubuntu
Versions of Apache Airflow Providers
No response
Deployment
Virtualenv installation
Deployment details
No response
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct