Skip to content

Commit

Permalink
AIP-84 | add UI batch recent dag runs
Browse files Browse the repository at this point in the history
  • Loading branch information
jason810496 committed Oct 20, 2024
1 parent 0145642 commit 0b552c7
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 48 deletions.
14 changes: 7 additions & 7 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1848,9 +1848,6 @@ components:
description: Historical Metric Data serializer for responses.
RecentDAGRun:
properties:
dag_id:
type: string
title: Dag Id
start_date:
anyOf:
- type: string
Expand Down Expand Up @@ -1885,7 +1882,6 @@ components:
title: Data Interval End
type: object
required:
- dag_id
- start_date
- end_date
- state
Expand All @@ -1901,13 +1897,17 @@ components:
$ref: '#/components/schemas/RecentDAGRunsResponse'
type: array
title: Recent Dag Runs
total_entries:
total_dag_ids:
type: integer
title: Total Entries
title: Total Dag Ids
total_dag_runs:
type: integer
title: Total Dag Runs
type: object
required:
- recent_dag_runs
- total_entries
- total_dag_ids
- total_dag_runs
title: RecentDAGRunsCollectionResponse
description: Recent DAG Runs collection response serializer.
RecentDAGRunsResponse:
Expand Down
76 changes: 51 additions & 25 deletions airflow/api_fastapi/core_api/routes/ui/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,66 +59,92 @@ async def recent_dag_runs(
only_active: QueryOnlyActiveFilter,
paused: QueryPausedFilter,
last_dag_run_state: QueryLastDagRunStateFilter,
dag_runs_limit: QueryLimit,
session: Annotated[Session, Depends(get_session)],
) -> RecentDAGRunsCollectionResponse:
"""Get recent DAG runs."""
recent_runs_subquery = (
select(
DagRun.dag_id,
func.max(DagRun.execution_date).label("max_execution_date"),
DagRun.execution_date,
func.rank()
.over(
partition_by=DagRun.dag_id,
order_by=DagRun.execution_date.desc(),
)
.label("rank"),
)
.group_by(DagRun.dag_id)
.subquery("last_runs")
.order_by(DagRun.execution_date.desc())
.subquery()
)
dags_select_with_recent_dag_run = (
dags_with_recent_dag_runs_select = (
select(
DagModel.dag_id,
# should select DagModel fields that `get_dags` in public endpoint will filter at
DagModel.dag_display_name,
DagModel.next_dagrun,
recent_runs_subquery.c.execution_date,
DagRun.start_date,
DagRun.end_date,
DagRun.state,
DagRun.execution_date,
DagRun.data_interval_start,
DagRun.data_interval_end,
)
.join(DagModel, DagModel.dag_id == recent_runs_subquery.c.dag_id)
# `last_run_state` and `last_run_start_date` query params of`get_dags` endpoint
# needs `DagRun` to order_by
.join(
DagRun,
DagModel.dag_id == DagRun.dag_id,
)
.join(
recent_runs_subquery,
and_(
recent_runs_subquery.c.dag_id == DagModel.dag_id,
recent_runs_subquery.c.max_execution_date == DagRun.execution_date,
DagRun.dag_id == DagModel.dag_id,
DagRun.execution_date == recent_runs_subquery.c.execution_date,
),
)
.where(recent_runs_subquery.c.rank <= dag_runs_limit.value)
.group_by(
DagModel.dag_id,
recent_runs_subquery.c.execution_date,
DagRun.start_date,
DagRun.end_date,
DagRun.state,
DagRun.data_interval_start,
DagRun.data_interval_end,
)
)
recent_dags_select, total_entries = paginated_select(
dags_select_with_recent_dag_run,
dags_with_recent_dag_runs_select_filter, total_entries = paginated_select(
dags_with_recent_dag_runs_select,
[only_active, paused, dag_id_pattern, dag_display_name_pattern, tags, owners, last_dag_run_state],
None,
offset,
limit,
)
dags_with_recent_dag_runs = session.execute(dags_with_recent_dag_runs_select_filter).all()
# aggregate rows by dag_id
dag_runs_by_dag_id: dict[str, list] = {}
for row in dags_with_recent_dag_runs:
dag_id = row.dag_id
if dag_id not in dag_runs_by_dag_id:
dag_runs_by_dag_id[dag_id] = []
dag_runs_by_dag_id[dag_id].append(row)

dag_runs = session.execute(recent_dags_select).all()
print("dag_runs\n", dag_runs)
return RecentDAGRunsCollectionResponse(
total_entries=total_entries,
total_dag_runs=total_entries,
total_dag_ids=len(dag_runs_by_dag_id),
recent_dag_runs=[
RecentDAGRunsResponse(
dag_id=row.dag_id,
dag_id=dag_id,
dag_runs=[
RecentDAGRun(
dag_id=row.dag_id,
start_date=row.start_date,
end_date=row.end_date,
state=row.state,
execution_date=row.execution_date,
data_interval_start=row.data_interval_start,
data_interval_end=row.data_interval_end,
start_date=dag_run.start_date,
end_date=dag_run.end_date,
state=dag_run.state,
execution_date=dag_run.execution_date,
data_interval_start=dag_run.data_interval_start,
data_interval_end=dag_run.data_interval_end,
)
for dag_run in dag_runs
],
)
for row in dag_runs
for dag_id, dag_runs in dag_runs_by_dag_id.items()
],
)
4 changes: 2 additions & 2 deletions airflow/api_fastapi/core_api/serializers/ui/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
class RecentDAGRun(BaseModel):
"""Run serializer for Recent DAG Runs."""

dag_id: str
start_date: datetime | None
end_date: datetime | None
state: DagRunState
Expand All @@ -47,4 +46,5 @@ class RecentDAGRunsCollectionResponse(BaseModel):
"""Recent DAG Runs collection response serializer."""

recent_dag_runs: list[RecentDAGRunsResponse]
total_entries: int
total_dag_ids: int
total_dag_runs: int
15 changes: 7 additions & 8 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1171,10 +1171,6 @@ export const $HistoricalMetricDataResponse = {

export const $RecentDAGRun = {
properties: {
dag_id: {
type: "string",
title: "Dag Id",
},
start_date: {
anyOf: [
{
Expand Down Expand Up @@ -1241,7 +1237,6 @@ export const $RecentDAGRun = {
},
type: "object",
required: [
"dag_id",
"start_date",
"end_date",
"state",
Expand All @@ -1262,13 +1257,17 @@ export const $RecentDAGRunsCollectionResponse = {
type: "array",
title: "Recent Dag Runs",
},
total_entries: {
total_dag_ids: {
type: "integer",
title: "Total Entries",
title: "Total Dag Ids",
},
total_dag_runs: {
type: "integer",
title: "Total Dag Runs",
},
},
type: "object",
required: ["recent_dag_runs", "total_entries"],
required: ["recent_dag_runs", "total_dag_ids", "total_dag_runs"],
title: "RecentDAGRunsCollectionResponse",
description: "Recent DAG Runs collection response serializer.",
} as const;
Expand Down
4 changes: 2 additions & 2 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ export type HistoricalMetricDataResponse = {
* Run serializer for Recent DAG Runs.
*/
export type RecentDAGRun = {
dag_id: string;
start_date: string | null;
end_date: string | null;
state: DagRunState;
Expand All @@ -276,7 +275,8 @@ export type RecentDAGRun = {
*/
export type RecentDAGRunsCollectionResponse = {
recent_dag_runs: Array<RecentDAGRunsResponse>;
total_entries: number;
total_dag_ids: number;
total_dag_runs: number;
};

/**
Expand Down
93 changes: 89 additions & 4 deletions tests/api_fastapi/core_api/routes/ui/test_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,96 @@
# under the License.
from __future__ import annotations

from tests.api_fastapi.core_api.routes.public.test_dags import TestDagEndpoint
from datetime import datetime, timezone

import pendulum
import pytest

class TestRecentDagRuns(TestDagEndpoint):
def test_recent_dag_runs(self, test_client, query_params, expected_total_entries, expected_ids):
from airflow.models import DagRun
from airflow.utils.session import provide_session
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType

from tests.api_fastapi.core_api.routes.public.test_dags import (
DAG1_ID,
DAG2_ID,
DAG3_ID,
DAG4_ID,
DAG5_ID,
TestDagEndpoint as TestPublicDagEndpoint,
)


class TestRecentDagRuns(TestPublicDagEndpoint):
@pytest.fixture(autouse=True)
@provide_session
def setup_dag_runs(self, session=None) -> None:
# Create DAG Runs
for dag_id in [DAG1_ID, DAG2_ID, DAG3_ID, DAG4_ID, DAG5_ID]:
dag_runs_count = 5 if dag_id in [DAG1_ID, DAG2_ID] else 2
for i in range(dag_runs_count):
start_date = datetime(2021 + i, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
dag_run = DagRun(
dag_id=dag_id,
run_id=f"run_id_{i+1}",
run_type=DagRunType.MANUAL,
start_date=start_date,
execution_date=start_date,
state=(DagRunState.FAILED if i % 2 == 0 else DagRunState.SUCCESS),
)
dag_run.end_date = dag_run.start_date + pendulum.duration(hours=1)
session.add(dag_run)
session.commit()

@pytest.mark.parametrize(
"query_params, expected_ids,expected_total_dag_runs",
[
# Filters
({}, [DAG1_ID, DAG2_ID], 11),
({"limit": 1}, [DAG1_ID], 2),
({"offset": 1}, [DAG1_ID, DAG2_ID], 11),
({"tags": ["example"]}, [DAG1_ID], 6),
({"only_active": False}, [DAG1_ID, DAG2_ID, DAG3_ID], 15),
({"paused": True, "only_active": False}, [DAG3_ID], 4),
({"paused": False}, [DAG1_ID, DAG2_ID], 11),
({"owners": ["airflow"]}, [DAG1_ID, DAG2_ID], 11),
({"owners": ["test_owner"], "only_active": False}, [DAG3_ID], 4),
({"last_dag_run_state": "success", "only_active": False}, [DAG1_ID, DAG2_ID, DAG3_ID], 6),
({"last_dag_run_state": "failed", "only_active": False}, [DAG1_ID, DAG2_ID, DAG3_ID], 9),
# # Sort
({"order_by": "-dag_id"}, [DAG2_ID, DAG1_ID], 11),
({"order_by": "-dag_display_name"}, [DAG2_ID, DAG1_ID], 11),
({"order_by": "dag_display_name"}, [DAG1_ID, DAG2_ID], 11),
({"order_by": "next_dagrun", "only_active": False}, [DAG3_ID, DAG1_ID, DAG2_ID], 15),
({"order_by": "last_run_state", "only_active": False}, [DAG1_ID, DAG3_ID, DAG2_ID], 15),
({"order_by": "-last_run_state", "only_active": False}, [DAG3_ID, DAG1_ID, DAG2_ID], 15),
({"order_by": "last_run_start_date", "only_active": False}, [DAG1_ID, DAG3_ID, DAG2_ID], 15),
({"order_by": "-last_run_start_date", "only_active": False}, [DAG3_ID, DAG1_ID, DAG2_ID], 15),
# Search
({"dag_id_pattern": "1"}, [DAG1_ID], 6),
({"dag_display_name_pattern": "test_dag2"}, [DAG2_ID], 5),
],
)
def test_recent_dag_runs(self, test_client, query_params, expected_ids, expected_total_dag_runs):
response = test_client.get("/ui/dags/recent_dag_runs", params=query_params)
assert response.status_code == 200
print(response.json())
body = response.json()
assert body["total_dag_runs"] == expected_total_dag_runs
assert body["total_dag_ids"] == len(expected_ids)

for recent_dag_runs in body["recent_dag_runs"]:
dag_runs = recent_dag_runs["dag_runs"]
# check date ordering
previous_execution_date = None
for dag_run in dag_runs:
assert [
"start_date",
"end_date",
"state",
"execution_date",
"data_interval_start",
"data_interval_end",
] == list(dag_run.keys())
if previous_execution_date:
assert previous_execution_date < dag_run["execution_date"]
previous_execution_date = dag_run["execution_date"]

0 comments on commit 0b552c7

Please sign in to comment.