Skip to content

Commit

Permalink
AIP-84 | Public list tags API (#42959)
Browse files Browse the repository at this point in the history
* AIP-84 | Public list tags API

* refactor: upd resp schema, use paginated_select

* refactor(test): move test to routers/public folder

* refactor: remove OrderBy param, use SortParm
  • Loading branch information
jason810496 authored Oct 21, 2024
1 parent f37f29b commit f3bd2c2
Show file tree
Hide file tree
Showing 12 changed files with 497 additions and 3 deletions.
13 changes: 13 additions & 0 deletions airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,17 @@ def depends(self, last_dag_run_state: DagRunState | None = None) -> _LastDagRunS
return self.set_value(last_dag_run_state)


class _DagTagNamePatternSearch(_SearchParam):
"""Search on dag_tag.name."""

def __init__(self, skip_none: bool = True) -> None:
super().__init__(DagTag.name, skip_none)

def depends(self, tag_name_pattern: str | None = None) -> _DagTagNamePatternSearch:
tag_name_pattern = super().transform_aliases(tag_name_pattern)
return self.set_value(tag_name_pattern)


def _safe_parse_datetime(date_to_check: str) -> datetime:
"""
Parse datetime and raise error for invalid dates.
Expand Down Expand Up @@ -299,3 +310,5 @@ def _safe_parse_datetime(date_to_check: str) -> datetime:
QueryOwnersFilter = Annotated[_OwnersFilter, Depends(_OwnersFilter().depends)]
# DagRun
QueryLastDagRunStateFilter = Annotated[_LastDagRunStateFilter, Depends(_LastDagRunStateFilter().depends)]
# DAGTags
QueryDagTagPatternSearch = Annotated[_DagTagNamePatternSearch, Depends(_DagTagNamePatternSearch().depends)]
78 changes: 78 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,68 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/tags:
get:
tags:
- DAG
summary: Get Dag Tags
description: Get all DAG tags.
operationId: get_dag_tags
parameters:
- name: limit
in: query
required: false
schema:
type: integer
default: 100
title: Limit
- name: offset
in: query
required: false
schema:
type: integer
default: 0
title: Offset
- name: order_by
in: query
required: false
schema:
type: string
default: name
title: Order By
- name: tag_name_pattern
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Tag Name Pattern
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/DAGTagCollectionResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}:
get:
tags:
Expand Down Expand Up @@ -1713,6 +1775,22 @@ components:
- dataset_triggered
title: DAGRunTypes
description: DAG Run Types for responses.
DAGTagCollectionResponse:
properties:
tags:
items:
type: string
type: array
title: Tags
total_entries:
type: integer
title: Total Entries
type: object
required:
- tags
- total_entries
title: DAGTagCollectionResponse
description: DAG Tags Collection serializer for responses.
DagProcessorInfoSchema:
properties:
status:
Expand Down
39 changes: 37 additions & 2 deletions airflow/api_fastapi/core_api/routes/public/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from __future__ import annotations

from fastapi import Depends, HTTPException, Query, Request, Response
from sqlalchemy import update
from sqlalchemy import select, update
from sqlalchemy.orm import Session
from typing_extensions import Annotated

Expand All @@ -32,6 +32,7 @@
QueryDagDisplayNamePatternSearch,
QueryDagIdPatternSearch,
QueryDagIdPatternSearchWithNone,
QueryDagTagPatternSearch,
QueryLastDagRunStateFilter,
QueryLimit,
QueryOffset,
Expand All @@ -48,9 +49,10 @@
DAGDetailsResponse,
DAGPatchBody,
DAGResponse,
DAGTagCollectionResponse,
)
from airflow.exceptions import AirflowException, DagNotFound
from airflow.models import DAG, DagModel
from airflow.models import DAG, DagModel, DagTag

dags_router = AirflowRouter(tags=["DAG"], prefix="/dags")

Expand Down Expand Up @@ -95,6 +97,39 @@ async def get_dags(
)


@dags_router.get(
"/tags",
responses=create_openapi_http_exception_doc([401, 403]),
)
async def get_dag_tags(
limit: QueryLimit,
offset: QueryOffset,
order_by: Annotated[
SortParam,
Depends(
SortParam(
["name"],
DagTag,
).dynamic_depends()
),
],
tag_name_pattern: QueryDagTagPatternSearch,
session: Annotated[Session, Depends(get_session)],
) -> DAGTagCollectionResponse:
"""Get all DAG tags."""
base_select = select(DagTag.name).group_by(DagTag.name)
dag_tags_select, total_entries = paginated_select(
base_select=base_select,
filters=[tag_name_pattern],
order_by=order_by,
offset=offset,
limit=limit,
session=session,
)
dag_tags = session.execute(dag_tags_select).scalars().all()
return DAGTagCollectionResponse(tags=[dag_tag for dag_tag in dag_tags], total_entries=total_entries)


@dags_router.get("/{dag_id}", responses=create_openapi_http_exception_doc([400, 401, 403, 404, 422]))
async def get_dag(
dag_id: str, session: Annotated[Session, Depends(get_session)], request: Request
Expand Down
7 changes: 7 additions & 0 deletions airflow/api_fastapi/core_api/serializers/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,10 @@ def get_params(cls, params: abc.MutableMapping | None) -> dict | None:
def concurrency(self) -> int:
"""Return max_active_tasks as concurrency."""
return self.max_active_tasks


class DAGTagCollectionResponse(BaseModel):
"""DAG Tags Collection serializer for responses."""

tags: list[str]
total_entries: int
25 changes: 25 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,31 @@ export const UseDagServiceGetDagsKeyFn = (
},
]),
];
export type DagServiceGetDagTagsDefaultResponse = Awaited<
ReturnType<typeof DagService.getDagTags>
>;
export type DagServiceGetDagTagsQueryResult<
TData = DagServiceGetDagTagsDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useDagServiceGetDagTagsKey = "DagServiceGetDagTags";
export const UseDagServiceGetDagTagsKeyFn = (
{
limit,
offset,
orderBy,
tagNamePattern,
}: {
limit?: number;
offset?: number;
orderBy?: string;
tagNamePattern?: string;
} = {},
queryKey?: Array<unknown>,
) => [
useDagServiceGetDagTagsKey,
...(queryKey ?? [{ limit, offset, orderBy, tagNamePattern }]),
];
export type DagServiceGetDagDefaultResponse = Awaited<
ReturnType<typeof DagService.getDag>
>;
Expand Down
35 changes: 35 additions & 0 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,41 @@ export const prefetchUseDagServiceGetDags = (
tags,
}),
});
/**
* Get Dag Tags
* Get all DAG tags.
* @param data The data for the request.
* @param data.limit
* @param data.offset
* @param data.orderBy
* @param data.tagNamePattern
* @returns DAGTagCollectionResponse Successful Response
* @throws ApiError
*/
export const prefetchUseDagServiceGetDagTags = (
queryClient: QueryClient,
{
limit,
offset,
orderBy,
tagNamePattern,
}: {
limit?: number;
offset?: number;
orderBy?: string;
tagNamePattern?: string;
} = {},
) =>
queryClient.prefetchQuery({
queryKey: Common.UseDagServiceGetDagTagsKeyFn({
limit,
offset,
orderBy,
tagNamePattern,
}),
queryFn: () =>
DagService.getDagTags({ limit, offset, orderBy, tagNamePattern }),
});
/**
* Get Dag
* Get basic information about a DAG.
Expand Down
44 changes: 44 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,50 @@ export const useDagServiceGetDags = <
}) as TData,
...options,
});
/**
* Get Dag Tags
* Get all DAG tags.
* @param data The data for the request.
* @param data.limit
* @param data.offset
* @param data.orderBy
* @param data.tagNamePattern
* @returns DAGTagCollectionResponse Successful Response
* @throws ApiError
*/
export const useDagServiceGetDagTags = <
TData = Common.DagServiceGetDagTagsDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
limit,
offset,
orderBy,
tagNamePattern,
}: {
limit?: number;
offset?: number;
orderBy?: string;
tagNamePattern?: string;
} = {},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useQuery<TData, TError>({
queryKey: Common.UseDagServiceGetDagTagsKeyFn(
{ limit, offset, orderBy, tagNamePattern },
queryKey,
),
queryFn: () =>
DagService.getDagTags({
limit,
offset,
orderBy,
tagNamePattern,
}) as TData,
...options,
});
/**
* Get Dag
* Get basic information about a DAG.
Expand Down
44 changes: 44 additions & 0 deletions airflow/ui/openapi-gen/queries/suspense.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,50 @@ export const useDagServiceGetDagsSuspense = <
}) as TData,
...options,
});
/**
* Get Dag Tags
* Get all DAG tags.
* @param data The data for the request.
* @param data.limit
* @param data.offset
* @param data.orderBy
* @param data.tagNamePattern
* @returns DAGTagCollectionResponse Successful Response
* @throws ApiError
*/
export const useDagServiceGetDagTagsSuspense = <
TData = Common.DagServiceGetDagTagsDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
limit,
offset,
orderBy,
tagNamePattern,
}: {
limit?: number;
offset?: number;
orderBy?: string;
tagNamePattern?: string;
} = {},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useSuspenseQuery<TData, TError>({
queryKey: Common.UseDagServiceGetDagTagsKeyFn(
{ limit, offset, orderBy, tagNamePattern },
queryKey,
),
queryFn: () =>
DagService.getDagTags({
limit,
offset,
orderBy,
tagNamePattern,
}) as TData,
...options,
});
/**
* Get Dag
* Get basic information about a DAG.
Expand Down
20 changes: 20 additions & 0 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,26 @@ export const $DAGRunTypes = {
description: "DAG Run Types for responses.",
} as const;

export const $DAGTagCollectionResponse = {
properties: {
tags: {
items: {
type: "string",
},
type: "array",
title: "Tags",
},
total_entries: {
type: "integer",
title: "Total Entries",
},
},
type: "object",
required: ["tags", "total_entries"],
title: "DAGTagCollectionResponse",
description: "DAG Tags Collection serializer for responses.",
} as const;

export const $DagProcessorInfoSchema = {
properties: {
status: {
Expand Down
Loading

0 comments on commit f3bd2c2

Please sign in to comment.