Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
AirflowKubernetesScheduler,
)


# CLI Args
ARG_NAMESPACE = Arg(
("--namespace",),
Expand Down Expand Up @@ -577,7 +576,22 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task
for pod in pod_list:
self.adopt_launched_task(kube_client, pod, tis_to_flush_by_key)
self._adopt_completed_pods(kube_client)
tis_to_flush.extend(tis_to_flush_by_key.values())

# as this method can be retried within a short time frame
# (wrapped in a run_with_db_retries of scheduler_job_runner,
# and get retried due to an OperationalError, for example),
# there is a chance that in second attempt, adopt_launched_task will not be called even once
# as all pods are already adopted in the first attempt.
# and tis_to_flush_by_key will contain TIs that are already adopted.
# therefore, we need to check if the TIs are already adopted by the first attempt and remove them.
def _iter_tis_to_flush():
for key, ti in tis_to_flush_by_key.items():
if key in self.running:
self.log.info("%s is already adopted, no need to flush.", ti)
else:
yield ti

tis_to_flush.extend(_iter_tis_to_flush())
return tis_to_flush

def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -991,8 +991,35 @@ def test_try_adopt_task_instances_no_matching_pods(

tis_to_flush = executor.try_adopt_task_instances([mock_ti])
assert tis_to_flush == [mock_ti]
assert executor.running == set()
mock_adopt_launched_task.assert_not_called()
mock_adopt_completed_pods.assert_called_once()

@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@mock.patch(
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task"
)
@mock.patch(
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods"
)
def test_try_adopt_already_adopted_task_instances(
self, mock_adopt_completed_pods, mock_adopt_launched_task, mock_kube_dynamic_client
):
"""For TIs that are already adopted, we should not flush them"""
mock_kube_dynamic_client.return_value = mock.MagicMock()
mock_kube_dynamic_client.return_value.get.return_value.items = []
mock_kube_client = mock.MagicMock()
executor = self.kubernetes_executor
executor.kube_client = mock_kube_client
ti_key = TaskInstanceKey("dag", "task", "run_id", 1)
mock_ti = mock.MagicMock(queued_by_job_id="1", external_executor_id="1", key=ti_key)
executor.running = {ti_key}

tis_to_flush = executor.try_adopt_task_instances([mock_ti])
mock_adopt_launched_task.assert_not_called()
mock_adopt_completed_pods.assert_called_once()
assert tis_to_flush == []
assert executor.running == {ti_key}

@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
def test_adopt_launched_task(self, mock_kube_client):
Expand Down