Skip to content

Commit

Permalink
Make Kubernetes Executor & Scheduler resilient to error during PMH ex…
Browse files Browse the repository at this point in the history
…ecution (#27611)

Earlier there is no try-catch logic when pod_mutation_hook is called in the Kubernetes Executor. So if there is any error/exception during the execution of the pod_mutation_hook user created, the whole executor/scheduler will crash.

This change makes Kubernetes Executor & Scheduler more resilient to such errors during PMH execution
  • Loading branch information
XD-DENG authored Nov 11, 2022
1 parent cc4cde9 commit 16b1001
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 2 deletions.
4 changes: 4 additions & 0 deletions airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,10 @@ class TaskDeferralError(AirflowException):
"""Raised when a task failed during deferral for some reason."""


class PodMutationHookException(AirflowException):
"""Raised when exception happens during Pod Mutation Hook execution"""


class PodReconciliationError(AirflowException):
"""Raised when an error is encountered while trying to merge pod configs."""

Expand Down
15 changes: 13 additions & 2 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from kubernetes.client.rest import ApiException
from urllib3.exceptions import ReadTimeoutError

from airflow.exceptions import AirflowException, PodReconciliationError
from airflow.exceptions import AirflowException, PodMutationHookException, PodReconciliationError
from airflow.executors.base_executor import NOT_STARTED_MESSAGE, BaseExecutor, CommandType
from airflow.kubernetes import pod_generator
from airflow.kubernetes.kube_client import get_kube_client
Expand Down Expand Up @@ -256,7 +256,10 @@ def __init__(

def run_pod_async(self, pod: k8s.V1Pod, **kwargs):
"""Runs POD asynchronously"""
pod_mutation_hook(pod)
try:
pod_mutation_hook(pod)
except Exception as e:
raise PodMutationHookException(e)

sanitized_pod = self.kube_client.api_client.sanitize_for_serialization(pod)
json_pod = json.dumps(sanitized_pod, indent=2)
Expand Down Expand Up @@ -646,6 +649,14 @@ def sync(self) -> None:
json.loads(e.body)["message"],
)
self.task_queue.put(task)
except PodMutationHookException as e:
key, _, _, _ = task
self.log.error(
"Pod Mutation Hook failed for the task %s. Failing task. Details: %s",
key,
e,
)
self.fail(key, e)
finally:
self.task_queue.task_done()
except Empty:
Expand Down
36 changes: 36 additions & 0 deletions tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,42 @@ def test_run_next_exception_requeue(
assert kubernetes_executor.task_queue.empty()
assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED

@pytest.mark.skipif(
AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed"
)
@mock.patch("airflow.executors.kubernetes_executor.pod_mutation_hook")
@mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
def test_run_next_pmh_error(self, mock_get_kube_client, mock_pmh):
"""
Exception during Pod Mutation Hook execution should be handled gracefully.
"""
exception_in_pmh = Exception("Purposely generate error for test")
mock_pmh.side_effect = exception_in_pmh

mock_kube_client = mock.patch("kubernetes.client.CoreV1Api", autospec=True)
mock_kube_client.create_namespaced_pod = mock.MagicMock()
mock_get_kube_client.return_value = mock_kube_client

kubernetes_executor = self.kubernetes_executor
kubernetes_executor.start()
try_number = 1
task_instance_key = TaskInstanceKey("dag", "task", "run_id", try_number)
kubernetes_executor.execute_async(
key=task_instance_key,
queue=None,
command=["airflow", "tasks", "run", "true", "some_parameter"],
)
kubernetes_executor.sync()

# The pod_mutation_hook should have been called once.
assert mock_pmh.call_count == 1
# There should be no pod creation request sent
assert mock_kube_client.create_namespaced_pod.call_count == 0
# The task is not re-queued and there is the failed record in event_buffer
assert kubernetes_executor.task_queue.empty()
assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED
assert kubernetes_executor.event_buffer[task_instance_key][1].args[0] == exception_in_pmh

@pytest.mark.skipif(
AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed"
)
Expand Down

0 comments on commit 16b1001

Please sign in to comment.