Skip to content

Commit

Permalink
Reorganize api_fastapi folder into apps (#43062)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaxil authored Oct 16, 2024
1 parent 60c02f5 commit 39a16c0
Show file tree
Hide file tree
Showing 44 changed files with 199 additions and 121 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ repos:
- --fuzzy-match-generates-todo
- id: insert-license
name: Add license for all YAML files except Helm templates
exclude: ^\.github/.*$|^.*/.*_vendor/|^chart/templates/.*|.*/reproducible_build.yaml$|^airflow/api_fastapi/openapi/v1-generated.yaml$|^.*/pnpm-lock.yaml$
exclude: ^\.github/.*$|^.*/.*_vendor/|^chart/templates/.*|.*/reproducible_build.yaml$|^airflow/api_fastapi/core_api/openapi/v1-generated.yaml$|^.*/pnpm-lock.yaml$
types: [yaml]
files: \.ya?ml$
args:
Expand Down
79 changes: 5 additions & 74 deletions airflow/api_fastapi/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,25 @@
from __future__ import annotations

import logging
import os
from pathlib import Path
from typing import cast

from fastapi import FastAPI, Request
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates

from airflow.settings import AIRFLOW_PATH
from airflow.www.extensions.init_dagbag import get_dag_bag
from airflow.api_fastapi.core_api.app import init_dag_bag, init_plugins, init_views

log = logging.getLogger(__name__)

app: FastAPI | None = None


def init_dag_bag(app: FastAPI) -> None:
"""
Create global DagBag for the FastAPI application.
To access it use ``request.app.state.dag_bag``.
"""
app.state.dag_bag = get_dag_bag()


def create_app() -> FastAPI:
def create_app(apps: list[str] | None = None) -> FastAPI:
from airflow.configuration import conf

app = FastAPI(
title="Airflow API",
description="Airflow API. All endpoints located under ``/public`` can be used safely, are stable and backward compatible. "
"Endpoints located under ``/ui`` are dedicated to the UI and are subject to breaking change "
"depending on the need of the frontend. Users should not rely on those but use the public ones instead."
"depending on the need of the frontend. Users should not rely on those but use the public ones instead.",
)

init_dag_bag(app)
Expand All @@ -75,60 +60,6 @@ def create_app() -> FastAPI:
return app


def init_views(app: FastAPI) -> None:
"""Init views by registering the different routers."""
from airflow.api_fastapi.routes.public import public_router
from airflow.api_fastapi.routes.ui import ui_router

app.include_router(ui_router)
app.include_router(public_router)

dev_mode = os.environ.get("DEV_MODE", False) == "true"

directory = Path(AIRFLOW_PATH) / ("airflow/ui/dev" if dev_mode else "airflow/ui/dist")

# During python tests or when the backend is run without having the frontend build
# those directories might not exist. App should not fail initializing in those scenarios.
Path(directory).mkdir(exist_ok=True)

templates = Jinja2Templates(directory=directory)

app.mount(
"/static",
StaticFiles(
directory=directory,
html=True,
),
name="webapp_static_folder",
)

@app.get("/webapp/{rest_of_path:path}", response_class=HTMLResponse, include_in_schema=False)
def webapp(request: Request, rest_of_path: str):
return templates.TemplateResponse("/index.html", {"request": request}, media_type="text/html")


def init_plugins(app: FastAPI) -> None:
"""Integrate FastAPI app plugins."""
from airflow import plugins_manager

plugins_manager.initialize_fastapi_plugins()

# After calling initialize_fastapi_plugins, fastapi_apps cannot be None anymore.
for subapp_dict in cast(list, plugins_manager.fastapi_apps):
name = subapp_dict.get("name")
subapp = subapp_dict.get("app")
if subapp is None:
log.error("'app' key is missing for the fastapi app: %s", name)
continue
url_prefix = subapp_dict.get("url_prefix")
if url_prefix is None:
log.error("'url_prefix' key is missing for the fastapi app: %s", name)
continue

log.debug("Adding subapplication %s under prefix %s", name, url_prefix)
app.mount(url_prefix, subapp)


def cached_app(config=None, testing=False) -> FastAPI:
"""Return cached instance of Airflow UI app."""
global app
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from sqlalchemy.orm import Session
from sqlalchemy.sql import Select

from airflow.api_fastapi.parameters import BaseParam
from airflow.api_fastapi.common.parameters import BaseParam


async def get_session() -> Session:
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
96 changes: 96 additions & 0 deletions airflow/api_fastapi/core_api/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import logging
import os
from pathlib import Path
from typing import cast

from fastapi import FastAPI
from starlette.requests import Request
from starlette.responses import HTMLResponse
from starlette.staticfiles import StaticFiles
from starlette.templating import Jinja2Templates

from airflow.settings import AIRFLOW_PATH
from airflow.www.extensions.init_dagbag import get_dag_bag

log = logging.getLogger(__name__)


def init_dag_bag(app: FastAPI) -> None:
"""
Create global DagBag for the FastAPI application.
To access it use ``request.app.state.dag_bag``.
"""
app.state.dag_bag = get_dag_bag()


def init_views(app: FastAPI) -> None:
"""Init views by registering the different routers."""
from airflow.api_fastapi.core_api.routes.public import public_router
from airflow.api_fastapi.core_api.routes.ui import ui_router

app.include_router(ui_router)
app.include_router(public_router)

dev_mode = os.environ.get("DEV_MODE", False) == "true"

directory = Path(AIRFLOW_PATH) / ("airflow/ui/dev" if dev_mode else "airflow/ui/dist")

# During python tests or when the backend is run without having the frontend build
# those directories might not exist. App should not fail initializing in those scenarios.
Path(directory).mkdir(exist_ok=True)

templates = Jinja2Templates(directory=directory)

app.mount(
"/static",
StaticFiles(
directory=directory,
html=True,
),
name="webapp_static_folder",
)

@app.get("/webapp/{rest_of_path:path}", response_class=HTMLResponse, include_in_schema=False)
def webapp(request: Request, rest_of_path: str):
return templates.TemplateResponse("/index.html", {"request": request}, media_type="text/html")


def init_plugins(app: FastAPI) -> None:
"""Integrate FastAPI app plugins."""
from airflow import plugins_manager

plugins_manager.initialize_fastapi_plugins()

# After calling initialize_fastapi_plugins, fastapi_apps cannot be None anymore.
for subapp_dict in cast(list, plugins_manager.fastapi_apps):
name = subapp_dict.get("name")
subapp = subapp_dict.get("app")
if subapp is None:
log.error("'app' key is missing for the fastapi app: %s", name)
continue
url_prefix = subapp_dict.get("url_prefix")
if url_prefix is None:
log.error("'url_prefix' key is missing for the fastapi app: %s", name)
continue

log.debug("Adding subapplication %s under prefix %s", name, url_prefix)
app.mount(url_prefix, subapp)
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
openapi: 3.1.0
info:
title: FastAPI
title: Airflow API
description: Airflow API. All endpoints located under ``/public`` can be used safely,
are stable and backward compatible. Endpoints located under ``/ui`` are dedicated
to the UI and are subject to breaking change depending on the need of the frontend.
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

from __future__ import annotations

from airflow.api_fastapi.routes.public.connections import connections_router
from airflow.api_fastapi.routes.public.dag_run import dag_run_router
from airflow.api_fastapi.routes.public.dags import dags_router
from airflow.api_fastapi.routes.public.monitor import monitor_router
from airflow.api_fastapi.routes.public.variables import variables_router
from airflow.api_fastapi.routes.router import AirflowRouter
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.routes.public.connections import connections_router
from airflow.api_fastapi.core_api.routes.public.dag_run import dag_run_router
from airflow.api_fastapi.core_api.routes.public.dags import dags_router
from airflow.api_fastapi.core_api.routes.public.monitor import monitor_router
from airflow.api_fastapi.core_api.routes.public.variables import variables_router

public_router = AirflowRouter(prefix="/public")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
from sqlalchemy.orm import Session
from typing_extensions import Annotated

from airflow.api_fastapi.db.common import get_session, paginated_select
from airflow.api_fastapi.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.parameters import QueryLimit, QueryOffset, SortParam
from airflow.api_fastapi.routes.router import AirflowRouter
from airflow.api_fastapi.serializers.connections import ConnectionCollectionResponse, ConnectionResponse
from airflow.api_fastapi.common.db.common import get_session, paginated_select
from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset, SortParam
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.connections import (
ConnectionCollectionResponse,
ConnectionResponse,
)
from airflow.models import Connection

connections_router = AirflowRouter(tags=["Connection"], prefix="/connections")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
from sqlalchemy.orm import Session
from typing_extensions import Annotated

from airflow.api_fastapi.db.common import get_session
from airflow.api_fastapi.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.routes.router import AirflowRouter
from airflow.api_fastapi.serializers.dag_run import DAGRunResponse
from airflow.api_fastapi.common.db.common import get_session
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.dag_run import DAGRunResponse
from airflow.models import DagRun

dag_run_router = AirflowRouter(tags=["DagRun"], prefix="/dags/{dag_id}/dagRuns")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@
from typing_extensions import Annotated

from airflow.api.common import delete_dag as delete_dag_module
from airflow.api_fastapi.db.common import (
from airflow.api_fastapi.common.db.common import (
get_session,
paginated_select,
)
from airflow.api_fastapi.db.dags import dags_select_with_latest_dag_run
from airflow.api_fastapi.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.parameters import (
from airflow.api_fastapi.common.db.dags import dags_select_with_latest_dag_run
from airflow.api_fastapi.common.parameters import (
QueryDagDisplayNamePatternSearch,
QueryDagIdPatternSearch,
QueryDagIdPatternSearchWithNone,
Expand All @@ -42,8 +41,9 @@
QueryTagsFilter,
SortParam,
)
from airflow.api_fastapi.routes.router import AirflowRouter
from airflow.api_fastapi.serializers.dags import (
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.dags import (
DAGCollectionResponse,
DAGDetailsResponse,
DAGPatchBody,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
from __future__ import annotations

from airflow.api.common.airflow_health import get_airflow_health
from airflow.api_fastapi.routes.router import AirflowRouter
from airflow.api_fastapi.serializers.monitor import HealthInfoSchema
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.serializers.monitor import HealthInfoSchema

monitor_router = AirflowRouter(tags=["Monitor"], prefix="/monitor")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
from sqlalchemy.orm import Session
from typing_extensions import Annotated

from airflow.api_fastapi.db.common import get_session
from airflow.api_fastapi.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.routes.router import AirflowRouter
from airflow.api_fastapi.serializers.variables import VariableBody, VariableResponse
from airflow.api_fastapi.common.db.common import get_session
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.variables import VariableBody, VariableResponse
from airflow.models.variable import Variable

variables_router = AirflowRouter(tags=["Variable"], prefix="/variables")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
# under the License.
from __future__ import annotations

from airflow.api_fastapi.routes.router import AirflowRouter
from airflow.api_fastapi.routes.ui.assets import assets_router
from airflow.api_fastapi.routes.ui.dashboard import dashboard_router
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.routes.ui.assets import assets_router
from airflow.api_fastapi.core_api.routes.ui.dashboard import dashboard_router

ui_router = AirflowRouter(prefix="/ui")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
from sqlalchemy.orm import Session
from typing_extensions import Annotated

from airflow.api_fastapi.db.common import get_session
from airflow.api_fastapi.routes.router import AirflowRouter
from airflow.api_fastapi.common.db.common import get_session
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.models import DagModel
from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel, DagScheduleAssetReference

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@
from sqlalchemy.orm import Session
from typing_extensions import Annotated

from airflow.api_fastapi.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.parameters import DateTimeQuery
from airflow.api_fastapi.serializers.dashboard import HistoricalMetricDataResponse
from airflow.api_fastapi.common.parameters import DateTimeQuery
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.dashboard import HistoricalMetricDataResponse
from airflow.models.dagrun import DagRun, DagRunType
from airflow.models.taskinstance import TaskInstance
from airflow.utils.state import DagRunState, TaskInstanceState

if TYPE_CHECKING:
from sqlalchemy.orm import Session
from airflow.api_fastapi.db.common import get_session
from airflow.api_fastapi.routes.router import AirflowRouter
from airflow.api_fastapi.common.db.common import get_session
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.utils import timezone

dashboard_router = AirflowRouter(tags=["Dashboard"])
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion airflow/ui/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"lint:fix": "eslint --fix && tsc --p tsconfig.app.json",
"format": "pnpm prettier --write .",
"preview": "vite preview",
"codegen": "openapi-rq -i \"../api_fastapi/openapi/v1-generated.yaml\" -c axios --format prettier -o openapi-gen --operationId",
"codegen": "openapi-rq -i \"../api_fastapi/core_api/openapi/v1-generated.yaml\" -c axios --format prettier -o openapi-gen --operationId",
"test": "vitest run",
"coverage": "vitest run --coverage"
},
Expand Down
Loading

0 comments on commit 39a16c0

Please sign in to comment.