Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
1810f24
add asset query param to dag endpoint
vatsrahul1001 Aug 8, 2025
4e654eb
Merge branch 'main' into add-assets-filter-to-dag-endpoint
vatsrahul1001 Aug 8, 2025
07dcdde
Merge branch 'main' into add-assets-filter-to-dag-endpoint
vatsrahul1001 Aug 8, 2025
9e5da87
implement review comments
vatsrahul1001 Aug 12, 2025
832b242
Merge branch 'add-assets-filter-to-dag-endpoint' of github.com:astron…
vatsrahul1001 Aug 12, 2025
a86fb4a
Merge branch 'main' into add-assets-filter-to-dag-endpoint
vatsrahul1001 Aug 12, 2025
166a858
Merge branch 'main' into add-assets-filter-to-dag-endpoint
vatsrahul1001 Aug 12, 2025
6ce0325
fix tests
vatsrahul1001 Aug 12, 2025
b6564b4
Merge branch 'add-assets-filter-to-dag-endpoint' of github.com:astron…
vatsrahul1001 Aug 12, 2025
e8afa15
fix review comments
vatsrahul1001 Aug 13, 2025
3ce67c5
Merge branch 'main' into add-assets-filter-to-dag-endpoint
vatsrahul1001 Aug 13, 2025
431c44a
Merge branch 'main' into add-assets-filter-to-dag-endpoint
vatsrahul1001 Aug 13, 2025
a84d60c
Merge branch 'main' into add-assets-filter-to-dag-endpoint
vatsrahul1001 Aug 14, 2025
02bca12
Merge branch 'main' into add-assets-filter-to-dag-endpoint
vatsrahul1001 Aug 18, 2025
fd89bb2
Merge branch 'main' of github.com:astronomer/airflow into add-assets-…
vatsrahul1001 Aug 21, 2025
8161d6b
add filters to UI endpoint
vatsrahul1001 Aug 21, 2025
4432aa5
Merge branch 'main' into add-assets-filter-to-dag-endpoint
vatsrahul1001 Aug 21, 2025
427d3a9
Merge branch 'main' into add-assets-filter-to-dag-endpoint
vatsrahul1001 Aug 21, 2025
d8a6980
resolving conflicts
vatsrahul1001 Aug 25, 2025
4cdaf95
Merge branch 'main' into add-assets-filter-to-dag-endpoint
vatsrahul1001 Aug 25, 2025
9e65616
Merge branch 'main' into add-assets-filter-to-dag-endpoint
vatsrahul1001 Aug 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 56 additions & 2 deletions airflow-core/src/airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from fastapi import Depends, HTTPException, Query, status
from pendulum.parsing.exceptions import ParserError
from pydantic import AfterValidator, BaseModel, NonNegativeInt
from sqlalchemy import Column, and_, case, func, not_, or_, select
from sqlalchemy import Column, and_, case, func, not_, or_, select as sql_select
from sqlalchemy.inspection import inspect

from airflow._shared.timezones import timezone
Expand Down Expand Up @@ -131,7 +131,7 @@ def to_orm(self, select_stmt: Select) -> Select:
else:
select_stmt = select_stmt.where(
not_(
select(DagFavorite)
sql_select(DagFavorite)
.where(and_(DagFavorite.dag_id == DagModel.dag_id, DagFavorite.user_id == self.user_id))
.exists()
)
Expand Down Expand Up @@ -636,6 +636,60 @@ def depends_float(
QueryTagsFilter = Annotated[_TagsFilter, Depends(_TagsFilter.depends)]
QueryOwnersFilter = Annotated[_OwnersFilter, Depends(_OwnersFilter.depends)]


class _HasAssetScheduleFilter(BaseParam[bool]):
"""Filter DAGs that have asset-based scheduling."""

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select

asset_ref_subquery = sql_select(DagScheduleAssetReference.dag_id).distinct()

if self.value:
# Filter DAGs that have asset-based scheduling
return select.where(DagModel.dag_id.in_(asset_ref_subquery))

# Filter DAGs that do NOT have asset-based scheduling
return select.where(DagModel.dag_id.notin_(asset_ref_subquery))

@classmethod
def depends(
cls,
has_asset_schedule: bool | None = Query(None, description="Filter DAGs with asset-based scheduling"),
) -> _HasAssetScheduleFilter:
return cls().set_value(has_asset_schedule)


class _AssetDependencyFilter(BaseParam[str]):
"""Filter DAGs by specific asset dependencies."""

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select

asset_dag_subquery = (
sql_select(DagScheduleAssetReference.dag_id)
.join(AssetModel, DagScheduleAssetReference.asset_id == AssetModel.id)
.where(or_(AssetModel.name.ilike(f"%{self.value}%"), AssetModel.uri.ilike(f"%{self.value}%")))
.distinct()
)

return select.where(DagModel.dag_id.in_(asset_dag_subquery))

@classmethod
def depends(
cls,
asset_dependency: str | None = Query(
None, description="Filter DAGs by asset dependency (name or URI)"
),
) -> _AssetDependencyFilter:
return cls().set_value(asset_dependency)


QueryHasAssetScheduleFilter = Annotated[_HasAssetScheduleFilter, Depends(_HasAssetScheduleFilter.depends)]
QueryAssetDependencyFilter = Annotated[_AssetDependencyFilter, Depends(_AssetDependencyFilter.depends)]

# DagRun
QueryLastDagRunStateFilter = Annotated[
FilterParam[DagRunState | None],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,26 @@ paths:
- type: boolean
- type: 'null'
title: Is Favorite
- name: has_asset_schedule
in: query
required: false
schema:
anyOf:
- type: boolean
- type: 'null'
description: Filter DAGs with asset-based scheduling
title: Has Asset Schedule
description: Filter DAGs with asset-based scheduling
- name: asset_dependency
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
description: Filter DAGs by asset dependency (name or URI)
title: Asset Dependency
description: Filter DAGs by asset dependency (name or URI)
responses:
'200':
description: Successful Response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3004,6 +3004,26 @@ paths:
- type: string
- type: 'null'
title: Bundle Version
- name: has_asset_schedule
in: query
required: false
schema:
anyOf:
- type: boolean
- type: 'null'
description: Filter DAGs with asset-based scheduling
title: Has Asset Schedule
description: Filter DAGs with asset-based scheduling
- name: asset_dependency
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
description: Filter DAGs by asset dependency (name or URI)
title: Asset Dependency
description: Filter DAGs by asset dependency (name or URI)
- name: dag_run_start_date_gte
in: query
required: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@
from airflow.api_fastapi.common.parameters import (
FilterOptionEnum,
FilterParam,
QueryAssetDependencyFilter,
QueryBundleNameFilter,
QueryBundleVersionFilter,
QueryDagDisplayNamePatternSearch,
QueryDagIdPatternSearch,
QueryDagIdPatternSearchWithNone,
QueryExcludeStaleFilter,
QueryFavoriteFilter,
QueryHasAssetScheduleFilter,
QueryLastDagRunStateFilter,
QueryLimit,
QueryOffset,
Expand Down Expand Up @@ -89,6 +91,8 @@ def get_dags(
last_dag_run_state: QueryLastDagRunStateFilter,
bundle_name: QueryBundleNameFilter,
bundle_version: QueryBundleVersionFilter,
has_asset_schedule: QueryHasAssetScheduleFilter,
asset_dependency: QueryAssetDependencyFilter,
dag_run_start_date_range: Annotated[
RangeFilter, Depends(datetime_range_filter_factory("dag_run_start_date", DagRun, "start_date"))
],
Expand Down Expand Up @@ -146,6 +150,8 @@ def get_dags(
readable_dags_filter,
bundle_name,
bundle_version,
has_asset_schedule,
asset_dependency,
],
order_by=order_by,
offset=offset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@
from airflow.api_fastapi.common.parameters import (
FilterOptionEnum,
FilterParam,
QueryAssetDependencyFilter,
QueryBundleNameFilter,
QueryBundleVersionFilter,
QueryDagDisplayNamePatternSearch,
QueryDagIdPatternSearch,
QueryExcludeStaleFilter,
QueryFavoriteFilter,
QueryHasAssetScheduleFilter,
QueryLastDagRunStateFilter,
QueryLimit,
QueryOffset,
Expand Down Expand Up @@ -101,6 +103,8 @@ def get_dags(
),
],
is_favorite: QueryFavoriteFilter,
has_asset_schedule: QueryHasAssetScheduleFilter,
asset_dependency: QueryAssetDependencyFilter,
readable_dags_filter: ReadableDagsFilterDep,
session: SessionDep,
dag_runs_limit: int = 10,
Expand All @@ -126,6 +130,8 @@ def get_dags(
owners,
last_dag_run_state,
is_favorite,
has_asset_schedule,
asset_dependency,
readable_dags_filter,
bundle_name,
bundle_version,
Expand Down
12 changes: 8 additions & 4 deletions airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ export const UseDagWarningServiceListDagWarningsKeyFn = ({ dagId, limit, offset,
export type DagServiceGetDagsDefaultResponse = Awaited<ReturnType<typeof DagService.getDags>>;
export type DagServiceGetDagsQueryResult<TData = DagServiceGetDagsDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useDagServiceGetDagsKey = "DagServiceGetDags";
export const UseDagServiceGetDagsKeyFn = ({ bundleName, bundleVersion, dagDisplayNamePattern, dagIdPattern, dagRunEndDateGt, dagRunEndDateGte, dagRunEndDateLt, dagRunEndDateLte, dagRunStartDateGt, dagRunStartDateGte, dagRunStartDateLt, dagRunStartDateLte, dagRunState, excludeStale, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, tags, tagsMatchMode }: {
export const UseDagServiceGetDagsKeyFn = ({ assetDependency, bundleName, bundleVersion, dagDisplayNamePattern, dagIdPattern, dagRunEndDateGt, dagRunEndDateGte, dagRunEndDateLt, dagRunEndDateLte, dagRunStartDateGt, dagRunStartDateGte, dagRunStartDateLt, dagRunStartDateLte, dagRunState, excludeStale, hasAssetSchedule, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, tags, tagsMatchMode }: {
assetDependency?: string;
bundleName?: string;
bundleVersion?: string;
dagDisplayNamePattern?: string;
Expand All @@ -257,6 +258,7 @@ export const UseDagServiceGetDagsKeyFn = ({ bundleName, bundleVersion, dagDispla
dagRunStartDateLte?: string;
dagRunState?: string[];
excludeStale?: boolean;
hasAssetSchedule?: boolean;
isFavorite?: boolean;
lastDagRunState?: DagRunState;
limit?: number;
Expand All @@ -266,7 +268,7 @@ export const UseDagServiceGetDagsKeyFn = ({ bundleName, bundleVersion, dagDispla
paused?: boolean;
tags?: string[];
tagsMatchMode?: "any" | "all";
} = {}, queryKey?: Array<unknown>) => [useDagServiceGetDagsKey, ...(queryKey ?? [{ bundleName, bundleVersion, dagDisplayNamePattern, dagIdPattern, dagRunEndDateGt, dagRunEndDateGte, dagRunEndDateLt, dagRunEndDateLte, dagRunStartDateGt, dagRunStartDateGte, dagRunStartDateLt, dagRunStartDateLte, dagRunState, excludeStale, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, tags, tagsMatchMode }])];
} = {}, queryKey?: Array<unknown>) => [useDagServiceGetDagsKey, ...(queryKey ?? [{ assetDependency, bundleName, bundleVersion, dagDisplayNamePattern, dagIdPattern, dagRunEndDateGt, dagRunEndDateGte, dagRunEndDateLt, dagRunEndDateLte, dagRunStartDateGt, dagRunStartDateGte, dagRunStartDateLt, dagRunStartDateLte, dagRunState, excludeStale, hasAssetSchedule, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, tags, tagsMatchMode }])];
export type DagServiceGetDagDefaultResponse = Awaited<ReturnType<typeof DagService.getDag>>;
export type DagServiceGetDagQueryResult<TData = DagServiceGetDagDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useDagServiceGetDagKey = "DagServiceGetDag";
Expand All @@ -291,14 +293,16 @@ export const UseDagServiceGetDagTagsKeyFn = ({ limit, offset, orderBy, tagNamePa
export type DagServiceGetDagsUiDefaultResponse = Awaited<ReturnType<typeof DagService.getDagsUi>>;
export type DagServiceGetDagsUiQueryResult<TData = DagServiceGetDagsUiDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useDagServiceGetDagsUiKey = "DagServiceGetDagsUi";
export const UseDagServiceGetDagsUiKeyFn = ({ bundleName, bundleVersion, dagDisplayNamePattern, dagIdPattern, dagIds, dagRunsLimit, excludeStale, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, tags, tagsMatchMode }: {
export const UseDagServiceGetDagsUiKeyFn = ({ assetDependency, bundleName, bundleVersion, dagDisplayNamePattern, dagIdPattern, dagIds, dagRunsLimit, excludeStale, hasAssetSchedule, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, tags, tagsMatchMode }: {
assetDependency?: string;
bundleName?: string;
bundleVersion?: string;
dagDisplayNamePattern?: string;
dagIdPattern?: string;
dagIds?: string[];
dagRunsLimit?: number;
excludeStale?: boolean;
hasAssetSchedule?: boolean;
isFavorite?: boolean;
lastDagRunState?: DagRunState;
limit?: number;
Expand All @@ -308,7 +312,7 @@ export const UseDagServiceGetDagsUiKeyFn = ({ bundleName, bundleVersion, dagDisp
paused?: boolean;
tags?: string[];
tagsMatchMode?: "any" | "all";
} = {}, queryKey?: Array<unknown>) => [useDagServiceGetDagsUiKey, ...(queryKey ?? [{ bundleName, bundleVersion, dagDisplayNamePattern, dagIdPattern, dagIds, dagRunsLimit, excludeStale, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, tags, tagsMatchMode }])];
} = {}, queryKey?: Array<unknown>) => [useDagServiceGetDagsUiKey, ...(queryKey ?? [{ assetDependency, bundleName, bundleVersion, dagDisplayNamePattern, dagIdPattern, dagIds, dagRunsLimit, excludeStale, hasAssetSchedule, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, tags, tagsMatchMode }])];
export type DagServiceGetLatestRunInfoDefaultResponse = Awaited<ReturnType<typeof DagService.getLatestRunInfo>>;
export type DagServiceGetLatestRunInfoQueryResult<TData = DagServiceGetLatestRunInfoDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useDagServiceGetLatestRunInfoKey = "DagServiceGetLatestRunInfo";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,8 @@ export const ensureUseDagWarningServiceListDagWarningsData = (queryClient: Query
* @param data.lastDagRunState
* @param data.bundleName
* @param data.bundleVersion
* @param data.hasAssetSchedule Filter DAGs with asset-based scheduling
* @param data.assetDependency Filter DAGs by asset dependency (name or URI)
* @param data.dagRunStartDateGte
* @param data.dagRunStartDateGt
* @param data.dagRunStartDateLte
Expand All @@ -474,7 +476,8 @@ export const ensureUseDagWarningServiceListDagWarningsData = (queryClient: Query
* @returns DAGCollectionResponse Successful Response
* @throws ApiError
*/
export const ensureUseDagServiceGetDagsData = (queryClient: QueryClient, { bundleName, bundleVersion, dagDisplayNamePattern, dagIdPattern, dagRunEndDateGt, dagRunEndDateGte, dagRunEndDateLt, dagRunEndDateLte, dagRunStartDateGt, dagRunStartDateGte, dagRunStartDateLt, dagRunStartDateLte, dagRunState, excludeStale, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, tags, tagsMatchMode }: {
export const ensureUseDagServiceGetDagsData = (queryClient: QueryClient, { assetDependency, bundleName, bundleVersion, dagDisplayNamePattern, dagIdPattern, dagRunEndDateGt, dagRunEndDateGte, dagRunEndDateLt, dagRunEndDateLte, dagRunStartDateGt, dagRunStartDateGte, dagRunStartDateLt, dagRunStartDateLte, dagRunState, excludeStale, hasAssetSchedule, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, tags, tagsMatchMode }: {
assetDependency?: string;
bundleName?: string;
bundleVersion?: string;
dagDisplayNamePattern?: string;
Expand All @@ -489,6 +492,7 @@ export const ensureUseDagServiceGetDagsData = (queryClient: QueryClient, { bundl
dagRunStartDateLte?: string;
dagRunState?: string[];
excludeStale?: boolean;
hasAssetSchedule?: boolean;
isFavorite?: boolean;
lastDagRunState?: DagRunState;
limit?: number;
Expand All @@ -498,7 +502,7 @@ export const ensureUseDagServiceGetDagsData = (queryClient: QueryClient, { bundl
paused?: boolean;
tags?: string[];
tagsMatchMode?: "any" | "all";
} = {}) => queryClient.ensureQueryData({ queryKey: Common.UseDagServiceGetDagsKeyFn({ bundleName, bundleVersion, dagDisplayNamePattern, dagIdPattern, dagRunEndDateGt, dagRunEndDateGte, dagRunEndDateLt, dagRunEndDateLte, dagRunStartDateGt, dagRunStartDateGte, dagRunStartDateLt, dagRunStartDateLte, dagRunState, excludeStale, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, tags, tagsMatchMode }), queryFn: () => DagService.getDags({ bundleName, bundleVersion, dagDisplayNamePattern, dagIdPattern, dagRunEndDateGt, dagRunEndDateGte, dagRunEndDateLt, dagRunEndDateLte, dagRunStartDateGt, dagRunStartDateGte, dagRunStartDateLt, dagRunStartDateLte, dagRunState, excludeStale, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, tags, tagsMatchMode }) });
} = {}) => queryClient.ensureQueryData({ queryKey: Common.UseDagServiceGetDagsKeyFn({ assetDependency, bundleName, bundleVersion, dagDisplayNamePattern, dagIdPattern, dagRunEndDateGt, dagRunEndDateGte, dagRunEndDateLt, dagRunEndDateLte, dagRunStartDateGt, dagRunStartDateGte, dagRunStartDateLt, dagRunStartDateLte, dagRunState, excludeStale, hasAssetSchedule, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, tags, tagsMatchMode }), queryFn: () => DagService.getDags({ assetDependency, bundleName, bundleVersion, dagDisplayNamePattern, dagIdPattern, dagRunEndDateGt, dagRunEndDateGte, dagRunEndDateLt, dagRunEndDateLte, dagRunStartDateGt, dagRunStartDateGte, dagRunStartDateLt, dagRunStartDateLte, dagRunState, excludeStale, hasAssetSchedule, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, tags, tagsMatchMode }) });
/**
* Get Dag
* Get basic information about a DAG.
Expand Down Expand Up @@ -558,17 +562,21 @@ export const ensureUseDagServiceGetDagTagsData = (queryClient: QueryClient, { li
* @param data.bundleVersion
* @param data.orderBy
* @param data.isFavorite
* @param data.hasAssetSchedule Filter DAGs with asset-based scheduling
* @param data.assetDependency Filter DAGs by asset dependency (name or URI)
* @returns DAGWithLatestDagRunsCollectionResponse Successful Response
* @throws ApiError
*/
export const ensureUseDagServiceGetDagsUiData = (queryClient: QueryClient, { bundleName, bundleVersion, dagDisplayNamePattern, dagIdPattern, dagIds, dagRunsLimit, excludeStale, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, tags, tagsMatchMode }: {
export const ensureUseDagServiceGetDagsUiData = (queryClient: QueryClient, { assetDependency, bundleName, bundleVersion, dagDisplayNamePattern, dagIdPattern, dagIds, dagRunsLimit, excludeStale, hasAssetSchedule, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, tags, tagsMatchMode }: {
assetDependency?: string;
bundleName?: string;
bundleVersion?: string;
dagDisplayNamePattern?: string;
dagIdPattern?: string;
dagIds?: string[];
dagRunsLimit?: number;
excludeStale?: boolean;
hasAssetSchedule?: boolean;
isFavorite?: boolean;
lastDagRunState?: DagRunState;
limit?: number;
Expand All @@ -578,7 +586,7 @@ export const ensureUseDagServiceGetDagsUiData = (queryClient: QueryClient, { bun
paused?: boolean;
tags?: string[];
tagsMatchMode?: "any" | "all";
} = {}) => queryClient.ensureQueryData({ queryKey: Common.UseDagServiceGetDagsUiKeyFn({ bundleName, bundleVersion, dagDisplayNamePattern, dagIdPattern, dagIds, dagRunsLimit, excludeStale, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, tags, tagsMatchMode }), queryFn: () => DagService.getDagsUi({ bundleName, bundleVersion, dagDisplayNamePattern, dagIdPattern, dagIds, dagRunsLimit, excludeStale, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, tags, tagsMatchMode }) });
} = {}) => queryClient.ensureQueryData({ queryKey: Common.UseDagServiceGetDagsUiKeyFn({ assetDependency, bundleName, bundleVersion, dagDisplayNamePattern, dagIdPattern, dagIds, dagRunsLimit, excludeStale, hasAssetSchedule, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, tags, tagsMatchMode }), queryFn: () => DagService.getDagsUi({ assetDependency, bundleName, bundleVersion, dagDisplayNamePattern, dagIdPattern, dagIds, dagRunsLimit, excludeStale, hasAssetSchedule, isFavorite, lastDagRunState, limit, offset, orderBy, owners, paused, tags, tagsMatchMode }) });
/**
* Get Latest Run Info
* Get latest run.
Expand Down
Loading