Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 6 additions & 17 deletions airflow/www/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,25 +682,14 @@ def sync_perm_for_dag(
for dag_action_name in self.DAG_ACTIONS:
self.create_permission(dag_action_name, dag_resource_name)

def _revoke_all_stale_permissions(resource: Resource):
existing_dag_perms = self.get_resource_permissions(resource)
for perm in existing_dag_perms:
non_admin_roles = [role for role in perm.role if role.name != "Admin"]
for role in non_admin_roles:
self.log.info(
"Revoking '%s' on DAG '%s' for role '%s'",
perm.action,
dag_resource_name,
role.name,
)
self.remove_permission_from_role(role, perm)

if access_control:
if access_control is not None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the most important change in this PR - if access control will proceed into the following block if and only if access_control is a non-empty dict, whereas if access control is not None will proceed given any dict, including an empty one.

self.log.info("Syncing DAG-level permissions for DAG '%s'", dag_resource_name)
self._sync_dag_view_permissions(dag_resource_name, access_control)
else:
resource = self.get_resource(dag_resource_name)
if resource:
_revoke_all_stale_permissions(resource)
self.log.info(
"Not syncing DAG-level permissions for DAG '%s' as access control is unset.",
dag_resource_name,
)

def _sync_dag_view_permissions(self, dag_id: str, access_control: dict[str, Collection[str]]) -> None:
"""
Expand Down
44 changes: 44 additions & 0 deletions docs/apache-airflow/security/access-control.rst
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,47 @@ List Plugins Plugins.can_read
List Task Reschedules Task Reschedules.can_read Admin
List Triggers Triggers.can_read Admin
====================================== ======================================================================= ============

These DAG-level controls can be set directly through the UI / CLI, or encoded in the dags themselves through the access_control arg.

Order of precedence for DAG-level permissions
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Since DAG-level access control can be configured in multiple places, conflicts are inevitable and a clear resolution strategy is required. As a result,
Airflow considers the ``access_control`` argument supplied on a DAG itself to be completely authoritative if present, which has a few effects:

Setting ``access_control`` on a DAG will overwrite any previously existing DAG-level permissions if it is any value other than ``None``:

.. code-block:: python

DAG(
dag_id="example_fine_grained_access",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
access_control={
"Viewer": {"can_edit", "can_create", "can_delete"},
},
)

This also means that setting ``access_control={}`` will wipe any existing DAG-level permissions for a given DAG from the DB:

.. code-block:: python

DAG(
dag_id="example_no_fine_grained_access",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
access_control={},
)

Conversely, removing the access_control block from a DAG altogether (or setting it to ``None``) won't make any changes and can leave dangling permissions.

.. code-block:: python

DAG(
dag_id="example_indifferent_to_fine_grained_access",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
)

In the case that there is no ``access_control`` defined on the DAG itself, Airflow will defer to existing permissions defined in the DB, which
may have been set through the UI, CLI or by previous access_control args on the DAG in question.

In all cases, system-wide roles such as ``Can edit on DAG`` take precedence over dag-level access controls, such that they can be considered ``Can edit on DAG: *``
134 changes: 134 additions & 0 deletions tests/www/test_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,140 @@ def test_sync_perm_for_dag_creates_permissions_on_resources(security_manager):
assert security_manager.get_permission(permissions.ACTION_CAN_EDIT, prefixed_test_dag_id) is not None


def test_sync_perm_for_dag_creates_permissions_for_specified_roles(app, security_manager):
test_dag_id = "TEST_DAG"
test_role = "limited-role"
security_manager.bulk_sync_roles([{"role": test_role, "perms": []}])
with app.app_context():
with create_user_scope(
app,
username="test_user",
role_name=test_role,
permissions=[],
) as user:
security_manager.sync_perm_for_dag(
test_dag_id, access_control={test_role: {"can_read", "can_edit"}}
)
assert security_manager.can_read_dag(test_dag_id, user)
assert security_manager.can_edit_dag(test_dag_id, user)
assert not security_manager.can_delete_dag(test_dag_id, user)


def test_sync_perm_for_dag_removes_existing_permissions_if_empty(app, security_manager):
test_dag_id = "TEST_DAG"
test_role = "limited-role"

with app.app_context():
with create_user_scope(
app,
username="test_user",
role_name=test_role,
permissions=[],
) as user:

security_manager.bulk_sync_roles(
[
{
"role": test_role,
"perms": [
(permissions.ACTION_CAN_READ, f"DAG:{test_dag_id}"),
(permissions.ACTION_CAN_EDIT, f"DAG:{test_dag_id}"),
(permissions.ACTION_CAN_DELETE, f"DAG:{test_dag_id}"),
],
}
]
)

assert security_manager.can_read_dag(test_dag_id, user)
assert security_manager.can_edit_dag(test_dag_id, user)
assert security_manager.can_delete_dag(test_dag_id, user)

# Need to clear cache on user perms
user._perms = None

security_manager.sync_perm_for_dag(test_dag_id, access_control={test_role: {}})

assert not security_manager.can_read_dag(test_dag_id, user)
assert not security_manager.can_edit_dag(test_dag_id, user)
assert not security_manager.can_delete_dag(test_dag_id, user)


def test_sync_perm_for_dag_removes_permissions_from_other_roles(app, security_manager):
test_dag_id = "TEST_DAG"
test_role = "limited-role"

with app.app_context():
with create_user_scope(
app,
username="test_user",
role_name=test_role,
permissions=[],
) as user:

security_manager.bulk_sync_roles(
[
{
"role": test_role,
"perms": [
(permissions.ACTION_CAN_READ, f"DAG:{test_dag_id}"),
(permissions.ACTION_CAN_EDIT, f"DAG:{test_dag_id}"),
(permissions.ACTION_CAN_DELETE, f"DAG:{test_dag_id}"),
],
},
{"role": "other_role", "perms": []},
]
)

assert security_manager.can_read_dag(test_dag_id, user)
assert security_manager.can_edit_dag(test_dag_id, user)
assert security_manager.can_delete_dag(test_dag_id, user)

# Need to clear cache on user perms
user._perms = None

security_manager.sync_perm_for_dag(test_dag_id, access_control={"other_role": {"can_read"}})

assert not security_manager.can_read_dag(test_dag_id, user)
assert not security_manager.can_edit_dag(test_dag_id, user)
assert not security_manager.can_delete_dag(test_dag_id, user)


def test_sync_perm_for_dag_does_not_prune_roles_when_access_control_unset(app, security_manager):
test_dag_id = "TEST_DAG"
test_role = "limited-role"

with app.app_context():
with create_user_scope(
app,
username="test_user",
role_name=test_role,
permissions=[],
) as user:

security_manager.bulk_sync_roles(
[
{
"role": test_role,
"perms": [
(permissions.ACTION_CAN_READ, f"DAG:{test_dag_id}"),
(permissions.ACTION_CAN_EDIT, f"DAG:{test_dag_id}"),
],
},
]
)

assert security_manager.can_read_dag(test_dag_id, user)
assert security_manager.can_edit_dag(test_dag_id, user)

# Need to clear cache on user perms
user._perms = None

security_manager.sync_perm_for_dag(test_dag_id, access_control=None)

assert security_manager.can_read_dag(test_dag_id, user)
assert security_manager.can_edit_dag(test_dag_id, user)


def test_has_all_dag_access(app, security_manager):
for role_name in ["Admin", "Viewer", "Op", "User"]:
with app.app_context():
Expand Down