Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@
from fastapi import Depends, HTTPException, Query, Response, status
from fastapi.exceptions import RequestValidationError
from pydantic import ValidationError
from sqlalchemy import select, update
from sqlalchemy import func, null, select, update

from airflow.api.common import delete_dag as delete_dag_module
from airflow.api_fastapi.common.dagbag import DagBagDep
from airflow.api_fastapi.common.db.common import (
SessionDep,
apply_filters_to_select,
paginated_select,
)
from airflow.api_fastapi.common.db.dags import generate_dag_with_latest_run_query
Expand Down Expand Up @@ -115,30 +116,54 @@ def get_dags(
session: SessionDep,
) -> DAGCollectionResponse:
"""Get all DAGs."""
dag_runs_select = None
query = select(DagModel)

if dag_run_state.value or dag_run_start_date_range.is_active() or dag_run_end_date_range.is_active():
dag_runs_select, _ = paginated_select(
statement=select(DagRun),
max_run_id_query = ( # ordering by id will not always be "latest run", but it's a simplifying assumption
select(DagRun.dag_id, func.max(DagRun.id).label("max_dag_run_id"))
.where(DagRun.start_date.is_not(null()))
.group_by(DagRun.dag_id)
.subquery(name="mrq")
)

has_max_run_filter = (
dag_run_state.value
or last_dag_run_state.value
or dag_run_start_date_range.is_active()
or dag_run_end_date_range.is_active()
)

if has_max_run_filter or order_by.value in (
"last_run_state",
"last_run_start_date",
"-last_run_state",
"-last_run_start_date",
):
query = query.join(
max_run_id_query,
DagModel.dag_id == max_run_id_query.c.dag_id,
isouter=True,
).join(DagRun, DagRun.id == max_run_id_query.c.max_dag_run_id, isouter=True)

if has_max_run_filter:
query = apply_filters_to_select(
statement=query,
filters=[
dag_run_start_date_range,
dag_run_end_date_range,
dag_run_state,
last_dag_run_state,
],
session=session,
)
dag_runs_select = dag_runs_select.cte()

dags_select, total_entries = paginated_select(
statement=generate_dag_with_latest_run_query(dag_runs_select),
statement=query,
filters=[
exclude_stale,
paused,
dag_id_pattern,
dag_display_name_pattern,
tags,
owners,
last_dag_run_state,
readable_dags_filter,
],
order_by=order_by,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class TestGetDags(TestDagEndpoint):
({"last_dag_run_state": "success", "exclude_stale": False}, 1, [DAG3_ID]),
({"last_dag_run_state": "failed", "exclude_stale": False}, 1, [DAG1_ID]),
({"dag_run_state": "failed"}, 1, [DAG1_ID]),
({"dag_run_state": "failed", "exclude_stale": False}, 2, [DAG1_ID, DAG3_ID]),
({"dag_run_state": "failed", "exclude_stale": False}, 1, [DAG1_ID]),
(
{"dag_run_start_date_gte": DAG3_START_DATE_2.isoformat(), "exclude_stale": False},
1,
Expand Down Expand Up @@ -210,10 +210,10 @@ class TestGetDags(TestDagEndpoint):
"dag_run_state": "failed",
"exclude_stale": False,
},
1,
[DAG3_ID],
0,
[],
),
# # Sort
# Sort
({"order_by": "-dag_id"}, 2, [DAG2_ID, DAG1_ID]),
({"order_by": "-dag_display_name"}, 2, [DAG2_ID, DAG1_ID]),
({"order_by": "dag_display_name"}, 2, [DAG1_ID, DAG2_ID]),
Expand Down