Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ def _is_authorized_dag(

if details and details.id:
# Check whether the user has permissions to access a specific DAG
resource_dag_name = self._resource_name(details.id, RESOURCE_DAG)
resource_dag_name = permissions.resource_name(details.id, RESOURCE_DAG)
return self._is_authorized(method=method, resource_type=resource_dag_name, user=user)

return False
Expand All @@ -592,7 +592,7 @@ def _is_authorized_dag_run(

if details and details.id:
# Check whether the user has permissions to access a specific DAG Run permission on a DAG Level
resource_dag_name = self._resource_name(details.id, RESOURCE_DAG_RUN)
resource_dag_name = permissions.resource_name(details.id, RESOURCE_DAG_RUN)
return self._is_authorized(method=method, resource_type=resource_dag_name, user=user)

return False
Expand Down Expand Up @@ -624,19 +624,6 @@ def _get_fab_resource_types(dag_access_entity: DagAccessEntity) -> tuple[str, ..
raise AirflowException(f"Unknown DAG access entity: {dag_access_entity}")
return _MAP_DAG_ACCESS_ENTITY_TO_FAB_RESOURCE_TYPE[dag_access_entity]

def _resource_name(self, dag_id: str, resource_type: str) -> str:
"""
Return the FAB resource name for a DAG id.

:param dag_id: the DAG id

:meta private:
"""
root_dag_id = self._get_root_dag_id(dag_id)
if hasattr(permissions, "resource_name"):
return getattr(permissions, "resource_name")(root_dag_id, resource_type)
return getattr(permissions, "resource_name_for_dag")(root_dag_id)

@staticmethod
def _get_user_permissions(user: User):
"""
Expand All @@ -651,23 +638,6 @@ def _get_user_permissions(user: User):
return []
return getattr(user, "perms") or []

def _get_root_dag_id(self, dag_id: str) -> str:
"""
Return the root DAG id in case of sub DAG, return the DAG id otherwise.

:param dag_id: the DAG id

:meta private:
"""
if not self.appbuilder:
raise AirflowException("AppBuilder is not initialized.")

if "." in dag_id and hasattr(DagModel, "root_dag_id"):
return self.appbuilder.get_session.scalar(
select(DagModel.dag_id, DagModel.root_dag_id).where(DagModel.dag_id == dag_id).limit(1)
)
return dag_id

def _sync_appbuilder_roles(self):
"""
Sync appbuilder roles to DB.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -921,15 +921,14 @@ def create_dag_specific_permissions(self) -> None:
dags = dagbag.dags.values()

for dag in dags:
root_dag_id = dag.dag_id
for resource_name, resource_values in self.RESOURCE_DETAILS_MAP.items():
dag_resource_name = self._resource_name(root_dag_id, resource_name)
dag_resource_name = permissions.resource_name(dag.dag_id, resource_name)
for action_name in resource_values["actions"]:
if (action_name, dag_resource_name) not in perms:
self._merge_perm(action_name, dag_resource_name)

if dag.access_control is not None:
self.sync_perm_for_dag(root_dag_id, dag.access_control)
self.sync_perm_for_dag(dag.dag_id, dag.access_control)

def sync_perm_for_dag(
self,
Expand All @@ -949,7 +948,7 @@ def sync_perm_for_dag(
:return:
"""
for resource_name, resource_values in self.RESOURCE_DETAILS_MAP.items():
dag_resource_name = self._resource_name(dag_id, resource_name)
dag_resource_name = permissions.resource_name(dag_id, resource_name)
for dag_action_name in resource_values["actions"]:
self.create_permission(dag_action_name, dag_resource_name)

Expand All @@ -962,17 +961,6 @@ def sync_perm_for_dag(
dag_id,
)

def _resource_name(self, dag_id: str, resource_name: str) -> str:
"""
Get the resource name from permissions.

This method is to keep compatibility with new FAB versions
running with old airflow versions.
"""
if hasattr(permissions, "resource_name"):
return getattr(permissions, "resource_name")(dag_id, resource_name)
return getattr(permissions, "resource_name_for_dag")(dag_id)

def _sync_dag_view_permissions(
self,
dag_id: str,
Expand Down Expand Up @@ -1000,7 +988,7 @@ def _get_or_create_dag_permission(action_name: str, dag_resource_name: str) -> P

# Revoking stale permissions for all possible DAG level resources
for resource_name in self.RESOURCE_DETAILS_MAP.keys():
dag_resource_name = self._resource_name(dag_id, resource_name)
dag_resource_name = permissions.resource_name(dag_id, resource_name)
if resource := self.get_resource(dag_resource_name):
existing_dag_perms = self.get_resource_permissions(resource)
for perm in existing_dag_perms:
Expand Down Expand Up @@ -1043,7 +1031,7 @@ def _get_or_create_dag_permission(action_name: str, dag_resource_name: str) -> P
f"The set of valid resource names is: {self.RESOURCE_DETAILS_MAP.keys()}"
)

dag_resource_name = self._resource_name(dag_id, resource_name)
dag_resource_name = permissions.resource_name(dag_id, resource_name)
self.log.debug("Syncing DAG-level permissions for DAG '%s'", dag_resource_name)

invalid_actions = set(actions) - self.RESOURCE_DETAILS_MAP[resource_name]["actions"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,3 @@ def resource_name(root_dag_id: str, resource: str) -> str:
if root_dag_id.startswith(tuple(PREFIX_RESOURCES_MAP.keys())):
return root_dag_id
return f"{RESOURCE_DETAILS_MAP[resource]['prefix']}{root_dag_id}"


def resource_name_for_dag(root_dag_id: str) -> str:
"""
Return the resource name for a DAG id.

Note: This function is kept for backwards compatibility.
"""
if root_dag_id == RESOURCE_DAG:
return root_dag_id
if root_dag_id.startswith(RESOURCE_DAG_PREFIX):
return root_dag_id
return f"{RESOURCE_DAG_PREFIX}{root_dag_id}"