From 1c1f25d92e0faeb4f84069056dbdc24c1e53fa93 Mon Sep 17 00:00:00 2001 From: Xiaodong DENG Date: Fri, 2 Dec 2022 16:51:50 -0800 Subject: [PATCH] Address review comment (rename config item) --- airflow/config_templates/config.yml | 5 +++-- airflow/config_templates/default_airflow.cfg | 4 ++-- airflow/executors/kubernetes_executor.py | 10 ++++++---- airflow/kubernetes/kube_config.py | 10 +++++++--- tests/executors/test_kubernetes_executor.py | 14 ++++++++------ 5 files changed, 26 insertions(+), 17 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index b418db74e5638..1f95cff1c1650 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2395,12 +2395,13 @@ - name: multi_namespace_mode description: | Allows users to launch pods in multiple namespaces. - Will require creating a cluster-role for the scheduler, or use namespace_list configuration. + Will require creating a cluster-role for the scheduler, + or use multi_namespace_mode_namespace_list configuration. version_added: 1.10.12 type: boolean example: ~ default: "False" - - name: namespace_list + - name: multi_namespace_mode_namespace_list description: | If multi_namespace_mode is True while scheduler does not have a cluster-role, give the list of namespaces where the scheduler will schedule jobs diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 2a1fb712095b6..ea464934dd2c6 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -1197,13 +1197,13 @@ delete_worker_pods_on_failure = False worker_pods_creation_batch_size = 1 # Allows users to launch pods in multiple namespaces. -# Will require creating a cluster-role for the scheduler, or use namespace_list configuration. +# Will require creating a cluster-role for the scheduler, or use multi_namespace_mode_namespace_list configuration. multi_namespace_mode = False # If multi_namespace_mode is True while scheduler does not have a cluster-role, # give the list of namespaces where the scheduler will schedule jobs # Scheduler needs to have the necessary permissions in these namespaces. -namespace_list = +multi_namespace_mode_namespace_list = # Use the service account kubernetes gives to pods to connect to kubernetes cluster. # It's intended for clients that expect to be running inside a pod running on kubernetes. diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 29b91c2a245e8..024abaf28a38c 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -141,7 +141,7 @@ def _run( last_resource_version: str | None = None if self.multi_namespace_mode: - if self.kube_config.namespace_list: + if self.kube_config.multi_namespace_mode_namespace_list: list_worker_pods = functools.partial( watcher.stream, kube_client.list_namespaced_pod, self.namespace, **kwargs ) @@ -297,7 +297,9 @@ def _make_kube_watchers(self) -> dict[str | None, KubernetesJobWatcher]: watchers = {} if self.kube_config.multi_namespace_mode: namespaces_to_watch = ( - self.kube_config.namespace_list if self.kube_config.namespace_list else [None] + self.kube_config.multi_namespace_mode_namespace_list + if self.kube_config.multi_namespace_mode_namespace_list + else [None] ) else: namespaces_to_watch = [self.kube_config.kube_namespace] @@ -471,9 +473,9 @@ def __init__(self): def _list_pods(self, query_kwargs): if self.kube_config.multi_namespace_mode: - if self.kube_config.namespace_list: + if self.kube_config.multi_namespace_mode_namespace_list: pods = [] - for namespace in self.kube_config.namespace_list: + for namespace in self.kube_config.multi_namespace_mode_namespace_list: pods.extend( self.kube_client.list_namespaced_pod(namespace=namespace, **query_kwargs).items ) diff --git a/airflow/kubernetes/kube_config.py b/airflow/kubernetes/kube_config.py index 91749cf6f1c71..8d2aa9c2faf5b 100644 --- a/airflow/kubernetes/kube_config.py +++ b/airflow/kubernetes/kube_config.py @@ -57,10 +57,14 @@ def __init__(self): # create, watch, get, and delete pods in this namespace. self.kube_namespace = conf.get(self.kubernetes_section, "namespace") self.multi_namespace_mode = conf.getboolean(self.kubernetes_section, "multi_namespace_mode") - if self.multi_namespace_mode and conf.get(self.kubernetes_section, "namespace_list"): - self.namespace_list = conf.get(self.kubernetes_section, "namespace_list").split(",") + if self.multi_namespace_mode and conf.get( + self.kubernetes_section, "multi_namespace_mode_namespace_list" + ): + self.multi_namespace_mode_namespace_list = conf.get( + self.kubernetes_section, "multi_namespace_mode_namespace_list" + ).split(",") else: - self.namespace_list = None + self.multi_namespace_mode_namespace_list = None # The Kubernetes Namespace in which pods will be created by the executor. Note # that if your # cluster has RBAC enabled, your workers may need service account permissions to diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index 67056850ac431..eaa3a60687a8f 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -522,17 +522,19 @@ def test_change_state_failed_no_deletion( mock_delete_pod.assert_not_called() @pytest.mark.parametrize( - "namespace_list, watchers_keys", + "multi_namespace_mode_namespace_list, watchers_keys", [ pytest.param(["A", "B", "C"], ["A", "B", "C"]), pytest.param(None, [None]), ], ) @mock.patch("airflow.executors.kubernetes_executor.get_kube_client") - def test_watchers_under_multi_namespace_mode(self, mock_get_kube_client, namespace_list, watchers_keys): + def test_watchers_under_multi_namespace_mode( + self, mock_get_kube_client, multi_namespace_mode_namespace_list, watchers_keys + ): executor = self.kubernetes_executor executor.kube_config.multi_namespace_mode = True - executor.kube_config.namespace_list = namespace_list + executor.kube_config.multi_namespace_mode_namespace_list = multi_namespace_mode_namespace_list executor.start() assert list(executor.kube_scheduler.kube_watchers.keys()) == watchers_keys assert all( @@ -726,12 +728,12 @@ def test_kube_config_get_namespace_list( ): config = { ("kubernetes", "multi_namespace_mode"): raw_multi_namespace_mode, - ("kubernetes", "namespace_list"): raw_value_namespace_list, + ("kubernetes", "multi_namespace_mode_namespace_list"): raw_value_namespace_list, } with conf_vars(config): executor = KubernetesExecutor() - assert executor.kube_config.namespace_list == expected_value_in_kube_config + assert executor.kube_config.multi_namespace_mode_namespace_list == expected_value_in_kube_config @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher") @mock.patch("airflow.executors.kubernetes_executor.get_kube_client") @@ -853,7 +855,7 @@ def list_namespaced_pod(namespace, *args, **kwargs): config = { ("kubernetes", "namespace"): "mynamespace", ("kubernetes", "multi_namespace_mode"): "true", - ("kubernetes", "namespace_list"): "namespace-1,namespace-2,namespace-3", + ("kubernetes", "multi_namespace_mode_namespace_list"): "namespace-1,namespace-2,namespace-3", ("kubernetes", "kube_client_request_args"): '{"sentinel": "foo"}', } with conf_vars(config):