Skip to content

Commit

Permalink
Migrate the public endpoint Delete DAG to FastAPI
Browse files Browse the repository at this point in the history
  • Loading branch information
omkar-foss committed Oct 15, 2024
1 parent cc0aad0 commit 50a258e
Show file tree
Hide file tree
Showing 9 changed files with 241 additions and 5 deletions.
2 changes: 2 additions & 0 deletions airflow/api_connexion/endpoints/dag_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from airflow.api_connexion.types import APIResponse, UpdateMask


@mark_fastapi_migration_done
@security.requires_access_dag("GET")
@provide_session
def get_dag(
Expand Down Expand Up @@ -215,6 +216,7 @@ def patch_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_pat
return dags_collection_schema.dump(DAGCollection(dags=dags, total_entries=total_entries))


@mark_fastapi_migration_done
@security.requires_access_dag("DELETE")
@action_logging
@provide_session
Expand Down
49 changes: 49 additions & 0 deletions airflow/api_fastapi/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,55 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
delete:
tags:
- DAG
summary: Delete Dag
description: Delete the specific DAG.
operationId: delete_dag
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
responses:
'200':
description: Successful Response
content:
application/json:
schema: {}
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unprocessable Entity
/public/dags/{dag_id}/details:
get:
tags:
Expand Down
19 changes: 18 additions & 1 deletion airflow/api_fastapi/views/public/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

from __future__ import annotations

from fastapi import Depends, HTTPException, Query, Request
from fastapi import Depends, HTTPException, Query, Request, Response
from sqlalchemy import update
from sqlalchemy.orm import Session
from typing_extensions import Annotated

from airflow.api.common import delete_dag as delete_dag_module
from airflow.api_fastapi.db.common import (
get_session,
paginated_select,
Expand All @@ -48,6 +49,7 @@
DAGResponse,
)
from airflow.api_fastapi.views.router import AirflowRouter
from airflow.exceptions import AirflowException, DagNotFound
from airflow.models import DAG, DagModel

dags_router = AirflowRouter(tags=["DAG"], prefix="/dags")
Expand Down Expand Up @@ -204,3 +206,18 @@ async def patch_dags(
dags=[DAGResponse.model_validate(dag, from_attributes=True) for dag in dags],
total_entries=total_entries,
)


@dags_router.delete("/{dag_id}", responses=create_openapi_http_exception_doc([400, 401, 403, 404, 422]))
async def delete_dag(
dag_id: str,
session: Annotated[Session, Depends(get_session)],
) -> Response:
"""Delete the specific DAG."""
try:
delete_dag_module.delete_dag(dag_id, session=session)
except DagNotFound:
raise HTTPException(404, f"Dag with id: {dag_id} was not found")
except AirflowException:
raise HTTPException(409, f"Task instances of dag with id: '{dag_id}' are still running")
return Response(status_code=204)
3 changes: 3 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ export type DagServicePatchDagMutationResult = Awaited<
export type VariableServicePatchVariableMutationResult = Awaited<
ReturnType<typeof VariableService.patchVariable>
>;
export type DagServiceDeleteDagMutationResult = Awaited<
ReturnType<typeof DagService.deleteDag>
>;
export type ConnectionServiceDeleteConnectionMutationResult = Awaited<
ReturnType<typeof ConnectionService.deleteConnection>
>;
Expand Down
37 changes: 37 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,43 @@ export const useVariableServicePatchVariable = <
}) as unknown as Promise<TData>,
...options,
});
/**
* Delete Dag
* Delete the specific DAG.
* @param data The data for the request.
* @param data.dagId
* @returns unknown Successful Response
* @throws ApiError
*/
export const useDagServiceDeleteDag = <
TData = Common.DagServiceDeleteDagMutationResult,
TError = unknown,
TContext = unknown,
>(
options?: Omit<
UseMutationOptions<
TData,
TError,
{
dagId: string;
},
TContext
>,
"mutationFn"
>,
) =>
useMutation<
TData,
TError,
{
dagId: string;
},
TContext
>({
mutationFn: ({ dagId }) =>
DagService.deleteDag({ dagId }) as unknown as Promise<TData>,
...options,
});
/**
* Delete Connection
* Delete a connection entry.
Expand Down
29 changes: 29 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import type {
GetDagResponse,
PatchDagData,
PatchDagResponse,
DeleteDagData,
DeleteDagResponse,
GetDagDetailsData,
GetDagDetailsResponse,
DeleteConnectionData,
Expand Down Expand Up @@ -234,6 +236,33 @@ export class DagService {
});
}

/**
* Delete Dag
* Delete the specific DAG.
* @param data The data for the request.
* @param data.dagId
* @returns unknown Successful Response
* @throws ApiError
*/
public static deleteDag(
data: DeleteDagData,
): CancelablePromise<DeleteDagResponse> {
return __request(OpenAPI, {
method: "DELETE",
url: "/public/dags/{dag_id}",
path: {
dag_id: data.dagId,
},
errors: {
400: "Bad Request",
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
422: "Unprocessable Entity",
},
});
}

/**
* Get Dag Details
* Get details of DAG.
Expand Down
35 changes: 35 additions & 0 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,12 @@ export type PatchDagData = {

export type PatchDagResponse = DAGResponse;

export type DeleteDagData = {
dagId: string;
};

export type DeleteDagResponse = unknown;

export type GetDagDetailsData = {
dagId: string;
};
Expand Down Expand Up @@ -525,6 +531,35 @@ export type $OpenApiTs = {
422: HTTPValidationError;
};
};
delete: {
req: DeleteDagData;
res: {
/**
* Successful Response
*/
200: unknown;
/**
* Bad Request
*/
400: HTTPExceptionResponse;
/**
* Unauthorized
*/
401: HTTPExceptionResponse;
/**
* Forbidden
*/
403: HTTPExceptionResponse;
/**
* Not Found
*/
404: HTTPExceptionResponse;
/**
* Unprocessable Entity
*/
422: HTTPExceptionResponse;
};
};
};
"/public/dags/{dag_id}/details": {
get: {
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/api_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
RT = TypeVar("RT")


def mark_fastapi_migration_done(function: Callable[PS, RT]) -> Callable[PS, RT]:
def mark_fastapi_migration_done(function: Callable[..., RT]) -> Callable[..., RT]:
"""
Mark an endpoint as migrated over to the new FastAPI API.
Expand Down
70 changes: 67 additions & 3 deletions tests/api_fastapi/views/public/test_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from airflow.models.dagrun import DagRun
from airflow.operators.empty import EmptyOperator
from airflow.utils.session import provide_session
from airflow.utils.state import DagRunState
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType
from tests_common.test_utils.db import clear_db_dags, clear_db_runs, clear_db_serialized_dags

Expand All @@ -36,11 +36,15 @@
DAG2_ID = "test_dag2"
DAG2_START_DATE = datetime(2021, 6, 15, tzinfo=timezone.utc)
DAG3_ID = "test_dag3"
DAG4_ID = "test_dag4"
DAG4_DISPLAY_NAME = "display4"
DAG5_ID = "test_dag5"
DAG5_DISPLAY_NAME = "display5"
TASK_ID = "op1"
UTC_JSON_REPR = "UTC" if pendulum.__version__.startswith("3") else "Timezone('UTC')"
API_PREFIX = "/public/dags"


@provide_session
def _create_deactivated_paused_dag(session=None):
dag_model = DagModel(
dag_id=DAG3_ID,
Expand Down Expand Up @@ -75,6 +79,28 @@ def _create_deactivated_paused_dag(session=None):
session.add(dagrun_success)


def _create_dag_for_deletion(
session,
dag_maker,
dag_id=None,
dag_display_name=None,
has_running_dagruns=False,
):
with dag_maker(
dag_id,
dag_display_name=dag_display_name,
start_date=datetime(2024, 10, 10, tzinfo=timezone.utc),
):
EmptyOperator(task_id="dummy")

if has_running_dagruns:
dr = dag_maker.create_dagrun()
ti = dr.get_task_instances()[0]
ti.set_state(TaskInstanceState.RUNNING)

dag_maker.dagbag.sync_to_db()


@pytest.fixture(autouse=True)
@provide_session
def setup(dag_maker, session=None) -> None:
Expand Down Expand Up @@ -106,11 +132,12 @@ def setup(dag_maker, session=None) -> None:
):
EmptyOperator(task_id=TASK_ID)

_create_deactivated_paused_dag(session)

dag_maker.dagbag.sync_to_db()
dag_maker.dag_model.has_task_concurrency_limits = True
session.merge(dag_maker.dag_model)
session.commit()
_create_deactivated_paused_dag()


class TestGetDags:
Expand Down Expand Up @@ -359,3 +386,40 @@ def test_get_dag(self, test_client, query_params, dag_id, expected_status_code,
"pickle_id": None,
}
assert res_json == expected


class TestDeleteDAG:
@pytest.mark.parametrize(
"dag_id, dag_display_name, status_code_delete, status_code_details, has_running_dagruns, is_create_dag",
[
("test_nonexistent_dag_id", "nonexistent_display_name", 404, 404, False, False),
(DAG4_ID, DAG4_DISPLAY_NAME, 204, 404, False, True),
(DAG5_ID, DAG5_DISPLAY_NAME, 409, 200, True, True),
],
)
@provide_session
def test_delete_dag(
session,
dag_maker,
test_client,
dag_id,
dag_display_name,
status_code_delete,
status_code_details,
has_running_dagruns,
is_create_dag,
):
if is_create_dag:
_create_dag_for_deletion(
session,
dag_maker,
dag_id=dag_id,
dag_display_name=dag_display_name,
has_running_dagruns=has_running_dagruns,
)

delete_response = test_client.delete(f"{API_PREFIX}/{dag_id}")
assert delete_response.status_code == status_code_delete

details_response = test_client.get(f"{API_PREFIX}/{dag_id}/details")
assert details_response.status_code == status_code_details

0 comments on commit 50a258e

Please sign in to comment.