Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
95fb8f7
Add deferrable mode to Kubernetes and GKE operator
MrGeorgeOwl Dec 8, 2022
da29787
Fix import of the dag
MrGeorgeOwl Dec 9, 2022
5928c66
update kubernetesPodOperator to return xcom value
VladaZakharova Dec 9, 2022
e109aad
Change cleanup method to first implementation
MrGeorgeOwl Dec 12, 2022
3bf7750
Add logic to convert config_file content to prevent memory leak
VladaZakharova Dec 21, 2022
b0e464e
Middlework.Change call to db, refactor temporary file creation and li…
MrGeorgeOwl Dec 23, 2022
4cba01a
Middlework.Add caching to async request to db, remove error imports, …
MrGeorgeOwl Dec 23, 2022
afd17ac
Change the way of authentication to GKE cluster
MrGeorgeOwl Dec 29, 2022
c62aa31
Fix marking task failed, pending for additional resources, add failed…
MrGeorgeOwl Dec 29, 2022
dc857e1
Add additional check for timeout event
MrGeorgeOwl Jan 3, 2023
665b0f9
Add caching for Kubernetes client
VladaZakharova Dec 22, 2022
0becd01
Add logging on all stages of pod run
VladaZakharova Jan 10, 2023
c431381
Fix tests for gke operator and kubernetes hook
VladaZakharova Jan 12, 2023
e68cc21
Add asgiref module to cncf provider.yaml
VladaZakharova Jan 12, 2023
ff813af
Change version ranging, add deprecation messages
MrGeorgeOwl Jan 16, 2023
0e438de
Add tests for GKE pod hook, refactor test for GKE operator
MrGeorgeOwl Jan 16, 2023
fb6f2c5
Add tests for GKE trigger + add some params for it for correct inheri…
VladaZakharova Jan 17, 2023
235844b
Fix for passing static checks
VladaZakharova Jan 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 162 additions & 0 deletions airflow/providers/cncf/kubernetes/hooks/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,28 @@
# under the License.
from __future__ import annotations

import contextlib
import tempfile
import warnings
from typing import TYPE_CHECKING, Any, Generator

from asgiref.sync import sync_to_async
from kubernetes import client, config, watch
from kubernetes.client.models import V1Pod
from kubernetes.config import ConfigException
from kubernetes_asyncio import client as async_client, config as async_config
from kubernetes_asyncio.client import ApiException
from kubernetes_asyncio.config import load_kube_config_from_dict
from urllib3.exceptions import HTTPError

from airflow.compat.functools import cached_property
from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
from airflow.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive
from airflow.utils import yaml

LOADING_KUBE_CONFIG_FILE_RESOURCE = "Loading Kubernetes configuration file kube_config from {}..."


def _load_body_to_dict(body):
try:
Expand Down Expand Up @@ -396,6 +405,12 @@ def get_pod_logs(
namespace=namespace or self._get_namespace() or self.DEFAULT_NAMESPACE,
)

def get_pod(self, name: str, namespace: str) -> V1Pod:
return self.core_v1_client.read_namespaced_pod(
name=name,
namespace=namespace,
)

def get_namespaced_pod_list(
self,
label_selector: str | None = "",
Expand Down Expand Up @@ -431,3 +446,150 @@ def _get_bool(val) -> bool | None:
elif val.strip().lower() == "false":
return False
return None


class AsyncKubernetesHook(KubernetesHook):
"""Hook to use Kubernetes SDK asynchronously."""

def __init__(self, config_dict: dict | None = None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.config_dict = config_dict

self._extras: dict | None = None

async def _load_config(self):
"""Returns Kubernetes API session for use with requests"""
in_cluster = self._coalesce_param(self.in_cluster, await self._get_field("in_cluster"))
cluster_context = self._coalesce_param(self.cluster_context, await self._get_field("cluster_context"))
kubeconfig = await self._get_field("kube_config")

num_selected_configuration = len([o for o in [in_cluster, kubeconfig, self.config_dict] if o])

if num_selected_configuration > 1:
raise AirflowException(
"Invalid connection configuration. Options kube_config_path, "
"kube_config, in_cluster are mutually exclusive. "
"You can only use one option at a time."
)

if in_cluster:
self.log.debug(LOADING_KUBE_CONFIG_FILE_RESOURCE.format("within a pod"))
self._is_in_cluster = True
async_config.load_incluster_config()
return async_client.ApiClient()

if self.config_dict:
self.log.debug(LOADING_KUBE_CONFIG_FILE_RESOURCE.format("config dictionary"))
await load_kube_config_from_dict(self.config_dict)
return async_client.ApiClient()

if kubeconfig is not None:
with tempfile.NamedTemporaryFile() as temp_config:
self.log.debug(
"Reading kubernetes configuration file from connection "
"object and writing temporary config file with its content",
)
temp_config.write(kubeconfig.encode())
temp_config.flush()
self._is_in_cluster = False
await async_config.load_kube_config(
config_file=temp_config.name,
client_configuration=self.client_configuration,
context=cluster_context,
)
return async_client.ApiClient()
self.log.debug(LOADING_KUBE_CONFIG_FILE_RESOURCE.format("default configuration file"))
await async_config.load_kube_config(
client_configuration=self.client_configuration,
context=cluster_context,
)

async def get_conn_extras(self) -> dict:
if self._extras is None:
if self.conn_id:
connection = await sync_to_async(self.get_connection)(self.conn_id)
self._extras = connection.extra_dejson
else:
self._extras = {}
return self._extras

async def _get_field(self, field_name):
if field_name.startswith("extra__"):
raise ValueError(
f"Got prefixed name {field_name}; please remove the 'extra__kubernetes__' prefix "
f"when using this method."
)
extras = await self.get_conn_extras()
if field_name in extras:
return extras[field_name] or None
prefixed_name = f"extra__kubernetes__{field_name}"
return extras.get(prefixed_name) or None

@contextlib.asynccontextmanager
async def get_conn(self) -> async_client.ApiClient:
kube_client = None
try:
kube_client = await self._load_config() or async_client.ApiClient()
yield kube_client
finally:
if kube_client is not None:
await kube_client.close()

async def get_pod(self, name: str, namespace: str) -> V1Pod:
"""
Gets pod's object.

:param name: Name of the pod.
:param namespace: Name of the pod's namespace.
"""
async with self.get_conn() as connection:
v1_api = async_client.CoreV1Api(connection)
pod: V1Pod = await v1_api.read_namespaced_pod(
name=name,
namespace=namespace,
)
return pod

async def delete_pod(self, name: str, namespace: str):
"""
Deletes pod's object.

:param name: Name of the pod.
:param namespace: Name of the pod's namespace.
"""
async with self.get_conn() as connection:
try:
v1_api = async_client.CoreV1Api(connection)
await v1_api.delete_namespaced_pod(
name=name, namespace=namespace, body=client.V1DeleteOptions()
)
except ApiException as e:
# If the pod is already deleted
if e.status != 404:
raise

async def read_logs(self, name: str, namespace: str):
"""
Reads logs inside the pod while starting containers inside. All the logs will be outputted with its
timestamp to track the logs after the execution of the pod is completed. The method is used for async
output of the logs only in the pod failed it execution or the task was cancelled by the user.

:param name: Name of the pod.
:param namespace: Name of the pod's namespace.
"""
async with self.get_conn() as connection:
try:
v1_api = async_client.CoreV1Api(connection)
logs = await v1_api.read_namespaced_pod_log(
name=name,
namespace=namespace,
follow=False,
timestamps=True,
)
logs = logs.splitlines()
for line in logs:
self.log.info("Container logs from %s", line)
return logs
except HTTPError:
self.log.exception("There was an error reading the kubernetes API.")
raise
Loading