Skip to content

Infinite loop on scheduler when kubernetes state event is None along with state in database also None #35888

@tirkarthi

Description

@tirkarthi

Apache Airflow version

2.7.3

What happened

We are facing an issue using Kubernetes Executor where process_watcher_task that gets None state and is pushed to result_queue. On fetching the state from queue in kubernetes_executor.py it's passed to _change_state and if the state is None then state is fetched from database which when is also None due to some reason the TaskInstanceState(state) throws ValueError which is caught in the exception and the result is again added to the queue causing scheduler to go into infinite loop trying to set state. We need to restart the scheduler to make it run. If state is None database query too then we shouldn't set the state or to catch ValueError instead of generic exception handling to not retry by pushing the same result to queue. The validation was introduced by this change 9556d6d#diff-11bb8713bf2f01502e66ffa91136f939cc8445839517187f818f044233414f7eR459

def process_watcher_task(self, task: KubernetesWatchType) -> None:
"""Process the task by watcher."""
pod_name, namespace, state, annotations, resource_version = task
self.log.debug(
"Attempting to finish pod; pod_name: %s; state: %s; annotations: %s",
pod_name,
state,
annotations_for_logging_task_metadata(annotations),
)
key = annotations_to_key(annotations=annotations)
if key:
self.log.debug("finishing job %s - %s (%s)", key, state, pod_name)
self.result_queue.put((key, state, pod_name, namespace, resource_version))

results = self.result_queue.get_nowait()
try:
key, state, pod_name, namespace, resource_version = results
last_resource_version[namespace] = resource_version
self.log.info("Changing state of %s to %s", results, state)
try:
self._change_state(key, state, pod_name, namespace)
except Exception as e:
self.log.exception(
"Exception: %s when attempting to change state of %s to %s, re-queueing.",
e,
results,
state,
)
self.result_queue.put(results)

# If we don't have a TI state, look it up from the db. event_buffer expects the TI state
if state is None:
from airflow.models.taskinstance import TaskInstance
state = session.scalar(select(TaskInstance.state).where(TaskInstance.filter_for_tis([key])))
state = TaskInstanceState(state)
self.event_buffer[key] = state, None

What you think should happen instead

scheduler should not retry infinitely

How to reproduce

We are not sure of the exact scenario where this reproducible. We tried running a task that returns an event which k8s returns None in rare case when pod is deleted or killed and also delete the task instance to make sure db query also returns None but we are not able to consistently get to the case that causes this.

Operating System

Ubuntu

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

area:corekind:bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yet

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions