Skip to content

Commit

Permalink
Migrate the public endpoint Delete DAG to FastAPI
Browse files Browse the repository at this point in the history
  • Loading branch information
omkar-foss committed Oct 10, 2024
1 parent 420b24a commit 2978c05
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 10 deletions.
49 changes: 49 additions & 0 deletions airflow/api_fastapi/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,55 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
delete:
tags:
- DAG
summary: Delete Dag
description: Delete the specific DAG.
operationId: delete_dag
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
responses:
'200':
description: Successful Response
content:
application/json:
schema: {}
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unprocessable Entity
/public/connections/{connection_id}:
delete:
tags:
Expand Down
19 changes: 18 additions & 1 deletion airflow/api_fastapi/views/public/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

from __future__ import annotations

from fastapi import Depends, HTTPException, Query, Request
from fastapi import Depends, HTTPException, Query, Request, Response
from sqlalchemy import update
from sqlalchemy.orm import Session
from typing_extensions import Annotated

from airflow.api.common import delete_dag as delete_dag_module
from airflow.api_fastapi.db.common import (
get_session,
paginated_select,
Expand All @@ -48,6 +49,7 @@
DAGResponse,
)
from airflow.api_fastapi.views.router import AirflowRouter
from airflow.exceptions import AirflowException, DagNotFound
from airflow.models import DAG, DagModel

dags_router = AirflowRouter(tags=["DAG"], prefix="/dags")
Expand Down Expand Up @@ -184,3 +186,18 @@ async def patch_dags(
dags=[DAGResponse.model_validate(dag, from_attributes=True) for dag in dags],
total_entries=total_entries,
)


@dags_router.delete("/{dag_id}", responses=create_openapi_http_exception_doc([400, 401, 403, 404, 422]))
async def delete_dag(
dag_id: str,
session: Annotated[Session, Depends(get_session)],
) -> Response:
"""Delete the specific DAG."""
try:
delete_dag_module.delete_dag(dag_id, session=session)
except DagNotFound:
raise HTTPException(404, f"Dag with id: {dag_id} was not found")
except AirflowException:
raise HTTPException(409, f"Task instances of dag with id: '{dag_id}' are still running")
return Response(status_code=204)
3 changes: 3 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ export type DagServicePatchDagsMutationResult = Awaited<
export type DagServicePatchDagMutationResult = Awaited<
ReturnType<typeof DagService.patchDag>
>;
export type DagServiceDeleteDagMutationResult = Awaited<
ReturnType<typeof DagService.deleteDag>
>;
export type ConnectionServiceDeleteConnectionMutationResult = Awaited<
ReturnType<typeof ConnectionService.deleteConnection>
>;
Expand Down
37 changes: 37 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,43 @@ export const useDagServicePatchDag = <
}) as unknown as Promise<TData>,
...options,
});
/**
* Delete Dag
* Delete the specific DAG.
* @param data The data for the request.
* @param data.dagId
* @returns unknown Successful Response
* @throws ApiError
*/
export const useDagServiceDeleteDag = <
TData = Common.DagServiceDeleteDagMutationResult,
TError = unknown,
TContext = unknown,
>(
options?: Omit<
UseMutationOptions<
TData,
TError,
{
dagId: string;
},
TContext
>,
"mutationFn"
>,
) =>
useMutation<
TData,
TError,
{
dagId: string;
},
TContext
>({
mutationFn: ({ dagId }) =>
DagService.deleteDag({ dagId }) as unknown as Promise<TData>,
...options,
});
/**
* Delete Connection
* Delete a connection entry.
Expand Down
29 changes: 29 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import type {
GetDagDetailsResponse,
PatchDagData,
PatchDagResponse,
DeleteDagData,
DeleteDagResponse,
DeleteConnectionData,
DeleteConnectionResponse,
GetConnectionData,
Expand Down Expand Up @@ -225,6 +227,33 @@ export class DagService {
},
});
}

/**
* Delete Dag
* Delete the specific DAG.
* @param data The data for the request.
* @param data.dagId
* @returns unknown Successful Response
* @throws ApiError
*/
public static deleteDag(
data: DeleteDagData,
): CancelablePromise<DeleteDagResponse> {
return __request(OpenAPI, {
method: "DELETE",
url: "/public/dags/{dag_id}",
path: {
dag_id: data.dagId,
},
errors: {
400: "Bad Request",
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
422: "Unprocessable Entity",
},
});
}
}

export class ConnectionService {
Expand Down
35 changes: 35 additions & 0 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,12 @@ export type PatchDagData = {

export type PatchDagResponse = DAGResponse;

export type DeleteDagData = {
dagId: string;
};

export type DeleteDagResponse = unknown;

export type DeleteConnectionData = {
connectionId: string;
};
Expand Down Expand Up @@ -440,6 +446,35 @@ export type $OpenApiTs = {
422: HTTPValidationError;
};
};
delete: {
req: DeleteDagData;
res: {
/**
* Successful Response
*/
200: unknown;
/**
* Bad Request
*/
400: HTTPExceptionResponse;
/**
* Unauthorized
*/
401: HTTPExceptionResponse;
/**
* Forbidden
*/
403: HTTPExceptionResponse;
/**
* Not Found
*/
404: HTTPExceptionResponse;
/**
* Unprocessable Entity
*/
422: HTTPExceptionResponse;
};
};
};
"/public/connections/{connection_id}": {
delete: {
Expand Down
82 changes: 73 additions & 9 deletions tests/api_fastapi/views/public/test_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from airflow.models.dagrun import DagRun
from airflow.operators.empty import EmptyOperator
from airflow.utils.session import provide_session
from airflow.utils.state import DagRunState
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType

from dev.tests_common.test_utils.db import clear_db_dags, clear_db_runs, clear_db_serialized_dags
Expand All @@ -38,11 +38,15 @@
DAG2_DISPLAY_NAME = "display2"
DAG2_START_DATE = datetime(2021, 6, 15, tzinfo=timezone.utc)
DAG3_ID = "test_dag3"
DAG4_ID = "test_dag4"
DAG4_DISPLAY_NAME = "display4"
DAG5_ID = "test_dag5"
DAG5_DISPLAY_NAME = "display5"
TASK_ID = "op1"
UTC_JSON_REPR = "UTC" if pendulum.__version__.startswith("3") else "Timezone('UTC')"
API_PREFIX = "/public/dags"


@provide_session
def _create_deactivated_paused_dag(session=None):
dag_model = DagModel(
dag_id=DAG3_ID,
Expand Down Expand Up @@ -77,6 +81,29 @@ def _create_deactivated_paused_dag(session=None):
session.add(dagrun_success)


def _create_dag_for_deletion(
session,
dag_maker,
dag_id=None,
dag_display_name=None,
has_running_dagruns=False,
):
with dag_maker(
dag_id,
dag_display_name=dag_display_name,
start_date=datetime(2024, 10, 10, tzinfo=timezone.utc),
):
EmptyOperator(task_id="dummy")

if has_running_dagruns:
dr = dag_maker.create_dagrun()
ti = dr.get_task_instances()[0]
ti.set_state(TaskInstanceState.RUNNING)

dag_maker.dagbag.sync_to_db()
session.commit()


@pytest.fixture(autouse=True)
@provide_session
def setup(dag_maker, session=None) -> None:
Expand Down Expand Up @@ -109,11 +136,12 @@ def setup(dag_maker, session=None) -> None:
):
EmptyOperator(task_id=TASK_ID)

_create_deactivated_paused_dag(session)

dag_maker.dagbag.sync_to_db()
dag_maker.dag_model.has_task_concurrency_limits = True
session.merge(dag_maker.dag_model)
session.commit()
_create_deactivated_paused_dag()


@pytest.mark.parametrize(
Expand Down Expand Up @@ -154,7 +182,7 @@ def setup(dag_maker, session=None) -> None:
],
)
def test_get_dags(test_client, query_params, expected_total_entries, expected_ids):
response = test_client.get("/public/dags", params=query_params)
response = test_client.get(API_PREFIX, params=query_params)

assert response.status_code == 200
body = response.json()
Expand All @@ -175,7 +203,7 @@ def test_get_dags(test_client, query_params, expected_total_entries, expected_id
],
)
def test_patch_dag(test_client, query_params, dag_id, body, expected_status_code, expected_is_paused):
response = test_client.patch(f"/public/dags/{dag_id}", json=body, params=query_params)
response = test_client.patch(f"{API_PREFIX}/{dag_id}", json=body, params=query_params)

assert response.status_code == expected_status_code
if expected_status_code == 200:
Expand Down Expand Up @@ -225,7 +253,7 @@ def test_patch_dag(test_client, query_params, dag_id, body, expected_status_code
],
)
def test_patch_dags(test_client, query_params, body, expected_status_code, expected_ids, expected_paused_ids):
response = test_client.patch("/public/dags", json=body, params=query_params)
response = test_client.patch(API_PREFIX, json=body, params=query_params)

assert response.status_code == expected_status_code
if expected_status_code == 200:
Expand All @@ -238,12 +266,12 @@ def test_patch_dags(test_client, query_params, body, expected_status_code, expec
@pytest.mark.parametrize(
"query_params, dag_id, expected_status_code, dag_display_name, start_date",
[
({}, "fake_dag_id", 404, "fake_dag", datetime(2023, 12, 31, tzinfo=timezone.utc)),
({}, DAG2_ID, 200, DAG2_DISPLAY_NAME, DAG2_START_DATE),
({}, "fake_dag_id", 404, "fake_dag", datetime(2021, 6, 15, tzinfo=timezone.utc)),
({}, DAG2_ID, 200, DAG2_DISPLAY_NAME, datetime(2021, 6, 15, tzinfo=timezone.utc)),
],
)
def test_dag_details(test_client, query_params, dag_id, expected_status_code, dag_display_name, start_date):
response = test_client.get(f"/public/dags/{dag_id}/details", params=query_params)
response = test_client.get(f"{API_PREFIX}/{dag_id}/details", params=query_params)
assert response.status_code == expected_status_code
if expected_status_code != 200:
return
Expand Down Expand Up @@ -303,3 +331,39 @@ def test_dag_details(test_client, query_params, dag_id, expected_status_code, da
"timezone": UTC_JSON_REPR,
}
assert res_json == expected


@pytest.mark.parametrize(
"dag_id, dag_display_name, status_code_delete, status_code_details, has_running_dagruns, is_create_dag",
[
("test_nonexistent_dag_id", "nonexistent_display_name", 404, 404, False, False),
(DAG4_ID, DAG4_DISPLAY_NAME, 204, 404, False, True),
(DAG5_ID, DAG5_DISPLAY_NAME, 409, 200, True, True),
],
)
@provide_session
def test_delete_dag(
session,
dag_maker,
test_client,
dag_id,
dag_display_name,
status_code_delete,
status_code_details,
has_running_dagruns,
is_create_dag,
):
if is_create_dag:
_create_dag_for_deletion(
session,
dag_maker,
dag_id=dag_id,
dag_display_name=dag_display_name,
has_running_dagruns=has_running_dagruns,
)

delete_response = test_client.delete(f"{API_PREFIX}/{dag_id}")
assert delete_response.status_code == status_code_delete

details_response = test_client.get(f"{API_PREFIX}/{dag_id}/details")
assert details_response.status_code == status_code_details

0 comments on commit 2978c05

Please sign in to comment.