Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import aiofiles
import requests
import tenacity
from asgiref.sync import sync_to_async
from kubernetes import client, config, utils, watch
from kubernetes.client.models import V1Deployment
Expand All @@ -39,7 +38,7 @@
from airflow.models import Connection
from airflow.providers.cncf.kubernetes.exceptions import KubernetesApiError, KubernetesApiPermissionError
from airflow.providers.cncf.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import should_retry_creation
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import generic_api_retry
from airflow.providers.cncf.kubernetes.utils.container import (
container_is_completed,
container_is_running,
Expand Down Expand Up @@ -390,6 +389,7 @@ def create_custom_object(
self.log.debug("Response: %s", response)
return response

@generic_api_retry
def get_custom_object(
self, group: str, version: str, plural: str, name: str, namespace: str | None = None
):
Expand All @@ -412,6 +412,7 @@ def get_custom_object(
)
return response

@generic_api_retry
def delete_custom_object(
self, group: str, version: str, plural: str, name: str, namespace: str | None = None, **kwargs
):
Expand Down Expand Up @@ -540,12 +541,7 @@ def get_deployment_status(
name=name, namespace=namespace, pretty=True, **kwargs
)

@tenacity.retry(
stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_random_exponential(),
reraise=True,
retry=tenacity.retry_if_exception(should_retry_creation),
)
@generic_api_retry
def create_job(
self,
job: V1Job,
Expand All @@ -572,6 +568,7 @@ def create_job(
raise e
return resp

@generic_api_retry
def get_job(self, job_name: str, namespace: str) -> V1Job:
"""
Get Job of specified name and namespace.
Expand All @@ -582,6 +579,7 @@ def get_job(self, job_name: str, namespace: str) -> V1Job:
"""
return self.batch_v1_client.read_namespaced_job(name=job_name, namespace=namespace, pretty=True)

@generic_api_retry
def get_job_status(self, job_name: str, namespace: str) -> V1Job:
"""
Get job with status of specified name and namespace.
Expand Down Expand Up @@ -611,6 +609,7 @@ def wait_until_job_complete(self, job_name: str, namespace: str, job_poll_interv
self.log.info("The job '%s' is incomplete. Sleeping for %i sec.", job_name, job_poll_interval)
sleep(job_poll_interval)

@generic_api_retry
def list_jobs_all_namespaces(self) -> V1JobList:
"""
Get list of Jobs from all namespaces.
Expand All @@ -619,6 +618,7 @@ def list_jobs_all_namespaces(self) -> V1JobList:
"""
return self.batch_v1_client.list_job_for_all_namespaces(pretty=True)

@generic_api_retry
def list_jobs_from_namespace(self, namespace: str) -> V1JobList:
"""
Get list of Jobs from dedicated namespace.
Expand Down Expand Up @@ -674,6 +674,7 @@ def is_job_successful(job: V1Job) -> str | bool:
return bool(next((c for c in conditions if c.type == "Complete" and c.status), None))
return False

@generic_api_retry
def patch_namespaced_job(self, job_name: str, namespace: str, body: object) -> V1Job:
"""
Update the specified Job.
Expand Down Expand Up @@ -879,6 +880,7 @@ async def get_conn(self) -> async_client.ApiClient:
if kube_client is not None:
await kube_client.close()

@generic_api_retry
async def get_pod(self, name: str, namespace: str) -> V1Pod:
"""
Get pod's object.
Expand All @@ -899,6 +901,7 @@ async def get_pod(self, name: str, namespace: str) -> V1Pod:
raise KubernetesApiPermissionError("Permission denied (403) from Kubernetes API.") from e
raise KubernetesApiError from e

@generic_api_retry
async def delete_pod(self, name: str, namespace: str):
"""
Delete pod's object.
Expand All @@ -917,6 +920,7 @@ async def delete_pod(self, name: str, namespace: str):
if str(e.status) != "404":
raise

@generic_api_retry
async def read_logs(
self, name: str, namespace: str, container_name: str | None = None, since_seconds: int | None = None
) -> list[str]:
Expand Down Expand Up @@ -949,6 +953,7 @@ async def read_logs(
except HTTPError as e:
raise KubernetesApiError from e

@generic_api_retry
async def get_pod_events(self, name: str, namespace: str) -> CoreV1EventList:
"""Get pod's events."""
async with self.get_conn() as connection:
Expand All @@ -964,6 +969,7 @@ async def get_pod_events(self, name: str, namespace: str) -> CoreV1EventList:
raise KubernetesApiPermissionError("Permission denied (403) from Kubernetes API.") from e
raise KubernetesApiError from e

@generic_api_retry
async def get_job_status(self, name: str, namespace: str) -> V1Job:
"""
Get job's status object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@
from typing import TYPE_CHECKING

import pendulum
from kubernetes.client.rest import ApiException
import tenacity
from kubernetes.client.rest import ApiException as SyncApiException
from kubernetes_asyncio.client.exceptions import ApiException as AsyncApiException
from slugify import slugify
from urllib3.exceptions import HTTPError

from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.providers.cncf.kubernetes.backcompat import get_logical_date_key

if TYPE_CHECKING:
Expand All @@ -39,6 +43,62 @@
POD_NAME_MAX_LENGTH = 63 # Matches Linux kernel's HOST_NAME_MAX default value minus 1.


class PodLaunchFailedException(AirflowException):
"""When pod launching fails in KubernetesPodOperator."""


class KubernetesApiException(AirflowException):
"""When communication with kubernetes API fails."""


API_RETRIES = conf.getint("workers", "api_retries", fallback=5)
API_RETRY_WAIT_MIN = conf.getfloat("workers", "api_retry_wait_min", fallback=1)
API_RETRY_WAIT_MAX = conf.getfloat("workers", "api_retry_wait_max", fallback=15)

_default_wait = tenacity.wait_exponential(min=API_RETRY_WAIT_MIN, max=API_RETRY_WAIT_MAX)

TRANSIENT_STATUS_CODES = {409, 429, 500, 502, 503, 504}


def _should_retry_api(exc: BaseException) -> bool:
"""Retry on selected ApiException status codes, plus plain HTTP/timeout errors."""
if isinstance(exc, (SyncApiException, AsyncApiException)):
return exc.status in TRANSIENT_STATUS_CODES
return isinstance(exc, (HTTPError, KubernetesApiException))


class WaitRetryAfterOrExponential(tenacity.wait.wait_base):
"""Wait strategy that honors Retry-After header on 429, else falls back to exponential backoff."""

def __call__(self, retry_state):
exc = retry_state.outcome.exception() if retry_state.outcome else None
if isinstance(exc, (SyncApiException, AsyncApiException)) and exc.status == 429:
retry_after = (exc.headers or {}).get("Retry-After")
if retry_after:
try:
return float(int(retry_after))
except ValueError:
pass
# Inline exponential fallback
return _default_wait(retry_state)


def generic_api_retry(func):
"""
Retry to Kubernetes API calls.

- Retries only transient ApiException status codes.
- Honors Retry-After on 429.
"""
return tenacity.retry(
stop=tenacity.stop_after_attempt(API_RETRIES),
wait=WaitRetryAfterOrExponential(),
retry=tenacity.retry_if_exception(_should_retry_api),
reraise=True,
before_sleep=tenacity.before_sleep_log(log, logging.WARNING),
)(func)


def rand_str(num):
"""
Generate random lowercase alphanumeric string of length num.
Expand Down Expand Up @@ -148,18 +208,3 @@ def annotations_for_logging_task_metadata(annotation_set):
else:
annotations_for_logging = "<omitted>"
return annotations_for_logging


def should_retry_creation(exception: BaseException) -> bool:
"""
Check if an Exception indicates a transient error and warrants retrying.

This function is needed for preventing 'No agent available' error. The error appears time to time
when users try to create a Resource or Job. This issue is inside kubernetes and in the current moment
has no solution. Like a temporary solution we decided to retry Job or Resource creation request each
time when this error appears.
More about this issue here: https://github.com/cert-manager/cert-manager/issues/6457
"""
if isinstance(exception, ApiException):
return str(exception.status) == "500"
return False
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@
from functools import cached_property
from typing import TYPE_CHECKING

import tenacity
import yaml
from kubernetes.utils import create_from_yaml

from airflow.exceptions import AirflowException
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import should_retry_creation
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import generic_api_retry
from airflow.providers.cncf.kubernetes.utils.delete_from import delete_from_yaml
from airflow.providers.cncf.kubernetes.utils.k8s_resource_iterator import k8s_resource_iterator
from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_1_PLUS
Expand Down Expand Up @@ -132,12 +131,7 @@ def create_custom_from_yaml_object(self, body: dict):
else:
self.custom_object_client.create_cluster_custom_object(group, version, plural, body)

@tenacity.retry(
stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_random_exponential(),
reraise=True,
retry=tenacity.retry_if_exception(should_retry_creation),
)
@generic_api_retry
def _create_objects(self, objects):
self.log.info("Starting resource creation")
if not self.custom_resource_definition:
Expand Down
Loading