From b4f12f8aa0ae4770bb192fffee71f7a79b249c87 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Mon, 16 May 2022 11:36:27 -0700 Subject: [PATCH] Test patch_already_checked directly (and separate from the other tests) The "already_checked" label was added to a few "expected pods" recently, when we changed it to patch even in the case of a successful pod. Since we are changing the "patch" code to patch with the latest read on the pod that we have (i.e. using the `remote_pod` variable), and no longer the pod object stored on `k.pod`, the label no longer shows up in those tests (that's because in k.pod isn't actually a read of the remote pod, but just happens to get mutated in the patch function before it is used to actually patch the pod). Further, since the `remote_pod` is a local variable, we can't observe it in tests. So we have to read the pod using k8s api. _But_, our "find pod" function excludes "already checked" pods! So we have to make this configurable. So, now we have a proper integration test for the "already_checked" behavior (there was already a unit test). --- .../kubernetes/operators/kubernetes_pod.py | 12 ++-- .../test_kubernetes_pod_operator.py | 55 ++++++++++++++++++- 2 files changed, 60 insertions(+), 7 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index 7c64875645471..71ab55b97b292 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -330,9 +330,9 @@ def client(self) -> CoreV1Api: kwargs.update(in_cluster=self.in_cluster) return kube_client.get_kube_client(**kwargs) - def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]: + def find_pod(self, namespace, context, *, exclude_checked=True) -> Optional[k8s.V1Pod]: """Returns an already-running pod for this task instance if one exists.""" - label_selector = self._build_find_pod_label_selector(context) + label_selector = self._build_find_pod_label_selector(context, exclude_checked=exclude_checked) pod_list = self.client.list_namespaced_pod( namespace=namespace, label_selector=label_selector, @@ -436,10 +436,14 @@ def process_pod_deletion(self, pod): else: self.log.info("skipping deleting pod: %s", pod.metadata.name) - def _build_find_pod_label_selector(self, context: Optional[dict] = None) -> str: + def _build_find_pod_label_selector(self, context: Optional[dict] = None, *, exclude_checked=True) -> str: labels = self._get_ti_pod_labels(context, include_try_number=False) label_strings = [f'{label_id}={label}' for label_id, label in sorted(labels.items())] - return ','.join(label_strings) + f',{self.POD_CHECKED_KEY}!=True,!airflow-worker' + labels_value = ','.join(label_strings) + if exclude_checked: + labels_value += f',{self.POD_CHECKED_KEY}!=True' + labels_value += ',!airflow-worker' + return labels_value def _set_name(self, name): if name is None: diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py index 25c5bab7639e7..56705c7a9d6d1 100644 --- a/kubernetes_tests/test_kubernetes_pod_operator.py +++ b/kubernetes_tests/test_kubernetes_pod_operator.py @@ -169,9 +169,8 @@ def test_config_path_move(self): context = create_context(k) k.execute(context) expected_pod = copy(self.expected_pod) - expected_pod['metadata']['labels']['already_checked'] = 'True' actual_pod = self.api_client.sanitize_for_serialization(k.pod) - assert expected_pod == actual_pod + assert actual_pod == expected_pod def test_working_pod(self): k = KubernetesPodOperator( @@ -210,6 +209,57 @@ def test_delete_operator_pod(self): assert self.expected_pod['spec'] == actual_pod['spec'] assert self.expected_pod['metadata']['labels'] == actual_pod['metadata']['labels'] + def test_already_checked_on_success(self): + """ + When ``is_delete_operator_pod=False``, pod should have 'already_checked' + label, whether pod is successful or not. + """ + pod_name = "test-" + str(random.randint(0, 1000000)) + k = KubernetesPodOperator( + namespace='default', + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["echo 10"], + labels={"foo": "bar"}, + name=pod_name, + task_id="task" + self.get_current_task_name(), + in_cluster=False, + do_xcom_push=False, + is_delete_operator_pod=False, + ) + context = create_context(k) + k.execute(context) + actual_pod = k.find_pod('default', context, exclude_checked=False) + actual_pod = self.api_client.sanitize_for_serialization(actual_pod) + assert actual_pod['metadata']['labels']['already_checked'] == 'True' + + def test_already_checked_on_failure(self): + """ + When ``is_delete_operator_pod=False``, pod should have 'already_checked' + label, whether pod is successful or not. + """ + pod_name = "test-" + str(random.randint(0, 1000000)) + k = KubernetesPodOperator( + namespace='default', + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["lalala"], + labels={"foo": "bar"}, + name=pod_name, + task_id="task" + self.get_current_task_name(), + in_cluster=False, + do_xcom_push=False, + is_delete_operator_pod=False, + ) + context = create_context(k) + with pytest.raises(AirflowException): + k.execute(context) + actual_pod = k.find_pod('default', context, exclude_checked=False) + actual_pod = self.api_client.sanitize_for_serialization(actual_pod) + status = next(iter(filter(lambda x: x['name'] == 'base', actual_pod['status']['containerStatuses']))) + assert status['state']['terminated']['reason'] == 'Error' + assert actual_pod['metadata']['labels']['already_checked'] == 'True' + def test_pod_hostnetwork(self): k = KubernetesPodOperator( namespace='default', @@ -763,7 +813,6 @@ def test_full_pod_spec(self): 'kubernetes_pod_operator': 'True', 'task_id': mock.ANY, 'try_number': '1', - 'already_checked': 'True', } assert k.pod.spec.containers[0].env == [k8s.V1EnvVar(name="env_name", value="value")] assert result == {"hello": "world"}