From a791c857f7b35ed59dea9757cc19b10e3f17af77 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 13 Jun 2024 10:40:14 +0100 Subject: [PATCH] Don't add pod key label from get go --- .../executors/kubernetes_executor.py | 4 ++-- .../executors/kubernetes_executor_utils.py | 13 ++++------ .../cncf/kubernetes/pod_generator.py | 3 --- .../executors/test_kubernetes_executor.py | 24 ++++++------------- .../cncf/kubernetes/test_pod_generator.py | 2 -- .../kubernetes/test_template_rendering.py | 1 - 6 files changed, 13 insertions(+), 34 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index f65122f83350e0..40a27d70fdecfb 100644 --- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -569,7 +569,7 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task "field_selector": "status.phase!=Succeeded", "label_selector": ( "kubernetes_executor=True," - f"airflow-worker={scheduler_job_id},{POD_EXECUTOR_DONE_KEY}=False" + f"airflow-worker={scheduler_job_id},{POD_EXECUTOR_DONE_KEY}!=True" ), } pod_list = self._list_pods(query_kwargs) @@ -687,7 +687,7 @@ def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None: "field_selector": "status.phase=Succeeded", "label_selector": ( "kubernetes_executor=True," - f"airflow-worker!={new_worker_id_label},{POD_EXECUTOR_DONE_KEY}=False" + f"airflow-worker!={new_worker_id_label},{POD_EXECUTOR_DONE_KEY}!=True" ), } pod_list = self._list_pods(query_kwargs) diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index b4da5d303c7be4..5da947752254ad 100644 --- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -140,7 +140,7 @@ def _run( self.log.info("Event: and now my watch begins starting at resource_version: %s", resource_version) kwargs: dict[str, Any] = { - "label_selector": f"airflow-worker={scheduler_job_id},{POD_EXECUTOR_DONE_KEY}=False", + "label_selector": f"airflow-worker={scheduler_job_id},{POD_EXECUTOR_DONE_KEY}!=True", } if resource_version: kwargs["resource_version"] = resource_version @@ -273,14 +273,9 @@ def process_status( elif status == "Succeeded": # We get multiple events once the pod hits a terminal state, and we only want to # send it along to the scheduler once. - # If our event type is DELETED, we have the POD_EXECUTOR_DONE_KEY, or the pod has - # a deletion timestamp, we've already seen the initial Succeeded event and sent it - # along to the scheduler. - if ( - event["type"] == "DELETED" - or pod.metadata.labels[POD_EXECUTOR_DONE_KEY] == "True" - or pod.metadata.deletion_timestamp - ): + # If our event type is DELETED, the pod has a deletion timestamp, we've already + # seen the initial Succeeded event and sent it along to the scheduler. + if event["type"] == "DELETED" or pod.metadata.deletion_timestamp: self.log.info( "Skipping event for Succeeded pod %s - event for this pod already sent to executor", pod_name, diff --git a/airflow/providers/cncf/kubernetes/pod_generator.py b/airflow/providers/cncf/kubernetes/pod_generator.py index b8811d15e3924a..a04eb5aa7e20e4 100644 --- a/airflow/providers/cncf/kubernetes/pod_generator.py +++ b/airflow/providers/cncf/kubernetes/pod_generator.py @@ -56,8 +56,6 @@ from airflow.utils.hashlib_wrapper import md5 from airflow.version import version as airflow_version -from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import POD_EXECUTOR_DONE_KEY - if TYPE_CHECKING: import datetime @@ -523,7 +521,6 @@ def build_labels_for_k8s_executor_pod( "try_number": str(try_number), "kubernetes_executor": "True", "airflow_version": airflow_version.replace("+", "-"), - POD_EXECUTOR_DONE_KEY: "False", } if airflow_worker is not None: labels["airflow-worker"] = make_safe_label_value(str(airflow_worker)) diff --git a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py index 3ec0e80250ad8c..1ed9399b59950c 100644 --- a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -39,7 +39,6 @@ ) from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import ( ADOPTED, - POD_EXECUTOR_DONE_KEY, ) from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils import ( AirflowKubernetesScheduler, @@ -650,7 +649,6 @@ def test_pod_template_file_override_in_executor_config( "release": "stable", "task_id": "task", "try_number": "1", - "airflow_executor_done": "False", }, ), spec=k8s.V1PodSpec( @@ -895,7 +893,7 @@ def test_try_adopt_task_instances( resource=mock_pod_resource, namespace="default", field_selector="status.phase!=Succeeded", - label_selector="kubernetes_executor=True,airflow-worker=1,airflow_executor_done=False", + label_selector="kubernetes_executor=True,airflow-worker=1,airflow_executor_done!=True", header_params={"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"}, ) mock_adopt_launched_task.assert_called_once_with(mock_kube_client, pod, {ti_key: mock_ti}) @@ -919,7 +917,7 @@ def test_try_adopt_task_instances( resource=mock_pod_resource, namespace="default", field_selector="status.phase!=Succeeded", - label_selector="kubernetes_executor=True,airflow-worker=10,airflow_executor_done=False", + label_selector="kubernetes_executor=True,airflow-worker=10,airflow_executor_done!=True", header_params={"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"}, ) mock_adopt_launched_task.assert_called_once() # Won't check args this time around as they get mutated @@ -955,7 +953,7 @@ def test_try_adopt_task_instances_multiple_scheduler_ids( resource=mock_pod_resource, namespace="default", field_selector="status.phase!=Succeeded", - label_selector="kubernetes_executor=True,airflow-worker=10,airflow_executor_done=False", + label_selector="kubernetes_executor=True,airflow-worker=10,airflow_executor_done!=True", header_params={ "Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io" }, @@ -964,7 +962,7 @@ def test_try_adopt_task_instances_multiple_scheduler_ids( resource=mock_pod_resource, namespace="default", field_selector="status.phase!=Succeeded", - label_selector="kubernetes_executor=True,airflow-worker=40,airflow_executor_done=False", + label_selector="kubernetes_executor=True,airflow-worker=40,airflow_executor_done!=True", header_params={ "Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io" }, @@ -1112,7 +1110,7 @@ def get_annotations(pod_name): resource=mock_pod_resource, namespace="somens", field_selector="status.phase=Succeeded", - label_selector="kubernetes_executor=True,airflow-worker!=modified,airflow_executor_done=False", + label_selector="kubernetes_executor=True,airflow-worker!=modified,airflow_executor_done!=True", header_params={"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"}, ) assert len(pod_names) == mock_kube_client.patch_namespaced_pod.call_count @@ -1531,7 +1529,7 @@ def setup_method(self): annotations={"airflow-worker": "bar", **self.core_annotations}, namespace="airflow", resource_version="456", - labels={POD_EXECUTOR_DONE_KEY: "False"}, + labels={}, ), status=k8s.V1PodStatus(phase="Pending"), ) @@ -1556,7 +1554,7 @@ def _run(self): mock_pod_events.assert_called_once_with( kube_client=self.kube_client, query_kwargs={ - "label_selector": "airflow-worker=123,airflow_executor_done=False", + "label_selector": "airflow-worker=123,airflow_executor_done!=True", "resource_version": "0", "_request_timeout": 30, "timeout_seconds": 3600, @@ -1777,14 +1775,6 @@ def test_process_status_succeeded(self): # We don't know the TI state, so we send in None self.assert_watcher_queue_called_once_with_state(None) - def test_process_status_succeeded_dedup_label(self): - self.pod.status.phase = "Succeeded" - self.pod.metadata.labels[POD_EXECUTOR_DONE_KEY] = "True" - self.events.append({"type": "MODIFIED", "object": self.pod}) - - self._run() - self.watcher.watcher_queue.put.assert_not_called() - def test_process_status_succeeded_dedup_timestamp(self): self.pod.status.phase = "Succeeded" self.pod.metadata.deletion_timestamp = timezone.utcnow() diff --git a/tests/providers/cncf/kubernetes/test_pod_generator.py b/tests/providers/cncf/kubernetes/test_pod_generator.py index 9cfa09fb362599..27d4ef9c862fd3 100644 --- a/tests/providers/cncf/kubernetes/test_pod_generator.py +++ b/tests/providers/cncf/kubernetes/test_pod_generator.py @@ -83,7 +83,6 @@ def setup_method(self): "try_number": str(self.try_number), "airflow_version": __version__.replace("+", "-"), "kubernetes_executor": "True", - "airflow_executor_done": "False", } self.annotations = { "dag_id": self.dag_id, @@ -819,7 +818,6 @@ def test_build_labels_for_k8s_executor_pod(self, extra, extra_expected): try_number="1", airflow_version=airflow_version, kubernetes_executor="True", - airflow_executor_done="False", ) labels = PodGenerator.build_labels_for_k8s_executor_pod(**kwargs, **extra) assert labels == {**expected, **extra_expected} diff --git a/tests/providers/cncf/kubernetes/test_template_rendering.py b/tests/providers/cncf/kubernetes/test_template_rendering.py index dbf19547f63ae2..98764a2f1faa01 100644 --- a/tests/providers/cncf/kubernetes/test_template_rendering.py +++ b/tests/providers/cncf/kubernetes/test_template_rendering.py @@ -58,7 +58,6 @@ def test_render_k8s_pod_yaml(pod_mutation_hook, create_task_instance): "kubernetes_executor": "True", "task_id": "op1", "try_number": mock.ANY, - "airflow_executor_done": "False", }, "name": mock.ANY, "namespace": "default",