Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
554fe7b
Update methods to use Connexion v3, Ginucorn command and encoding
Nov 23, 2023
9a6b00a
Fix static checks
Dec 21, 2023
cfee0cf
Update setup.cfg and encoding
MaksYermak Dec 29, 2023
fd00ba9
Update migrations files for
MaksYermak Jan 5, 2024
f48673d
Update configuration for tests
MaksYermak Jan 9, 2024
1be1a27
Fix problem with static checks
MaksYermak Jan 9, 2024
912e026
Followed vincbeck instruction on the issue #36052
Satoshi-Sh Feb 19, 2024
13bc94a
Refactor dataset class inheritance (#37590)
sunank200 Feb 22, 2024
a0aa56c
Check if Python 3.12 is used for release management commands as well …
potiuk Feb 22, 2024
50edc4e
Enhancing breeze commands with PACKAGE_LIST env variable (#37502)
amoghrajesh Feb 22, 2024
5e7bf22
Introduce new config variable to control whether DAG processor output…
Owen-CH-Leung Feb 22, 2024
de059c5
Clarify skip-message for test requiring database. (#37612)
hterik Feb 22, 2024
585111b
Use new exception type inheriting BaseException for SIGTERMs (#37613)
hterik Feb 22, 2024
803e3e1
Implement `batch_is_authorized_*` APIs in AWS auth manager (#37430)
vincbeck Feb 22, 2024
ec7bccc
chore: Update comments and logging in OpenLineage ExtractorManager (#…
kacpermuda Feb 22, 2024
c8b3118
Merge branch 'main' into fix/api-endpoints-registration
sudiptob2 Feb 22, 2024
8a3a013
fix: add connexion v3 and missing dependencies in pyproject.toml.
sudiptob2 Feb 22, 2024
8fbded2
fix: use appbuilder.app instead of FlaskApp
sudiptob2 Feb 22, 2024
a5dfee7
feat: return connexion.app to keep the return type as FlaskApp
sudiptob2 Feb 22, 2024
46274e4
Show dataset events above task/run details in grid view (#37603)
bbovenzi Feb 22, 2024
4a674b6
Update min kubernetes version in docs and update release instructions…
potiuk Feb 22, 2024
a236484
Prepare docs ad hoc Teradata provider RC2 February 2024 (#37624)
eladkal Feb 22, 2024
e11579e
Add datasets to dag graph (#37604)
bbovenzi Feb 22, 2024
58976f8
Filter datasets graph by dag_id (#37464)
bbovenzi Feb 22, 2024
c3ca633
Fix task duration selection (#37630)
bbovenzi Feb 22, 2024
6c01338
Fix Async GCSObjectsWithPrefixExistenceSensor xcom push (#37634)
pankajastro Feb 22, 2024
194f3c7
Fix few small release issues found during 2.8.2 preparation (#37633)
potiuk Feb 22, 2024
affb3d8
Merge branch 'main' into fix/api-endpoints-registration
sudiptob2 Feb 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 23 additions & 32 deletions airflow/api_connexion/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@
from http import HTTPStatus
from typing import TYPE_CHECKING, Any

import werkzeug
from connexion import FlaskApi, ProblemException, problem
from connexion import ProblemException, problem

from airflow.utils.docs import get_docs_url

if TYPE_CHECKING:
import flask
from connexion.lifecycle import ConnexionRequest, ConnexionResponse

doc_link = get_docs_url("stable-rest-api-ref.html")

Expand All @@ -40,37 +39,29 @@
}


def common_error_handler(exception: BaseException) -> flask.Response:
def problem_error_handler(_request: ConnexionRequest, exception: ProblemException) -> ConnexionResponse:
"""Use to capture connexion exceptions and add link to the type field."""
if isinstance(exception, ProblemException):
link = EXCEPTIONS_LINK_MAP.get(exception.status)
if link:
response = problem(
status=exception.status,
title=exception.title,
detail=exception.detail,
type=link,
instance=exception.instance,
headers=exception.headers,
ext=exception.ext,
)
else:
response = problem(
status=exception.status,
title=exception.title,
detail=exception.detail,
type=exception.type,
instance=exception.instance,
headers=exception.headers,
ext=exception.ext,
)
link = EXCEPTIONS_LINK_MAP.get(exception.status)
if link:
return problem(
status=exception.status,
title=exception.title,
detail=exception.detail,
type=link,
instance=exception.instance,
headers=exception.headers,
ext=exception.ext,
)
else:
if not isinstance(exception, werkzeug.exceptions.HTTPException):
exception = werkzeug.exceptions.InternalServerError()

response = problem(title=exception.name, detail=exception.description, status=exception.code)

return FlaskApi.get_response(response)
return problem(
status=exception.status,
title=exception.title,
detail=exception.detail,
type=exception.type,
instance=exception.instance,
headers=exception.headers,
ext=exception.ext,
)


class NotFound(ProblemException):
Expand Down
6 changes: 3 additions & 3 deletions airflow/auth/managers/base_auth_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from airflow.utils.session import NEW_SESSION, provide_session

if TYPE_CHECKING:
from flask import Blueprint
import connexion
from flask_appbuilder.menu import MenuItem
from sqlalchemy.orm import Session

Expand Down Expand Up @@ -81,8 +81,8 @@ def get_cli_commands() -> list[CLICommand]:
"""
return []

def get_api_endpoints(self) -> None | Blueprint:
"""Return API endpoint(s) definition for the auth manager."""
def set_api_endpoints(self, connexion_app: connexion.FlaskApp) -> None:
"""Set API endpoint(s) definition for the auth manager."""
return None

def get_user_name(self) -> str:
Expand Down
15 changes: 8 additions & 7 deletions airflow/cli/commands/internal_api_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
from tempfile import gettempdir
from time import sleep

import connexion
import psutil
from flask import Flask
from flask_appbuilder import SQLA
from flask_caching import Cache
from flask_wtf.csrf import CSRFProtect
Expand All @@ -54,7 +54,7 @@
from airflow.www.extensions.init_views import init_api_internal, init_error_handlers

log = logging.getLogger(__name__)
app: Flask | None = None
app: connexion.FlaskApp | None = None


@cli_utils.action_cli
Expand All @@ -73,8 +73,8 @@ def internal_api(args):
log.info(f"Starting the Internal API server on port {args.port} and host {args.hostname}.")
app = create_app(testing=conf.getboolean("core", "unit_test_mode"))
app.run(
debug=True, # nosec
use_reloader=not app.config["TESTING"],
log_level="debug",
# reload=not app.app.config["TESTING"],
Comment on lines -76 to +77
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the change here?

port=args.port,
host=args.hostname,
)
Expand All @@ -101,7 +101,7 @@ def internal_api(args):
"--workers",
str(num_workers),
"--worker-class",
str(args.workerclass),
"uvicorn.workers.UvicornWorker",
"--timeout",
str(worker_timeout),
"--bind",
Expand Down Expand Up @@ -195,7 +195,8 @@ def start_and_monitor_gunicorn(args):

def create_app(config=None, testing=False):
"""Create a new instance of Airflow Internal API app."""
flask_app = Flask(__name__)
connexion_app = connexion.FlaskApp(__name__)
flask_app = connexion_app.app

flask_app.config["APP_NAME"] = "Airflow Internal API"
flask_app.config["TESTING"] = testing
Expand Down Expand Up @@ -240,7 +241,7 @@ def create_app(config=None, testing=False):

with flask_app.app_context():
init_error_handlers(flask_app)
init_api_internal(flask_app, standalone_api=True)
init_api_internal(connexion_app, standalone_api=True)

init_jinja_globals(flask_app)
init_xframe_protection(flask_app)
Expand Down
8 changes: 4 additions & 4 deletions airflow/cli/commands/webserver_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,11 +355,11 @@ def webserver(args):
print(f"Starting the web server on port {args.port} and host {args.hostname}.")
app = create_app(testing=conf.getboolean("core", "unit_test_mode"))
app.run(
debug=True,
use_reloader=not app.config["TESTING"],
log_level="debug",
port=args.port,
host=args.hostname,
ssl_context=(ssl_cert, ssl_key) if ssl_cert and ssl_key else None,
ssl_keyfile=ssl_key if ssl_cert and ssl_key else None,
ssl_certfile=ssl_cert if ssl_cert and ssl_key else None,
)
else:
print(
Expand All @@ -383,7 +383,7 @@ def webserver(args):
"--workers",
str(num_workers),
"--worker-class",
str(args.workerclass),
"uvicorn.workers.UvicornWorker",
"--timeout",
str(worker_timeout),
"--bind",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@

def remap_permissions():
"""Apply Map Airflow permissions."""
appbuilder = cached_app(config={"FAB_UPDATE_PERMS": False}).appbuilder
appbuilder = cached_app(config={"FAB_UPDATE_PERMS": False}).app.appbuilder
for old, new in mapping.items():
(old_resource_name, old_action_name) = old
old_permission = appbuilder.sm.get_permission(old_action_name, old_resource_name)
Expand All @@ -313,7 +313,7 @@ def remap_permissions():

def undo_remap_permissions():
"""Unapply Map Airflow permissions"""
appbuilder = cached_app(config={"FAB_UPDATE_PERMS": False}).appbuilder
appbuilder = cached_app(config={"FAB_UPDATE_PERMS": False}).app.appbuilder
for old, new in mapping.items():
(new_resource_name, new_action_name) = new[0]
new_permission = appbuilder.sm.get_permission(new_action_name, new_resource_name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def upgrade():
log = logging.getLogger()
handlers = log.handlers[:]

appbuilder = cached_app(config={"FAB_UPDATE_PERMS": False}).appbuilder
appbuilder = cached_app(config={"FAB_UPDATE_PERMS": False}).app.appbuilder
roles_to_modify = [role for role in appbuilder.sm.get_all_roles() if role.name in ["User", "Viewer"]]
can_read_on_config_perm = appbuilder.sm.get_permission(
permissions.ACTION_CAN_READ, permissions.RESOURCE_CONFIG
Expand All @@ -59,7 +59,7 @@ def upgrade():

def downgrade():
"""Add can_read action on config resource for User and Viewer role"""
appbuilder = cached_app(config={"FAB_UPDATE_PERMS": False}).appbuilder
appbuilder = cached_app(config={"FAB_UPDATE_PERMS": False}).app.appbuilder
roles_to_modify = [role for role in appbuilder.sm.get_all_roles() if role.name in ["User", "Viewer"]]
can_read_on_config_perm = appbuilder.sm.get_permission(
permissions.ACTION_CAN_READ, permissions.RESOURCE_CONFIG
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@

def remap_permissions():
"""Apply Map Airflow permissions."""
appbuilder = cached_app(config={"FAB_UPDATE_PERMS": False}).appbuilder
appbuilder = cached_app(config={"FAB_UPDATE_PERMS": False}).app.appbuilder
for old, new in mapping.items():
(old_resource_name, old_action_name) = old
old_permission = appbuilder.sm.get_permission(old_action_name, old_resource_name)
Expand All @@ -165,7 +165,7 @@ def remap_permissions():

def undo_remap_permissions():
"""Unapply Map Airflow permissions"""
appbuilder = cached_app(config={"FAB_UPDATE_PERMS": False}).appbuilder
appbuilder = cached_app(config={"FAB_UPDATE_PERMS": False}).app.appbuilder
for old, new in mapping.items():
(new_resource_name, new_action_name) = new[0]
new_permission = appbuilder.sm.get_permission(new_action_name, new_resource_name)
Expand Down
30 changes: 21 additions & 9 deletions airflow/providers/fab/auth_manager/fab_auth_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
from pathlib import Path
from typing import TYPE_CHECKING, Container

from connexion import FlaskApi
from flask import Blueprint, url_for
from connexion.options import SwaggerUIOptions
from flask import url_for
from sqlalchemy import select
from sqlalchemy.orm import Session, joinedload

Expand Down Expand Up @@ -84,10 +84,11 @@
)
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.yaml import safe_load
from airflow.www.constants import SWAGGER_BUNDLE, SWAGGER_ENABLED
from airflow.www.extensions.init_views import _CustomErrorRequestBodyValidator, _LazyResolver
from airflow.www.extensions.init_views import _LazyResolver

if TYPE_CHECKING:
import connexion

from airflow.auth.managers.models.base_user import BaseUser
from airflow.cli.cli_config import (
CLICommand,
Expand Down Expand Up @@ -149,19 +150,30 @@ def get_cli_commands() -> list[CLICommand]:
SYNC_PERM_COMMAND, # not in a command group
]

def get_api_endpoints(self) -> None | Blueprint:
def set_api_endpoints(self, connexion_app: connexion.FlaskApp) -> None:
folder = Path(__file__).parents[0].resolve() # this is airflow/auth/managers/fab/
with folder.joinpath("openapi", "v1.yaml").open() as f:
specification = safe_load(f)
return FlaskApi(

swagger_ui_options = SwaggerUIOptions(
swagger_ui=conf.getboolean("webserver", "enable_swagger_ui", fallback=True),
)
Comment on lines +158 to +160
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this come from another PR? If so please rebase to make the review easier


api = connexion_app.add_api(
specification=specification,
resolver=_LazyResolver(),
base_path="/auth/fab/v1",
options={"swagger_ui": SWAGGER_ENABLED, "swagger_path": SWAGGER_BUNDLE.__fspath__()},
swagger_ui_options=swagger_ui_options,
strict_validation=True,
validate_responses=True,
validator_map={"body": _CustomErrorRequestBodyValidator},
).blueprint
)

# Exempt the API blueprint from CSRF protection
# Ref: https://github.com/apache/airflow/pull/36052#issuecomment-1898786641
if api:
self.appbuilder.app.extensions["csrf"].exempt(api.blueprint)

return api.blueprint if api else None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case api can be Falsy?


def get_user_display_name(self) -> str:
"""Return the user's display name associated to the user in session."""
Expand Down
3 changes: 2 additions & 1 deletion airflow/utils/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class AirflowJsonProvider(JSONProvider):
def dumps(self, obj, **kwargs):
kwargs.setdefault("ensure_ascii", self.ensure_ascii)
kwargs.setdefault("sort_keys", self.sort_keys)
return json.dumps(obj, **kwargs, cls=WebEncoder)
kwargs.setdefault("cls", WebEncoder)
return json.dumps(obj, **kwargs)

def loads(self, s: str | bytes, **kwargs):
return json.loads(s, **kwargs)
Expand Down
30 changes: 22 additions & 8 deletions airflow/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
import warnings
from datetime import timedelta

from flask import Flask
import connexion
from flask_appbuilder import SQLA
from flask_wtf.csrf import CSRFProtect
from markupsafe import Markup
from sqlalchemy.engine.url import make_url
from starlette.middleware.cors import CORSMiddleware

from airflow import settings
from airflow.api_internal.internal_api_call import InternalApiConfig
Expand Down Expand Up @@ -61,7 +62,7 @@
)
from airflow.www.extensions.init_wsgi_middlewares import init_wsgi_middleware

app: Flask | None = None
app: connexion.FlaskApp | None = None

# Initializes at the module level, so plugins can access it.
# See: /docs/plugins.rst
Expand All @@ -70,7 +71,18 @@

def create_app(config=None, testing=False):
"""Create a new instance of Airflow WWW app."""
flask_app = Flask(__name__)
connexion_app = connexion.FlaskApp(__name__)

connexion_app.add_middleware(
CORSMiddleware,
connexion.middleware.MiddlewarePosition.BEFORE_ROUTING,
allow_origins=conf.get("api", "access_control_allow_origins"),
allow_credentials=True,
allow_methods=conf.get("api", "access_control_allow_methods"),
allow_headers=conf.get("api", "access_control_allow_headers"),
)

flask_app = connexion_app.app
flask_app.secret_key = conf.get("webserver", "SECRET_KEY")

flask_app.config["PERMANENT_SESSION_LIFETIME"] = timedelta(minutes=settings.get_session_lifetime_config())
Expand Down Expand Up @@ -158,22 +170,24 @@ def create_app(config=None, testing=False):
init_appbuilder_links(flask_app)
init_plugins(flask_app)
init_error_handlers(flask_app)
init_api_connexion(flask_app)
init_api_connexion(connexion_app)
if conf.getboolean("webserver", "run_internal_api", fallback=False):
if not _ENABLE_AIP_44:
raise RuntimeError("The AIP_44 is not enabled so you cannot use it.")
init_api_internal(flask_app)
init_api_internal(connexion_app)
init_api_experimental(flask_app)
init_api_auth_provider(flask_app)
init_api_error_handlers(flask_app) # needs to be after all api inits to let them add their path first
init_api_auth_provider(connexion_app)
init_api_error_handlers(
connexion_app
) # needs to be after all api inits to let them add their path first

get_auth_manager().init()

init_jinja_globals(flask_app)
init_xframe_protection(flask_app)
init_airflow_session_interface(flask_app)
init_check_user_active(flask_app)
return flask_app
return connexion_app


def cached_app(config=None, testing=False):
Expand Down
Loading