Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion airflow/providers/cncf/kubernetes/operators/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,15 @@ def __init__(
namespace: str | None = None,
kubernetes_conn_id: str | None = KubernetesHook.default_conn_name,
custom_resource_definition: bool = False,
config_file: str | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self._namespace = namespace
self.kubernetes_conn_id = kubernetes_conn_id
self.yaml_conf = yaml_conf
self.custom_resource_definition = custom_resource_definition
self.config_file = config_file

@cached_property
def client(self) -> ApiClient:
Expand All @@ -76,7 +78,7 @@ def custom_object_client(self) -> CustomObjectsApi:

@cached_property
def hook(self) -> KubernetesHook:
hook = KubernetesHook(conn_id=self.kubernetes_conn_id)
hook = KubernetesHook(conn_id=self.kubernetes_conn_id, config_file=self.config_file)
return hook

def get_namespace(self) -> str:
Expand Down
14 changes: 11 additions & 3 deletions tests/providers/cncf/kubernetes/operators/test_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,17 @@ def setup_method(self):
args = {"owner": "airflow", "start_date": timezone.datetime(2020, 2, 1)}
self.dag = DAG("test_dag_id", default_args=args)

@patch("kubernetes.config.load_kube_config")
@patch("kubernetes.client.api.CoreV1Api.create_namespaced_persistent_volume_claim")
def test_create_application_from_yaml(self, mock_create_namespaced_persistent_volume_claim, context):
def test_create_application_from_yaml(
self, mock_create_namespaced_persistent_volume_claim, mock_load_kube_config, context
):
op = KubernetesCreateResourceOperator(
yaml_conf=TEST_VALID_RESOURCE_YAML,
dag=self.dag,
kubernetes_conn_id="kubernetes_default",
task_id="test_task_id",
config_file="/foo/bar",
)

op.execute(context)
Expand All @@ -112,30 +116,34 @@ def test_create_application_from_yaml_list(self, mock_create_namespaced_persiste

assert mock_create_namespaced_persistent_volume_claim.call_count == 2

@patch("kubernetes.config.load_kube_config")
@patch("kubernetes.client.api.CoreV1Api.delete_namespaced_persistent_volume_claim")
def test_single_delete_application_from_yaml(
self, mock_delete_namespaced_persistent_volume_claim, context
self, mock_delete_namespaced_persistent_volume_claim, mock_load_kube_config, context
):
op = KubernetesDeleteResourceOperator(
yaml_conf=TEST_VALID_RESOURCE_YAML,
dag=self.dag,
kubernetes_conn_id="kubernetes_default",
task_id="test_task_id",
config_file="/foo/bar",
)

op.execute(context)

mock_delete_namespaced_persistent_volume_claim.assert_called()

@patch("kubernetes.config.load_kube_config")
@patch("kubernetes.client.api.CoreV1Api.delete_namespaced_persistent_volume_claim")
def test_multi_delete_application_from_yaml(
self, mock_delete_namespaced_persistent_volume_claim, context
self, mock_delete_namespaced_persistent_volume_claim, mock_load_kube_config, context
):
op = KubernetesDeleteResourceOperator(
yaml_conf=TEST_VALID_LIST_RESOURCE_YAML,
dag=self.dag,
kubernetes_conn_id="kubernetes_default",
task_id="test_task_id",
config_file="/foo/bar",
)

op.execute(context)
Expand Down