Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion airflow/api_fastapi/auth/managers/base_auth_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from sqlalchemy import select

from airflow.api_fastapi.auth.managers.models.base_user import BaseUser
from airflow.api_fastapi.auth.managers.models.resource_details import DagDetails
from airflow.api_fastapi.auth.managers.models.resource_details import BackfillDetails, DagDetails
from airflow.api_fastapi.common.types import MenuItem
from airflow.configuration import conf
from airflow.models import DagModel
Expand Down Expand Up @@ -169,6 +169,22 @@ def is_authorized_dag(
:param details: optional details about the DAG
"""

@abstractmethod
def is_authorized_backfill(
self,
*,
method: ResourceMethod,
user: T,
details: BackfillDetails | None = None,
) -> bool:
"""
Return whether the user is authorized to perform a given action on a backfill.

:param method: the method to perform
:param user: the user to performing the action
:param details: optional details about the backfill
"""

@abstractmethod
def is_authorized_asset(
self,
Expand Down
7 changes: 7 additions & 0 deletions airflow/api_fastapi/auth/managers/models/resource_details.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ class DagDetails:
id: str | None = None


@dataclass
class BackfillDetails:
"""Represents the details of a backfill."""

id: str | None = None


@dataclass
class AssetDetails:
"""Represents the details of an asset."""
Expand Down
15 changes: 15 additions & 0 deletions airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

from airflow.api_fastapi.app import AUTH_MANAGER_FASTAPI_APP_PREFIX
from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager
from airflow.api_fastapi.auth.managers.models.resource_details import BackfillDetails
from airflow.api_fastapi.auth.managers.simple.user import SimpleAuthManagerUser
from airflow.configuration import AIRFLOW_HOME, conf

Expand Down Expand Up @@ -175,6 +176,20 @@ def is_authorized_dag(
user=user,
)

def is_authorized_backfill(
self,
*,
method: ResourceMethod,
user: SimpleAuthManagerUser,
details: BackfillDetails | None = None,
) -> bool:
return self._is_authorized(
method=method,
allow_get_role=SimpleAuthManagerRole.VIEWER,
allow_role=SimpleAuthManagerRole.OP,
user=user,
)

def is_authorized_asset(
self,
*,
Expand Down
14 changes: 14 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1370,6 +1370,8 @@ paths:
- Backfill
summary: List Backfills
operationId: list_backfills
security:
- OAuth2PasswordBearer: []
parameters:
- name: dag_id
in: query
Expand Down Expand Up @@ -1430,6 +1432,8 @@ paths:
- Backfill
summary: Create Backfill
operationId: create_backfill
security:
- OAuth2PasswordBearer: []
requestBody:
required: true
content:
Expand Down Expand Up @@ -1479,6 +1483,8 @@ paths:
- Backfill
summary: Get Backfill
operationId: get_backfill
security:
- OAuth2PasswordBearer: []
parameters:
- name: backfill_id
in: path
Expand Down Expand Up @@ -1523,6 +1529,8 @@ paths:
- Backfill
summary: Pause Backfill
operationId: pause_backfill
security:
- OAuth2PasswordBearer: []
parameters:
- name: backfill_id
in: path
Expand Down Expand Up @@ -1572,6 +1580,8 @@ paths:
- Backfill
summary: Unpause Backfill
operationId: unpause_backfill
security:
- OAuth2PasswordBearer: []
parameters:
- name: backfill_id
in: path
Expand Down Expand Up @@ -1621,6 +1631,8 @@ paths:
- Backfill
summary: Cancel Backfill
operationId: cancel_backfill
security:
- OAuth2PasswordBearer: []
parameters:
- name: backfill_id
in: path
Expand Down Expand Up @@ -1713,6 +1725,8 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
security:
- OAuth2PasswordBearer: []
/public/connections/{connection_id}:
delete:
tags:
Expand Down
26 changes: 24 additions & 2 deletions airflow/api_fastapi/core_api/routes/public/backfills.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from fastapi.exceptions import RequestValidationError
from sqlalchemy import select, update

from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity
from airflow.api_fastapi.common.db.common import (
AsyncSessionDep,
SessionDep,
Expand All @@ -39,6 +40,7 @@
from airflow.api_fastapi.core_api.openapi.exceptions import (
create_openapi_http_exception_doc,
)
from airflow.api_fastapi.core_api.security import requires_access_backfill, requires_access_dag
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.exceptions import DagNotFound
from airflow.models import DagRun
Expand All @@ -61,6 +63,9 @@

@backfills_router.get(
path="",
dependencies=[
Depends(requires_access_backfill(method="GET")),
],
)
async def list_backfills(
dag_id: str,
Expand Down Expand Up @@ -89,6 +94,9 @@ async def list_backfills(
@backfills_router.get(
path="/{backfill_id}",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
dependencies=[
Depends(requires_access_backfill(method="GET")),
],
)
def get_backfill(
backfill_id: str,
Expand All @@ -108,6 +116,7 @@ def get_backfill(
status.HTTP_409_CONFLICT,
]
),
dependencies=[Depends(requires_access_backfill(method="PUT"))],
)
def pause_backfill(backfill_id, session: SessionDep) -> BackfillResponse:
b = session.get(Backfill, backfill_id)
Expand All @@ -129,6 +138,7 @@ def pause_backfill(backfill_id, session: SessionDep) -> BackfillResponse:
status.HTTP_409_CONFLICT,
]
),
dependencies=[Depends(requires_access_backfill(method="PUT"))],
)
def unpause_backfill(backfill_id, session: SessionDep) -> BackfillResponse:
b = session.get(Backfill, backfill_id)
Expand All @@ -149,7 +159,11 @@ def unpause_backfill(backfill_id, session: SessionDep) -> BackfillResponse:
status.HTTP_409_CONFLICT,
]
),
dependencies=[Depends(action_logging())],
dependencies=[
Depends(action_logging()),
Depends(requires_access_dag(method="PUT", access_entity=DagAccessEntity.RUN)),
Depends(requires_access_backfill(method="PUT")),
],
)
def cancel_backfill(backfill_id, session: SessionDep) -> BackfillResponse:
b: Backfill = session.get(Backfill, backfill_id)
Expand Down Expand Up @@ -190,7 +204,11 @@ def cancel_backfill(backfill_id, session: SessionDep) -> BackfillResponse:
@backfills_router.post(
path="",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND, status.HTTP_409_CONFLICT]),
dependencies=[Depends(action_logging())],
dependencies=[
Depends(action_logging()),
Depends(requires_access_dag(method="POST", access_entity=DagAccessEntity.RUN)),
Depends(requires_access_backfill(method="POST")),
],
)
def create_backfill(
backfill_request: BackfillPostBody,
Expand Down Expand Up @@ -232,6 +250,10 @@ def create_backfill(
@backfills_router.post(
path="/dry_run",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND, status.HTTP_409_CONFLICT]),
dependencies=[
Depends(requires_access_dag(method="POST", access_entity=DagAccessEntity.RUN)),
Depends(requires_access_backfill(method="POST")),
],
)
def create_backfill_dry_run(
body: BackfillPostBody,
Expand Down
17 changes: 17 additions & 0 deletions airflow/api_fastapi/core_api/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
AccessView,
AssetAliasDetails,
AssetDetails,
BackfillDetails,
ConfigurationDetails,
ConnectionDetails,
DagAccessEntity,
Expand Down Expand Up @@ -137,6 +138,22 @@ def depends_permitted_dags_filter(
ReadableDagsFilterDep = Annotated[PermittedDagFilter, Depends(permitted_dag_filter_factory("GET"))]


def requires_access_backfill(method: ResourceMethod) -> Callable:
def inner(
request: Request,
user: Annotated[BaseUser | None, Depends(get_user)] = None,
) -> None:
backfill_id: str | None = request.path_params.get("backfill_id")

_requires_access(
is_authorized_callback=lambda: get_auth_manager().is_authorized_backfill(
method=method, details=BackfillDetails(id=backfill_id), user=user
),
)

return inner


def requires_access_pool(method: ResourceMethod) -> Callable[[Request, BaseUser], None]:
def inner(
request: Request,
Expand Down
1 change: 1 addition & 0 deletions airflow/security/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
RESOURCE_ACTION = "Permissions"
RESOURCE_ADMIN_MENU = "Admin"
RESOURCE_AUDIT_LOG = "Audit Logs"
RESOURCE_BACKFILL = "Backfills"
RESOURCE_BROWSE_MENU = "Browse"
RESOURCE_CONFIG = "Configurations"
RESOURCE_CONNECTION = "Connections"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class AvpEntities(Enum):
# Resource types
ASSET = "Asset"
ASSET_ALIAS = "AssetAlias"
BACKFILL = "Backfills"
CONFIGURATION = "Configuration"
CONNECTION = "Connection"
CUSTOM = "Custom"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager
from airflow.api_fastapi.auth.managers.models.resource_details import (
AccessView,
BackfillDetails,
ConnectionDetails,
DagAccessEntity,
DagDetails,
Expand Down Expand Up @@ -156,6 +157,14 @@ def is_authorized_dag(
context=context,
)

def is_authorized_backfill(
self, *, method: ResourceMethod, user: AwsAuthManagerUser, details: BackfillDetails | None = None
) -> bool:
backfill_id = details.id if details else None
return self.avp_facade.is_authorized(
method=method, entity_type=AvpEntities.BACKFILL, user=user, entity_id=backfill_id
)

def is_authorized_asset(
self, *, method: ResourceMethod, user: AwsAuthManagerUser, details: AssetDetails | None = None
) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from airflow.api_fastapi.app import AUTH_MANAGER_FASTAPI_APP_PREFIX
from airflow.api_fastapi.auth.managers.models.resource_details import (
AccessView,
BackfillDetails,
ConfigurationDetails,
ConnectionDetails,
DagAccessEntity,
Expand All @@ -44,7 +45,10 @@

if TYPE_CHECKING:
from airflow.api_fastapi.auth.managers.base_auth_manager import ResourceMethod
from airflow.api_fastapi.auth.managers.models.resource_details import AssetAliasDetails, AssetDetails
from airflow.api_fastapi.auth.managers.models.resource_details import (
AssetAliasDetails,
AssetDetails,
)
else:
from airflow.providers.common.compat.assets import AssetAliasDetails, AssetDetails

Expand Down Expand Up @@ -202,6 +206,34 @@ def test_is_authorized_dag(
)
assert result

@pytest.mark.parametrize(
"details, user, expected_user, expected_entity_id",
[
(None, mock, ANY, None),
(BackfillDetails(id="1"), mock, mock, "1"),
],
)
@patch.object(AwsAuthManager, "avp_facade")
def test_is_authorized_backfill(
self,
mock_avp_facade,
details,
user,
expected_user,
expected_entity_id,
auth_manager,
):
is_authorized = Mock(return_value=True)
mock_avp_facade.is_authorized = is_authorized

method: ResourceMethod = "GET"
result = auth_manager.is_authorized_backfill(method=method, details=details, user=user)

is_authorized.assert_called_once_with(
method=method, entity_type=AvpEntities.BACKFILL, user=expected_user, entity_id=expected_entity_id
)
assert result

@pytest.mark.parametrize(
"details, user, expected_user, expected_entity_id",
[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from airflow.security.permissions import RESOURCE_ASSET, RESOURCE_ASSET_ALIAS
from airflow.security.permissions import RESOURCE_ASSET, RESOURCE_ASSET_ALIAS, RESOURCE_BACKFILL
else:
try:
from airflow.security.permissions import RESOURCE_ASSET, RESOURCE_ASSET_ALIAS
from airflow.security.permissions import RESOURCE_ASSET, RESOURCE_ASSET_ALIAS, RESOURCE_BACKFILL
except ImportError:
from airflow.security.permissions import RESOURCE_DATASET as RESOURCE_ASSET


__all__ = ["RESOURCE_ASSET", "RESOURCE_ASSET_ALIAS"]
__all__ = ["RESOURCE_ASSET", "RESOURCE_ASSET_ALIAS", "RESOURCE_BACKFILL"]
Loading