Skip to content

Commit

Permalink
Speed up most Users/Role CLI commands (#28259)
Browse files Browse the repository at this point in the history
Originally those command were initializing whole Airflow Flask
webserver, which was much more than what was needed - we only
need to initialize Flask Appblication Builder. In case you have
slow DB this limits significantly not only a number of imported
classses but also a number of DB connections.

In some cases the speed of those CLI commands will visibly
go down from 10s of seonds to individual seconds (3x - 5x times).

Follow up after #28242, #28244
  • Loading branch information
potiuk authored Dec 10, 2022
1 parent 5f54009 commit 886d9bd
Show file tree
Hide file tree
Showing 7 changed files with 293 additions and 231 deletions.
149 changes: 77 additions & 72 deletions airflow/cli/commands/role_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,19 @@
import json
import os

from flask import Flask

from airflow.cli.simple_table import AirflowConsole
from airflow.utils import cli as cli_utils
from airflow.utils.cli import suppress_logs_and_warning
from airflow.www.app import cached_app
from airflow.www.extensions.init_appbuilder import init_appbuilder
from airflow.www.fab_security.sqla.models import Action, Permission, Resource, Role
from airflow.www.security import EXISTING_ROLES, AirflowSecurityManager
from airflow.www.security import EXISTING_ROLES


@suppress_logs_and_warning
def roles_list(args):
"""Lists all existing roles."""
flask_app = Flask(__name__)
with flask_app.app_context():
appbuilder = init_appbuilder(flask_app)
from airflow.utils.cli_app_builder import get_application_builder

with get_application_builder() as appbuilder:
roles = appbuilder.sm.get_all_roles()

if not args.permission:
Expand All @@ -64,75 +60,80 @@ def roles_list(args):
@suppress_logs_and_warning
def roles_create(args):
"""Creates new empty role in DB."""
appbuilder = cached_app().appbuilder
for role_name in args.role:
appbuilder.sm.add_role(role_name)
from airflow.utils.cli_app_builder import get_application_builder

with get_application_builder() as appbuilder:
for role_name in args.role:
appbuilder.sm.add_role(role_name)
print(f"Added {len(args.role)} role(s)")


@cli_utils.action_cli
@suppress_logs_and_warning
def roles_delete(args):
"""Deletes role in DB."""
appbuilder = cached_app().appbuilder

for role_name in args.role:
role = appbuilder.sm.find_role(role_name)
if not role:
print(f"Role named '{role_name}' does not exist")
exit(1)

for role_name in args.role:
appbuilder.sm.delete_role(role_name)
from airflow.utils.cli_app_builder import get_application_builder

with get_application_builder() as appbuilder:
for role_name in args.role:
role = appbuilder.sm.find_role(role_name)
if not role:
print(f"Role named '{role_name}' does not exist")
exit(1)
for role_name in args.role:
appbuilder.sm.delete_role(role_name)
print(f"Deleted {len(args.role)} role(s)")


def __roles_add_or_remove_permissions(args):
asm: AirflowSecurityManager = cached_app().appbuilder.sm
is_add: bool = args.subcommand.startswith("add")

role_map = {}
perm_map: dict[tuple[str, str], set[str]] = collections.defaultdict(set)
for name in args.role:
role: Role | None = asm.find_role(name)
if not role:
print(f"Role named '{name}' does not exist")
exit(1)

role_map[name] = role
for permission in role.permissions:
perm_map[(name, permission.resource.name)].add(permission.action.name)

for name in args.resource:
resource: Resource | None = asm.get_resource(name)
if not resource:
print(f"Resource named '{name}' does not exist")
exit(1)

for name in args.action or []:
action: Action | None = asm.get_action(name)
if not action:
print(f"Action named '{name}' does not exist")
exit(1)

permission_count = 0
for (role_name, resource_name, action_name) in list(
itertools.product(args.role, args.resource, args.action or [None])
):
res_key = (role_name, resource_name)
if is_add and action_name not in perm_map[res_key]:
perm: Permission | None = asm.create_permission(action_name, resource_name)
asm.add_permission_to_role(role_map[role_name], perm)
print(f"Added {perm} to role {role_name}")
permission_count += 1
elif not is_add and res_key in perm_map:
for _action_name in perm_map[res_key] if action_name is None else [action_name]:
perm: Permission | None = asm.get_permission(_action_name, resource_name)
asm.remove_permission_from_role(role_map[role_name], perm)
print(f"Deleted {perm} from role {role_name}")
from airflow.utils.cli_app_builder import get_application_builder

with get_application_builder() as appbuilder:
is_add: bool = args.subcommand.startswith("add")

role_map = {}
perm_map: dict[tuple[str, str], set[str]] = collections.defaultdict(set)
asm = appbuilder.sm
for name in args.role:
role: Role | None = asm.find_role(name)
if not role:
print(f"Role named '{name}' does not exist")
exit(1)

role_map[name] = role
for permission in role.permissions:
perm_map[(name, permission.resource.name)].add(permission.action.name)

for name in args.resource:
resource: Resource | None = asm.get_resource(name)
if not resource:
print(f"Resource named '{name}' does not exist")
exit(1)

for name in args.action or []:
action: Action | None = asm.get_action(name)
if not action:
print(f"Action named '{name}' does not exist")
exit(1)

permission_count = 0
for (role_name, resource_name, action_name) in list(
itertools.product(args.role, args.resource, args.action or [None])
):
res_key = (role_name, resource_name)
if is_add and action_name not in perm_map[res_key]:
perm: Permission | None = asm.create_permission(action_name, resource_name)
asm.add_permission_to_role(role_map[role_name], perm)
print(f"Added {perm} to role {role_name}")
permission_count += 1
elif not is_add and res_key in perm_map:
for _action_name in perm_map[res_key] if action_name is None else [action_name]:
perm: Permission | None = asm.get_permission(_action_name, resource_name)
asm.remove_permission_from_role(role_map[role_name], perm)
print(f"Deleted {perm} from role {role_name}")
permission_count += 1

print(f"{'Added' if is_add else 'Deleted'} {permission_count} permission(s)")
print(f"{'Added' if is_add else 'Deleted'} {permission_count} permission(s)")


@cli_utils.action_cli
Expand All @@ -157,9 +158,11 @@ def roles_export(args):
Note, this function does not export the permissions associated for each role.
Strictly, it exports the role names into the passed role json file.
"""
appbuilder = cached_app().appbuilder
roles = appbuilder.sm.get_all_roles()
exporting_roles = [role.name for role in roles if role.name not in EXISTING_ROLES]
from airflow.utils.cli_app_builder import get_application_builder

with get_application_builder() as appbuilder:
roles = appbuilder.sm.get_all_roles()
exporting_roles = [role.name for role in roles if role.name not in EXISTING_ROLES]
filename = os.path.expanduser(args.file)
kwargs = {} if not args.pretty else {"sort_keys": True, "indent": 4}
with open(filename, "w", encoding="utf-8") as f:
Expand All @@ -186,9 +189,11 @@ def roles_import(args):
except ValueError as e:
print(f"File '{json_file}' is not a valid JSON file. Error: {e}")
exit(1)
appbuilder = cached_app().appbuilder
existing_roles = [role.name for role in appbuilder.sm.get_all_roles()]
roles_to_import = [role for role in role_list if role not in existing_roles]
for role_name in roles_to_import:
appbuilder.sm.add_role(role_name)
from airflow.utils.cli_app_builder import get_application_builder

with get_application_builder() as appbuilder:
existing_roles = [role.name for role in appbuilder.sm.get_all_roles()]
roles_to_import = [role for role in role_list if role not in existing_roles]
for role_name in roles_to_import:
appbuilder.sm.add_role(role_name)
print(f"roles '{roles_to_import}' successfully imported")
7 changes: 4 additions & 3 deletions airflow/cli/commands/standalone_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from airflow.jobs.scheduler_job import SchedulerJob
from airflow.jobs.triggerer_job import TriggererJob
from airflow.utils import db
from airflow.www.app import cached_app


class StandaloneCommand:
Expand Down Expand Up @@ -180,8 +179,10 @@ def initialize_database(self):
# server. Thus, we make a random password and store it in AIRFLOW_HOME,
# with the reasoning that if you can read that directory, you can see
# the database credentials anyway.
appbuilder = cached_app().appbuilder
user_exists = appbuilder.sm.find_user("admin")
from airflow.utils.cli_app_builder import get_application_builder

with get_application_builder() as appbuilder:
user_exists = appbuilder.sm.find_user("admin")
password_path = os.path.join(AIRFLOW_HOME, "standalone_admin_password.txt")
we_know_password = os.path.isfile(password_path)
# If the user does not exist, make a random password and make it
Expand Down
19 changes: 10 additions & 9 deletions airflow/cli/commands/sync_perm_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@
from __future__ import annotations

from airflow.utils import cli as cli_utils
from airflow.www.app import cached_app


@cli_utils.action_cli
def sync_perm(args):
"""Updates permissions for existing roles and DAGs."""
appbuilder = cached_app().appbuilder
print("Updating actions and resources for all existing roles")
# Add missing permissions for all the Base Views _before_ syncing/creating roles
appbuilder.add_permissions(update_perms=True)
appbuilder.sm.sync_roles()
if args.include_dags:
print("Updating permission on all DAG views")
appbuilder.sm.create_dag_specific_permissions()
from airflow.utils.cli_app_builder import get_application_builder

with get_application_builder() as appbuilder:
print("Updating actions and resources for all existing roles")
# Add missing permissions for all the Base Views _before_ syncing/creating roles
appbuilder.add_permissions(update_perms=True)
appbuilder.sm.sync_roles()
if args.include_dags:
print("Updating permission on all DAG views")
appbuilder.sm.create_dag_specific_permissions()
Loading

0 comments on commit 886d9bd

Please sign in to comment.