diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index cee1f77202ccc..9f4fe76baccf2 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -162,9 +162,11 @@ def test_delete_pod_successfully(self, mock_watcher, mock_client, mock_kube_clie kube_executor = KubernetesExecutor() kube_executor.job_id = 1 kube_executor.start() - kube_executor.kube_scheduler.delete_pod(pod_id, namespace) - - mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions()) + try: + kube_executor.kube_scheduler.delete_pod(pod_id, namespace) + mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions()) + finally: + kube_executor.end() @unittest.skipIf(AirflowKubernetesScheduler is None, "kubernetes python package is not installed") @mock.patch("airflow.executors.kubernetes_executor.get_kube_client") @@ -203,9 +205,11 @@ def test_delete_pod_404_not_raised(self, mock_watcher, mock_client, mock_kube_cl kube_executor = KubernetesExecutor() kube_executor.job_id = 1 kube_executor.start() - - kube_executor.kube_scheduler.delete_pod(pod_id, namespace) - mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions()) + try: + kube_executor.kube_scheduler.delete_pod(pod_id, namespace) + mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions()) + finally: + kube_executor.end() class TestKubernetesExecutor: @@ -266,32 +270,35 @@ def test_run_next_exception_requeue( with conf_vars(config): kubernetes_executor = self.kubernetes_executor kubernetes_executor.start() - # Execute a task while the Api Throws errors - try_number = 1 - task_instance_key = TaskInstanceKey("dag", "task", "run_id", try_number) - kubernetes_executor.execute_async( - key=task_instance_key, - queue=None, - command=["airflow", "tasks", "run", "true", "some_parameter"], - ) - kubernetes_executor.sync() + try: + # Execute a task while the Api Throws errors + try_number = 1 + task_instance_key = TaskInstanceKey("dag", "task", "run_id", try_number) + kubernetes_executor.execute_async( + key=task_instance_key, + queue=None, + command=["airflow", "tasks", "run", "true", "some_parameter"], + ) + kubernetes_executor.sync() - assert mock_kube_client.create_namespaced_pod.call_count == 1 + assert mock_kube_client.create_namespaced_pod.call_count == 1 - if should_requeue: - assert not kubernetes_executor.task_queue.empty() + if should_requeue: + assert not kubernetes_executor.task_queue.empty() - # Disable the ApiException - mock_kube_client.create_namespaced_pod.side_effect = None + # Disable the ApiException + mock_kube_client.create_namespaced_pod.side_effect = None - # Execute the task without errors should empty the queue - mock_kube_client.create_namespaced_pod.reset_mock() - kubernetes_executor.sync() - assert mock_kube_client.create_namespaced_pod.called - assert kubernetes_executor.task_queue.empty() - else: - assert kubernetes_executor.task_queue.empty() - assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED + # Execute the task without errors should empty the queue + mock_kube_client.create_namespaced_pod.reset_mock() + kubernetes_executor.sync() + assert mock_kube_client.create_namespaced_pod.called + assert kubernetes_executor.task_queue.empty() + else: + assert kubernetes_executor.task_queue.empty() + assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED + finally: + kubernetes_executor.end() @pytest.mark.skipif( AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed" @@ -311,23 +318,26 @@ def test_run_next_pmh_error(self, mock_get_kube_client, mock_pmh): kubernetes_executor = self.kubernetes_executor kubernetes_executor.start() - try_number = 1 - task_instance_key = TaskInstanceKey("dag", "task", "run_id", try_number) - kubernetes_executor.execute_async( - key=task_instance_key, - queue=None, - command=["airflow", "tasks", "run", "true", "some_parameter"], - ) - kubernetes_executor.sync() + try: + try_number = 1 + task_instance_key = TaskInstanceKey("dag", "task", "run_id", try_number) + kubernetes_executor.execute_async( + key=task_instance_key, + queue=None, + command=["airflow", "tasks", "run", "true", "some_parameter"], + ) + kubernetes_executor.sync() - # The pod_mutation_hook should have been called once. - assert mock_pmh.call_count == 1 - # There should be no pod creation request sent - assert mock_kube_client.create_namespaced_pod.call_count == 0 - # The task is not re-queued and there is the failed record in event_buffer - assert kubernetes_executor.task_queue.empty() - assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED - assert kubernetes_executor.event_buffer[task_instance_key][1].args[0] == exception_in_pmh + # The pod_mutation_hook should have been called once. + assert mock_pmh.call_count == 1 + # There should be no pod creation request sent + assert mock_kube_client.create_namespaced_pod.call_count == 0 + # The task is not re-queued and there is the failed record in event_buffer + assert kubernetes_executor.task_queue.empty() + assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED + assert kubernetes_executor.event_buffer[task_instance_key][1].args[0] == exception_in_pmh + finally: + kubernetes_executor.end() @pytest.mark.skipif( AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed" @@ -351,19 +361,22 @@ def test_run_next_pod_reconciliation_error(self, mock_get_kube_client, mock_kube with conf_vars(config): kubernetes_executor = self.kubernetes_executor kubernetes_executor.start() - # Execute a task while the Api Throws errors - try_number = 1 - task_instance_key = TaskInstanceKey("dag", "task", "run_id", try_number) - kubernetes_executor.execute_async( - key=task_instance_key, - queue=None, - command=["airflow", "tasks", "run", "true", "some_parameter"], - ) - kubernetes_executor.sync() + try: + # Execute a task while the Api Throws errors + try_number = 1 + task_instance_key = TaskInstanceKey("dag", "task", "run_id", try_number) + kubernetes_executor.execute_async( + key=task_instance_key, + queue=None, + command=["airflow", "tasks", "run", "true", "some_parameter"], + ) + kubernetes_executor.sync() - assert kubernetes_executor.task_queue.empty() - assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED - assert kubernetes_executor.event_buffer[task_instance_key][1].args[0] == fail_msg + assert kubernetes_executor.task_queue.empty() + assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED + assert kubernetes_executor.event_buffer[task_instance_key][1].args[0] == fail_msg + finally: + kubernetes_executor.end() @mock.patch("airflow.executors.kubernetes_executor.KubeConfig") @mock.patch("airflow.executors.kubernetes_executor.KubernetesExecutor.sync") @@ -384,20 +397,22 @@ def test_gauge_executor_metrics(self, mock_stats_gauge, mock_trigger_tasks, mock def test_invalid_executor_config(self, mock_get_kube_client, mock_kubernetes_job_watcher): executor = self.kubernetes_executor executor.start() + try: + assert executor.event_buffer == {} + executor.execute_async( + key=("dag", "task", datetime.utcnow(), 1), + queue=None, + command=["airflow", "tasks", "run", "true", "some_parameter"], + executor_config=k8s.V1Pod( + spec=k8s.V1PodSpec( + containers=[k8s.V1Container(name="base", image="myimage", image_pull_policy="Always")] + ) + ), + ) - assert executor.event_buffer == {} - executor.execute_async( - key=("dag", "task", datetime.utcnow(), 1), - queue=None, - command=["airflow", "tasks", "run", "true", "some_parameter"], - executor_config=k8s.V1Pod( - spec=k8s.V1PodSpec( - containers=[k8s.V1Container(name="base", image="myimage", image_pull_policy="Always")] - ) - ), - ) - - assert list(executor.event_buffer.values())[0][1] == "Invalid executor_config passed" + assert list(executor.event_buffer.values())[0][1] == "Invalid executor_config passed" + finally: + executor.end() @pytest.mark.execution_timeout(10) @pytest.mark.skipif( @@ -417,83 +432,88 @@ def test_pod_template_file_override_in_executor_config(self, mock_get_kube_clien with conf_vars({("kubernetes", "pod_template_file"): ""}): executor = self.kubernetes_executor executor.start() + try: + assert executor.event_buffer == {} + assert executor.task_queue.empty() + + executor.execute_async( + key=TaskInstanceKey("dag", "task", "run_id", 1), + queue=None, + command=["airflow", "tasks", "run", "true", "some_parameter"], + executor_config={ + "pod_template_file": template_file, + "pod_override": k8s.V1Pod( + metadata=k8s.V1ObjectMeta(labels={"release": "stable"}), + spec=k8s.V1PodSpec( + containers=[k8s.V1Container(name="base", image="airflow:3.6")], + ), + ), + }, + ) - assert executor.event_buffer == {} - assert executor.task_queue.empty() - - executor.execute_async( - key=TaskInstanceKey("dag", "task", "run_id", 1), - queue=None, - command=["airflow", "tasks", "run", "true", "some_parameter"], - executor_config={ - "pod_template_file": template_file, - "pod_override": k8s.V1Pod( - metadata=k8s.V1ObjectMeta(labels={"release": "stable"}), + assert not executor.task_queue.empty() + task = executor.task_queue.get_nowait() + _, _, expected_executor_config, expected_pod_template_file = task + executor.task_queue.task_done() + # Test that the correct values have been put to queue + assert expected_executor_config.metadata.labels == {"release": "stable"} + assert expected_pod_template_file == template_file + + self.kubernetes_executor.kube_scheduler.run_next(task) + mock_run_pod_async.assert_called_once_with( + k8s.V1Pod( + api_version="v1", + kind="Pod", + metadata=k8s.V1ObjectMeta( + name=mock.ANY, + namespace="default", + annotations={ + "dag_id": "dag", + "run_id": "run_id", + "task_id": "task", + "try_number": "1", + }, + labels={ + "airflow-worker": "5", + "airflow_version": mock.ANY, + "dag_id": "dag", + "run_id": "run_id", + "kubernetes_executor": "True", + "mylabel": "foo", + "release": "stable", + "task_id": "task", + "try_number": "1", + }, + ), spec=k8s.V1PodSpec( - containers=[k8s.V1Container(name="base", image="airflow:3.6")], + containers=[ + k8s.V1Container( + name="base", + image="airflow:3.6", + args=["airflow", "tasks", "run", "true", "some_parameter"], + env=[k8s.V1EnvVar(name="AIRFLOW_IS_K8S_EXECUTOR_POD", value="True")], + ) + ], + image_pull_secrets=[k8s.V1LocalObjectReference(name="airflow-registry")], + scheduler_name="default-scheduler", + security_context=k8s.V1PodSecurityContext(fs_group=50000, run_as_user=50000), ), - ), - }, - ) - - assert not executor.task_queue.empty() - task = executor.task_queue.get_nowait() - _, _, expected_executor_config, expected_pod_template_file = task - - # Test that the correct values have been put to queue - assert expected_executor_config.metadata.labels == {"release": "stable"} - assert expected_pod_template_file == template_file - - self.kubernetes_executor.kube_scheduler.run_next(task) - mock_run_pod_async.assert_called_once_with( - k8s.V1Pod( - api_version="v1", - kind="Pod", - metadata=k8s.V1ObjectMeta( - name=mock.ANY, - namespace="default", - annotations={ - "dag_id": "dag", - "run_id": "run_id", - "task_id": "task", - "try_number": "1", - }, - labels={ - "airflow-worker": "5", - "airflow_version": mock.ANY, - "dag_id": "dag", - "run_id": "run_id", - "kubernetes_executor": "True", - "mylabel": "foo", - "release": "stable", - "task_id": "task", - "try_number": "1", - }, - ), - spec=k8s.V1PodSpec( - containers=[ - k8s.V1Container( - name="base", - image="airflow:3.6", - args=["airflow", "tasks", "run", "true", "some_parameter"], - env=[k8s.V1EnvVar(name="AIRFLOW_IS_K8S_EXECUTOR_POD", value="True")], - ) - ], - image_pull_secrets=[k8s.V1LocalObjectReference(name="airflow-registry")], - scheduler_name="default-scheduler", - security_context=k8s.V1PodSecurityContext(fs_group=50000, run_as_user=50000), - ), + ) ) - ) + finally: + executor.end() @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher") @mock.patch("airflow.executors.kubernetes_executor.get_kube_client") def test_change_state_running(self, mock_get_kube_client, mock_kubernetes_job_watcher): executor = self.kubernetes_executor executor.start() - key = ("dag_id", "task_id", "run_id", "try_number1") - executor._change_state(key, State.RUNNING, "pod_id", "default") - assert executor.event_buffer[key][0] == State.RUNNING + try: + key = ("dag_id", "task_id", "run_id", "try_number1") + executor._change_state(key, State.RUNNING, "pod_id", "default") + assert executor.event_buffer[key][0] == State.RUNNING + finally: + executor.end() @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher") @mock.patch("airflow.executors.kubernetes_executor.get_kube_client") @@ -501,10 +521,13 @@ def test_change_state_running(self, mock_get_kube_client, mock_kubernetes_job_wa def test_change_state_success(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher): executor = self.kubernetes_executor executor.start() - key = ("dag_id", "task_id", "run_id", "try_number2") - executor._change_state(key, State.SUCCESS, "pod_id", "default") - assert executor.event_buffer[key][0] == State.SUCCESS - mock_delete_pod.assert_called_once_with("pod_id", "default") + try: + key = ("dag_id", "task_id", "run_id", "try_number2") + executor._change_state(key, State.SUCCESS, "pod_id", "default") + assert executor.event_buffer[key][0] == State.SUCCESS + mock_delete_pod.assert_called_once_with("pod_id", "default") + finally: + executor.end() @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher") @mock.patch("airflow.executors.kubernetes_executor.get_kube_client") @@ -516,10 +539,13 @@ def test_change_state_failed_no_deletion( executor.kube_config.delete_worker_pods = False executor.kube_config.delete_worker_pods_on_failure = False executor.start() - key = ("dag_id", "task_id", "run_id", "try_number3") - executor._change_state(key, State.FAILED, "pod_id", "default") - assert executor.event_buffer[key][0] == State.FAILED - mock_delete_pod.assert_not_called() + try: + key = ("dag_id", "task_id", "run_id", "try_number3") + executor._change_state(key, State.FAILED, "pod_id", "default") + assert executor.event_buffer[key][0] == State.FAILED + mock_delete_pod.assert_not_called() + finally: + executor.end() @pytest.mark.parametrize( "multi_namespace_mode_namespace_list, watchers_keys", @@ -536,11 +562,13 @@ def test_watchers_under_multi_namespace_mode( executor.kube_config.multi_namespace_mode = True executor.kube_config.multi_namespace_mode_namespace_list = multi_namespace_mode_namespace_list executor.start() - assert list(executor.kube_scheduler.kube_watchers.keys()) == watchers_keys - assert all( - isinstance(v, KubernetesJobWatcher) for v in executor.kube_scheduler.kube_watchers.values() - ) - executor.end() + try: + assert list(executor.kube_scheduler.kube_watchers.keys()) == watchers_keys + assert all( + isinstance(v, KubernetesJobWatcher) for v in executor.kube_scheduler.kube_watchers.values() + ) + finally: + executor.end() @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher") @mock.patch("airflow.executors.kubernetes_executor.get_kube_client") @@ -553,10 +581,13 @@ def test_change_state_skip_pod_deletion( executor.kube_config.delete_worker_pods_on_failure = False executor.start() - key = ("dag_id", "task_id", "run_id", "try_number2") - executor._change_state(key, State.SUCCESS, "pod_id", "default") - assert executor.event_buffer[key][0] == State.SUCCESS - mock_delete_pod.assert_not_called() + try: + key = ("dag_id", "task_id", "run_id", "try_number2") + executor._change_state(key, State.SUCCESS, "pod_id", "default") + assert executor.event_buffer[key][0] == State.SUCCESS + mock_delete_pod.assert_not_called() + finally: + executor.end() @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher") @mock.patch("airflow.executors.kubernetes_executor.get_kube_client") @@ -568,10 +599,13 @@ def test_change_state_failed_pod_deletion( executor.kube_config.delete_worker_pods_on_failure = True executor.start() - key = ("dag_id", "task_id", "run_id", "try_number2") - executor._change_state(key, State.FAILED, "pod_id", "test-namespace") - assert executor.event_buffer[key][0] == State.FAILED - mock_delete_pod.assert_called_once_with("pod_id", "test-namespace") + try: + key = ("dag_id", "task_id", "run_id", "try_number2") + executor._change_state(key, State.FAILED, "pod_id", "test-namespace") + assert executor.event_buffer[key][0] == State.FAILED + mock_delete_pod.assert_called_once_with("pod_id", "test-namespace") + finally: + executor.end() @mock.patch("airflow.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task") @mock.patch("airflow.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods") @@ -773,8 +807,11 @@ def test_pending_pod_timeout(self, mock_kubescheduler, mock_get_kube_client, moc executor = KubernetesExecutor() executor.job_id = 123 executor.start() - assert 2 == len(executor.event_scheduler.queue) - executor._check_worker_pods_pending_timeout() + try: + assert 2 == len(executor.event_scheduler.queue) + executor._check_worker_pods_pending_timeout() + finally: + executor.end() mock_kube_client.list_namespaced_pod.assert_called_once_with( namespace="mynamespace", @@ -815,7 +852,10 @@ def test_pending_pod_timeout_multi_namespace_mode( executor = KubernetesExecutor() executor.job_id = 123 executor.start() - executor._check_worker_pods_pending_timeout() + try: + executor._check_worker_pods_pending_timeout() + finally: + executor.end() mock_kube_client.list_pod_for_all_namespaces.assert_called_once_with( field_selector="status.phase=Pending", @@ -863,8 +903,10 @@ def list_namespaced_pod(namespace, *args, **kwargs): executor = KubernetesExecutor() executor.job_id = "123" executor.start() - executor._check_worker_pods_pending_timeout() - executor.end() + try: + executor._check_worker_pods_pending_timeout() + finally: + executor.end() assert mock_kube_client.list_namespaced_pod.call_count == 3 mock_kube_client.list_namespaced_pod.assert_has_calls(