Skip to content

Commit 62b3461

Browse files
committed
Optimize DAG list query for users with limited access
When users have limited DAG access, the DAG list query was inefficiently grouping all DagRuns in the database before filtering. This caused severe performance degradation in large deployments where a user might access only a few DAGs out of hundreds or thousands. The fix filters both the main DAG query and the DagRun subquery by accessible dag_ids before performing the expensive GROUP BY operation. Before (queries all dagruns): ```sql SELECT ... FROM dag LEFT OUTER JOIN ( SELECT dag_run.dag_id, max(dag_run.id) AS max_dag_run_id FROM dag_run GROUP BY dag_run.dag_id ) AS mrq ON dag.dag_id = mrq.dag_id ``` After (filters to accessible dags): ```sql SELECT ... FROM dag LEFT OUTER JOIN ( SELECT dag_run.dag_id, max(dag_run.id) AS max_dag_run_id FROM dag_run WHERE dag_run.dag_id IN ('accessible_dag_1', 'accessible_dag_2') GROUP BY dag_run.dag_id ) AS mrq ON dag.dag_id = mrq.dag_id WHERE dag.dag_id IN ('accessible_dag_1', 'accessible_dag_2') ``` Performance impact: In a deployment with 100 DAGs (100 runs each) where a user has access to only 2 DAGs, this reduces the subquery from grouping 10,000 rows down to 200 rows (50x improvement), and eliminates fetching 98 unnecessary DAG models. Fixes #57427
1 parent a6d3e2d commit 62b3461

File tree

4 files changed

+93
-6
lines changed

4 files changed

+93
-6
lines changed

airflow-core/src/airflow/api_fastapi/common/db/dags.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,30 @@
3333
from sqlalchemy.sql import Select
3434

3535

36-
def generate_dag_with_latest_run_query(max_run_filters: list[BaseParam], order_by: SortParam) -> Select:
36+
def generate_dag_with_latest_run_query(
37+
max_run_filters: list[BaseParam], order_by: SortParam, *, dag_ids: set[str] | None = None
38+
) -> Select:
39+
"""
40+
Generate a query to fetch DAGs with their latest run.
41+
42+
:param max_run_filters: List of filters to apply to the latest run
43+
:param order_by: Sort parameter for ordering results
44+
:param dag_ids: Optional set of DAG IDs to limit the query to. When provided, both the main
45+
DAG query and the subquery for finding the latest runs will be filtered to
46+
only these DAG IDs, improving performance when users have limited DAG access.
47+
:return: SQLAlchemy Select statement
48+
"""
3749
query = select(DagModel).options(selectinload(DagModel.tags))
3850

39-
max_run_id_query = ( # ordering by id will not always be "latest run", but it's a simplifying assumption
40-
select(DagRun.dag_id, func.max(DagRun.id).label("max_dag_run_id"))
41-
.group_by(DagRun.dag_id)
42-
.subquery(name="mrq")
43-
)
51+
# Filter main query by dag_ids if provided
52+
if dag_ids is not None:
53+
query = query.where(DagModel.dag_id.in_(dag_ids or set()))
54+
55+
# Also filter the subquery for finding latest runs
56+
max_run_id_query_stmt = select(DagRun.dag_id, func.max(DagRun.id).label("max_dag_run_id"))
57+
if dag_ids is not None:
58+
max_run_id_query_stmt = max_run_id_query_stmt.where(DagRun.dag_id.in_(dag_ids or set()))
59+
max_run_id_query = max_run_id_query_stmt.group_by(DagRun.dag_id).subquery(name="mrq")
4460

4561
has_max_run_filter = False
4662

airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ def get_dags(
137137
last_dag_run_state,
138138
],
139139
order_by=order_by,
140+
dag_ids=readable_dags_filter.value,
140141
)
141142

142143
dags_select, total_entries = paginated_select(

airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ def get_dags(
125125
last_dag_run_state,
126126
],
127127
order_by=order_by,
128+
dag_ids=readable_dags_filter.value,
128129
)
129130

130131
dags_select, total_entries = paginated_select(

airflow-core/tests/unit/api_fastapi/common/db/test_dags.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,3 +273,72 @@ def test_queued_runs_with_null_start_date_are_properly_joined(
273273
"This suggests the WHERE start_date IS NOT NULL condition is excluding it."
274274
)
275275
assert running_dagrun_state is not None, "Running DAG should have DagRun state joined"
276+
277+
@pytest.mark.usefixtures("testing_dag_bundle")
278+
def test_filters_by_dag_ids_when_provided(self, session):
279+
"""
280+
Verify that when dag_ids is provided, only those DAGs and their runs are queried.
281+
282+
This is a performance optimization: both the main DAG query and the DagRun subquery
283+
should only process accessible DAGs when the user has limited access.
284+
"""
285+
dag_ids = ["dag_accessible_1", "dag_accessible_2", "dag_inaccessible_3"]
286+
287+
for dag_id in dag_ids:
288+
dag_model = DagModel(
289+
dag_id=dag_id,
290+
bundle_name="testing",
291+
is_stale=False,
292+
is_paused=False,
293+
fileloc=f"/tmp/{dag_id}.py",
294+
)
295+
session.add(dag_model)
296+
session.flush()
297+
298+
# Create 2 runs for each DAG
299+
for run_idx in range(2):
300+
dagrun = DagRun(
301+
dag_id=dag_id,
302+
run_id=f"manual__{run_idx}",
303+
run_type="manual",
304+
logical_date=datetime(2024, 1, 1 + run_idx, tzinfo=timezone.utc),
305+
state=DagRunState.SUCCESS,
306+
start_date=datetime(2024, 1, 1 + run_idx, 1, tzinfo=timezone.utc),
307+
)
308+
session.add(dagrun)
309+
session.commit()
310+
311+
# User has access to only 2 DAGs
312+
accessible_dag_ids = {"dag_accessible_1", "dag_accessible_2"}
313+
314+
# Query with dag_ids filter
315+
query_filtered = generate_dag_with_latest_run_query(
316+
max_run_filters=[],
317+
order_by=SortParam(allowed_attrs=["last_run_state"], model=DagModel).set_value(
318+
["last_run_state"]
319+
),
320+
dag_ids=accessible_dag_ids,
321+
)
322+
323+
# Query without dag_ids filter
324+
query_unfiltered = generate_dag_with_latest_run_query(
325+
max_run_filters=[],
326+
order_by=SortParam(allowed_attrs=["last_run_state"], model=DagModel).set_value(
327+
["last_run_state"]
328+
),
329+
)
330+
331+
result_filtered = session.execute(query_filtered.add_columns(DagRun.state)).fetchall()
332+
result_unfiltered = session.execute(query_unfiltered.add_columns(DagRun.state)).fetchall()
333+
334+
# Filtered query should only return accessible DAGs
335+
filtered_dag_ids = {row[0].dag_id for row in result_filtered}
336+
assert filtered_dag_ids == accessible_dag_ids
337+
338+
# Unfiltered query returns all DAGs
339+
unfiltered_dag_ids = {row[0].dag_id for row in result_unfiltered}
340+
assert unfiltered_dag_ids == set(dag_ids)
341+
342+
# All accessible DAGs should have DagRun info
343+
filtered_dags_with_runs = {row[0].dag_id for row in result_filtered if row[1] is not None}
344+
assert filtered_dags_with_runs == accessible_dag_ids

0 commit comments

Comments
 (0)