Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -816,48 +816,54 @@ def __init__(
self.config_dict = config_dict
self._extras: dict | None = connection_extras
self._event_polling_fallback = False
self._config_loaded = False

async def _load_config(self):
"""Return Kubernetes API session for use with requests."""
"""Load Kubernetes configuration once per hook instance."""
if self._config_loaded:
return

in_cluster = self._coalesce_param(self.in_cluster, await self._get_field("in_cluster"))
cluster_context = self._coalesce_param(self.cluster_context, await self._get_field("cluster_context"))
kubeconfig_path = await self._get_field("kube_config_path")
kubeconfig = await self._get_field("kube_config")

num_selected_configuration = sum(
1 for o in [in_cluster, kubeconfig, kubeconfig_path, self.config_dict] if o
)

async def api_client_from_kubeconfig_file(_kubeconfig_path: str | None):
await async_config.load_kube_config(
config_file=_kubeconfig_path,
client_configuration=self.client_configuration,
context=cluster_context,
)
return _TimeoutAsyncK8sApiClient()

if num_selected_configuration > 1:
raise AirflowException(
"Invalid connection configuration. Options kube_config_path, "
"kube_config, in_cluster are mutually exclusive. "
"kube_config, in_cluster, and config_dict are mutually exclusive. "
"You can only use one option at a time."
)

if in_cluster:
self.log.debug(LOADING_KUBE_CONFIG_FILE_RESOURCE.format("within a pod"))
self._is_in_cluster = True
async_config.load_incluster_config()
return _TimeoutAsyncK8sApiClient()
self._is_in_cluster = True
self._config_loaded = True
return

# If above block does not return, we are not in a cluster.
self._is_in_cluster = False

if self.config_dict:
self.log.debug(LOADING_KUBE_CONFIG_FILE_RESOURCE.format("config dictionary"))
self._is_in_cluster = False
await async_config.load_kube_config_from_dict(self.config_dict, context=cluster_context)
return _TimeoutAsyncK8sApiClient()
self._config_loaded = True
return

if kubeconfig_path is not None:
self.log.debug("loading kube_config from: %s", kubeconfig_path)
self._is_in_cluster = False
return await api_client_from_kubeconfig_file(kubeconfig_path)
await async_config.load_kube_config(
config_file=kubeconfig_path,
client_configuration=self.client_configuration,
context=cluster_context,
)
self._config_loaded = True
return

if kubeconfig is not None:
async with aiofiles.tempfile.NamedTemporaryFile() as temp_config:
Expand All @@ -874,13 +880,21 @@ async def api_client_from_kubeconfig_file(_kubeconfig_path: str | None):
kubeconfig = json.dumps(kubeconfig)
await temp_config.write(kubeconfig.encode())
await temp_config.flush()
self._is_in_cluster = False
return await api_client_from_kubeconfig_file(temp_config.name)

await async_config.load_kube_config(
config_file=temp_config.name,
client_configuration=self.client_configuration,
context=cluster_context,
)
self._config_loaded = True
return

self.log.debug(LOADING_KUBE_CONFIG_FILE_RESOURCE.format("default configuration file"))
await async_config.load_kube_config(
client_configuration=self.client_configuration,
context=cluster_context,
)
self._config_loaded = True

async def get_conn_extras(self) -> dict:
if self._extras is None:
Expand All @@ -907,7 +921,8 @@ async def _get_field(self, field_name):
async def get_conn(self) -> AsyncGenerator[async_client.ApiClient, None]:
kube_client = None
try:
kube_client = await self._load_config() or _TimeoutAsyncK8sApiClient()
await self._load_config()
kube_client = _TimeoutAsyncK8sApiClient()
yield kube_client
finally:
if kube_client is not None:
Expand Down Expand Up @@ -1021,48 +1036,101 @@ async def watch_pod_events(
timeout_seconds: int = 30,
) -> AsyncGenerator[CoreV1Event]:
"""
Watch pod events using Kubernetes Watch API.
Watch Kubernetes events for a pod.

Reconnects on watch termination and resumes from the last observed
resourceVersion. The watch stops when the pod is terminal or deleted,
and falls back to polling if watch access is denied.

:param name: Pod name to watch events for
:param namespace: Kubernetes namespace
:param resource_version: Only return events not older than this resource version
:param timeout_seconds: Timeout in seconds for the watch stream. A small additional buffer may be applied internally.
This does not limit the total duration of event streaming.
"""
if self._event_polling_fallback:
async for event_polled in self.watch_pod_events_polling_fallback(
name, namespace, resource_version, timeout_seconds
):
yield event_polled

try:
w = async_watch.Watch()
async with self.get_conn() as connection:
v1_api = async_client.CoreV1Api(connection)
last_rv = resource_version

async for event_watched in w.stream(
v1_api.list_namespaced_event,
namespace=namespace,
field_selector=f"involvedObject.name={name}",
resource_version=resource_version,
timeout_seconds=timeout_seconds,
):
event: CoreV1Event = event_watched.get("object")
yield event

except async_client.exceptions.ApiException as e:
if hasattr(e, "status") and e.status == 403:
self.log.warning(
"Triggerer does not have Kubernetes API permission to 'watch' events: %s Falling back to polling.",
str(e),
)
self._event_polling_fallback = True
while True:
# If watch is known to be unavailable, use polling fallback
if self._event_polling_fallback:
async for event_polled in self.watch_pod_events_polling_fallback(
name, namespace, resource_version, timeout_seconds
name, namespace, last_rv, timeout_seconds
):
yield event_polled
return

finally:
w.stop()
# Watch may not be created if pod inspection triggers early return.
w = None

try:
# Pod lifecycle is authoritative; events alone are not.
pod = await self.get_pod(name=name, namespace=namespace)
if pod.status and pod.status.phase in ("Succeeded", "Failed"):
self.log.info(
"Pod '%s' reached terminal phase '%s'; stopping event watch",
name,
pod.status.phase,
)
return

w = async_watch.Watch()
async with self.get_conn() as connection:
v1_api = async_client.CoreV1Api(connection)

async for event_watched in w.stream(
v1_api.list_namespaced_event,
namespace=namespace,
field_selector=f"involvedObject.name={name}",
resource_version=last_rv,
timeout_seconds=timeout_seconds,
):
event = event_watched.get("object")
if not event or not event.metadata:
continue

if event.metadata.resource_version:
last_rv = event.metadata.resource_version

yield event

# Never swallow cancellation.
except asyncio.CancelledError:
raise

except async_client.exceptions.ApiException as e:
status = getattr(e, "status", None)

if status == 403:
# Permanently fall back to polling when watch is not permitted.
self.log.warning(
"Kubernetes API does not permit watching events; falling back to polling: %s",
str(e),
)
self._event_polling_fallback = True
continue

if status == 404:
# Terminate the watch if pod no longer exists.
self.log.info("Pod '%s' no longer exists; stopping event watch", name)
return

if status == 410:
# Restart watch from current state if resourceVersion is too old.
self.log.info(
"resourceVersion too old while watching pod '%s'; restarting watch",
name,
)
last_rv = None
continue

# Other API errors are either transient or configuration/programming errors.
# Re-raise so generic_api_retry can apply centralized retry/backoff for
# transient failures, and fail fast for non-retryable ones.
raise

finally:
if w is not None:
w.stop()

async def watch_pod_events_polling_fallback(
self,
Expand Down
Loading