Skip to content

Commit

Permalink
Address review comment (rename config item)
Browse files Browse the repository at this point in the history
  • Loading branch information
XD-DENG committed Dec 3, 2022
1 parent a23c33e commit 1c1f25d
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 17 deletions.
5 changes: 3 additions & 2 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2395,12 +2395,13 @@
- name: multi_namespace_mode
description: |
Allows users to launch pods in multiple namespaces.
Will require creating a cluster-role for the scheduler, or use namespace_list configuration.
Will require creating a cluster-role for the scheduler,
or use multi_namespace_mode_namespace_list configuration.
version_added: 1.10.12
type: boolean
example: ~
default: "False"
- name: namespace_list
- name: multi_namespace_mode_namespace_list
description: |
If multi_namespace_mode is True while scheduler does not have a cluster-role,
give the list of namespaces where the scheduler will schedule jobs
Expand Down
4 changes: 2 additions & 2 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -1197,13 +1197,13 @@ delete_worker_pods_on_failure = False
worker_pods_creation_batch_size = 1

# Allows users to launch pods in multiple namespaces.
# Will require creating a cluster-role for the scheduler, or use namespace_list configuration.
# Will require creating a cluster-role for the scheduler, or use multi_namespace_mode_namespace_list configuration.
multi_namespace_mode = False

# If multi_namespace_mode is True while scheduler does not have a cluster-role,
# give the list of namespaces where the scheduler will schedule jobs
# Scheduler needs to have the necessary permissions in these namespaces.
namespace_list =
multi_namespace_mode_namespace_list =

# Use the service account kubernetes gives to pods to connect to kubernetes cluster.
# It's intended for clients that expect to be running inside a pod running on kubernetes.
Expand Down
10 changes: 6 additions & 4 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def _run(

last_resource_version: str | None = None
if self.multi_namespace_mode:
if self.kube_config.namespace_list:
if self.kube_config.multi_namespace_mode_namespace_list:
list_worker_pods = functools.partial(
watcher.stream, kube_client.list_namespaced_pod, self.namespace, **kwargs
)
Expand Down Expand Up @@ -297,7 +297,9 @@ def _make_kube_watchers(self) -> dict[str | None, KubernetesJobWatcher]:
watchers = {}
if self.kube_config.multi_namespace_mode:
namespaces_to_watch = (
self.kube_config.namespace_list if self.kube_config.namespace_list else [None]
self.kube_config.multi_namespace_mode_namespace_list
if self.kube_config.multi_namespace_mode_namespace_list
else [None]
)
else:
namespaces_to_watch = [self.kube_config.kube_namespace]
Expand Down Expand Up @@ -471,9 +473,9 @@ def __init__(self):

def _list_pods(self, query_kwargs):
if self.kube_config.multi_namespace_mode:
if self.kube_config.namespace_list:
if self.kube_config.multi_namespace_mode_namespace_list:
pods = []
for namespace in self.kube_config.namespace_list:
for namespace in self.kube_config.multi_namespace_mode_namespace_list:
pods.extend(
self.kube_client.list_namespaced_pod(namespace=namespace, **query_kwargs).items
)
Expand Down
10 changes: 7 additions & 3 deletions airflow/kubernetes/kube_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,14 @@ def __init__(self):
# create, watch, get, and delete pods in this namespace.
self.kube_namespace = conf.get(self.kubernetes_section, "namespace")
self.multi_namespace_mode = conf.getboolean(self.kubernetes_section, "multi_namespace_mode")
if self.multi_namespace_mode and conf.get(self.kubernetes_section, "namespace_list"):
self.namespace_list = conf.get(self.kubernetes_section, "namespace_list").split(",")
if self.multi_namespace_mode and conf.get(
self.kubernetes_section, "multi_namespace_mode_namespace_list"
):
self.multi_namespace_mode_namespace_list = conf.get(
self.kubernetes_section, "multi_namespace_mode_namespace_list"
).split(",")
else:
self.namespace_list = None
self.multi_namespace_mode_namespace_list = None
# The Kubernetes Namespace in which pods will be created by the executor. Note
# that if your
# cluster has RBAC enabled, your workers may need service account permissions to
Expand Down
14 changes: 8 additions & 6 deletions tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,17 +522,19 @@ def test_change_state_failed_no_deletion(
mock_delete_pod.assert_not_called()

@pytest.mark.parametrize(
"namespace_list, watchers_keys",
"multi_namespace_mode_namespace_list, watchers_keys",
[
pytest.param(["A", "B", "C"], ["A", "B", "C"]),
pytest.param(None, [None]),
],
)
@mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
def test_watchers_under_multi_namespace_mode(self, mock_get_kube_client, namespace_list, watchers_keys):
def test_watchers_under_multi_namespace_mode(
self, mock_get_kube_client, multi_namespace_mode_namespace_list, watchers_keys
):
executor = self.kubernetes_executor
executor.kube_config.multi_namespace_mode = True
executor.kube_config.namespace_list = namespace_list
executor.kube_config.multi_namespace_mode_namespace_list = multi_namespace_mode_namespace_list
executor.start()
assert list(executor.kube_scheduler.kube_watchers.keys()) == watchers_keys
assert all(
Expand Down Expand Up @@ -726,12 +728,12 @@ def test_kube_config_get_namespace_list(
):
config = {
("kubernetes", "multi_namespace_mode"): raw_multi_namespace_mode,
("kubernetes", "namespace_list"): raw_value_namespace_list,
("kubernetes", "multi_namespace_mode_namespace_list"): raw_value_namespace_list,
}
with conf_vars(config):
executor = KubernetesExecutor()

assert executor.kube_config.namespace_list == expected_value_in_kube_config
assert executor.kube_config.multi_namespace_mode_namespace_list == expected_value_in_kube_config

@mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
@mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
Expand Down Expand Up @@ -853,7 +855,7 @@ def list_namespaced_pod(namespace, *args, **kwargs):
config = {
("kubernetes", "namespace"): "mynamespace",
("kubernetes", "multi_namespace_mode"): "true",
("kubernetes", "namespace_list"): "namespace-1,namespace-2,namespace-3",
("kubernetes", "multi_namespace_mode_namespace_list"): "namespace-1,namespace-2,namespace-3",
("kubernetes", "kube_client_request_args"): '{"sentinel": "foo"}',
}
with conf_vars(config):
Expand Down

0 comments on commit 1c1f25d

Please sign in to comment.