Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -734,9 +734,11 @@ async def _load_config(self):
"""Return Kubernetes API session for use with requests."""
in_cluster = self._coalesce_param(self.in_cluster, await self._get_field("in_cluster"))
cluster_context = self._coalesce_param(self.cluster_context, await self._get_field("cluster_context"))
kubeconfig_path = await self._get_field("kube_config_path")
kubeconfig = await self._get_field("kube_config")

num_selected_configuration = sum(1 for o in [in_cluster, kubeconfig, self.config_dict] if o)
num_selected_configuration = sum(
1 for o in [in_cluster, kubeconfig, kubeconfig_path, self.config_dict] if o
)

if num_selected_configuration > 1:
raise AirflowException(
Expand All @@ -757,6 +759,16 @@ async def _load_config(self):
await async_config.load_kube_config_from_dict(self.config_dict)
return async_client.ApiClient()

if kubeconfig_path is not None:
self.log.debug("loading kube_config from: %s", kubeconfig_path)
self._is_in_cluster = False
await async_config.load_kube_config(
config_file=kubeconfig_path,
client_configuration=self.client_configuration,
context=cluster_context,
)
return async_client.ApiClient()

if kubeconfig is not None:
async with aiofiles.tempfile.NamedTemporaryFile() as temp_config:
self.log.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
from unittest import mock
from unittest.mock import MagicMock, PropertyMock, patch

import anyio
import kubernetes
import pytest
import yaml
from kubernetes.client import V1Deployment, V1DeploymentStatus
from kubernetes.client.rest import ApiException
from kubernetes.config import ConfigException
Expand Down Expand Up @@ -898,6 +900,39 @@ async def test_load_config_with_conn_id(
kube_config_loader.assert_called_once()
kube_config_merger.assert_called_once()

@pytest.mark.asyncio
@mock.patch(INCLUSTER_CONFIG_LOADER)
@mock.patch(KUBE_CONFIG_MERGER)
async def test_load_config_with_conn_id_kube_config_path(
self, kube_config_merger, incluster_config, kube_config_loader, tmp_path
):
file_name = f"{tmp_path}/config"
extra = {"kube_config_path": file_name}
try:
merge_conn(
Connection(
conn_type="kubernetes",
conn_id=CONN_ID,
extra=json.dumps(extra),
),
)
async with await anyio.open_file(file_name, "w+") as f:
yaml.dump({"a": "b"}, f)
hook = AsyncKubernetesHook(
conn_id=CONN_ID,
in_cluster=False,
config_file=None,
cluster_context=None,
)
await hook._load_config()
assert not incluster_config.called
kube_config_loader.assert_called_once()
kube_config_merger.assert_called_once()
except:
raise
finally:
clear_db_connections()

@pytest.mark.asyncio
@mock.patch(INCLUSTER_CONFIG_LOADER)
@mock.patch(KUBE_CONFIG_MERGER)
Expand Down