-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Closed
Closed
Copy link
Labels
QuarantineIssues that are occasionally failing and are quarantinedIssues that are occasionally failing and are quarantinedarea:CIAirflow's tests and continious integrationAirflow's tests and continious integrationprovider:cncf-kubernetesKubernetes (k8s) provider related issuesKubernetes (k8s) provider related issues
Description
THE test_integration_run_dag_with_scheduler_failure is flaky and fails sometimes.
For example https://github.com/apache/airflow/actions/runs/12452312988/job/34761457306
kubernetes_tests/test_other_executors.py .F [100%]
=================================== FAILURES ===================================
__ TestCeleryAndLocalExecutor.test_integration_run_dag_with_scheduler_failure __
self = <kubernetes_tests.test_other_executors.TestCeleryAndLocalExecutor object at 0x7f7c8d269f70>
@pytest.mark.xfail(
EXECUTOR == "LocalExecutor",
reason="https://github.com/apache/airflow/issues/44481 needs to be implemented",
)
def test_integration_run_dag_with_scheduler_failure(self):
dag_id = "example_xcom"
dag_run_id, logical_date = self.start_job_in_kubernetes(dag_id, self.host)
self._delete_airflow_pod("scheduler")
time.sleep(10) # give time for pod to restart
# Wait some time for the operator to complete
> self.monitor_task(
host=self.host,
dag_run_id=dag_run_id,
dag_id=dag_id,
task_id="push",
expected_final_state="success",
timeout=40, # This should fail fast if failing
)
kubernetes_tests/test_other_executors.py:71:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <kubernetes_tests.test_other_executors.TestCeleryAndLocalExecutor object at 0x7f7c8d269f70>
host = 'localhost:41460'
dag_run_id = 'manual__2024-12-22T08:49:39.[1302](https://github.com/apache/airflow/actions/runs/12452312988/job/34761457306#step:10:1304)07+00:00', dag_id = 'example_xcom'
task_id = 'push', expected_final_state = 'success', timeout = 40
def monitor_task(self, host, dag_run_id, dag_id, task_id, expected_final_state, timeout):
tries = 0
state = ""
max_tries = max(int(timeout / 5), 1)
# Wait some time for the operator to complete
while tries < max_tries:
time.sleep(5)
# Check task state
try:
get_string = (
f"http://{host}/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}"
)
print(f"Calling [monitor_task]#1 {get_string}")
result = self.session.get(get_string)
if result.status_code == 404:
check_call(["echo", "api returned 404."])
tries += 1
continue
assert result.status_code == 200, "Could not get the status"
result_json = result.json()
print(f"Received [monitor_task]#2: {result_json}")
state = result_json["state"]
print(f"Attempt {tries}: Current state of operator is {state}")
if state == expected_final_state:
break
if state in {"failed", "upstream_failed", "removed"}:
# If the TI is in failed state (and that's not the state we want) there's no point
# continuing to poll, it won't change
break
self._describe_resources(namespace="airflow")
self._describe_resources(namespace="default")
tries += 1
except requests.exceptions.ConnectionError as e:
check_call(["echo", f"api call failed. trying again. error {e}"])
if state != expected_final_state:
print(f"The expected state is wrong {state} != {expected_final_state} (expected)!")
> assert state == expected_final_state
E AssertionError: assert equals failed
E None 'success'
kubernetes_tests/test_base.py:197: AssertionError
---------------------------- Captured stdout setup -----------------------------Metadata
Metadata
Assignees
Labels
QuarantineIssues that are occasionally failing and are quarantinedIssues that are occasionally failing and are quarantinedarea:CIAirflow's tests and continious integrationAirflow's tests and continious integrationprovider:cncf-kubernetesKubernetes (k8s) provider related issuesKubernetes (k8s) provider related issues
Type
Projects
Status
Done