-
Notifications
You must be signed in to change notification settings - Fork 16.6k
Kubernetes Pod Operator handle container registry rate limits #61778
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
8bf46c5
b7e8703
54e63d3
1e44d76
9467bcd
898fcad
9d51eb6
f1d9fb3
5d99032
1579f3d
0d700e6
225a78d
fadd814
a73d9bf
9d227e5
b8a55d1
46f396e
f944cec
a3040d1
d14ae3b
82445c2
e936591
17b2dee
4c64a93
d95a89c
99b56dd
723ce57
aca214c
57726a3
104527d
5461eba
fa62b31
7148a02
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -31,6 +31,7 @@ | |||||||||||||||||||||||||||||||||||||||
| from airflow.providers.cncf.kubernetes.utils.pod_manager import ( | ||||||||||||||||||||||||||||||||||||||||
| AsyncPodManager, | ||||||||||||||||||||||||||||||||||||||||
| OnFinishAction, | ||||||||||||||||||||||||||||||||||||||||
| PodLaunchFailedException, | ||||||||||||||||||||||||||||||||||||||||
| PodLaunchTimeoutException, | ||||||||||||||||||||||||||||||||||||||||
| PodPhase, | ||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||
|
|
@@ -183,7 +184,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: | |||||||||||||||||||||||||||||||||||||||
| event = await self._wait_for_container_completion() | ||||||||||||||||||||||||||||||||||||||||
| yield event | ||||||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||||||
| except PodLaunchTimeoutException as e: | ||||||||||||||||||||||||||||||||||||||||
| except (PodLaunchTimeoutException, PodLaunchFailedException) as e: | ||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, I understand that in case of rate limits you do not want to fail fast... but would it not be better to handle this case on triggerer and not sending back? Then also you do not need to defer back again.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought about doing that but I don't think it fully fixes the problem. There is always going to be a period of time when you are handing back from the triggerer to the operator and for any of the transient reasons the pod failed, like failing to pull the image or failure to spin up a node in time, the possibility exists that the pod will have started by the time the operator takes over. The other reason I felt it was better not to add extra logic looking for rate limiting was because airflow/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py Lines 187 to 205 in 1c41180
is looking for both ErrImagePull and ImagePullBackOff. Depending on how long it takes the triggerer to take over, I might only see one of those states and it might not give as verbose a reason for it being in ImagePullBackOff. If you feel strongly its worth doing, maybe I could look at pulling kubernetes events... Or disregarding ImagePullBackOff.
Anyway I marked the PR back to draft because I think I also need to account for the pod starting and terminating while waiting for the operator.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've been trying to think on why I didn't see this issue on the same dag running in non-deferred mode, but I suspect its because its configured to log an init container, than that delays the call to
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there should be some kind of a hard limit for the tries when pulling the image, as what if the image was deleted? we do not want to keep retrying forever, and as of now, it looks like if we have a corrupt image, we will retry indefinitely.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No that shouldn't happen. In that scenario what would happen is the triggerer would exit, because of I think there is a case for renaming the timeout state. Basically the triggerer can return one of
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the triggerer should be the source of truth here: the user defined a startup timeout, and when it’s reached, Airflow should behave accordingly. Normally there shouldn’t be a large gap between the triggerer finishing and the worker starting the operator—except in overload situations.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Anecdotally I am using the kubernetes executor in AWS EKS and I can see regularly seeing delays of 2-3 minutes for a node to startup to take over the execution.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The startup_timeout itself is a fairly loose amount of time, it doesn't account for the time taken for the triggerer to adopt the task, and it doesn't account for triggerer restarts. I think its just for situations where you want an early notification if the pod doesn't start, and not have to wait for a task timeout (or indefinitely if it isn't set). 2-3 minutes in the kubernetes executor could be enough time for some tasks to complete. I don't understand why killing a running pod would be preferable.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand your idea behind this but, from my point of view, the triggerer runs quite stable, which means the current startup timeout is valid. If a user sees that the pod needs more time to start due to flaky infrastructure, they can increase startup_timeout. I don’t think it makes sense to keep the pod running just because it might recover later. If you want that behavior, I suggest creating a new PR that adds an option to disable startup_timeout.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, well clearly I disagree but I don't want that disagreement to block us from addressing the issue. So I've split that part of the code to a seperate PR #62215 and I'll just close this for now. |
||||||||||||||||||||||||||||||||||||||||
| message = self._format_exception_description(e) | ||||||||||||||||||||||||||||||||||||||||
| yield TriggerEvent( | ||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -188,19 +188,47 @@ def detect_pod_terminate_early_issues(pod: V1Pod) -> str | None: | |
| """ | ||
| Identify issues that justify terminating the pod early. | ||
|
|
||
| This method distinguishes between permanent failures (e.g., invalid image names) | ||
| and transient errors (e.g., rate limits) that should be retried by Kubernetes. | ||
|
|
||
| :param pod: The pod object to check. | ||
| :return: An error message if an issue is detected; otherwise, None. | ||
| """ | ||
| # Indicators in error messages that suggest transient issues | ||
| TRANSIENT_ERROR_PATTERNS = [ | ||
| "pull qps exceeded", | ||
| "rate limit", | ||
| "too many requests", | ||
| "quota exceeded", | ||
| "temporarily unavailable", | ||
| "timeout", | ||
| "account limit", | ||
| ] | ||
|
|
||
| FATAL_STATES = ["InvalidImageName", "ErrImageNeverPull"] | ||
| TRANSIENT_STATES = ["ErrImagePull", "ImagePullBackOff"] | ||
|
|
||
| pod_status = pod.status | ||
| if pod_status.container_statuses: | ||
| for container_status in pod_status.container_statuses: | ||
| container_state: V1ContainerState = container_status.state | ||
| container_waiting: V1ContainerStateWaiting | None = container_state.waiting | ||
| if container_waiting: | ||
| if container_waiting.reason in ["ErrImagePull", "ImagePullBackOff", "InvalidImageName"]: | ||
| if not container_waiting: | ||
| continue | ||
|
|
||
| if container_waiting.reason in FATAL_STATES: | ||
| return ( | ||
| f"Image cannot be pulled, unable to start: {container_waiting.reason}" | ||
| f"\n{container_waiting.message or ''}" | ||
| ) | ||
|
|
||
| if container_waiting.reason in TRANSIENT_STATES: | ||
| message_lower = (container_waiting.message or "").lower() | ||
| is_transient = any(pattern in message_lower for pattern in TRANSIENT_ERROR_PATTERNS) | ||
| if not is_transient: | ||
| return ( | ||
| f"Pod docker image cannot be pulled, unable to start: {container_waiting.reason}" | ||
| f"\n{container_waiting.message}" | ||
| f"Image cannot be pulled, unable to start: {container_waiting.reason}" | ||
| f"\n{container_waiting.message or ''}" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not reuse message_lower here?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the original capitalization is better. |
||
| ) | ||
| return None | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this flag can be avoided, and it is better to avoid it to reduce the mental load when reading this code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how, the cleanup method is in the finally block, so it will be called even if the task defers again, and I don't want the pod to be deleted. How are you suggesting to remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we skip cleanup only when the task is either deferred or when the status is a timeout, I think we can just check that again instead of adding a flag, it will make the code easier to follow in my opinion, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would complicate the code. Right now the timeout code will check to see if the pod is running, but also if its running and then terminated. In the terminated case it will call the
invoke_defer_method, which will skip calling the defer code, and just calltrigger_reentryagain with a completed status. So the pod will get cleaned up etc. Then we hand back to the initial call, and I guess I'd need to make a k8s API call to check the status again in order to avoid killing the pod if it has already been killed.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see how it would complicate things, to me it looks like if we just check that the event is a timeout, we do not clean up, otherwise we do