Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ WIP
- [admin] Made organization owners read-only to non-superusers
- `organization permissions <https://github.com/openwisp/openwisp-users#organization-permissions>`_:
[admin] Allowed administrator role to access organization admin
- [model] Added `user permission helpers <https://github.com/openwisp/openwisp-users#permissions-helpers>`_

Version 0.2.2 [2020-05-04]
--------------------------
Expand Down
43 changes: 42 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,47 @@ Usage exmaple:
>>> user.organizations_dict.keys()
... dict_keys(['20135c30-d486-4d68-993f-322b8acb51c4'])

Permissions helpers
-------------------

The ``User`` model provides methods to check permissions in an efficient way
(without generating database queries each time the permissions are accessed).

``permissions``
~~~~~~~~~~~~~~~

The ``permissions`` property helper returns the user's permissions
from the cache, cache invalidation is handled automatically.

.. code-block:: python

>>> user.permissions
... {'account.add_emailaddress',
'account.change_emailaddress',
'account.delete_emailaddress',
'account.view_emailaddress',
'openwisp_users.add_organizationuser',
'openwisp_users.add_user',
'openwisp_users.change_organizationuser',
'openwisp_users.change_user',
'openwisp_users.delete_organizationuser',
'openwisp_users.delete_user'}

``has_permission(permission)``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

For superusers, the method returns ``True`` regardless of the permission passed to it.
While for other users, the method checks whether the user has the specified permission and
returns ``True`` or ``False`` accordingly.

It uses the `permissions property helper <#permissions>`_ under the hood
to avoid generating database queries each time is called.

.. code-block:: python

>>> user.has_permission('openwisp_users.add_user')
... True

Django REST Framework Permission Classes
----------------------------------------

Expand Down Expand Up @@ -447,7 +488,7 @@ Organization Owners

An organization owner is a user who is designated as the owner
of a particular organization and this owner can not be deleted
or edited by other administrators. Only the superuser has the permissons to do this.
or edited by other administrators. Only the superuser has the permissions to do this.

By default, the first manager of an organization is designated as the owner of that organization.

Expand Down
28 changes: 27 additions & 1 deletion openwisp_users/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

from django.apps import AppConfig
from django.conf import settings
from django.contrib.auth import get_user_model
from django.core.cache import cache
from django.core.exceptions import ValidationError
from django.db import transaction
from django.db.models.signals import post_delete, post_save
from django.db.models.signals import m2m_changed, post_delete, post_save
from django.utils.translation import ugettext_lazy as _
from openwisp_utils import settings as utils_settings
from swapper import get_model_name, load_model
Expand Down Expand Up @@ -56,6 +57,8 @@ def set_default_settings(self):
def connect_receivers(self):
OrganizationUser = load_model('openwisp_users', 'OrganizationUser')
OrganizationOwner = load_model('openwisp_users', 'OrganizationOwner')
Group = load_model('openwisp_users', 'Group')
User = get_user_model()
signal_tuples = [(post_save, 'post_save'), (post_delete, 'post_delete')]

for model in [OrganizationUser, OrganizationOwner]:
Expand All @@ -72,6 +75,16 @@ def connect_receivers(self):
sender=OrganizationUser,
dispatch_uid='make_first_org_user_org_owner',
)
for model in [
User.user_permissions.through,
User.groups.through,
Group.permissions.through,
]:
m2m_changed.connect(
self.update_user_permissions,
sender=model,
dispatch_uid='update_user_permissions',
)

def update_organizations_dict(cls, instance, **kwargs):
if hasattr(instance, 'user'):
Expand Down Expand Up @@ -110,3 +123,16 @@ def create_organization_owner(cls, instance, created, **kwargs):
f'OrganizationOwner with organization_user {instance} and '
f'organization {instance.organization}'
)

def update_user_permissions(cls, instance, action, sender, **kwargs):
if action == 'post_remove' or action == 'post_add':
if sender.__name__ == 'Group_permissions':
for user in instance.user_set.all():
cls.update_cached_permissions(user)
else:
cls.update_cached_permissions(instance)

def update_cached_permissions(cls, user):
cache_key = f'user_{user.pk}_permissions'
cache.delete(cache_key)
user.permissions
19 changes: 19 additions & 0 deletions openwisp_users/base/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,25 @@ def clean(self):
if self.phone_number == '':
self.phone_number = None

@property
def permissions(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a docstring:

"""
Returns the user permissions from the cache, if the cache is
empty it will call self.get_all_permissions() and cache the result 
"""

"""
Returns the user permissions from the cache, if the cache is
empty it will call self.get_all_permissions() and cache the result
"""
cache_key = f'user_{self.pk}_permissions'
permissions = cache.get(cache_key)
if permissions is not None:
return permissions
permissions = self.get_all_permissions()
cache.set(cache_key, permissions)
return permissions

def has_permission(self, permission):
if self.is_superuser:
return True
return permission in self.permissions


class BaseGroup(object):
"""
Expand Down
60 changes: 60 additions & 0 deletions openwisp_users/tests/test_models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Permission
from django.core.exceptions import ValidationError
from django.test import TestCase
from swapper import load_model
Expand All @@ -7,6 +8,7 @@

OrganizationUser = load_model('openwisp_users', 'OrganizationUser')
OrganizationOwner = load_model('openwisp_users', 'OrganizationOwner')
Group = load_model('openwisp_users', 'Group')
User = get_user_model()


Expand Down Expand Up @@ -181,3 +183,61 @@ def test_add_users_with_empty_phone_numbers(self):
self.assertIsNone(user1.phone_number)
self.assertIsNone(user2.phone_number)
self.assertEqual(self.user_model.objects.filter(phone_number=None).count(), 2)

def test_cache_user_permission(self):
user = self.user_model(
username='user', email='email1@email.com', password='user1', is_staff=True
)
user.full_clean()
user.save()
group = Group.objects.filter(name='Administrator')
user.groups.set(group)

with self.subTest('Test cached permissions'):
with self.assertNumQueries(0):
user.permissions
self.assertEqual(user.get_all_permissions(), user.permissions)

with self.subTest('Test group permissions changed'):
self.assertIn('account.view_emailaddress', user.permissions)
permission = Permission.objects.get(codename='view_emailaddress')
g = group.first()
g.permissions.remove(permission.pk)
g.refresh_from_db()
self.assertNotIn('account.view_emailaddress', user.permissions)

with self.subTest('Test group changed'):
user.groups.remove(group.first().pk)
user.groups.set(Group.objects.filter(name='Operator'))
with self.assertNumQueries(0):
self.assertEqual(user.get_all_permissions(), user.permissions)

with self.subTest('Test user permission changed'):
permission = Permission.objects.filter(codename='add_organization')
user.user_permissions.add(*permission)
with self.assertNumQueries(0):
self.assertEqual(user.get_all_permissions(), user.permissions)

def test_operator_has_permission(self):
app_label = 'account'
user = self.user_model(
username='user', email='email1@email.com', password='user1', is_staff=True
)
user.full_clean()
user.save()
group = Group.objects.filter(name='Administrator')
user.groups.set(group)
self.assertFalse(user.has_permission(f'{app_label}.view_wrong'))
self.assertTrue(user.has_permission(f'{app_label}.view_emailaddress'))

def test_superuser_has_permission(self):
user = self.user_model(
username='superuser',
email='email@email.com',
password='test',
is_staff=True,
is_superuser=True,
)
user.full_clean()
user.save()
self.assertTrue(user.has_permission('not_found.not_found'))