Skip to content

Commit 7d85aff

Browse files
committed
fix: kpo async kube_config_path
1 parent ef79854 commit 7d85aff

File tree

2 files changed

+36
-2
lines changed

2 files changed

+36
-2
lines changed

providers/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -734,9 +734,11 @@ async def _load_config(self):
734734
"""Return Kubernetes API session for use with requests."""
735735
in_cluster = self._coalesce_param(self.in_cluster, await self._get_field("in_cluster"))
736736
cluster_context = self._coalesce_param(self.cluster_context, await self._get_field("cluster_context"))
737+
kubeconfig_path = self._coalesce_param(self.config_file, self._get_field("kube_config_path"))
737738
kubeconfig = await self._get_field("kube_config")
738-
739-
num_selected_configuration = sum(1 for o in [in_cluster, kubeconfig, self.config_dict] if o)
739+
num_selected_configuration = sum(
740+
1 for o in [in_cluster, kubeconfig, kubeconfig_path, self.config_dict] if o
741+
)
740742

741743
if num_selected_configuration > 1:
742744
raise AirflowException(
@@ -757,6 +759,16 @@ async def _load_config(self):
757759
await async_config.load_kube_config_from_dict(self.config_dict)
758760
return async_client.ApiClient()
759761

762+
if kubeconfig_path is not None:
763+
self.log.debug("loading kube_config from: %s", kubeconfig_path)
764+
self._is_in_cluster = False
765+
await async_config.load_kube_config(
766+
config_file=kubeconfig_path,
767+
client_configuration=self.client_configuration,
768+
context=cluster_context,
769+
)
770+
return async_client.ApiClient()
771+
760772
if kubeconfig is not None:
761773
async with aiofiles.tempfile.NamedTemporaryFile() as temp_config:
762774
self.log.debug(

providers/tests/cncf/kubernetes/hooks/test_kubernetes.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424
from unittest import mock
2525
from unittest.mock import MagicMock, PropertyMock, patch
2626

27+
import anyio
2728
import kubernetes
2829
import pytest
30+
import yaml
2931
from kubernetes.client import V1Deployment, V1DeploymentStatus
3032
from kubernetes.client.rest import ApiException
3133
from kubernetes.config import ConfigException
@@ -860,6 +862,26 @@ async def test_load_config_with_incluster(self, kube_config_merger, kube_config_
860862
assert not kube_config_loader.called
861863
assert not kube_config_merger.called
862864

865+
@pytest.mark.asyncio
866+
@mock.patch(INCLUSTER_CONFIG_LOADER)
867+
@mock.patch(KUBE_CONFIG_MERGER)
868+
async def test_load_config_with_config_file(
869+
self, kube_config_merger, incluster_config, kube_config_loader, tmp_path
870+
):
871+
file_name = f"{tmp_path}/config"
872+
async with await anyio.open_file(file_name, "w+") as f:
873+
yaml.dump({"a": "b"}, f)
874+
hook = AsyncKubernetesHook(
875+
conn_id=None,
876+
in_cluster=False,
877+
config_file=file_name,
878+
cluster_context=None,
879+
)
880+
await hook._load_config()
881+
assert not incluster_config.called
882+
assert hook._is_in_cluster is False
883+
kube_config_loader.assert_called_once()
884+
863885
@pytest.mark.asyncio
864886
@mock.patch(INCLUSTER_CONFIG_LOADER)
865887
@mock.patch(KUBE_CONFIG_MERGER)

0 commit comments

Comments
 (0)