Skip to content

Conversation

@sean-rose
Copy link
Contributor

@sean-rose sean-rose commented Apr 17, 2025

Description

So pods don't keep running when the associated Airflow task has been stopped.

Related Tickets & Documents

  • DENG-8290: Airflow execution timeout doesn't stop the k8s pod
  • DENG-7968: Airflow tasks occasionally fail due pod start timeout but pod still runs

KubernetesEnginePodLink.persist(context=context, task_instance=self)
return pod

def process_pod_deletion(self, pod: k8s.V1Pod, *, reraise=True):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have preferred to use an on_pod_cleanup callback rather than overriding this internal-ish method, but unfortunately on_pod_cleanup callbacks aren't called when Airflow tasks fail (maybe a bug?).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any idea why on_pod_cleanup callbacks aren't called in the event of a task failure? Did you look in the Airflow project to see if this is a known issue/bug?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need the * boundary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any idea why on_pod_cleanup callbacks aren't called in the event of a task failure?

It's because the on_pod_cleanup callback is only called after the cleanup method completes, but the cleanup method raises an exception if the task failed.

Did you look in the Airflow project to see if this is a known issue/bug?

Yes. I didn't see an existing issue for this, and the problem still exists on the main branch so it hasn't been fixed yet (though it's possible this is an intentional design decision).

I did just submit this Airflow PR with a possible fix, so we'll so how that goes (though even if that's accepted it'd likely be a while before it's included in an apache-airflow-providers-cncf-kubernetes package release and we can upgrade to that).

Why do you need the * boundary?

This matches the argument signature of the original process_pod_deletion method that this is overriding, and the * makes it so that subsequent arguments can only be specified as keyword arguments.

# or a task execution timeout (this could be considered a bug in the Kubernetes provider).
# As a workaround we delete the pod during cleanup if it's still running.
try:
remote_pod: k8s.V1Pod = self.client.read_namespaced_pod(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between pod and remote_pod? remote_pod is passed into this method https://github.com/apache/airflow/blob/2723508345d5cf074aeb673955ce72996785f2bc/providers/src/airflow/providers/cncf/kubernetes/operators/pod.py#L905

Can you just use that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.remote_pod is indeed passed in as pod to this method, but in cases when the task execution stopped due to a pod startup timeout or task execution timeout that will only reflect the state of the pod immediately after starting, because that was the last time self.remote_pod would have been updated (self.remote_pod isn't updated again until after waiting for the pod to terminate).

So here I'm setting remote_pod to a more up-to-date copy of the pod's state to have more accurate information, so that this will log "Deleting running pod: ..." if the pod was in fact running at the time (without re-fetching the pod state it would always log "Deleting pending pod: ..." even for running pods).

While this isn't technically required for the core functionality of cleaning up unterminated pods, I figured it was worth doing for recording what actually happened. However, if you think it's overkill I'm not opposed to simplifying this and using the out-of-date self.remote_pod/pod value (in which case I'd change the logging to the more generic "Deleting pod: ..." since we don't really know its state).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok that makes sense. I guess you would need the updated state for the PodPhase.terminal_states check below anyway

Copy link
Contributor

@BenWu BenWu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this LGTM and it's the best we can do apart from changes to the operator which are mostly out of our control.

One thing I'm not sure about is why on_kill(), which would delete the pod, isn't being called. It seems to do that if the task is manually killed and it looks like it should when the task times out: https://github.com/apache/airflow/blob/42024188b9553dd50072e717af754cb95c901972/airflow-core/src/airflow/models/taskinstance.py#L2192

…s_pod_deletion()`.

Co-authored-by: Ben Wu <12437227+BenWu@users.noreply.github.com>
@sean-rose
Copy link
Contributor Author

One thing I'm not sure about is why on_kill(), which would delete the pod, isn't being called. It seems to do that if the task is manually killed and it looks like it should when the task times out: https://github.com/apache/airflow/blob/42024188b9553dd50072e717af754cb95c901972/airflow-core/src/airflow/models/taskinstance.py#L2192

That's because while the AirflowTaskTimeout is being raised, KubernetesPodOperator.cleanup() gets called and raises an AirflowException due to the pod not ending in the "Succeeded" state, so the core Airflow code that's looking for an AirflowTaskTimeout only ends up seeing the subsequent AirflowException (IMO this could also be considered a bug in the Kubernetes operator).

@sean-rose sean-rose merged commit 9a886e3 into main Apr 21, 2025
6 checks passed
@sean-rose sean-rose deleted the DENG-8290-cleanup-running-pods branch April 21, 2025 20:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants