From 4bc1257df4bf1f7391ad8bca3b10d294b2d92e7a Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Tue, 12 Nov 2024 01:08:55 +0100 Subject: [PATCH] Fix duplication of Task tries in the UI (#43891) It was observed that there are moments where the TI tries endpoint returns duplicate TaskInstance. I have observed this to happen when the TI is in up_for_retry state. When the TI is in up_for_retry state, we have already recorded the previous try in TI history and the TI try_number has not incremented at this time, so we must exclude this recorded TI from the taskinstance tries endpoint. We know the TI because its state is in up_for_retry, so we filter TIs with up_for_retry state when querying for the task instance tries. Closes: #41765 --- .../endpoints/task_instance_endpoint.py | 7 ++++++- .../endpoints/test_task_instance_endpoint.py | 17 +++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py index f23f37213c3d0..9f94619100828 100644 --- a/airflow/api_connexion/endpoints/task_instance_endpoint.py +++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py @@ -810,7 +810,12 @@ def _query(orm_object): ) return query - task_instances = session.scalars(_query(TIH)).all() + session.scalars(_query(TI)).all() + # Exclude TaskInstance with state UP_FOR_RETRY since they have been recorded in TaskInstanceHistory + tis = session.scalars( + _query(TI).where(or_(TI.state != TaskInstanceState.UP_FOR_RETRY, TI.state.is_(None))) + ).all() + + task_instances = session.scalars(_query(TIH)).all() + tis return task_instance_history_collection_schema.dump( TaskInstanceHistoryCollection(task_instances=task_instances, total_entries=len(task_instances)) ) diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py index e1fa6d13b7488..1eeefd698e287 100644 --- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py @@ -2874,6 +2874,23 @@ def test_should_respond_200(self, session): assert response.json["total_entries"] == 2 # The task instance and its history assert len(response.json["task_instances"]) == 2 + def test_ti_in_retry_state_not_returned(self, session): + self.create_task_instances( + session=session, task_instances=[{"state": State.SUCCESS}], with_ti_history=True + ) + ti = session.query(TaskInstance).one() + ti.state = State.UP_FOR_RETRY + session.merge(ti) + session.commit() + + response = self.client.get( + "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 200 + assert response.json["total_entries"] == 1 + assert len(response.json["task_instances"]) == 1 + def test_mapped_task_should_respond_200(self, session): tis = self.create_task_instances(session, task_instances=[{"state": State.FAILED}]) old_ti = tis[0]