Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
8bf46c5
handle transient timeout
johnhoran Feb 11, 2026
b7e8703
add test
johnhoran Feb 11, 2026
54e63d3
remove whitespace
johnhoran Feb 11, 2026
1e44d76
format
johnhoran Feb 11, 2026
9467bcd
cleanup test
johnhoran Feb 11, 2026
898fcad
Merge branch 'main' into 61775_kpo-timeout-retry
johnhoran Feb 11, 2026
9d51eb6
format message
johnhoran Feb 11, 2026
f1d9fb3
revert change
johnhoran Feb 11, 2026
5d99032
move change into triggerer
johnhoran Feb 11, 2026
1579f3d
include terminal states
johnhoran Feb 12, 2026
0d700e6
Merge branch 'main' into 61775_kpo-timeout-retry
johnhoran Feb 12, 2026
225a78d
update comment
johnhoran Feb 12, 2026
fadd814
add return
johnhoran Feb 12, 2026
a73d9bf
Merge branch 'main' into 61775_kpo-timeout-retry
johnhoran Feb 13, 2026
9d227e5
Merge branch 'main' into 61775_kpo-timeout-retry
johnhoran Feb 13, 2026
b8a55d1
Merge branch 'main' into 61775_kpo-timeout-retry
johnhoran Feb 13, 2026
46f396e
Merge branch 'main' into 61775_kpo-timeout-retry
johnhoran Feb 16, 2026
f944cec
Merge branch 'main' into 61775_kpo-timeout-retry
johnhoran Feb 16, 2026
a3040d1
handle transient errors
johnhoran Feb 17, 2026
d14ae3b
Merge branch 'main' into 61775_kpo-timeout-retry
johnhoran Feb 17, 2026
82445c2
Merge branch 'main' into 61775_kpo-timeout-retry
johnhoran Feb 17, 2026
e936591
update message
johnhoran Feb 18, 2026
17b2dee
error arrays
johnhoran Feb 18, 2026
4c64a93
Merge branch 'main' into 61775_kpo-timeout-retry
johnhoran Feb 18, 2026
d95a89c
Merge branch 'main' into 61775_kpo-timeout-retry
johnhoran Feb 18, 2026
99b56dd
account limit
johnhoran Feb 18, 2026
723ce57
fix test
johnhoran Feb 18, 2026
aca214c
ruff format
johnhoran Feb 18, 2026
57726a3
Merge branch 'main' into 61775_kpo-timeout-retry
johnhoran Feb 18, 2026
104527d
Merge branch 'main' into 61775_kpo-timeout-retry
johnhoran Feb 18, 2026
5461eba
Merge branch 'main' into 61775_kpo-timeout-retry
johnhoran Feb 18, 2026
fa62b31
Merge branch 'main' into 61775_kpo-timeout-retry
johnhoran Feb 18, 2026
7148a02
Merge branch 'main' into 61775_kpo-timeout-retry
johnhoran Feb 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,7 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any:
"""
self.pod = None
xcom_sidecar_output = None
skip_cleanup = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this flag can be avoided, and it is better to avoid it to reduce the mental load when reading this code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how, the cleanup method is in the finally block, so it will be called even if the task defers again, and I don't want the pod to be deleted. How are you suggesting to remove it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we skip cleanup only when the task is either deferred or when the status is a timeout, I think we can just check that again instead of adding a flag, it will make the code easier to follow in my opinion, what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would complicate the code. Right now the timeout code will check to see if the pod is running, but also if its running and then terminated. In the terminated case it will call the invoke_defer_method, which will skip calling the defer code, and just call trigger_reentry again with a completed status. So the pod will get cleaned up etc. Then we hand back to the initial call, and I guess I'd need to make a k8s API call to check the status again in order to avoid killing the pod if it has already been killed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how it would complicate things, to me it looks like if we just check that the event is a timeout, we do not clean up, otherwise we do

try:
pod_name = event["name"]
pod_namespace = event["namespace"]
Expand All @@ -958,6 +959,14 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any:
follow = self.logging_interval is None
last_log_time = event.get("last_log_time")

if event["status"] == "timeout":
pod_phase = self.pod.status.phase if self.pod.status and self.pod.status.phase else None
if pod_phase in {PodPhase.RUNNING, *PodPhase.terminal_states}:
self.log.info("Pod has transitioned from pending state after timeout, deferring again")
self.invoke_defer_method(last_log_time=last_log_time, context=context)
skip_cleanup = True
return

if event["status"] in ("error", "failed", "timeout", "success"):
if self.get_logs:
self._write_logs(self.pod, follow=follow, since_time=last_log_time)
Expand Down Expand Up @@ -988,12 +997,14 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any:
message = event.get("stack_trace", event["message"])
raise AirflowException(message)
except TaskDeferred:
skip_cleanup = True
raise
finally:
self._clean(event=event, context=context, result=xcom_sidecar_output)
if not skip_cleanup:
self._clean(event=event, context=context, result=xcom_sidecar_output)

if self.do_xcom_push and xcom_sidecar_output:
context["ti"].xcom_push(XCOM_RETURN_KEY, xcom_sidecar_output)
if self.do_xcom_push and xcom_sidecar_output:
context["ti"].xcom_push(XCOM_RETURN_KEY, xcom_sidecar_output)

def _clean(self, event: dict[str, Any], result: dict | None, context: Context) -> None:
if self.pod is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from airflow.providers.cncf.kubernetes.utils.pod_manager import (
AsyncPodManager,
OnFinishAction,
PodLaunchFailedException,
PodLaunchTimeoutException,
PodPhase,
)
Expand Down Expand Up @@ -183,7 +184,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
event = await self._wait_for_container_completion()
yield event
return
except PodLaunchTimeoutException as e:
except (PodLaunchTimeoutException, PodLaunchFailedException) as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I understand that in case of rate limits you do not want to fail fast... but would it not be better to handle this case on triggerer and not sending back? Then also you do not need to defer back again.

Copy link
Contributor Author

@johnhoran johnhoran Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about doing that but I don't think it fully fixes the problem. There is always going to be a period of time when you are handing back from the triggerer to the operator and for any of the transient reasons the pod failed, like failing to pull the image or failure to spin up a node in time, the possibility exists that the pod will have started by the time the operator takes over.

The other reason I felt it was better not to add extra logic looking for rate limiting was because

def detect_pod_terminate_early_issues(pod: V1Pod) -> str | None:
"""
Identify issues that justify terminating the pod early.
:param pod: The pod object to check.
:return: An error message if an issue is detected; otherwise, None.
"""
pod_status = pod.status
if pod_status.container_statuses:
for container_status in pod_status.container_statuses:
container_state: V1ContainerState = container_status.state
container_waiting: V1ContainerStateWaiting | None = container_state.waiting
if container_waiting:
if container_waiting.reason in ["ErrImagePull", "ImagePullBackOff", "InvalidImageName"]:
return (
f"Pod docker image cannot be pulled, unable to start: {container_waiting.reason}"
f"\n{container_waiting.message}"
)
return None

is looking for both ErrImagePull and ImagePullBackOff. Depending on how long it takes the triggerer to take over, I might only see one of those states and it might not give as verbose a reason for it being in ImagePullBackOff. If you feel strongly its worth doing, maybe I could look at pulling kubernetes events... Or disregarding ImagePullBackOff.

Anyway I marked the PR back to draft because I think I also need to account for the pod starting and terminating while waiting for the operator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been trying to think on why I didn't see this issue on the same dag running in non-deferred mode, but I suspect its because its configured to log an init container, than that delays the call to detect_pod_terminate_early_issues until after the init container has completed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there should be some kind of a hard limit for the tries when pulling the image, as what if the image was deleted? we do not want to keep retrying forever, and as of now, it looks like if we have a corrupt image, we will retry indefinitely.
That is, at least, what I see that will happen now.

Copy link
Contributor Author

@johnhoran johnhoran Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No that shouldn't happen. In that scenario what would happen is the triggerer would exit, because of detect_pod_terminate_early_issues it would happen on the first time in saw the image pull failure, and before the startup_timeout expires, with a timeout state. The timeout state basically then accounts for the gap in time between the triggerer exiting and the operator starting back up and does a final check to see if the pod is in a running or terminal state. In this scenario it wouldn't be, so the task fails.

I think there is a case for renaming the timeout state. Basically the triggerer can return one of error, fatal, timeout and success. Timeout is essentially for situations where the pod didn't start up in time, but if it has started when it gets to the operator, I think its better to let it run rather than fail the task and retry. So if I could think of a pithy name for "fatal unless recovered" then I'd rename it to that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the triggerer should be the source of truth here: the user defined a startup timeout, and when it’s reached, Airflow should behave accordingly. Normally there shouldn’t be a large gap between the triggerer finishing and the worker starting the operator—except in overload situations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anecdotally I am using the kubernetes executor in AWS EKS and I can see regularly seeing delays of 2-3 minutes for a node to startup to take over the execution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The startup_timeout itself is a fairly loose amount of time, it doesn't account for the time taken for the triggerer to adopt the task, and it doesn't account for triggerer restarts. I think its just for situations where you want an early notification if the pod doesn't start, and not have to wait for a task timeout (or indefinitely if it isn't set). 2-3 minutes in the kubernetes executor could be enough time for some tasks to complete. I don't understand why killing a running pod would be preferable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand your idea behind this but, from my point of view, the triggerer runs quite stable, which means the current startup timeout is valid. If a user sees that the pod needs more time to start due to flaky infrastructure, they can increase startup_timeout. I don’t think it makes sense to keep the pod running just because it might recover later. If you want that behavior, I suggest creating a new PR that adds an option to disable startup_timeout.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, well clearly I disagree but I don't want that disagreement to block us from addressing the issue. So I've split that part of the code to a seperate PR #62215 and I'll just close this for now.

message = self._format_exception_description(e)
yield TriggerEvent(
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,19 +188,47 @@ def detect_pod_terminate_early_issues(pod: V1Pod) -> str | None:
"""
Identify issues that justify terminating the pod early.

This method distinguishes between permanent failures (e.g., invalid image names)
and transient errors (e.g., rate limits) that should be retried by Kubernetes.

:param pod: The pod object to check.
:return: An error message if an issue is detected; otherwise, None.
"""
# Indicators in error messages that suggest transient issues
TRANSIENT_ERROR_PATTERNS = [
"pull qps exceeded",
"rate limit",
"too many requests",
"quota exceeded",
"temporarily unavailable",
"timeout",
"account limit",
]

FATAL_STATES = ["InvalidImageName", "ErrImageNeverPull"]
TRANSIENT_STATES = ["ErrImagePull", "ImagePullBackOff"]

pod_status = pod.status
if pod_status.container_statuses:
for container_status in pod_status.container_statuses:
container_state: V1ContainerState = container_status.state
container_waiting: V1ContainerStateWaiting | None = container_state.waiting
if container_waiting:
if container_waiting.reason in ["ErrImagePull", "ImagePullBackOff", "InvalidImageName"]:
if not container_waiting:
continue

if container_waiting.reason in FATAL_STATES:
return (
f"Image cannot be pulled, unable to start: {container_waiting.reason}"
f"\n{container_waiting.message or ''}"
)

if container_waiting.reason in TRANSIENT_STATES:
message_lower = (container_waiting.message or "").lower()
is_transient = any(pattern in message_lower for pattern in TRANSIENT_ERROR_PATTERNS)
if not is_transient:
return (
f"Pod docker image cannot be pulled, unable to start: {container_waiting.reason}"
f"\n{container_waiting.message}"
f"Image cannot be pulled, unable to start: {container_waiting.reason}"
f"\n{container_waiting.message or ''}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not reuse message_lower here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the original capitalization is better.

)
return None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2928,6 +2928,40 @@ def test_skip_deferral_on_terminated_pod(
k.execute(context)
mocked_trigger_reentry.assert_called_once()

@patch(HOOK_CLASS)
@patch(KUB_OP_PATH.format("pod_manager"))
@patch(f"{KPO_MODULE}.KubernetesPodOperator._clean")
def test_should_defer_on_running_after_timeout(self, mock_clean, mock_manager, mocked_hook, mocker):
k = KubernetesPodOperator(task_id="task", deferrable=True)
running_state = mock.MagicMock(
**{
"metadata.name": TEST_NAME,
"metadata.namespace": TEST_NAMESPACE,
"status.phase": "Running",
"status.container_statuses": [
k8s.V1ContainerStatus(
name=k.base_container_name,
state=k8s.V1ContainerState(running=k8s.V1ContainerStateRunning()),
image="alpine",
image_id="",
ready=True,
restart_count=0,
)
],
},
)
mocked_hook.return_value.get_pod.return_value = running_state
ti_mock = MagicMock()

event = {"status": "timeout", "message": "timeout", "name": TEST_NAME, "namespace": TEST_NAMESPACE}

mock_file = mock_open(read_data='{"a": "b"}')
mocker.patch("builtins.open", mock_file)

with pytest.raises(TaskDeferred):
k.trigger_reentry({"ti": ti_mock}, event)
mock_clean.assert_not_called()


@pytest.mark.parametrize("do_xcom_push", [True, False])
@patch(KUB_OP_PATH.format("extract_xcom"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,9 @@ async def test_start_pod_raises_fast_error_on_image_error(self, fail_reason):
pod_response.status.container_statuses = [container_statuse]

self.mock_kube_client.read_namespaced_pod.return_value = pod_response
expected_msg = f"Pod docker image cannot be pulled, unable to start: {waiting_state.reason}\n{waiting_state.message}"
expected_msg = (
f"Image cannot be pulled, unable to start: {waiting_state.reason}\n{waiting_state.message}"
)
mock_pod = MagicMock()
with pytest.raises(AirflowException, match=expected_msg):
await self.pod_manager.await_pod_start(
Expand Down Expand Up @@ -1262,7 +1264,9 @@ async def test_start_pod_raises_fast_error_on_image_error(self, fail_reason):
container_status.state.waiting = waiting_state
pod_response.status.container_statuses = [container_status]
self.mock_async_hook.get_pod.return_value = pod_response
expected_msg = f"Pod docker image cannot be pulled, unable to start: {waiting_state.reason}\n{waiting_state.message}"
expected_msg = (
f"Image cannot be pulled, unable to start: {waiting_state.reason}\n{waiting_state.message}"
)
mock_pod = mock.MagicMock()
with pytest.raises(AirflowException, match=expected_msg):
await self.async_pod_manager.await_pod_start(
Expand Down