Skip to content

Commit ff7dc38

Browse files
committed
Refactor ensure_resource_health to support statefulset
- Note that LocalExecutor use statefulset for scheduler instead of deployment
1 parent 76ff648 commit ff7dc38

File tree

3 files changed

+28
-8
lines changed

3 files changed

+28
-8
lines changed

kubernetes_tests/test_base.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from datetime import datetime, timezone
2424
from pathlib import Path
2525
from subprocess import check_call, check_output
26+
from typing import Literal
2627

2728
import pytest
2829
import re2
@@ -62,7 +63,7 @@ def base_tests_setup(self, request):
6263
self.session = self._get_session_with_retries()
6364

6465
# Ensure the api-server deployment is healthy at kubernetes level before calling the any API
65-
self.ensure_deployment_health("airflow-webserver")
66+
self.ensure_resource_health("airflow-webserver")
6667
try:
6768
self._ensure_airflow_webserver_is_healthy()
6869
yield
@@ -195,12 +196,25 @@ def monitor_task(self, host, dag_run_id, dag_id, task_id, expected_final_state,
195196
assert state == expected_final_state
196197

197198
@staticmethod
198-
def ensure_deployment_health(deployment_name: str, namespace: str = "airflow"):
199-
"""Watch the deployment until it is healthy."""
200-
deployment_rollout_status = check_output(
201-
["kubectl", "rollout", "status", "deployment", deployment_name, "-n", namespace, "--watch"]
199+
def ensure_resource_health(
200+
resource_name: str,
201+
namespace: str = "airflow",
202+
resource_type: Literal["deployment", "statefulset"] = "deployment",
203+
):
204+
"""Watch the resource until it is healthy.
205+
206+
Args:
207+
resource_name (str): Name of the resource to check.
208+
resource_type (str): Type of the resource (e.g., deployment, statefulset).
209+
namespace (str): Kubernetes namespace where the resource is located.
210+
"""
211+
rollout_status = check_output(
212+
["kubectl", "rollout", "status", f"{resource_type}/{resource_name}", "-n", namespace, "--watch"],
202213
).decode()
203-
assert "successfully rolled out" in deployment_rollout_status
214+
if resource_type == "deployment":
215+
assert "successfully rolled out" in rollout_status
216+
else:
217+
assert "roll out complete" in rollout_status
204218

205219
def ensure_dag_expected_state(self, host, execution_date, dag_id, expected_final_state, timeout):
206220
tries = 0

kubernetes_tests/test_kubernetes_executor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def test_integration_run_dag_with_scheduler_failure(self):
5555

5656
self._delete_airflow_pod("scheduler")
5757

58-
self.ensure_deployment_health("airflow-scheduler")
58+
self.ensure_resource_health("airflow-scheduler")
5959

6060
# Wait some time for the operator to complete
6161
self.monitor_task(

kubernetes_tests/test_other_executors.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,13 @@ def test_integration_run_dag_with_scheduler_failure(self):
5656

5757
self._delete_airflow_pod("scheduler")
5858

59-
self.ensure_deployment_health("airflow-scheduler")
59+
if EXECUTOR == "CeleryExecutor":
60+
scheduler_resource_type = "deployment"
61+
elif EXECUTOR == "LocalExecutor":
62+
scheduler_resource_type = "statefulset"
63+
else:
64+
raise ValueError(f"Unknown executor {EXECUTOR}")
65+
self.ensure_resource_health("airflow-scheduler", resource_type=scheduler_resource_type)
6066

6167
# Wait some time for the operator to complete
6268
self.monitor_task(

0 commit comments

Comments
 (0)