Skip to content

Commit

Permalink
Don't add pod key label from get go
Browse files Browse the repository at this point in the history
  • Loading branch information
ephraimbuddy committed Jun 13, 2024
1 parent 258d4dc commit a791c85
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task
"field_selector": "status.phase!=Succeeded",
"label_selector": (
"kubernetes_executor=True,"
f"airflow-worker={scheduler_job_id},{POD_EXECUTOR_DONE_KEY}=False"
f"airflow-worker={scheduler_job_id},{POD_EXECUTOR_DONE_KEY}!=True"
),
}
pod_list = self._list_pods(query_kwargs)
Expand Down Expand Up @@ -687,7 +687,7 @@ def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None:
"field_selector": "status.phase=Succeeded",
"label_selector": (
"kubernetes_executor=True,"
f"airflow-worker!={new_worker_id_label},{POD_EXECUTOR_DONE_KEY}=False"
f"airflow-worker!={new_worker_id_label},{POD_EXECUTOR_DONE_KEY}!=True"
),
}
pod_list = self._list_pods(query_kwargs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def _run(
self.log.info("Event: and now my watch begins starting at resource_version: %s", resource_version)

kwargs: dict[str, Any] = {
"label_selector": f"airflow-worker={scheduler_job_id},{POD_EXECUTOR_DONE_KEY}=False",
"label_selector": f"airflow-worker={scheduler_job_id},{POD_EXECUTOR_DONE_KEY}!=True",
}
if resource_version:
kwargs["resource_version"] = resource_version
Expand Down Expand Up @@ -273,14 +273,9 @@ def process_status(
elif status == "Succeeded":
# We get multiple events once the pod hits a terminal state, and we only want to
# send it along to the scheduler once.
# If our event type is DELETED, we have the POD_EXECUTOR_DONE_KEY, or the pod has
# a deletion timestamp, we've already seen the initial Succeeded event and sent it
# along to the scheduler.
if (
event["type"] == "DELETED"
or pod.metadata.labels[POD_EXECUTOR_DONE_KEY] == "True"
or pod.metadata.deletion_timestamp
):
# If our event type is DELETED, the pod has a deletion timestamp, we've already
# seen the initial Succeeded event and sent it along to the scheduler.
if event["type"] == "DELETED" or pod.metadata.deletion_timestamp:
self.log.info(
"Skipping event for Succeeded pod %s - event for this pod already sent to executor",
pod_name,
Expand Down
3 changes: 0 additions & 3 deletions airflow/providers/cncf/kubernetes/pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@
from airflow.utils.hashlib_wrapper import md5
from airflow.version import version as airflow_version

from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import POD_EXECUTOR_DONE_KEY

if TYPE_CHECKING:
import datetime

Expand Down Expand Up @@ -523,7 +521,6 @@ def build_labels_for_k8s_executor_pod(
"try_number": str(try_number),
"kubernetes_executor": "True",
"airflow_version": airflow_version.replace("+", "-"),
POD_EXECUTOR_DONE_KEY: "False",
}
if airflow_worker is not None:
labels["airflow-worker"] = make_safe_label_value(str(airflow_worker))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
)
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import (
ADOPTED,
POD_EXECUTOR_DONE_KEY,
)
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils import (
AirflowKubernetesScheduler,
Expand Down Expand Up @@ -650,7 +649,6 @@ def test_pod_template_file_override_in_executor_config(
"release": "stable",
"task_id": "task",
"try_number": "1",
"airflow_executor_done": "False",
},
),
spec=k8s.V1PodSpec(
Expand Down Expand Up @@ -895,7 +893,7 @@ def test_try_adopt_task_instances(
resource=mock_pod_resource,
namespace="default",
field_selector="status.phase!=Succeeded",
label_selector="kubernetes_executor=True,airflow-worker=1,airflow_executor_done=False",
label_selector="kubernetes_executor=True,airflow-worker=1,airflow_executor_done!=True",
header_params={"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
)
mock_adopt_launched_task.assert_called_once_with(mock_kube_client, pod, {ti_key: mock_ti})
Expand All @@ -919,7 +917,7 @@ def test_try_adopt_task_instances(
resource=mock_pod_resource,
namespace="default",
field_selector="status.phase!=Succeeded",
label_selector="kubernetes_executor=True,airflow-worker=10,airflow_executor_done=False",
label_selector="kubernetes_executor=True,airflow-worker=10,airflow_executor_done!=True",
header_params={"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
)
mock_adopt_launched_task.assert_called_once() # Won't check args this time around as they get mutated
Expand Down Expand Up @@ -955,7 +953,7 @@ def test_try_adopt_task_instances_multiple_scheduler_ids(
resource=mock_pod_resource,
namespace="default",
field_selector="status.phase!=Succeeded",
label_selector="kubernetes_executor=True,airflow-worker=10,airflow_executor_done=False",
label_selector="kubernetes_executor=True,airflow-worker=10,airflow_executor_done!=True",
header_params={
"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"
},
Expand All @@ -964,7 +962,7 @@ def test_try_adopt_task_instances_multiple_scheduler_ids(
resource=mock_pod_resource,
namespace="default",
field_selector="status.phase!=Succeeded",
label_selector="kubernetes_executor=True,airflow-worker=40,airflow_executor_done=False",
label_selector="kubernetes_executor=True,airflow-worker=40,airflow_executor_done!=True",
header_params={
"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"
},
Expand Down Expand Up @@ -1112,7 +1110,7 @@ def get_annotations(pod_name):
resource=mock_pod_resource,
namespace="somens",
field_selector="status.phase=Succeeded",
label_selector="kubernetes_executor=True,airflow-worker!=modified,airflow_executor_done=False",
label_selector="kubernetes_executor=True,airflow-worker!=modified,airflow_executor_done!=True",
header_params={"Accept": "application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
)
assert len(pod_names) == mock_kube_client.patch_namespaced_pod.call_count
Expand Down Expand Up @@ -1531,7 +1529,7 @@ def setup_method(self):
annotations={"airflow-worker": "bar", **self.core_annotations},
namespace="airflow",
resource_version="456",
labels={POD_EXECUTOR_DONE_KEY: "False"},
labels={},
),
status=k8s.V1PodStatus(phase="Pending"),
)
Expand All @@ -1556,7 +1554,7 @@ def _run(self):
mock_pod_events.assert_called_once_with(
kube_client=self.kube_client,
query_kwargs={
"label_selector": "airflow-worker=123,airflow_executor_done=False",
"label_selector": "airflow-worker=123,airflow_executor_done!=True",
"resource_version": "0",
"_request_timeout": 30,
"timeout_seconds": 3600,
Expand Down Expand Up @@ -1777,14 +1775,6 @@ def test_process_status_succeeded(self):
# We don't know the TI state, so we send in None
self.assert_watcher_queue_called_once_with_state(None)

def test_process_status_succeeded_dedup_label(self):
self.pod.status.phase = "Succeeded"
self.pod.metadata.labels[POD_EXECUTOR_DONE_KEY] = "True"
self.events.append({"type": "MODIFIED", "object": self.pod})

self._run()
self.watcher.watcher_queue.put.assert_not_called()

def test_process_status_succeeded_dedup_timestamp(self):
self.pod.status.phase = "Succeeded"
self.pod.metadata.deletion_timestamp = timezone.utcnow()
Expand Down
2 changes: 0 additions & 2 deletions tests/providers/cncf/kubernetes/test_pod_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ def setup_method(self):
"try_number": str(self.try_number),
"airflow_version": __version__.replace("+", "-"),
"kubernetes_executor": "True",
"airflow_executor_done": "False",
}
self.annotations = {
"dag_id": self.dag_id,
Expand Down Expand Up @@ -819,7 +818,6 @@ def test_build_labels_for_k8s_executor_pod(self, extra, extra_expected):
try_number="1",
airflow_version=airflow_version,
kubernetes_executor="True",
airflow_executor_done="False",
)
labels = PodGenerator.build_labels_for_k8s_executor_pod(**kwargs, **extra)
assert labels == {**expected, **extra_expected}
Expand Down
1 change: 0 additions & 1 deletion tests/providers/cncf/kubernetes/test_template_rendering.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ def test_render_k8s_pod_yaml(pod_mutation_hook, create_task_instance):
"kubernetes_executor": "True",
"task_id": "op1",
"try_number": mock.ANY,
"airflow_executor_done": "False",
},
"name": mock.ANY,
"namespace": "default",
Expand Down

0 comments on commit a791c85

Please sign in to comment.