-
Notifications
You must be signed in to change notification settings - Fork 96
fix: Have GKEPodOperator delete running pods during cleanup (DENG-8290)
#2192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| KubernetesEnginePodLink.persist(context=context, task_instance=self) | ||
| return pod | ||
|
|
||
| def process_pod_deletion(self, pod: k8s.V1Pod, *, reraise=True): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would have preferred to use an on_pod_cleanup callback rather than overriding this internal-ish method, but unfortunately on_pod_cleanup callbacks aren't called when Airflow tasks fail (maybe a bug?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any idea why on_pod_cleanup callbacks aren't called in the event of a task failure? Did you look in the Airflow project to see if this is a known issue/bug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need the * boundary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any idea why
on_pod_cleanupcallbacks aren't called in the event of a task failure?
It's because the on_pod_cleanup callback is only called after the cleanup method completes, but the cleanup method raises an exception if the task failed.
Did you look in the Airflow project to see if this is a known issue/bug?
Yes. I didn't see an existing issue for this, and the problem still exists on the main branch so it hasn't been fixed yet (though it's possible this is an intentional design decision).
I did just submit this Airflow PR with a possible fix, so we'll so how that goes (though even if that's accepted it'd likely be a while before it's included in an apache-airflow-providers-cncf-kubernetes package release and we can upgrade to that).
Why do you need the
*boundary?
This matches the argument signature of the original process_pod_deletion method that this is overriding, and the * makes it so that subsequent arguments can only be specified as keyword arguments.
| # or a task execution timeout (this could be considered a bug in the Kubernetes provider). | ||
| # As a workaround we delete the pod during cleanup if it's still running. | ||
| try: | ||
| remote_pod: k8s.V1Pod = self.client.read_namespaced_pod( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the difference between pod and remote_pod? remote_pod is passed into this method https://github.com/apache/airflow/blob/2723508345d5cf074aeb673955ce72996785f2bc/providers/src/airflow/providers/cncf/kubernetes/operators/pod.py#L905
Can you just use that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.remote_pod is indeed passed in as pod to this method, but in cases when the task execution stopped due to a pod startup timeout or task execution timeout that will only reflect the state of the pod immediately after starting, because that was the last time self.remote_pod would have been updated (self.remote_pod isn't updated again until after waiting for the pod to terminate).
So here I'm setting remote_pod to a more up-to-date copy of the pod's state to have more accurate information, so that this will log "Deleting running pod: ..." if the pod was in fact running at the time (without re-fetching the pod state it would always log "Deleting pending pod: ..." even for running pods).
While this isn't technically required for the core functionality of cleaning up unterminated pods, I figured it was worth doing for recording what actually happened. However, if you think it's overkill I'm not opposed to simplifying this and using the out-of-date self.remote_pod/pod value (in which case I'd change the logging to the more generic "Deleting pod: ..." since we don't really know its state).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok that makes sense. I guess you would need the updated state for the PodPhase.terminal_states check below anyway
BenWu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this LGTM and it's the best we can do apart from changes to the operator which are mostly out of our control.
One thing I'm not sure about is why on_kill(), which would delete the pod, isn't being called. It seems to do that if the task is manually killed and it looks like it should when the task times out: https://github.com/apache/airflow/blob/42024188b9553dd50072e717af754cb95c901972/airflow-core/src/airflow/models/taskinstance.py#L2192
…s_pod_deletion()`. Co-authored-by: Ben Wu <12437227+BenWu@users.noreply.github.com>
That's because while the |
Description
So pods don't keep running when the associated Airflow task has been stopped.
Related Tickets & Documents