Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-84 | Public list tags API #42959

Merged
merged 4 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,17 @@ def depends(self, last_dag_run_state: DagRunState | None = None) -> _LastDagRunS
return self.set_value(last_dag_run_state)


class _DagTagNamePatternSearch(_SearchParam):
"""Search on dag_tag.name."""

def __init__(self, skip_none: bool = True) -> None:
super().__init__(DagTag.name, skip_none)

def depends(self, tag_name_pattern: str | None = None) -> _DagTagNamePatternSearch:
tag_name_pattern = super().transform_aliases(tag_name_pattern)
return self.set_value(tag_name_pattern)


def _safe_parse_datetime(date_to_check: str) -> datetime:
"""
Parse datetime and raise error for invalid dates.
Expand Down Expand Up @@ -299,3 +310,5 @@ def _safe_parse_datetime(date_to_check: str) -> datetime:
QueryOwnersFilter = Annotated[_OwnersFilter, Depends(_OwnersFilter().depends)]
# DagRun
QueryLastDagRunStateFilter = Annotated[_LastDagRunStateFilter, Depends(_LastDagRunStateFilter().depends)]
# DAGTags
QueryDagTagPatternSearch = Annotated[_DagTagNamePatternSearch, Depends(_DagTagNamePatternSearch().depends)]
78 changes: 78 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,68 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/tags:
get:
tags:
- DAG
summary: Get Dag Tags
description: Get all DAG tags.
operationId: get_dag_tags
parameters:
- name: limit
in: query
required: false
schema:
type: integer
default: 100
title: Limit
- name: offset
in: query
required: false
schema:
type: integer
default: 0
title: Offset
- name: order_by
in: query
required: false
schema:
type: string
default: name
title: Order By
- name: tag_name_pattern
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Tag Name Pattern
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/DAGTagCollectionResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}:
get:
tags:
Expand Down Expand Up @@ -1678,6 +1740,22 @@ components:
- dataset_triggered
title: DAGRunTypes
description: DAG Run Types for responses.
DAGTagCollectionResponse:
properties:
tags:
items:
type: string
type: array
title: Tags
total_entries:
type: integer
title: Total Entries
type: object
required:
- tags
- total_entries
title: DAGTagCollectionResponse
description: DAG Tags Collection serializer for responses.
DagProcessorInfoSchema:
properties:
status:
Expand Down
39 changes: 37 additions & 2 deletions airflow/api_fastapi/core_api/routes/public/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from __future__ import annotations

from fastapi import Depends, HTTPException, Query, Request, Response
from sqlalchemy import update
from sqlalchemy import select, update
from sqlalchemy.orm import Session
from typing_extensions import Annotated

Expand All @@ -32,6 +32,7 @@
QueryDagDisplayNamePatternSearch,
QueryDagIdPatternSearch,
QueryDagIdPatternSearchWithNone,
QueryDagTagPatternSearch,
QueryLastDagRunStateFilter,
QueryLimit,
QueryOffset,
Expand All @@ -48,9 +49,10 @@
DAGDetailsResponse,
DAGPatchBody,
DAGResponse,
DAGTagCollectionResponse,
)
from airflow.exceptions import AirflowException, DagNotFound
from airflow.models import DAG, DagModel
from airflow.models import DAG, DagModel, DagTag

dags_router = AirflowRouter(tags=["DAG"], prefix="/dags")

Expand Down Expand Up @@ -95,6 +97,39 @@ async def get_dags(
)


@dags_router.get(
"/tags",
responses=create_openapi_http_exception_doc([401, 403]),
)
async def get_dag_tags(
limit: QueryLimit,
offset: QueryOffset,
order_by: Annotated[
SortParam,
Depends(
SortParam(
["name"],
DagTag,
).dynamic_depends()
),
],
tag_name_pattern: QueryDagTagPatternSearch,
session: Annotated[Session, Depends(get_session)],
) -> DAGTagCollectionResponse:
"""Get all DAG tags."""
base_select = select(DagTag.name).group_by(DagTag.name)
dag_tags_select, total_entries = paginated_select(
base_select=base_select,
filters=[tag_name_pattern],
order_by=order_by,
offset=offset,
limit=limit,
session=session,
)
dag_tags = session.execute(dag_tags_select).scalars().all()
return DAGTagCollectionResponse(tags=[dag_tag for dag_tag in dag_tags], total_entries=total_entries)


@dags_router.get("/{dag_id}", responses=create_openapi_http_exception_doc([400, 401, 403, 404, 422]))
async def get_dag(
dag_id: str, session: Annotated[Session, Depends(get_session)], request: Request
Expand Down
7 changes: 7 additions & 0 deletions airflow/api_fastapi/core_api/serializers/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,10 @@ def get_params(cls, params: abc.MutableMapping | None) -> dict | None:
def concurrency(self) -> int:
"""Return max_active_tasks as concurrency."""
return self.max_active_tasks


class DAGTagCollectionResponse(BaseModel):
"""DAG Tags Collection serializer for responses."""

tags: list[str]
total_entries: int
25 changes: 25 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,31 @@ export const UseDagServiceGetDagsKeyFn = (
},
]),
];
export type DagServiceGetDagTagsDefaultResponse = Awaited<
ReturnType<typeof DagService.getDagTags>
>;
export type DagServiceGetDagTagsQueryResult<
TData = DagServiceGetDagTagsDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useDagServiceGetDagTagsKey = "DagServiceGetDagTags";
export const UseDagServiceGetDagTagsKeyFn = (
{
limit,
offset,
orderBy,
tagNamePattern,
}: {
limit?: number;
offset?: number;
orderBy?: string;
tagNamePattern?: string;
} = {},
queryKey?: Array<unknown>,
) => [
useDagServiceGetDagTagsKey,
...(queryKey ?? [{ limit, offset, orderBy, tagNamePattern }]),
];
export type DagServiceGetDagDefaultResponse = Awaited<
ReturnType<typeof DagService.getDag>
>;
Expand Down
35 changes: 35 additions & 0 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,41 @@ export const prefetchUseDagServiceGetDags = (
tags,
}),
});
/**
* Get Dag Tags
* Get all DAG tags.
* @param data The data for the request.
* @param data.limit
* @param data.offset
* @param data.orderBy
* @param data.tagNamePattern
* @returns DAGTagCollectionResponse Successful Response
* @throws ApiError
*/
export const prefetchUseDagServiceGetDagTags = (
queryClient: QueryClient,
{
limit,
offset,
orderBy,
tagNamePattern,
}: {
limit?: number;
offset?: number;
orderBy?: string;
tagNamePattern?: string;
} = {},
) =>
queryClient.prefetchQuery({
queryKey: Common.UseDagServiceGetDagTagsKeyFn({
limit,
offset,
orderBy,
tagNamePattern,
}),
queryFn: () =>
DagService.getDagTags({ limit, offset, orderBy, tagNamePattern }),
});
/**
* Get Dag
* Get basic information about a DAG.
Expand Down
44 changes: 44 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,50 @@ export const useDagServiceGetDags = <
}) as TData,
...options,
});
/**
* Get Dag Tags
* Get all DAG tags.
* @param data The data for the request.
* @param data.limit
* @param data.offset
* @param data.orderBy
* @param data.tagNamePattern
* @returns DAGTagCollectionResponse Successful Response
* @throws ApiError
*/
export const useDagServiceGetDagTags = <
TData = Common.DagServiceGetDagTagsDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
limit,
offset,
orderBy,
tagNamePattern,
}: {
limit?: number;
offset?: number;
orderBy?: string;
tagNamePattern?: string;
} = {},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useQuery<TData, TError>({
queryKey: Common.UseDagServiceGetDagTagsKeyFn(
{ limit, offset, orderBy, tagNamePattern },
queryKey,
),
queryFn: () =>
DagService.getDagTags({
limit,
offset,
orderBy,
tagNamePattern,
}) as TData,
...options,
});
/**
* Get Dag
* Get basic information about a DAG.
Expand Down
44 changes: 44 additions & 0 deletions airflow/ui/openapi-gen/queries/suspense.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,50 @@ export const useDagServiceGetDagsSuspense = <
}) as TData,
...options,
});
/**
* Get Dag Tags
* Get all DAG tags.
* @param data The data for the request.
* @param data.limit
* @param data.offset
* @param data.orderBy
* @param data.tagNamePattern
* @returns DAGTagCollectionResponse Successful Response
* @throws ApiError
*/
export const useDagServiceGetDagTagsSuspense = <
TData = Common.DagServiceGetDagTagsDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
limit,
offset,
orderBy,
tagNamePattern,
}: {
limit?: number;
offset?: number;
orderBy?: string;
tagNamePattern?: string;
} = {},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useSuspenseQuery<TData, TError>({
queryKey: Common.UseDagServiceGetDagTagsKeyFn(
{ limit, offset, orderBy, tagNamePattern },
queryKey,
),
queryFn: () =>
DagService.getDagTags({
limit,
offset,
orderBy,
tagNamePattern,
}) as TData,
...options,
});
/**
* Get Dag
* Get basic information about a DAG.
Expand Down
20 changes: 20 additions & 0 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,26 @@ export const $DAGRunTypes = {
description: "DAG Run Types for responses.",
} as const;

export const $DAGTagCollectionResponse = {
properties: {
tags: {
items: {
type: "string",
},
type: "array",
title: "Tags",
},
total_entries: {
type: "integer",
title: "Total Entries",
},
},
type: "object",
required: ["tags", "total_entries"],
title: "DAGTagCollectionResponse",
description: "DAG Tags Collection serializer for responses.",
} as const;

export const $DagProcessorInfoSchema = {
properties: {
status: {
Expand Down
Loading
Loading