Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@
option_verbose,
)
from airflow_breeze.commands.production_image_commands import run_build_production_image
from airflow_breeze.global_constants import ALLOWED_EXECUTORS, ALLOWED_KUBERNETES_VERSIONS
from airflow_breeze.global_constants import (
ALLOWED_EXECUTORS,
ALLOWED_KUBERNETES_VERSIONS,
CELERY_EXECUTOR,
KUBERNETES_EXECUTOR,
)
from airflow_breeze.params.build_prod_params import BuildProdParams
from airflow_breeze.utils.ci_group import ci_group
from airflow_breeze.utils.click_utils import BreezeGroup
Expand Down Expand Up @@ -782,6 +787,12 @@ def upload_k8s_image(
if return_code == 0:
get_console().print("\n[warning]NEXT STEP:[/][info] You might now deploy airflow by:\n")
get_console().print("\nbreeze k8s deploy-airflow\n")
get_console().print(
"\n[warning]Note:[/]\nIf you want to run tests with [info]--executor KubernetesExecutor[/], you should deploy airflow with [info]--multi-namespace-mode --executor KubernetesExecutor[/] flag.\n"
)
get_console().print(
"\nbreeze k8s deploy-airflow --multi-namespace-mode --executor KubernetesExecutor\n"
)
sys.exit(return_code)


Expand Down Expand Up @@ -1406,6 +1417,30 @@ def _get_parallel_test_args(
return combo_titles, combos, pytest_args, short_combo_titles


def _is_deployed_with_same_executor(python: str, kubernetes_version: str, executor: str) -> bool:
"""Check if the current cluster is deployed with the same executor that the current tests are using.
This is especially useful when running tests with executors like KubernetesExecutor, CeleryExecutor, etc.
It verifies by checking the label of the airflow-scheduler deployment.
"""
result = run_command_with_k8s_env(
[
"kubectl",
"get",
"deployment",
"-n",
"airflow",
"airflow-scheduler",
"-o",
"jsonpath='{.metadata.labels.executor}'",
],
python=python,
kubernetes_version=kubernetes_version,
capture_output=True,
check=False,
)
return executor == result.stdout.decode().strip().replace("'", "")


def _run_tests(
python: str,
kubernetes_version: str,
Expand All @@ -1422,6 +1457,20 @@ def _run_tests(
extra_shell_args.append("--no-rcs")
elif shell_binary.endswith("bash"):
extra_shell_args.extend(["--norc", "--noprofile"])

# Check if the current cluster is deployed with the same executor
if (
executor == KUBERNETES_EXECUTOR or executor == CELERY_EXECUTOR
) and not _is_deployed_with_same_executor(python, kubernetes_version, executor):
get_console(output=output).print(
f"[warning]{executor} not deployed. Please deploy airflow with {executor} first."
)
get_console(output=output).print(
f"[info]You can deploy airflow with {executor} by running:[/]\nbreeze k8s configure-cluster\nbreeze k8s deploy-airflow --multi-namespace-mode --executor {executor}"
)
return 1, f"Tests {kubectl_cluster_name}"

# run tests
the_tests: list[str] = ["kubernetes_tests/"]
command_to_run = " ".join([quote(arg) for arg in ["uv", "run", "pytest", *the_tests, *test_args]])
get_console(output).print(f"[info] Command to run:[/] {command_to_run}")
Expand Down
25 changes: 25 additions & 0 deletions kubernetes_tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from datetime import datetime, timezone
from pathlib import Path
from subprocess import check_call, check_output
from typing import Literal

import pytest
import re2
Expand Down Expand Up @@ -60,6 +61,9 @@ def base_tests_setup(self, request):
# Replacement for unittests.TestCase.id()
self.test_id = f"{request.node.cls.__name__}_{request.node.name}"
self.session = self._get_session_with_retries()

# Ensure the api-server deployment is healthy at kubernetes level before calling the any API
self.ensure_resource_health("airflow-webserver")
try:
self._ensure_airflow_webserver_is_healthy()
yield
Expand Down Expand Up @@ -191,6 +195,27 @@ def monitor_task(self, host, dag_run_id, dag_id, task_id, expected_final_state,
print(f"The expected state is wrong {state} != {expected_final_state} (expected)!")
assert state == expected_final_state

@staticmethod
def ensure_resource_health(
resource_name: str,
namespace: str = "airflow",
resource_type: Literal["deployment", "statefulset"] = "deployment",
):
"""Watch the resource until it is healthy.

Args:
resource_name (str): Name of the resource to check.
resource_type (str): Type of the resource (e.g., deployment, statefulset).
namespace (str): Kubernetes namespace where the resource is located.
"""
rollout_status = check_output(
["kubectl", "rollout", "status", f"{resource_type}/{resource_name}", "-n", namespace, "--watch"],
).decode()
if resource_type == "deployment":
assert "successfully rolled out" in rollout_status
else:
assert "roll out complete" in rollout_status

def ensure_dag_expected_state(self, host, execution_date, dag_id, expected_final_state, timeout):
tries = 0
state = ""
Expand Down
4 changes: 1 addition & 3 deletions kubernetes_tests/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
# under the License.
from __future__ import annotations

import time

import pytest

from kubernetes_tests.test_base import EXECUTOR, BaseK8STest # isort:skip (needed to workaround isort bug)
Expand Down Expand Up @@ -57,7 +55,7 @@ def test_integration_run_dag_with_scheduler_failure(self):

self._delete_airflow_pod("scheduler")

time.sleep(10) # give time for pod to restart
self.ensure_resource_health("airflow-scheduler")

# Wait some time for the operator to complete
self.monitor_task(
Expand Down
10 changes: 7 additions & 3 deletions kubernetes_tests/test_other_executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
# under the License.
from __future__ import annotations

import time

import pytest

from kubernetes_tests.test_base import EXECUTOR, BaseK8STest # isort:skip (needed to workaround isort bug)
Expand Down Expand Up @@ -58,7 +56,13 @@ def test_integration_run_dag_with_scheduler_failure(self):

self._delete_airflow_pod("scheduler")

time.sleep(10) # give time for pod to restart
if EXECUTOR == "CeleryExecutor":
scheduler_resource_type = "deployment"
elif EXECUTOR == "LocalExecutor":
scheduler_resource_type = "statefulset"
else:
raise ValueError(f"Unknown executor {EXECUTOR}")
self.ensure_resource_health("airflow-scheduler", resource_type=scheduler_resource_type)

# Wait some time for the operator to complete
self.monitor_task(
Expand Down