Skip to content

Use row_number to get the latest dag run by start date in UI get dags endpoint #51217

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,22 @@ def get_dags(
"""Get all DAGs."""
query = select(DagModel)

max_run_id_query = ( # ordering by id will not always be "latest run", but it's a simplifying assumption
select(DagRun.dag_id, func.max(DagRun.id).label("max_dag_run_id"))
max_id_inner_query = (
select(
DagRun.id.label("max_run_id"),
DagRun.dag_id,
func.row_number().over(partition_by=DagRun.dag_id, order_by=DagRun.start_date.desc()).label("rn"),
)
.where(DagRun.start_date.is_not(null()))
.group_by(DagRun.dag_id)
.subquery(name="mrq")
.subquery("mrq_inner")
)
max_id_query = (
select(
max_id_inner_query.c.dag_id,
max_id_inner_query.c.max_run_id,
)
.where(max_id_inner_query.c.rn == 1)
.subquery("mrq")
)

has_max_run_filter = (
Expand All @@ -139,10 +150,10 @@ def get_dags(
"-last_run_start_date",
):
query = query.join(
max_run_id_query,
DagModel.dag_id == max_run_id_query.c.dag_id,
max_id_query,
DagModel.dag_id == max_id_query.c.dag_id,
isouter=True,
).join(DagRun, DagRun.id == max_run_id_query.c.max_dag_run_id, isouter=True)
).join(DagRun, DagRun.id == max_id_query.c.max_run_id, isouter=True)

if has_max_run_filter:
query = apply_filters_to_select(
Expand Down
1 change: 1 addition & 0 deletions airflow-core/src/airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from airflow.api_fastapi.common.types import UIAlert

log = logging.getLogger(__name__)
logging.getLogger("sqlalchemy").setLevel(logging.INFO)

try:
if (tz := conf.get_mandatory_value("core", "default_timezone")) != "system":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,23 @@ def _create_deactivated_paused_dag(self, session=None):
session.add(dagrun_failed)
session.add(dagrun_success)

def _create_noise_dag(self, dag_maker, dr_start_date, session):
"""
This dag has 3 runs all with same start date that coincides with that from another dag

It exists to identify regression from https://github.com/apache/airflow/pull/50984.
"""

with dag_maker(dag_id="test_noise_dag", schedule=None):
EmptyOperator(task_id=TASK_ID)

for i in range(3):
dag_maker.create_dagrun(
run_id=f"abc_{i}",
start_date=dr_start_date,
logical_date=DAG1_START_DATE + timedelta(seconds=i),
)

def _create_dag_tags(self, session=None):
session.add(DagTag(dag_id=DAG1_ID, name="tag_2"))
session.add(DagTag(dag_id=DAG2_ID, name="tag_1"))
Expand All @@ -123,7 +140,7 @@ def setup(self, dag_maker, session=None) -> None:
):
EmptyOperator(task_id=TASK_ID)

dag_maker.create_dagrun(state=DagRunState.FAILED)
dag_1_dr = dag_maker.create_dagrun(state=DagRunState.FAILED, logical_date=DAG1_START_DATE)

with dag_maker(
DAG2_ID,
Expand All @@ -141,6 +158,7 @@ def setup(self, dag_maker, session=None) -> None:
):
EmptyOperator(task_id=TASK_ID)

self._create_noise_dag(dag_maker=dag_maker, dr_start_date=dag_1_dr.start_date, session=session)
self._create_deactivated_paused_dag(session)
self._create_dag_tags(session)

Expand All @@ -160,13 +178,13 @@ class TestGetDags(TestDagEndpoint):
"query_params, expected_total_entries, expected_ids",
[
# Filters
({}, 2, [DAG1_ID, DAG2_ID]),
({"limit": 1}, 2, [DAG1_ID]),
({"offset": 1}, 2, [DAG2_ID]),
# ({}, 3, ["test_noise_dag", DAG1_ID, DAG2_ID]),
# ({"limit": 1}, 3, [DAG1_ID]),
# ({"offset": 1}, 3, [DAG2_ID, "test_noise_dag"]),
({"tags": ["example"]}, 1, [DAG1_ID]),
({"exclude_stale": False}, 3, [DAG1_ID, DAG2_ID, DAG3_ID]),
({"exclude_stale": False}, 4, [DAG1_ID, DAG2_ID, DAG3_ID]),
({"paused": True, "exclude_stale": False}, 1, [DAG3_ID]),
({"paused": False}, 2, [DAG1_ID, DAG2_ID]),
({"paused": False}, 3, [DAG1_ID, DAG2_ID]),
({"owners": ["airflow"]}, 2, [DAG1_ID, DAG2_ID]),
({"owners": ["test_owner"], "exclude_stale": False}, 1, [DAG3_ID]),
({"last_dag_run_state": "success", "exclude_stale": False}, 1, [DAG3_ID]),
Expand Down Expand Up @@ -249,9 +267,10 @@ def test_get_dags(self, test_client, query_params, expected_total_entries, expec
response = test_client.get("/dags", params=query_params)
assert response.status_code == 200
body = response.json()

assert body["total_entries"] == expected_total_entries
assert [dag["dag_id"] for dag in body["dags"]] == expected_ids
total_entries = body["total_entries"]
actual_ids = [dag["dag_id"] for dag in body["dags"]]
assert total_entries == expected_total_entries
assert actual_ids == expected_ids

@mock.patch("airflow.api_fastapi.auth.managers.base_auth_manager.BaseAuthManager.get_authorized_dag_ids")
def test_get_dags_should_call_authorized_dag_ids(self, mock_get_authorized_dag_ids, test_client):
Expand Down
Loading