Skip to content

Commit

Permalink
Add simple "Task API" server
Browse files Browse the repository at this point in the history
[skip ci]

closes apache#43009
  • Loading branch information
kaxil committed Oct 16, 2024
1 parent 69e4a5e commit 93fbd45
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 5 deletions.
22 changes: 18 additions & 4 deletions airflow/api_fastapi/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
from __future__ import annotations

import logging
import os

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from airflow.api_fastapi.core_api.app import init_dag_bag, init_plugins, init_views
from airflow.api_fastapi.execution_api.app import create_task_execution_api_app

log = logging.getLogger(__name__)

Expand All @@ -31,18 +33,30 @@
def create_app(apps: list[str] | None = None) -> FastAPI:
from airflow.configuration import conf

# TODO: Need to figure out how to get the 'apps' from the command line
# and pass it to the create_app function
# Passing it to the gunicorn command does not seem straightforward
# One possible option is to pass via environment variable (--env to gunicorn)
# https://docs.gunicorn.org/en/stable/settings.html#raw-env
apps_env = os.environ.get("AIRFLOW__API__APPS")
apps_list = apps_env.split(",") if apps_env else []
apps = apps or apps_list

app = FastAPI(
title="Airflow API",
description="Airflow API. All endpoints located under ``/public`` can be used safely, are stable and backward compatible. "
"Endpoints located under ``/ui`` are dedicated to the UI and are subject to breaking change "
"depending on the need of the frontend. Users should not rely on those but use the public ones instead.",
)

init_dag_bag(app)

init_views(app)
if not apps or "ui" in apps:
init_dag_bag(app)
init_views(app)
init_plugins(app)

init_plugins(app)
if not apps or "task-sdk" in apps:
task_exec_api_app = create_task_execution_api_app(app)
app.mount("/execution", task_exec_api_app)

allow_origins = conf.getlist("api", "access_control_allow_origins")
allow_methods = conf.getlist("api", "access_control_allow_methods")
Expand Down
16 changes: 16 additions & 0 deletions airflow/api_fastapi/execution_api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
34 changes: 34 additions & 0 deletions airflow/api_fastapi/execution_api/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from fastapi import FastAPI


def create_task_execution_api_app(app: FastAPI) -> FastAPI:
"""Create FastAPI app for task execution API."""
from airflow.api_fastapi.execution_api.routes import execution_api_router

task_exec_api_app = FastAPI(
title="Airflow Task Execution API",
description="The private Airflow Task Execution API.",
include_in_schema=False,
)

task_exec_api_app.include_router(execution_api_router)
return task_exec_api_app
23 changes: 23 additions & 0 deletions airflow/api_fastapi/execution_api/routes/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.execution_api.routes.health import health_router

execution_api_router = AirflowRouter()
execution_api_router.include_router(health_router)
27 changes: 27 additions & 0 deletions airflow/api_fastapi/execution_api/routes/health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from airflow.api_fastapi.common.router import AirflowRouter

health_router = AirflowRouter(tags=["Task SDK"])


@health_router.get("/health")
async def health() -> dict:
return {"status": "healthy"}
4 changes: 4 additions & 0 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,9 @@ def string_lower_type(val):
("-L", "--access-logformat"),
help="The access log format for gunicorn logs",
)
ARG_FASTAPI_API_APPS = Arg(
("--apps",), help="Applications to run. Default is all", type=string_list_type, default=["ui", "task-sdk"]
)


# scheduler
Expand Down Expand Up @@ -1936,6 +1939,7 @@ class GroupCommand(NamedTuple):
ARG_FASTAPI_API_ACCESS_LOGFILE,
ARG_FASTAPI_API_ERROR_LOGFILE,
ARG_FASTAPI_API_ACCESS_LOGFORMAT,
ARG_FASTAPI_API_APPS,
ARG_LOG_FILE,
ARG_SSL_CERT,
ARG_SSL_KEY,
Expand Down
5 changes: 4 additions & 1 deletion airflow/cli/commands/fastapi_api_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def fastapi_api(args):
"""Start Airflow FastAPI API."""
print(settings.HEADER)

apps = args.apps or []
access_logfile = args.access_logfile or "-"
error_logfile = args.error_logfile or "-"
access_logformat = args.access_logformat
Expand Down Expand Up @@ -101,6 +102,8 @@ def fastapi_api(args):
)

pid_file, _, _, _ = setup_locations("fastapi-api", pid=args.pid)

# TODO: Need to pass "apps" to create_app somehow
run_args = [
sys.executable,
"-m",
Expand Down Expand Up @@ -192,7 +195,7 @@ def start_and_monitor_gunicorn(args):
if args.daemon:
# This makes possible errors get reported before daemonization
os.environ["SKIP_DAGS_PARSING"] = "True"
create_app()
create_app(apps)
os.environ.pop("SKIP_DAGS_PARSING")

pid_file_path = Path(pid_file)
Expand Down

0 comments on commit 93fbd45

Please sign in to comment.