Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ class DAGDetailsResponse(DAGResponse):
default_args: Mapping | None
owner_links: dict[str, str] | None = None
is_favorite: bool = False
active_runs_count: int = 0

@field_validator("timezone", mode="before")
@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10036,6 +10036,10 @@ components:
type: boolean
title: Is Favorite
default: false
active_runs_count:
type: integer
title: Active Runs Count
default: 0
file_token:
type: string
title: File Token
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from fastapi import Depends, HTTPException, Query, Response, status
from fastapi.exceptions import RequestValidationError
from pydantic import ValidationError
from sqlalchemy import delete, insert, select, update
from sqlalchemy import delete, func, insert, select, update

from airflow.api.common import delete_dag as delete_dag_module
from airflow.api_fastapi.common.dagbag import DagBagDep, get_latest_version_of_dag
Expand Down Expand Up @@ -75,6 +75,7 @@
from airflow.models import DagModel
from airflow.models.dag_favorite import DagFavorite
from airflow.models.dagrun import DagRun
from airflow.utils.state import DagRunState

dags_router = AirflowRouter(tags=["DAG"], prefix="/dags")

Expand Down Expand Up @@ -233,8 +234,19 @@ def get_dag_details(
is not None
)

# Add is_favorite field to the DAG model
# Count active (running + queued) DAG runs for this DAG
active_runs_count = (
session.scalar(
select(func.count())
.select_from(DagRun)
.where(DagRun.dag_id == dag_id, DagRun.state.in_([DagRunState.RUNNING, DagRunState.QUEUED]))
)
or 0
)

# Add is_favorite and active_runs_count fields to the DAG model
setattr(dag_model, "is_favorite", is_favorite)
setattr(dag_model, "active_runs_count", active_runs_count)

return DAGDetailsResponse.model_validate(dag_model)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2073,6 +2073,11 @@ export const $DAGDetailsResponse = {
title: 'Is Favorite',
default: false
},
active_runs_count: {
type: 'integer',
title: 'Active Runs Count',
default: 0
},
file_token: {
type: 'string',
title: 'File Token',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ export type DAGDetailsResponse = {
[key: string]: (string);
} | null;
is_favorite?: boolean;
active_runs_count?: number;
/**
* Return file token.
*/
Expand Down
23 changes: 18 additions & 5 deletions airflow-core/src/airflow/ui/src/pages/Dag/Dag.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,29 @@ export const Dag = () => {
...externalTabs,
];

const refetchInterval = useAutoRefresh({ dagId });
const [hasPendingRuns, setHasPendingRuns] = useState<boolean | undefined>(false);

const {
data: dag,
error,
isLoading,
} = useDagServiceGetDagDetails({
dagId,
});
} = useDagServiceGetDagDetails(
{
dagId,
},
undefined,
{
refetchInterval: (query) => {
// Auto-refresh when there are active runs or pending runs
if (hasPendingRuns ?? (query.state.data && (query.state.data.active_runs_count ?? 0) > 0)) {
return refetchInterval;
}

const refetchInterval = useAutoRefresh({ dagId });
const [hasPendingRuns, setHasPendingRuns] = useState<boolean | undefined>(false);
return false;
},
},
);

// Ensures continuous refresh to detect new runs when there's no
// pending state and new runs are initiated from other page
Expand Down
7 changes: 7 additions & 0 deletions airflow-core/src/airflow/ui/src/pages/Dag/Header.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ export const Header = ({
/>
) : undefined,
},
{
label: translate("dagDetails.maxActiveRuns"),
value:
dag?.max_active_runs === undefined
? undefined
: `${dag.active_runs_count ?? 0} of ${dag.max_active_runs}`,
},
{
label: translate("dagDetails.owner"),
value: <DagOwners ownerLinks={dag?.owner_links ?? undefined} owners={dag?.owners} />,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -954,6 +954,7 @@ def test_dag_details(
"timetable_description": "Never, external triggers only",
"timezone": UTC_JSON_REPR,
"is_favorite": False,
"active_runs_count": 0,
}
assert res_json == expected

Expand Down Expand Up @@ -1043,6 +1044,7 @@ def test_dag_details_with_view_url_template(
"timetable_description": "Never, external triggers only",
"timezone": UTC_JSON_REPR,
"is_favorite": False,
"active_runs_count": 0,
}
assert res_json == expected

Expand Down Expand Up @@ -1078,6 +1080,63 @@ def test_dag_details_includes_is_favorite_field(self, session, test_client):
assert isinstance(body["is_favorite"], bool)
assert body["is_favorite"] is False

def test_dag_details_includes_active_runs_count(self, session, test_client):
"""Test that DAG details include the active_runs_count field."""
# Create running and queued DAG runs for DAG2
session.add(
DagRun(
dag_id=DAG2_ID,
run_id="running_run_1",
logical_date=datetime(2021, 6, 15, 1, 0, 0, tzinfo=timezone.utc),
start_date=datetime(2021, 6, 15, 1, 0, 0, tzinfo=timezone.utc),
run_type=DagRunType.MANUAL,
state=DagRunState.RUNNING,
triggered_by=DagRunTriggeredByType.TEST,
)
)
session.add(
DagRun(
dag_id=DAG2_ID,
run_id="queued_run_1",
logical_date=datetime(2021, 6, 15, 2, 0, 0, tzinfo=timezone.utc),
start_date=datetime(2021, 6, 15, 2, 0, 0, tzinfo=timezone.utc),
run_type=DagRunType.MANUAL,
state=DagRunState.QUEUED,
triggered_by=DagRunTriggeredByType.TEST,
)
)
# Add a successful DAG run (should not be counted)
session.add(
DagRun(
dag_id=DAG2_ID,
run_id="success_run_1",
logical_date=datetime(2021, 6, 15, 3, 0, 0, tzinfo=timezone.utc),
start_date=datetime(2021, 6, 15, 3, 0, 0, tzinfo=timezone.utc),
run_type=DagRunType.MANUAL,
state=DagRunState.SUCCESS,
triggered_by=DagRunTriggeredByType.TEST,
)
)
session.commit()

response = test_client.get(f"/dags/{DAG2_ID}/details")
assert response.status_code == 200
body = response.json()

# Verify active_runs_count field is present and correct
assert "active_runs_count" in body
assert isinstance(body["active_runs_count"], int)
assert body["active_runs_count"] == 2 # 1 running + 1 queued

# Test with DAG that has no active runs
response = test_client.get(f"/dags/{DAG1_ID}/details")
assert response.status_code == 200
body = response.json()

assert "active_runs_count" in body
assert isinstance(body["active_runs_count"], int)
assert body["active_runs_count"] == 0


class TestGetDag(TestDagEndpoint):
"""Unit tests for Get DAG."""
Expand Down
1 change: 1 addition & 0 deletions airflow-ctl/src/airflowctl/api/datamodels/generated.py
Original file line number Diff line number Diff line change
Expand Up @@ -1312,6 +1312,7 @@ class DAGDetailsResponse(BaseModel):
default_args: Annotated[dict[str, Any] | None, Field(title="Default Args")] = None
owner_links: Annotated[dict[str, str] | None, Field(title="Owner Links")] = None
is_favorite: Annotated[bool | None, Field(title="Is Favorite")] = False
active_runs_count: Annotated[int | None, Field(title="Active Runs Count")] = 0
file_token: Annotated[str, Field(description="Return file token.", title="File Token")]
concurrency: Annotated[
int,
Expand Down
Loading