Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 29 additions & 5 deletions airflow/api_ui/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,18 @@
# under the License.
from __future__ import annotations

import logging

from fastapi import APIRouter, FastAPI

from airflow.configuration import conf
from airflow.www.app import create_app as create_flask_app
from airflow.www.extensions.init_dagbag import get_dag_bag

app: FastAPI | None = None

log = logging.getLogger(__name__)


def init_dag_bag(app: FastAPI) -> None:
"""
Expand All @@ -32,17 +38,35 @@ def init_dag_bag(app: FastAPI) -> None:
app.state.dag_bag = get_dag_bag()


def create_app() -> FastAPI:
def init_flask_app(app: FastAPI, testing: bool = False) -> None:
"""
Auth providers and permission logic are tightly coupled to Flask.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question. Is it entirely true for Airflow 3? Do we want to base all our authentication on Flask and have to create a Flask App (cc: @vincbeck ?). I do not think there is (maybe due to compatibility but can be removed) any requirement to use flask in our Auth Manager?

Not that I have another proposal there - as I have a little knowledge about flask/asgi/wsgi etc - but I am not sure if we really need to use Flask. Flask is essentially synchronous and writing APIs is not really the main goal of Flask (producing HTML output is). But if we get rid of Flask we could be free to use sync/async requests as we want and some of the problems where joining Starlette, ASGI, gunicorn, uvicorn and all the complexity resulting from that could be avoided.

If we are going fast-api-first and async built-in, getting rid of flask should only bring benefits IMHO

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Maybe I do not understand the tight-coupling here) - but I don't think we are in any way tightly coupled with flask (other than compatibility requirements for the old plugins).

Copy link
Member Author

@pierrejeambrun pierrejeambrun Sep 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes indeed we really do not want to have Flask around anymore I believe when we release airflow 3. There is no reason to if all of our APIs are on FastAPI.

Yes FastAPI has async support natively, but we cannot have that 'freely' now because most of the db utility code of airflow, for instance how to create sessions and provide them via a decorator, how to create the db engine, context managers etc are not yet written in an async way, so we would need to write that. (I think that would be part of AIP-70). It is definitely the endgoal to be fully async for a FastAPI. Maybe we don't need AIP-70 for that but it's a significant effort to rewrite all of those and not critical for the POC to work. Also regarding the migration of endpionts from the old api to the new api it will be a bit easier if both are sync, basically that's close to a copy and past (i am exaggerating a bit but you get the idea). More code will need to be adapted if the new endpoints are async while the old ones are not. Maybe it is better to do that in a 'second' step, when everything is working in fastapi synchronously.

do not think there is (maybe due to compatibility but can be removed) any requirement to use flask in our Auth Manager?
(Maybe I do not understand the tight-coupling here) - but I don't think we are in any way tightly coupled with flask (other than compatibility requirements for the old plugins).

Those are the problem at the moment I believe. At multiple places in our backend_auth, permission code, etc.. etc..., we use the flask app context to retrieve global things such as the connected user, the sessions, the current request. Those are not available when running FastAPI, to give some exemples:

  • FABAuthManager uses flask_login.current_user, connexion.FlaskAPI,
  • Almost all the backend_auth accessing the flask app or app context
  • FabAirflowSecurityManagerOverride has calls to flask libraries to handle jwt or openid or access directly the flask g global object to retrieve the user. Uses flask_login.LoginManager, flask.flash or the session global object.
  • The AnonymousUser is using the flask.current_app() and a flask_login mixin.
  • Most of the places where an AirflowApp, AppBuilder is required, we need a flask app underneath

Those are just some exemples, there a plenty more in the code, maybe I am misunderstanding something and Vincent/Jed will have more information on that and we can simply write another SecurityManager / AuthManager agnostic of flask but I am not sure yet what is the best approach.

Also just to mention that FastAPI and Flask work in two different paradigm. Flask uses global object and application context accessor basically via importing g or current_user and using them where you need them in the views / provider / utility / managers. In opposition FastAPI uses dependency injection, where function signature declares its dependency and FastAPI will inject them at runtime. There might be some refactoring involved due to this shift.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is definitely going to have to be some decoupling/refactoring anyways. For example, auth managers expect an appbuilder today, and it's already on my "must change" list for AIP-79 :)

But yes, FAB / our security manager / auth managers, it's all pretty intertwined and messy today. I believe we can solve it in 3, but in the meantime I think initing like this is okay.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have quickly looked at the code, I will look deeper and provide inline comments to help but overall:

  • We must implement this new endpoint in an auth manager agnostic way. In other words, we should not assume the environment is going to use FAB auth manager or any other auth manager. FABAuthManager, FabAirflowSecurityManagerOverride, AnonymousUser are all specific to FAB auth manager.
  • Even though we want to remove FAB from Airflow 3, we want FAB auth manager to be usable with Airflow 3. I know it might sound contradictory, I dont have real solution here, this will need to be done in AIP-79. The way I see it (please correct me if I am wrong @jedcunningham) is, for the moment we might need to initialize some resources (such as Flask app) just for FAB auth manager to work properly. Another solution would be to make the UI Rest API only compatible with simple auth manager and once AIP-79 is complete, we can get rid of this limitation. But that would mean we need to make the simple auth manager the default one before we can merge this PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont want to block your PR though, this is not blocking and can be solved later but this is something we definitely need to figure out

Copy link
Member Author

@pierrejeambrun pierrejeambrun Sep 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do not rely on a session auth, we need the front end to pass a Basic auth header. Basically the base 64 encoded username and password on each request. There are utilities in the front end to do that. And maybe store the username/password in cookies in the frontend. 🤔. There are definitely solutions for 'basic auth' workflow on the front end but this is just for development indeed.

For production I am not even sure that we want session cookie based auth for a modern FastAPI app. JWT Bearer might seem more appropriate. But we do not have that kind of auth backend available yet ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think we need to align on the authentication storage. I assumed we were going to use a session but we might want to take a different approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a discussion with @bbovenzi at the Airflow summit, it seems we are aiming toward an identification using JWT token and session as storage for the token

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, thanks for the heads up, keep me posted if there is some work done in that direction that will allow me to move this PR forward. (but no hurry for now it can stay here and we move on without auth for development)


Leverage the create_app for flask to initialize the api_auth, app_builder, auth_manager.
Also limit the backend auth in production mode.
"""
flask_app = create_flask_app(testing=testing)

app.state.flask_app = flask_app

if conf.get("api", "auth_backends") != "airflow.providers.fab.auth_manager.api.auth.backend.basic_auth":
log.warning(
"FastAPI UI API only supports 'airflow.providers.fab.auth_manager.api.auth.backend.basic_auth' other backends will be ignored",
)


def create_app(testing: bool = False) -> FastAPI:
app = FastAPI(
description="Internal Rest API for the UI frontend. It is subject to breaking change "
"depending on the need of the frontend. Users should not rely on this API but use the "
"public API instead."
"public API instead.",
)

init_dag_bag(app)

init_views(app)

init_flask_app(app, testing)

return app


Expand All @@ -57,15 +81,15 @@ def init_views(app) -> None:
app.include_router(root_router)


def cached_app(config=None, testing=False):
def cached_app(config=None, testing=False) -> FastAPI:
"""Return cached instance of Airflow UI app."""
global app
if not app:
app = create_app()
return app


def purge_cached_app():
def purge_cached_app() -> None:
"""Remove the cached version of the app in global state."""
global app
app = None
43 changes: 43 additions & 0 deletions airflow/api_ui/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from typing import TYPE_CHECKING

from airflow.utils.session import create_session

if TYPE_CHECKING:
from sqlalchemy.orm import Session


async def get_session() -> Session:
"""
Dependency for providing a session.

For non route function please use the use the :class:`airflow.utils.session.provide_session` decorator.

Example usage:

.. code:: python

@router.get("/your_path")
def your_route(session: Annotated[Session, Depends(get_session)]):
pass
"""
with create_session() as session:
yield session
104 changes: 104 additions & 0 deletions airflow/api_ui/security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import Callable, cast

from fastapi import Depends, HTTPException, Request
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from typing_extensions import Annotated

from airflow.auth.managers.base_auth_manager import ResourceMethod
from airflow.auth.managers.models.base_user import BaseUser
from airflow.auth.managers.models.resource_details import DagAccessEntity, DagDetails, DatasetDetails
from airflow.providers.fab.auth_manager.api.auth.backend.basic_auth import auth_current_user
from airflow.providers.fab.auth_manager.models import User
from airflow.www.extensions.init_auth_manager import get_auth_manager

security = HTTPBasic()


def method(request: Request) -> ResourceMethod:
return cast(ResourceMethod, request.method)


def check_authentication(
credentials: Annotated[HTTPBasicCredentials, Depends(security)],
) -> User | None:
"""Check that the request has valid authorization information."""
# TODO: Handle other auth backends when they support FastAPI. (session, kerberos, etc.)
user = auth_current_user(credentials)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of relying on basic auth (that is part of FAB auth manager), I would do it the same way as in https://github.com/apache/airflow/blob/main/airflow/api_connexion/security.py#L44

Copy link
Member Author

@pierrejeambrun pierrejeambrun Sep 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not possible at the moment.

This is what I did at first, exact same code, the problem is in the implementation, none of the backends are working with fastapi for the same reason, they all need a flask app context to access flask.g, flask.request. So I modified what is necessary on the basic_auth to make it work, but then limited the backend to basic auth because others won't be working. (kerberos and session backend mostly), and it is too much effort to make it work cleanly so I 'hacked' my way around the basic_auth.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cf this comment for a more detailed example with the session backend for instance.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR as been merged. Just removing the # Handle AUTH_ROLE_PUBLIC todo is enough. Backend is now taking care of that.

if user is not None:
return user

# since this handler only checks authentication, not authorization,
# we should always return 401
raise HTTPException(401, headers={"WWW-Authenticate": "Basic"})


def _requires_access(
*,
is_authorized_callback: Callable[[], bool],
) -> None:
if not is_authorized_callback():
raise HTTPException(403, "Forbidden")


def requires_access_dataset(
method: Annotated[ResourceMethod, Depends(method)],
uri: str | None = None,
user: Annotated[BaseUser | None, Depends(check_authentication)] = None,
) -> None:
_requires_access(
is_authorized_callback=lambda: get_auth_manager().is_authorized_dataset(
user=user,
method=method,
details=DatasetDetails(uri=uri),
)
)


def requires_access_dag(access_entity: DagAccessEntity | None = None) -> Callable:
def inner(
method: Annotated[ResourceMethod, Depends(method)],
dag_id: str | None = None,
user: Annotated[BaseUser | None, Depends(check_authentication)] = None,
) -> None:
def callback():
access = get_auth_manager().is_authorized_dag(
method=method, access_entity=access_entity, details=DagDetails(id=dag_id), user=user
)

# ``access`` means here:
# - if a DAG id is provided (``dag_id`` not None): is the user authorized to access this DAG
# - if no DAG id is provided: is the user authorized to access all DAGs
if dag_id or access or access_entity:
return access

# No DAG id is provided, the user is not authorized to access all DAGs and authorization is done
# on DAG level
# If method is "GET", return whether the user has read access to any DAGs
# If method is "PUT", return whether the user has edit access to any DAGs
return (method == "GET" and any(get_auth_manager().get_permitted_dag_ids(methods=["GET"]))) or (
method == "PUT" and any(get_auth_manager().get_permitted_dag_ids(methods=["PUT"]))
)

_requires_access(
is_authorized_callback=callback,
)

return inner
103 changes: 57 additions & 46 deletions airflow/api_ui/views/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,69 +14,80 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from fastapi import APIRouter, HTTPException, Request
from fastapi import APIRouter, Depends, HTTPException, Request
from sqlalchemy import and_, func, select
from sqlalchemy.orm import Session
from typing_extensions import Annotated

from airflow.api_ui.db import get_session
from airflow.api_ui.security import (
requires_access_dag,
requires_access_dataset,
)
from airflow.auth.managers.models.resource_details import DagAccessEntity
from airflow.models import DagModel
from airflow.models.dataset import DagScheduleDatasetReference, DatasetDagRunQueue, DatasetEvent, DatasetModel
from airflow.utils.session import create_session

dataset_router = APIRouter(tags=["Dataset"])


# Ultimately we want async routes, with async sqlalchemy session / context manager.
# Additional effort to make airflow utility code async, not handled for now and most likely part of the AIP-70
@dataset_router.get("/next_run_datasets/{dag_id}")
async def next_run_datasets(dag_id: str, request: Request) -> dict:
@dataset_router.get(
"/next_run_datasets/{dag_id}",
dependencies=[
Depends(requires_access_dataset),
Depends(requires_access_dag(access_entity=DagAccessEntity.RUN)),
],
)
async def next_run_datasets(
dag_id: str, request: Request, session: Annotated[Session, Depends(get_session)]
) -> dict:
dag = request.app.state.dag_bag.get_dag(dag_id)

if not dag:
raise HTTPException(404, f"can't find dag {dag_id}")

with create_session() as session:
dag_model = DagModel.get_dagmodel(dag_id, session=session)
dag_model = DagModel.get_dagmodel(dag_id, session=session)

if dag_model is None:
raise HTTPException(404, f"can't find associated dag_model {dag_id}")
if dag_model is None:
raise HTTPException(404, f"can't find associated dag_model {dag_id}")

latest_run = dag_model.get_last_dagrun(session=session)
latest_run = dag_model.get_last_dagrun(session=session)

events = [
dict(info._mapping)
for info in session.execute(
select(
DatasetModel.id,
DatasetModel.uri,
func.max(DatasetEvent.timestamp).label("lastUpdate"),
)
.join(DagScheduleDatasetReference, DagScheduleDatasetReference.dataset_id == DatasetModel.id)
.join(
DatasetDagRunQueue,
and_(
DatasetDagRunQueue.dataset_id == DatasetModel.id,
DatasetDagRunQueue.target_dag_id == DagScheduleDatasetReference.dag_id,
),
isouter=True,
)
.join(
DatasetEvent,
and_(
DatasetEvent.dataset_id == DatasetModel.id,
(
DatasetEvent.timestamp >= latest_run.execution_date
if latest_run and latest_run.execution_date
else True
),
events = [
dict(info._mapping)
for info in session.execute(
select(
DatasetModel.id,
DatasetModel.uri,
func.max(DatasetEvent.timestamp).label("lastUpdate"),
)
.join(DagScheduleDatasetReference, DagScheduleDatasetReference.dataset_id == DatasetModel.id)
.join(
DatasetDagRunQueue,
and_(
DatasetDagRunQueue.dataset_id == DatasetModel.id,
DatasetDagRunQueue.target_dag_id == DagScheduleDatasetReference.dag_id,
),
isouter=True,
)
.join(
DatasetEvent,
and_(
DatasetEvent.dataset_id == DatasetModel.id,
(
DatasetEvent.timestamp >= latest_run.execution_date
if latest_run and latest_run.execution_date
else True
),
isouter=True,
)
.where(DagScheduleDatasetReference.dag_id == dag_id, ~DatasetModel.is_orphaned)
.group_by(DatasetModel.id, DatasetModel.uri)
.order_by(DatasetModel.uri)
),
isouter=True,
)
]
data = {"dataset_expression": dag_model.dataset_expression, "events": events}
return data
.where(DagScheduleDatasetReference.dag_id == dag_id, ~DatasetModel.is_orphaned)
.group_by(DatasetModel.id, DatasetModel.uri)
.order_by(DatasetModel.uri)
)
]
data = {"dataset_expression": dag_model.dataset_expression, "events": events}
return data
Loading