Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-84 Migrate the public endpoint Get DAG to FastAPI #42848

Merged
merged 2 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 56 additions & 6 deletions airflow/api_fastapi/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -252,13 +252,13 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/details:
/public/dags/{dag_id}:
get:
tags:
- DAG
summary: Get Dag Details
description: Get details of DAG.
operationId: get_dag_details
summary: Get Dag
description: Get basic information about a DAG.
operationId: get_dag
parameters:
- name: dag_id
in: path
Expand All @@ -272,7 +272,7 @@ paths:
content:
application/json:
schema:
$ref: '#/components/schemas/DAGDetailsResponse'
$ref: '#/components/schemas/DAGResponse'
'400':
content:
application/json:
Expand Down Expand Up @@ -303,7 +303,6 @@ paths:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unprocessable Entity
/public/dags/{dag_id}:
patch:
tags:
- DAG
Expand Down Expand Up @@ -370,6 +369,57 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/details:
get:
tags:
- DAG
summary: Get Dag Details
description: Get details of DAG.
operationId: get_dag_details
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/DAGDetailsResponse'
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unprocessable Entity
/public/connections/{connection_id}:
delete:
tags:
Expand Down
40 changes: 23 additions & 17 deletions airflow/api_fastapi/serializers/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
from itsdangerous import URLSafeSerializer
from pendulum.tz.timezone import FixedTimezone, Timezone
from pydantic import (
AliasChoices,
AliasGenerator,
BaseModel,
Field,
ConfigDict,
computed_field,
field_validator,
)
Expand Down Expand Up @@ -77,6 +77,14 @@ def get_owners(cls, v: Any) -> list[str] | None:
return v.split(",")
return v

@field_validator("timetable_summary", mode="before")
@classmethod
def get_timetable_summary(cls, tts: str | None) -> str | None:
"""Validate the string representation of timetable_summary."""
if tts is None or tts == "None":
return None
return str(tts)

# Mypy issue https://github.com/python/mypy/issues/1362
@computed_field # type: ignore[misc]
@property
Expand All @@ -103,9 +111,7 @@ class DAGDetailsResponse(DAGResponse):
"""Specific serializer for DAG Details responses."""

catchup: bool
dag_run_timeout: timedelta | None = Field(
validation_alias=AliasChoices("dag_run_timeout", "dagrun_timeout")
)
dag_run_timeout: timedelta | None
dataset_expression: dict | None
doc_md: str | None
start_date: datetime | None
Expand All @@ -114,11 +120,19 @@ class DAGDetailsResponse(DAGResponse):
orientation: str
params: abc.MutableMapping | None
render_template_as_native_obj: bool
template_search_path: Iterable[str] | None = Field(
validation_alias=AliasChoices("template_search_path", "template_searchpath")
)
template_search_path: Iterable[str] | None
timezone: str | None
last_parsed: datetime | None = Field(validation_alias=AliasChoices("last_parsed", "last_loaded"))
last_parsed: datetime | None

model_config = ConfigDict(
alias_generator=AliasGenerator(
validation_alias=lambda field_name: {
"dag_run_timeout": "dagrun_timeout",
"last_parsed": "last_loaded",
"template_search_path": "template_searchpath",
}.get(field_name, field_name),
)
)

@field_validator("timezone", mode="before")
@classmethod
Expand All @@ -128,14 +142,6 @@ def get_timezone(cls, tz: Timezone | FixedTimezone) -> str | None:
return None
return str(tz)

@field_validator("timetable_summary", mode="before")
@classmethod
def get_timetable_summary(cls, tts: str | None) -> str | None:
"""Validate the string representation of timetable_summary."""
if tts is None or tts == "None":
return None
return str(tts)

@field_validator("params", mode="before")
@classmethod
def get_params(cls, params: abc.MutableMapping | None) -> dict | None:
Expand Down
20 changes: 20 additions & 0 deletions airflow/api_fastapi/views/public/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,26 @@ async def get_dags(
)


@dags_router.get("/{dag_id}", responses=create_openapi_http_exception_doc([400, 401, 403, 404, 422]))
async def get_dag(
dag_id: str, session: Annotated[Session, Depends(get_session)], request: Request
) -> DAGResponse:
"""Get basic information about a DAG."""
dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
if not dag:
raise HTTPException(404, f"Dag with id {dag_id} was not found")

dag_model: DagModel = session.get(DagModel, dag_id)
if not dag_model:
raise HTTPException(404, f"Unable to obtain dag with id {dag_id} from session")

for key, value in dag.__dict__.items():
if not key.startswith("_") and not hasattr(dag_model, key):
setattr(dag_model, key, value)

return DAGResponse.model_validate(dag_model, from_attributes=True)


@dags_router.get("/{dag_id}/details", responses=create_openapi_http_exception_doc([400, 401, 403, 404, 422]))
async def get_dag_details(
dag_id: str, session: Annotated[Session, Depends(get_session)], request: Request
Expand Down
16 changes: 16 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,22 @@ export const UseDagServiceGetDagsKeyFn = (
},
]),
];
export type DagServiceGetDagDefaultResponse = Awaited<
ReturnType<typeof DagService.getDag>
>;
export type DagServiceGetDagQueryResult<
TData = DagServiceGetDagDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useDagServiceGetDagKey = "DagServiceGetDag";
export const UseDagServiceGetDagKeyFn = (
{
dagId,
}: {
dagId: string;
},
queryKey?: Array<unknown>,
) => [useDagServiceGetDagKey, ...(queryKey ?? [{ dagId }])];
export type DagServiceGetDagDetailsDefaultResponse = Awaited<
ReturnType<typeof DagService.getDagDetails>
>;
Expand Down
20 changes: 20 additions & 0 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,26 @@ export const prefetchUseDagServiceGetDags = (
tags,
}),
});
/**
* Get Dag
* Get basic information about a DAG.
* @param data The data for the request.
* @param data.dagId
* @returns DAGResponse Successful Response
* @throws ApiError
*/
export const prefetchUseDagServiceGetDag = (
queryClient: QueryClient,
{
dagId,
}: {
dagId: string;
},
) =>
queryClient.prefetchQuery({
queryKey: Common.UseDagServiceGetDagKeyFn({ dagId }),
queryFn: () => DagService.getDag({ dagId }),
});
/**
* Get Dag Details
* Get details of DAG.
Expand Down
26 changes: 26 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,32 @@ export const useDagServiceGetDags = <
}) as TData,
...options,
});
/**
* Get Dag
* Get basic information about a DAG.
* @param data The data for the request.
* @param data.dagId
* @returns DAGResponse Successful Response
* @throws ApiError
*/
export const useDagServiceGetDag = <
TData = Common.DagServiceGetDagDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
dagId,
}: {
dagId: string;
},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useQuery<TData, TError>({
queryKey: Common.UseDagServiceGetDagKeyFn({ dagId }, queryKey),
queryFn: () => DagService.getDag({ dagId }) as TData,
...options,
});
/**
* Get Dag Details
* Get details of DAG.
Expand Down
26 changes: 26 additions & 0 deletions airflow/ui/openapi-gen/queries/suspense.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,32 @@ export const useDagServiceGetDagsSuspense = <
}) as TData,
...options,
});
/**
* Get Dag
* Get basic information about a DAG.
* @param data The data for the request.
* @param data.dagId
* @returns DAGResponse Successful Response
* @throws ApiError
*/
export const useDagServiceGetDagSuspense = <
TData = Common.DagServiceGetDagDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
dagId,
}: {
dagId: string;
},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useSuspenseQuery<TData, TError>({
queryKey: Common.UseDagServiceGetDagKeyFn({ dagId }, queryKey),
queryFn: () => DagService.getDag({ dagId }) as TData,
...options,
});
/**
* Get Dag Details
* Get details of DAG.
Expand Down
45 changes: 36 additions & 9 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import type {
GetDagsResponse,
PatchDagsData,
PatchDagsResponse,
GetDagDetailsData,
GetDagDetailsResponse,
GetDagData,
GetDagResponse,
PatchDagData,
PatchDagResponse,
GetDagDetailsData,
GetDagDetailsResponse,
DeleteConnectionData,
DeleteConnectionResponse,
GetConnectionData,
Expand Down Expand Up @@ -134,19 +136,17 @@ export class DagService {
}

/**
* Get Dag Details
* Get details of DAG.
* Get Dag
* Get basic information about a DAG.
* @param data The data for the request.
* @param data.dagId
* @returns DAGDetailsResponse Successful Response
* @returns DAGResponse Successful Response
* @throws ApiError
*/
public static getDagDetails(
data: GetDagDetailsData,
): CancelablePromise<GetDagDetailsResponse> {
public static getDag(data: GetDagData): CancelablePromise<GetDagResponse> {
return __request(OpenAPI, {
method: "GET",
url: "/public/dags/{dag_id}/details",
url: "/public/dags/{dag_id}",
path: {
dag_id: data.dagId,
},
Expand Down Expand Up @@ -193,6 +193,33 @@ export class DagService {
},
});
}

/**
* Get Dag Details
* Get details of DAG.
* @param data The data for the request.
* @param data.dagId
* @returns DAGDetailsResponse Successful Response
* @throws ApiError
*/
public static getDagDetails(
data: GetDagDetailsData,
): CancelablePromise<GetDagDetailsResponse> {
return __request(OpenAPI, {
method: "GET",
url: "/public/dags/{dag_id}/details",
path: {
dag_id: data.dagId,
},
errors: {
400: "Bad Request",
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
422: "Unprocessable Entity",
},
});
}
}

export class ConnectionService {
Expand Down
Loading