Skip to content

K8S tests are flaky with test_integration_run_dag_with_scheduler_failurE tests #45145

@potiuk

Description

@potiuk

THE test_integration_run_dag_with_scheduler_failure is flaky and fails sometimes.

For example https://github.com/apache/airflow/actions/runs/12452312988/job/34761457306

kubernetes_tests/test_other_executors.py .F                              [100%]
  
  =================================== FAILURES ===================================
  __ TestCeleryAndLocalExecutor.test_integration_run_dag_with_scheduler_failure __
  
  self = <kubernetes_tests.test_other_executors.TestCeleryAndLocalExecutor object at 0x7f7c8d269f70>
  
      @pytest.mark.xfail(
          EXECUTOR == "LocalExecutor",
          reason="https://github.com/apache/airflow/issues/44481 needs to be implemented",
      )
      def test_integration_run_dag_with_scheduler_failure(self):
          dag_id = "example_xcom"
      
          dag_run_id, logical_date = self.start_job_in_kubernetes(dag_id, self.host)
      
          self._delete_airflow_pod("scheduler")
      
          time.sleep(10)  # give time for pod to restart
      
          # Wait some time for the operator to complete
  >       self.monitor_task(
              host=self.host,
              dag_run_id=dag_run_id,
              dag_id=dag_id,
              task_id="push",
              expected_final_state="success",
              timeout=40,  # This should fail fast if failing
          )
  
  kubernetes_tests/test_other_executors.py:71: 
  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
  
  self = <kubernetes_tests.test_other_executors.TestCeleryAndLocalExecutor object at 0x7f7c8d269f70>
  host = 'localhost:41460'
  dag_run_id = 'manual__2024-12-22T08:49:39.[1302](https://github.com/apache/airflow/actions/runs/12452312988/job/34761457306#step:10:1304)07+00:00', dag_id = 'example_xcom'
  task_id = 'push', expected_final_state = 'success', timeout = 40
  
      def monitor_task(self, host, dag_run_id, dag_id, task_id, expected_final_state, timeout):
          tries = 0
          state = ""
          max_tries = max(int(timeout / 5), 1)
          # Wait some time for the operator to complete
          while tries < max_tries:
              time.sleep(5)
              # Check task state
              try:
                  get_string = (
                      f"http://{host}/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}"
                  )
                  print(f"Calling [monitor_task]#1 {get_string}")
                  result = self.session.get(get_string)
                  if result.status_code == 404:
                      check_call(["echo", "api returned 404."])
                      tries += 1
                      continue
                  assert result.status_code == 200, "Could not get the status"
                  result_json = result.json()
                  print(f"Received [monitor_task]#2: {result_json}")
                  state = result_json["state"]
                  print(f"Attempt {tries}: Current state of operator is {state}")
      
                  if state == expected_final_state:
                      break
                  if state in {"failed", "upstream_failed", "removed"}:
                      # If the TI is in failed state (and that's not the state we want) there's no point
                      # continuing to poll, it won't change
                      break
                  self._describe_resources(namespace="airflow")
                  self._describe_resources(namespace="default")
                  tries += 1
              except requests.exceptions.ConnectionError as e:
                  check_call(["echo", f"api call failed. trying again. error {e}"])
          if state != expected_final_state:
              print(f"The expected state is wrong {state} != {expected_final_state} (expected)!")
  >       assert state == expected_final_state
  E       AssertionError: assert equals failed
  E         None       'success'
  
  kubernetes_tests/test_base.py:197: AssertionError
  ---------------------------- Captured stdout setup -----------------------------

Metadata

Metadata

Assignees

Labels

QuarantineIssues that are occasionally failing and are quarantinedarea:CIAirflow's tests and continious integrationprovider:cncf-kubernetesKubernetes (k8s) provider related issues

Type

No type

Projects

Status

Done

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions