From 2ec7b6dbf400a9f3cb2048072fd7c24c22d11479 Mon Sep 17 00:00:00 2001 From: Omkar P <45419097+omkar-foss@users.noreply.github.com> Date: Fri, 11 Oct 2024 22:13:31 +0530 Subject: [PATCH] Migrate the public endpoint Delete DAG to FastAPI --- airflow/api_fastapi/openapi/v1-generated.yaml | 49 ++++++++++++ airflow/api_fastapi/views/public/dags.py | 19 ++++- airflow/ui/openapi-gen/queries/common.ts | 3 + airflow/ui/openapi-gen/queries/queries.ts | 37 +++++++++ .../ui/openapi-gen/requests/services.gen.ts | 29 +++++++ airflow/ui/openapi-gen/requests/types.gen.ts | 35 +++++++++ tests/api_fastapi/views/public/test_dags.py | 78 +++++++++++++++++-- 7 files changed, 242 insertions(+), 8 deletions(-) diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml b/airflow/api_fastapi/openapi/v1-generated.yaml index 7debfbb1008afd..9047b8ec2a64d5 100644 --- a/airflow/api_fastapi/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/openapi/v1-generated.yaml @@ -408,6 +408,55 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + delete: + tags: + - DAG + summary: Delete Dag + description: Delete the specific DAG. + operationId: delete_dag + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + responses: + '200': + description: Successful Response + content: + application/json: + schema: {} + '400': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Bad Request + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unprocessable Entity /public/dags/{dag_id}/details: get: tags: diff --git a/airflow/api_fastapi/views/public/dags.py b/airflow/api_fastapi/views/public/dags.py index ca0f44162eb2de..eb8233a7f700dc 100644 --- a/airflow/api_fastapi/views/public/dags.py +++ b/airflow/api_fastapi/views/public/dags.py @@ -17,11 +17,12 @@ from __future__ import annotations -from fastapi import Depends, HTTPException, Query, Request +from fastapi import Depends, HTTPException, Query, Request, Response from sqlalchemy import update from sqlalchemy.orm import Session from typing_extensions import Annotated +from airflow.api.common import delete_dag as delete_dag_module from airflow.api_fastapi.db.common import ( get_session, paginated_select, @@ -48,6 +49,7 @@ DAGResponse, ) from airflow.api_fastapi.views.router import AirflowRouter +from airflow.exceptions import AirflowException, DagNotFound from airflow.models import DAG, DagModel dags_router = AirflowRouter(tags=["DAG"], prefix="/dags") @@ -204,3 +206,18 @@ async def patch_dags( dags=[DAGResponse.model_validate(dag, from_attributes=True) for dag in dags], total_entries=total_entries, ) + + +@dags_router.delete("/{dag_id}", responses=create_openapi_http_exception_doc([400, 401, 403, 404, 422])) +async def delete_dag( + dag_id: str, + session: Annotated[Session, Depends(get_session)], +) -> Response: + """Delete the specific DAG.""" + try: + delete_dag_module.delete_dag(dag_id, session=session) + except DagNotFound: + raise HTTPException(404, f"Dag with id: {dag_id} was not found") + except AirflowException: + raise HTTPException(409, f"Task instances of dag with id: '{dag_id}' are still running") + return Response(status_code=204) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index aaff196c0791d5..8f24df817c6e2a 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -191,6 +191,9 @@ export type DagServicePatchDagsMutationResult = Awaited< export type DagServicePatchDagMutationResult = Awaited< ReturnType >; +export type DagServiceDeleteDagMutationResult = Awaited< + ReturnType +>; export type ConnectionServiceDeleteConnectionMutationResult = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 19bb17b342a848..2348f3cb44c890 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -428,6 +428,43 @@ export const useDagServicePatchDag = < }) as unknown as Promise, ...options, }); +/** + * Delete Dag + * Delete the specific DAG. + * @param data The data for the request. + * @param data.dagId + * @returns unknown Successful Response + * @throws ApiError + */ +export const useDagServiceDeleteDag = < + TData = Common.DagServiceDeleteDagMutationResult, + TError = unknown, + TContext = unknown, +>( + options?: Omit< + UseMutationOptions< + TData, + TError, + { + dagId: string; + }, + TContext + >, + "mutationFn" + >, +) => + useMutation< + TData, + TError, + { + dagId: string; + }, + TContext + >({ + mutationFn: ({ dagId }) => + DagService.deleteDag({ dagId }) as unknown as Promise, + ...options, + }); /** * Delete Connection * Delete a connection entry. diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 9a126aef25fbc3..20980af2462993 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -15,6 +15,8 @@ import type { GetDagResponse, PatchDagData, PatchDagResponse, + DeleteDagData, + DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, DeleteConnectionData, @@ -228,6 +230,33 @@ export class DagService { }); } + /** + * Delete Dag + * Delete the specific DAG. + * @param data The data for the request. + * @param data.dagId + * @returns unknown Successful Response + * @throws ApiError + */ + public static deleteDag( + data: DeleteDagData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "DELETE", + url: "/public/dags/{dag_id}", + path: { + dag_id: data.dagId, + }, + errors: { + 400: "Bad Request", + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Unprocessable Entity", + }, + }); + } + /** * Get Dag Details * Get details of DAG. diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 45bfa51aec9c4e..e5c19df36b42d7 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -318,6 +318,12 @@ export type PatchDagData = { export type PatchDagResponse = DAGResponse; +export type DeleteDagData = { + dagId: string; +}; + +export type DeleteDagResponse = unknown; + export type GetDagDetailsData = { dagId: string; }; @@ -495,6 +501,35 @@ export type $OpenApiTs = { 422: HTTPValidationError; }; }; + delete: { + req: DeleteDagData; + res: { + /** + * Successful Response + */ + 200: unknown; + /** + * Bad Request + */ + 400: HTTPExceptionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Unprocessable Entity + */ + 422: HTTPExceptionResponse; + }; + }; }; "/public/dags/{dag_id}/details": { get: { diff --git a/tests/api_fastapi/views/public/test_dags.py b/tests/api_fastapi/views/public/test_dags.py index 13520b37b3ff27..6210dd5c214d40 100644 --- a/tests/api_fastapi/views/public/test_dags.py +++ b/tests/api_fastapi/views/public/test_dags.py @@ -25,7 +25,7 @@ from airflow.models.dagrun import DagRun from airflow.operators.empty import EmptyOperator from airflow.utils.session import provide_session -from airflow.utils.state import DagRunState +from airflow.utils.state import DagRunState, TaskInstanceState from airflow.utils.types import DagRunType from dev.tests_common.test_utils.db import clear_db_dags, clear_db_runs, clear_db_serialized_dags @@ -37,11 +37,15 @@ DAG2_ID = "test_dag2" DAG2_START_DATE = datetime(2021, 6, 15, tzinfo=timezone.utc) DAG3_ID = "test_dag3" +DAG4_ID = "test_dag4" +DAG4_DISPLAY_NAME = "display4" +DAG5_ID = "test_dag5" +DAG5_DISPLAY_NAME = "display5" TASK_ID = "op1" UTC_JSON_REPR = "UTC" if pendulum.__version__.startswith("3") else "Timezone('UTC')" +API_PREFIX = "/public/dags" -@provide_session def _create_deactivated_paused_dag(session=None): dag_model = DagModel( dag_id=DAG3_ID, @@ -76,6 +80,29 @@ def _create_deactivated_paused_dag(session=None): session.add(dagrun_success) +def _create_dag_for_deletion( + session, + dag_maker, + dag_id=None, + dag_display_name=None, + has_running_dagruns=False, +): + with dag_maker( + dag_id, + dag_display_name=dag_display_name, + start_date=datetime(2024, 10, 10, tzinfo=timezone.utc), + ): + EmptyOperator(task_id="dummy") + + if has_running_dagruns: + dr = dag_maker.create_dagrun() + ti = dr.get_task_instances()[0] + ti.set_state(TaskInstanceState.RUNNING) + + dag_maker.dagbag.sync_to_db() + session.commit() + + @pytest.fixture(autouse=True) @provide_session def setup(dag_maker, session=None) -> None: @@ -107,11 +134,12 @@ def setup(dag_maker, session=None) -> None: ): EmptyOperator(task_id=TASK_ID) + _create_deactivated_paused_dag(session) + dag_maker.dagbag.sync_to_db() dag_maker.dag_model.has_task_concurrency_limits = True session.merge(dag_maker.dag_model) session.commit() - _create_deactivated_paused_dag() @pytest.mark.parametrize( @@ -152,7 +180,7 @@ def setup(dag_maker, session=None) -> None: ], ) def test_get_dags(test_client, query_params, expected_total_entries, expected_ids): - response = test_client.get("/public/dags", params=query_params) + response = test_client.get(API_PREFIX, params=query_params) assert response.status_code == 200 body = response.json() @@ -173,7 +201,7 @@ def test_get_dags(test_client, query_params, expected_total_entries, expected_id ], ) def test_patch_dag(test_client, query_params, dag_id, body, expected_status_code, expected_is_paused): - response = test_client.patch(f"/public/dags/{dag_id}", json=body, params=query_params) + response = test_client.patch(f"{API_PREFIX}/{dag_id}", json=body, params=query_params) assert response.status_code == expected_status_code if expected_status_code == 200: @@ -223,7 +251,7 @@ def test_patch_dag(test_client, query_params, dag_id, body, expected_status_code ], ) def test_patch_dags(test_client, query_params, body, expected_status_code, expected_ids, expected_paused_ids): - response = test_client.patch("/public/dags", json=body, params=query_params) + response = test_client.patch(API_PREFIX, json=body, params=query_params) assert response.status_code == expected_status_code if expected_status_code == 200: @@ -241,7 +269,7 @@ def test_patch_dags(test_client, query_params, body, expected_status_code, expec ], ) def test_dag_details(test_client, query_params, dag_id, expected_status_code, dag_display_name, start_date): - response = test_client.get(f"/public/dags/{dag_id}/details", params=query_params) + response = test_client.get(f"{API_PREFIX}/{dag_id}/details", params=query_params) assert response.status_code == expected_status_code if expected_status_code != 200: return @@ -349,3 +377,39 @@ def test_get_dag(test_client, query_params, dag_id, expected_status_code, dag_di "pickle_id": None, } assert res_json == expected + + +@pytest.mark.parametrize( + "dag_id, dag_display_name, status_code_delete, status_code_details, has_running_dagruns, is_create_dag", + [ + ("test_nonexistent_dag_id", "nonexistent_display_name", 404, 404, False, False), + (DAG4_ID, DAG4_DISPLAY_NAME, 204, 404, False, True), + (DAG5_ID, DAG5_DISPLAY_NAME, 409, 200, True, True), + ], +) +@provide_session +def test_delete_dag( + session, + dag_maker, + test_client, + dag_id, + dag_display_name, + status_code_delete, + status_code_details, + has_running_dagruns, + is_create_dag, +): + if is_create_dag: + _create_dag_for_deletion( + session, + dag_maker, + dag_id=dag_id, + dag_display_name=dag_display_name, + has_running_dagruns=has_running_dagruns, + ) + + delete_response = test_client.delete(f"{API_PREFIX}/{dag_id}") + assert delete_response.status_code == status_code_delete + + details_response = test_client.get(f"{API_PREFIX}/{dag_id}/details") + assert details_response.status_code == status_code_details