Kubernetes Pod Operator handle container registry rate limits#61778
Kubernetes Pod Operator handle container registry rate limits#61778johnhoran wants to merge 33 commits intoapache:mainfrom
Conversation
| yield event | ||
| return | ||
| except PodLaunchTimeoutException as e: | ||
| except (PodLaunchTimeoutException, PodLaunchFailedException) as e: |
There was a problem hiding this comment.
Okay, I understand that in case of rate limits you do not want to fail fast... but would it not be better to handle this case on triggerer and not sending back? Then also you do not need to defer back again.
There was a problem hiding this comment.
I thought about doing that but I don't think it fully fixes the problem. There is always going to be a period of time when you are handing back from the triggerer to the operator and for any of the transient reasons the pod failed, like failing to pull the image or failure to spin up a node in time, the possibility exists that the pod will have started by the time the operator takes over.
The other reason I felt it was better not to add extra logic looking for rate limiting was because
is looking for both
ErrImagePull and ImagePullBackOff. Depending on how long it takes the triggerer to take over, I might only see one of those states and it might not give as verbose a reason for it being in ImagePullBackOff. If you feel strongly its worth doing, maybe I could look at pulling kubernetes events... Or disregarding ImagePullBackOff.
Anyway I marked the PR back to draft because I think I also need to account for the pod starting and terminating while waiting for the operator.
There was a problem hiding this comment.
I've been trying to think on why I didn't see this issue on the same dag running in non-deferred mode, but I suspect its because its configured to log an init container, than that delays the call to detect_pod_terminate_early_issues until after the init container has completed.
There was a problem hiding this comment.
I think there should be some kind of a hard limit for the tries when pulling the image, as what if the image was deleted? we do not want to keep retrying forever, and as of now, it looks like if we have a corrupt image, we will retry indefinitely.
That is, at least, what I see that will happen now.
There was a problem hiding this comment.
No that shouldn't happen. In that scenario what would happen is the triggerer would exit, because of detect_pod_terminate_early_issues it would happen on the first time in saw the image pull failure, and before the startup_timeout expires, with a timeout state. The timeout state basically then accounts for the gap in time between the triggerer exiting and the operator starting back up and does a final check to see if the pod is in a running or terminal state. In this scenario it wouldn't be, so the task fails.
I think there is a case for renaming the timeout state. Basically the triggerer can return one of error, fatal, timeout and success. Timeout is essentially for situations where the pod didn't start up in time, but if it has started when it gets to the operator, I think its better to let it run rather than fail the task and retry. So if I could think of a pithy name for "fatal unless recovered" then I'd rename it to that.
There was a problem hiding this comment.
I think the triggerer should be the source of truth here: the user defined a startup timeout, and when it’s reached, Airflow should behave accordingly. Normally there shouldn’t be a large gap between the triggerer finishing and the worker starting the operator—except in overload situations.
There was a problem hiding this comment.
Anecdotally I am using the kubernetes executor in AWS EKS and I can see regularly seeing delays of 2-3 minutes for a node to startup to take over the execution.
There was a problem hiding this comment.
The startup_timeout itself is a fairly loose amount of time, it doesn't account for the time taken for the triggerer to adopt the task, and it doesn't account for triggerer restarts. I think its just for situations where you want an early notification if the pod doesn't start, and not have to wait for a task timeout (or indefinitely if it isn't set). 2-3 minutes in the kubernetes executor could be enough time for some tasks to complete. I don't understand why killing a running pod would be preferable.
There was a problem hiding this comment.
I understand your idea behind this but, from my point of view, the triggerer runs quite stable, which means the current startup timeout is valid. If a user sees that the pod needs more time to start due to flaky infrastructure, they can increase startup_timeout. I don’t think it makes sense to keep the pod running just because it might recover later. If you want that behavior, I suggest creating a new PR that adds an option to disable startup_timeout.
There was a problem hiding this comment.
Okay, well clearly I disagree but I don't want that disagreement to block us from addressing the issue. So I've split that part of the code to a seperate PR #62215 and I'll just close this for now.
|
@AutomationDev85 we recently saw at Bosch also issues with rate limits and the "fail fast" in "ImgPullBackOff" killed the workload. WDYT, from what you saw is this resolving also the problem we saw? |
It's still possible that a pod won't have recovered from |
Nataneljpwd
left a comment
There was a problem hiding this comment.
Looks good, A few questions and maybe a small change is needed
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
Show resolved
Hide resolved
| """ | ||
| self.pod = None | ||
| xcom_sidecar_output = None | ||
| skip_cleanup = False |
There was a problem hiding this comment.
I think this flag can be avoided, and it is better to avoid it to reduce the mental load when reading this code
There was a problem hiding this comment.
I'm not sure how, the cleanup method is in the finally block, so it will be called even if the task defers again, and I don't want the pod to be deleted. How are you suggesting to remove it?
There was a problem hiding this comment.
If we skip cleanup only when the task is either deferred or when the status is a timeout, I think we can just check that again instead of adding a flag, it will make the code easier to follow in my opinion, what do you think?
There was a problem hiding this comment.
I think it would complicate the code. Right now the timeout code will check to see if the pod is running, but also if its running and then terminated. In the terminated case it will call the invoke_defer_method, which will skip calling the defer code, and just call trigger_reentry again with a completed status. So the pod will get cleaned up etc. Then we hand back to the initial call, and I guess I'd need to make a k8s API call to check the status again in order to avoid killing the pod if it has already been killed.
There was a problem hiding this comment.
I don't see how it would complicate things, to me it looks like if we just check that the event is a timeout, we do not clean up, otherwise we do
Nataneljpwd
left a comment
There was a problem hiding this comment.
I have left a few more comments, overall, I think that a few minor changes are all that is needed
| """ | ||
| self.pod = None | ||
| xcom_sidecar_output = None | ||
| skip_cleanup = False |
There was a problem hiding this comment.
If we skip cleanup only when the task is either deferred or when the status is a timeout, I think we can just check that again instead of adding a flag, it will make the code easier to follow in my opinion, what do you think?
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
Show resolved
Hide resolved
| yield event | ||
| return | ||
| except PodLaunchTimeoutException as e: | ||
| except (PodLaunchTimeoutException, PodLaunchFailedException) as e: |
There was a problem hiding this comment.
I think there should be some kind of a hard limit for the tries when pulling the image, as what if the image was deleted? we do not want to keep retrying forever, and as of now, it looks like if we have a corrupt image, we will retry indefinitely.
That is, at least, what I see that will happen now.
Nataneljpwd
left a comment
There was a problem hiding this comment.
Looks good, added a few questions
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
Outdated
Show resolved
Hide resolved
| f"\n{container_waiting.message or ''}" | ||
| ) | ||
|
|
||
| if container_waiting.reason in ["ErrImagePull", "ImagePullBackOff"]: |
There was a problem hiding this comment.
Maybe the array here should be given a name to explain why only these 2 states are checked
And I think this check, as it is more specific should come before the upper one, while the upper one is more of a catch all and does not even need an if, rather just return
There was a problem hiding this comment.
I added the array. As for moving it up, I have it below the if not container_waiting: so I can avoid either indenting the block below or adding more conditions to it. Personally I think it makes the code cleaner.
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
Outdated
Show resolved
Hide resolved
| f"Pod docker image cannot be pulled, unable to start: {container_waiting.reason}" | ||
| f"\n{container_waiting.message}" | ||
| f"Image cannot be pulled, unable to start: {container_waiting.reason}" | ||
| f"\n{container_waiting.message or ''}" |
There was a problem hiding this comment.
why not reuse message_lower here?
There was a problem hiding this comment.
I think the original capitalization is better.
This PR handles a task failure when running in deferred mode where the triggerer would mark a task as failed because kubernetes ran into rate limits when pulling an image from the container registry. Kubernetes will automatically retry pulling the image, and the pod might start running during the time taken to hand the task from triggerer to operator. When this happens I think it's best to just defer back to the triggerer again.