Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions airflow/providers/cncf/kubernetes/operators/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ class KubernetesPodOperator(BaseOperator):
to populate the environment variables with. The contents of the target
ConfigMap's Data field will represent the key-value pairs as environment variables.
Extends env_from.
:param skip_exit_code: If task exits with this exit code, leave the task
:param skip_on_exit_code: If task exits with this exit code, leave the task
in ``skipped`` state (default: None). If set to ``None``, any non-zero
exit code will be treated as a failure.
:param base_container_name: The name of the base container in the pod. This container's logs
Expand Down Expand Up @@ -292,6 +292,7 @@ def __init__(
termination_grace_period: int | None = None,
configmaps: list[str] | None = None,
skip_exit_code: int | None = None,
skip_on_exit_code: int | None = None,
base_container_name: str | None = None,
deferrable: bool = False,
poll_interval: float = 2,
Expand Down Expand Up @@ -361,7 +362,13 @@ def __init__(
self.termination_grace_period = termination_grace_period
self.pod_request_obj: k8s.V1Pod | None = None
self.pod: k8s.V1Pod | None = None
self.skip_exit_code = skip_exit_code
if skip_exit_code is not None:
warnings.warn(
"skip_exit_code is deprecated. Please use skip_on_exit_code", DeprecationWarning, stacklevel=2
)
self.skip_on_exit_code: int | None = skip_exit_code
else:
self.skip_on_exit_code = skip_on_exit_code
self.base_container_name = base_container_name or self.BASE_CONTAINER_NAME
self.deferrable = deferrable
self.poll_interval = poll_interval
Expand Down Expand Up @@ -675,7 +682,7 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):

error_message = get_container_termination_message(remote_pod, self.base_container_name)
error_message = "\n" + error_message if error_message else ""
if self.skip_exit_code is not None:
if self.skip_on_exit_code is not None:
container_statuses = (
remote_pod.status.container_statuses if remote_pod and remote_pod.status else None
) or []
Expand All @@ -689,9 +696,10 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
and base_container_status.last_state.terminated
else None
)
if exit_code == self.skip_exit_code:
if exit_code == self.skip_on_exit_code:
raise AirflowSkipException(
f"Pod {pod and pod.metadata.name} returned exit code {self.skip_exit_code}. Skipping."
f"Pod {pod and pod.metadata.name} returned exit code "
f"{self.skip_on_exit_code}. Skipping."
)
raise AirflowException(
f"Pod {pod and pod.metadata.name} returned a failure:\n{error_message}\n"
Expand Down
15 changes: 11 additions & 4 deletions airflow/providers/docker/operators/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class DockerOperator(BaseOperator):
If rolling the logs creates excess files, the oldest file is removed.
Only effective when max-size is also set. A positive integer. Defaults to 1.
:param ipc_mode: Set the IPC mode for the container.
:param skip_exit_code: If task exits with this exit code, leave the task
:param skip_on_exit_code: If task exits with this exit code, leave the task
in ``skipped`` state (default: None). If set to ``None``, any non-zero
exit code will be treated as a failure.
"""
Expand Down Expand Up @@ -215,6 +215,7 @@ def __init__(
log_opts_max_file: str | None = None,
ipc_mode: str | None = None,
skip_exit_code: int | None = None,
skip_on_exit_code: int | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand Down Expand Up @@ -276,7 +277,13 @@ def __init__(
self.log_opts_max_size = log_opts_max_size
self.log_opts_max_file = log_opts_max_file
self.ipc_mode = ipc_mode
self.skip_exit_code = skip_exit_code
if skip_exit_code is not None:
warnings.warn(
"skip_exit_code is deprecated. Please use skip_on_exit_code", DeprecationWarning, stacklevel=2
)
self.skip_on_exit_code: int | None = skip_exit_code
else:
self.skip_on_exit_code = skip_on_exit_code

@cached_property
def hook(self) -> DockerHook:
Expand Down Expand Up @@ -377,9 +384,9 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> list[
self.log.info("%s", log_chunk)

result = self.cli.wait(self.container["Id"])
if result["StatusCode"] == self.skip_exit_code:
if result["StatusCode"] == self.skip_on_exit_code:
raise AirflowSkipException(
f"Docker container returned exit code {self.skip_exit_code}. Skipping."
f"Docker container returned exit code {self.skip_on_exit_code}. Skipping."
)
elif result["StatusCode"] != 0:
joined_log_lines = "\n".join(log_lines)
Expand Down
18 changes: 9 additions & 9 deletions tests/providers/cncf/kubernetes/operators/test_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -1089,16 +1089,16 @@ def test_task_id_as_name_dag_id_is_ignored(self):
"extra_kwargs, actual_exit_code, expected_exc",
[
(None, 99, AirflowException),
({"skip_exit_code": 100}, 100, AirflowSkipException),
({"skip_exit_code": 100}, 101, AirflowException),
({"skip_exit_code": None}, 100, AirflowException),
({"skip_on_exit_code": 100}, 100, AirflowSkipException),
({"skip_on_exit_code": 100}, 101, AirflowException),
({"skip_on_exit_code": None}, 100, AirflowException),
],
)
@patch(f"{POD_MANAGER_CLASS}.await_pod_completion")
def test_task_skip_when_pod_exit_with_certain_code(
self, remote_pod, extra_kwargs, actual_exit_code, expected_exc
):
"""Tests that an AirflowSkipException is raised when the container exits with the skip_exit_code"""
"""Tests that an AirflowSkipException is raised when the container exits with the skip_on_exit_code"""
k = KubernetesPodOperator(
task_id="task", is_delete_operator_pod=True, **(extra_kwargs if extra_kwargs else {})
)
Expand Down Expand Up @@ -1284,13 +1284,13 @@ def test_async_create_pod_should_throw_exception(self, mocked_hook, mocked_clean
[
(None, 0, None, "Succeeded", "success"),
(None, 99, AirflowException, "Failed", "error"),
({"skip_exit_code": 100}, 100, AirflowSkipException, "Failed", "error"),
({"skip_exit_code": 100}, 101, AirflowException, "Failed", "error"),
({"skip_exit_code": None}, 100, AirflowException, "Failed", "error"),
({"skip_on_exit_code": 100}, 100, AirflowSkipException, "Failed", "error"),
({"skip_on_exit_code": 100}, 101, AirflowException, "Failed", "error"),
({"skip_on_exit_code": None}, 100, AirflowException, "Failed", "error"),
],
)
@patch(HOOK_CLASS)
def test_async_create_pod_with_skip_exit_code_should_skip(
def test_async_create_pod_with_skip_on_exit_code_should_skip(
self,
mocked_hook,
extra_kwargs,
Expand All @@ -1299,7 +1299,7 @@ def test_async_create_pod_with_skip_exit_code_should_skip(
pod_status,
event_status,
):
"""Tests that an AirflowSkipException is raised when the container exits with the skip_exit_code"""
"""Tests that an AirflowSkipException is raised when the container exits with the skip_on_exit_code"""

k = KubernetesPodOperator(
task_id=TEST_TASK_ID,
Expand Down
6 changes: 3 additions & 3 deletions tests/providers/docker/decorators/test_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ def do_run():
"extra_kwargs, actual_exit_code, expected_state",
[
(None, 99, TaskInstanceState.FAILED),
({"skip_exit_code": 100}, 100, TaskInstanceState.SKIPPED),
({"skip_exit_code": 100}, 101, TaskInstanceState.FAILED),
({"skip_exit_code": None}, 0, TaskInstanceState.SUCCESS),
({"skip_on_exit_code": 100}, 100, TaskInstanceState.SKIPPED),
({"skip_on_exit_code": 100}, 101, TaskInstanceState.FAILED),
({"skip_on_exit_code": None}, 0, TaskInstanceState.SUCCESS),
],
)
def test_skip_docker_operator(self, extra_kwargs, actual_exit_code, expected_state, dag_maker):
Expand Down
6 changes: 3 additions & 3 deletions tests/providers/docker/operators/test_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,9 +516,9 @@ def test_execute_unicode_logs(self):
"extra_kwargs, actual_exit_code, expected_exc",
[
(None, 99, AirflowException),
({"skip_exit_code": 100}, 100, AirflowSkipException),
({"skip_exit_code": 100}, 101, AirflowException),
({"skip_exit_code": None}, 100, AirflowException),
({"skip_on_exit_code": 100}, 100, AirflowSkipException),
({"skip_on_exit_code": 100}, 101, AirflowException),
({"skip_on_exit_code": None}, 100, AirflowException),
],
)
def test_skip(self, extra_kwargs, actual_exit_code, expected_exc):
Expand Down