Skip to content

Kubernetes Pod Operator handle container registry rate limits#61778

Closed
johnhoran wants to merge 33 commits intoapache:mainfrom
johnhoran:61775_kpo-timeout-retry
Closed

Kubernetes Pod Operator handle container registry rate limits#61778
johnhoran wants to merge 33 commits intoapache:mainfrom
johnhoran:61775_kpo-timeout-retry

Conversation

@johnhoran
Copy link
Contributor

This PR handles a task failure when running in deferred mode where the triggerer would mark a task as failed because kubernetes ran into rate limits when pulling an image from the container registry. Kubernetes will automatically retry pulling the image, and the pod might start running during the time taken to hand the task from triggerer to operator. When this happens I think it's best to just defer back to the triggerer again.

@johnhoran johnhoran marked this pull request as draft February 11, 2026 18:35
yield event
return
except PodLaunchTimeoutException as e:
except (PodLaunchTimeoutException, PodLaunchFailedException) as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I understand that in case of rate limits you do not want to fail fast... but would it not be better to handle this case on triggerer and not sending back? Then also you do not need to defer back again.

Copy link
Contributor Author

@johnhoran johnhoran Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about doing that but I don't think it fully fixes the problem. There is always going to be a period of time when you are handing back from the triggerer to the operator and for any of the transient reasons the pod failed, like failing to pull the image or failure to spin up a node in time, the possibility exists that the pod will have started by the time the operator takes over.

The other reason I felt it was better not to add extra logic looking for rate limiting was because

def detect_pod_terminate_early_issues(pod: V1Pod) -> str | None:
"""
Identify issues that justify terminating the pod early.
:param pod: The pod object to check.
:return: An error message if an issue is detected; otherwise, None.
"""
pod_status = pod.status
if pod_status.container_statuses:
for container_status in pod_status.container_statuses:
container_state: V1ContainerState = container_status.state
container_waiting: V1ContainerStateWaiting | None = container_state.waiting
if container_waiting:
if container_waiting.reason in ["ErrImagePull", "ImagePullBackOff", "InvalidImageName"]:
return (
f"Pod docker image cannot be pulled, unable to start: {container_waiting.reason}"
f"\n{container_waiting.message}"
)
return None

is looking for both ErrImagePull and ImagePullBackOff. Depending on how long it takes the triggerer to take over, I might only see one of those states and it might not give as verbose a reason for it being in ImagePullBackOff. If you feel strongly its worth doing, maybe I could look at pulling kubernetes events... Or disregarding ImagePullBackOff.

Anyway I marked the PR back to draft because I think I also need to account for the pod starting and terminating while waiting for the operator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been trying to think on why I didn't see this issue on the same dag running in non-deferred mode, but I suspect its because its configured to log an init container, than that delays the call to detect_pod_terminate_early_issues until after the init container has completed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there should be some kind of a hard limit for the tries when pulling the image, as what if the image was deleted? we do not want to keep retrying forever, and as of now, it looks like if we have a corrupt image, we will retry indefinitely.
That is, at least, what I see that will happen now.

Copy link
Contributor Author

@johnhoran johnhoran Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No that shouldn't happen. In that scenario what would happen is the triggerer would exit, because of detect_pod_terminate_early_issues it would happen on the first time in saw the image pull failure, and before the startup_timeout expires, with a timeout state. The timeout state basically then accounts for the gap in time between the triggerer exiting and the operator starting back up and does a final check to see if the pod is in a running or terminal state. In this scenario it wouldn't be, so the task fails.

I think there is a case for renaming the timeout state. Basically the triggerer can return one of error, fatal, timeout and success. Timeout is essentially for situations where the pod didn't start up in time, but if it has started when it gets to the operator, I think its better to let it run rather than fail the task and retry. So if I could think of a pithy name for "fatal unless recovered" then I'd rename it to that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the triggerer should be the source of truth here: the user defined a startup timeout, and when it’s reached, Airflow should behave accordingly. Normally there shouldn’t be a large gap between the triggerer finishing and the worker starting the operator—except in overload situations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anecdotally I am using the kubernetes executor in AWS EKS and I can see regularly seeing delays of 2-3 minutes for a node to startup to take over the execution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The startup_timeout itself is a fairly loose amount of time, it doesn't account for the time taken for the triggerer to adopt the task, and it doesn't account for triggerer restarts. I think its just for situations where you want an early notification if the pod doesn't start, and not have to wait for a task timeout (or indefinitely if it isn't set). 2-3 minutes in the kubernetes executor could be enough time for some tasks to complete. I don't understand why killing a running pod would be preferable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand your idea behind this but, from my point of view, the triggerer runs quite stable, which means the current startup timeout is valid. If a user sees that the pod needs more time to start due to flaky infrastructure, they can increase startup_timeout. I don’t think it makes sense to keep the pod running just because it might recover later. If you want that behavior, I suggest creating a new PR that adds an option to disable startup_timeout.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, well clearly I disagree but I don't want that disagreement to block us from addressing the issue. So I've split that part of the code to a seperate PR #62215 and I'll just close this for now.

@jscheffl
Copy link
Contributor

@AutomationDev85 we recently saw at Bosch also issues with rate limits and the "fail fast" in "ImgPullBackOff" killed the workload. WDYT, from what you saw is this resolving also the problem we saw?

@johnhoran
Copy link
Contributor Author

issues with rate limits and the "fail fast" in "ImgPullBackOff" killed the workload.

It's still possible that a pod won't have recovered from ImgPullBackOff by the time it reaches the code that checks in the operator. Looking at the code I felt the intention was for this to be something that was handled by airflow task retries rather than relying on the k8s retry.

@johnhoran johnhoran marked this pull request as ready for review February 12, 2026 12:38
Copy link
Contributor

@Nataneljpwd Nataneljpwd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, A few questions and maybe a small change is needed

"""
self.pod = None
xcom_sidecar_output = None
skip_cleanup = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this flag can be avoided, and it is better to avoid it to reduce the mental load when reading this code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how, the cleanup method is in the finally block, so it will be called even if the task defers again, and I don't want the pod to be deleted. How are you suggesting to remove it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we skip cleanup only when the task is either deferred or when the status is a timeout, I think we can just check that again instead of adding a flag, it will make the code easier to follow in my opinion, what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would complicate the code. Right now the timeout code will check to see if the pod is running, but also if its running and then terminated. In the terminated case it will call the invoke_defer_method, which will skip calling the defer code, and just call trigger_reentry again with a completed status. So the pod will get cleaned up etc. Then we hand back to the initial call, and I guess I'd need to make a k8s API call to check the status again in order to avoid killing the pod if it has already been killed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how it would complicate things, to me it looks like if we just check that the event is a timeout, we do not clean up, otherwise we do

Copy link
Contributor

@Nataneljpwd Nataneljpwd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have left a few more comments, overall, I think that a few minor changes are all that is needed

"""
self.pod = None
xcom_sidecar_output = None
skip_cleanup = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we skip cleanup only when the task is either deferred or when the status is a timeout, I think we can just check that again instead of adding a flag, it will make the code easier to follow in my opinion, what do you think?

yield event
return
except PodLaunchTimeoutException as e:
except (PodLaunchTimeoutException, PodLaunchFailedException) as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there should be some kind of a hard limit for the tries when pulling the image, as what if the image was deleted? we do not want to keep retrying forever, and as of now, it looks like if we have a corrupt image, we will retry indefinitely.
That is, at least, what I see that will happen now.

Copy link
Contributor

@Nataneljpwd Nataneljpwd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, added a few questions

f"\n{container_waiting.message or ''}"
)

if container_waiting.reason in ["ErrImagePull", "ImagePullBackOff"]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the array here should be given a name to explain why only these 2 states are checked
And I think this check, as it is more specific should come before the upper one, while the upper one is more of a catch all and does not even need an if, rather just return

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the array. As for moving it up, I have it below the if not container_waiting: so I can avoid either indenting the block below or adding more conditions to it. Personally I think it makes the code cleaner.

f"Pod docker image cannot be pulled, unable to start: {container_waiting.reason}"
f"\n{container_waiting.message}"
f"Image cannot be pulled, unable to start: {container_waiting.reason}"
f"\n{container_waiting.message or ''}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not reuse message_lower here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the original capitalization is better.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:cncf-kubernetes Kubernetes (k8s) provider related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Kubernetes Pod Operator: Deferred mode handling of registry rate limiting

4 participants