Skip to content

Commit

Permalink
Fix model permissions for UserCredential.
Browse files Browse the repository at this point in the history
ECOM-3978
  • Loading branch information
jsa committed Apr 1, 2016
1 parent 5fa592a commit c6be5d7
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 11 deletions.
55 changes: 55 additions & 0 deletions credentials/apps/api/permissions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""
Custom permissions classes for use with DRF.
"""
from django.http import Http404
from rest_framework import permissions


class UserCredentialViewSetPermissions(permissions.DjangoModelPermissions):
"""
Custom extension to DjangoModelPermissions for use with the
UserCredentialViewSet.
This permissions class uses an explicit 'view' permission, enabling support
for:
- explicitly granting certain users read access to any UserCredential
- implicitly granting any user read access to UserCredentials that were
awarded specifically to them
- denying read access (obscured by HTTP 404) in any other case
NOTE: users are allowed to read their own UserCredential records regardless
of their 'status' (i.e. even if revoked).
WARNING: this permissions implementation does not cover the 'list' method
of access. The access control required under DRF for that use case is
presently implemented in the `list` method of the viewset itself.
"""

# refer to the super() for more context on what this override is doing.
perms_map = permissions.DjangoModelPermissions.perms_map
perms_map.update({method: ['%(app_label)s.view_%(model_name)s'] for method in permissions.SAFE_METHODS})

def has_permission(self, request, view):
"""
Relax the base's view-level permissions in the case of 'safe'
(read-only) methods, requiring only that the user be authenticated.
This lets us delay deciding whether or not read permission should be
implicitly granted, until after DRF has fetched the requested object.
"""
return super(UserCredentialViewSetPermissions, self).has_permission(request, view) or (
request.user.is_authenticated() and request.method in permissions.SAFE_METHODS
)

def has_object_permission(self, request, view, obj):
"""
Allow access to specific objects when granted explicitly (via model
permissions) or, if a read-only request, implicitly (via matching
username).
"""
if super(UserCredentialViewSetPermissions, self).has_permission(request, view):
return True
elif request.user.username.lower() == obj.username.lower():
return True
else:
raise Http404
151 changes: 147 additions & 4 deletions credentials/apps/api/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
import json

import ddt
from django.contrib.auth.models import Permission
from django.contrib.auth.models import Group, Permission
from django.core.urlresolvers import reverse
from rest_framework.test import APITestCase, APIRequestFactory
from testfixtures import LogCapture

from credentials.apps.api.serializers import UserCredentialSerializer
from credentials.apps.api.tests import factories
from credentials.apps.core.constants import Role
from credentials.apps.credentials.models import UserCredential


Expand Down Expand Up @@ -40,6 +41,11 @@ def setUp(self):
self.username = "test_user"
self.request = APIRequestFactory().get('/')

def _add_permission(self, perm):
""" DRY helper to add usercredential model permissions to self.user """
# pylint: disable=no-member
self.user.user_permissions.add(Permission.objects.get(codename='{}_usercredential'.format(perm)))

def _attempt_update_user_credential(self, data):
""" Helper method that attempts to patch an existing credential object.
Expand All @@ -50,13 +56,13 @@ def _attempt_update_user_credential(self, data):
Response: HTTP response from the API.
"""
# pylint: disable=no-member
self.user.user_permissions.add(Permission.objects.get(codename="change_usercredential"))
self._add_permission('change')
path = reverse("api:v1:usercredential-detail", args=[self.user_credential.id])
return self.client.patch(path=path, data=json.dumps(data), content_type=JSON_CONTENT_TYPE)

def test_get(self):
""" Verify a single user credential is returned. """

self._add_permission('view')
path = reverse("api:v1:usercredential-detail", args=[self.user_credential.id])
response = self.client.get(path)
self.assertEqual(response.status_code, 200)
Expand Down Expand Up @@ -115,7 +121,7 @@ def _attempt_create_user_credentials(self, data):
Response: HTTP response from the API.
"""
# pylint: disable=no-member
self.user.user_permissions.add(Permission.objects.get(codename="add_usercredential"))
self._add_permission('add')
path = self.list_path
return self.client.post(path=path, data=json.dumps(data), content_type=JSON_CONTENT_TYPE)

Expand Down Expand Up @@ -268,6 +274,7 @@ def test_create_with_empty_attributes(self):

def test_list_with_username_filter(self):
""" Verify the list endpoint supports filter data by username."""
self._add_permission('view')
factories.UserCredentialFactory(username="dummy-user")
response = self.client.get(self.list_path, data={'username': self.user_credential.username})
self.assertEqual(response.status_code, 200)
Expand All @@ -281,6 +288,7 @@ def test_list_with_username_filter(self):

def test_list_with_status_filter(self):
""" Verify the list endpoint supports filtering by status."""
self._add_permission('view')
factories.UserCredentialFactory.create_batch(2, status="revoked", username=self.user_credential.username)
response = self.client.get(self.list_path, data={'status': self.user_credential.status})
self.assertEqual(response.status_code, 400)
Expand Down Expand Up @@ -441,6 +449,122 @@ def test_users_lists_access_by_authenticated_users(self):
self.assertEqual(response.status_code, 401)


@ddt.ddt
class UserCredentialViewSetPermissionsTests(APITestCase):
"""
Thoroughly exercise the custom view- and object-level permissions for this viewset.
"""

def make_user(self, group=None, perm=None, **kwargs):
""" DRY helper to create users with specific groups and/or permissions. """
# pylint: disable=no-member
user = factories.UserFactory(**kwargs)
if group:
user.groups.add(Group.objects.get(name=group))
if perm:
user.user_permissions.add(Permission.objects.get(codename='{}_usercredential'.format(perm)))
return user

@ddt.data(
({'group': Role.ADMINS}, 200),
({'perm': 'view'}, 200),
({'perm': 'add'}, 404),
({'perm': 'change'}, 404),
({'username': 'test-user'}, 200),
({'username': 'TeSt-uSeR'}, 200),
({'username': 'other'}, 404),
)
@ddt.unpack
def test_list(self, user_kwargs, expected_status):
"""
The list method (GET) requires either 'view' permission, or for the
'username' query parameter to match that of the requesting user.
"""
list_path = reverse("api:v1:usercredential-list")

self.client.force_authenticate(self.make_user(**user_kwargs)) # pylint: disable=no-member
response = self.client.get(list_path, {'username': 'test-user'})
self.assertEqual(response.status_code, expected_status)

@ddt.data(
({'group': Role.ADMINS}, 201),
({'perm': 'add'}, 201),
({'perm': 'view'}, 403),
({'perm': 'change'}, 403),
({}, 403),
({'username': 'test-user'}, 403),
)
@ddt.unpack
def test_create(self, user_kwargs, expected_status):
"""
The creation (POST) method requires the 'add' permission.
"""
list_path = reverse('api:v1:usercredential-list')
program_certificate = factories.ProgramCertificateFactory(program_id=2)
post_data = {
'username': 'test-user',
'credential': {
'program_id': program_certificate.program_id
},
'attributes': [],
}

self.client.force_authenticate(self.make_user(**user_kwargs)) # pylint: disable=no-member
response = self.client.post(list_path, data=json.dumps(post_data), content_type=JSON_CONTENT_TYPE)
self.assertEqual(response.status_code, expected_status)

@ddt.data(
({'group': Role.ADMINS}, 200),
({'perm': 'view'}, 200),
({'perm': 'add'}, 404),
({'perm': 'change'}, 404),
({'username': 'test-user'}, 200),
({'username': 'TeSt-uSeR'}, 200),
({'username': 'other-user'}, 404),
)
@ddt.unpack
def test_retrieve(self, user_kwargs, expected_status):
"""
The retrieve (GET) method requires the 'view' permission, or for the
requested object to be associated with the username of the requesting
user.
"""
program_cert = factories.ProgramCertificateFactory()
user_credential = factories.UserCredentialFactory.create(credential=program_cert, username='test-user')
detail_path = reverse("api:v1:usercredential-detail", args=[user_credential.id])

self.client.force_authenticate(self.make_user(**user_kwargs)) # pylint: disable=no-member
response = self.client.get(detail_path)
self.assertEqual(response.status_code, expected_status)

@ddt.data(
({'group': Role.ADMINS}, 200),
({'perm': 'view'}, 403),
({'perm': 'add'}, 403),
({'perm': 'change'}, 200),
({'username': 'test-user'}, 403),
({}, 403),
)
@ddt.unpack
def test_partial_update(self, user_kwargs, expected_status):
"""
The partial update (PATCH) method requires the 'change' permission.
"""
program_cert = factories.ProgramCertificateFactory()
user_credential = factories.UserCredentialFactory.create(credential=program_cert, username='test-user')
detail_path = reverse("api:v1:usercredential-detail", args=[user_credential.id])
post_data = {
'username': 'test-user',
'credential': {
'program_id': program_cert.program_id
},
'attributes': [{"name": "dummy-attr-name", "value": "dummy-attr-value"}],
}
self.client.force_authenticate(self.make_user(**user_kwargs)) # pylint: disable=no-member
response = self.client.patch(path=detail_path, data=json.dumps(post_data), content_type=JSON_CONTENT_TYPE)
self.assertEqual(response.status_code, expected_status)


class CredentialViewSetTests(APITestCase):
""" Base Class for ProgramCredentialViewSetTests and CourseCredentialViewSetTests. """

Expand All @@ -450,10 +574,21 @@ class CredentialViewSetTests(APITestCase):
def setUp(self):
super(CredentialViewSetTests, self).setUp()

# pylint: disable=no-member
self.user = factories.UserFactory()
self.user.groups.add(Group.objects.get(name=Role.ADMINS))
self.client.force_authenticate(self.user) # pylint: disable=no-member
self.request = APIRequestFactory().get('/')

def assert_permission_required(self, data):
"""
Ensure access to these APIs is restricted to those with explicit model
permissions.
"""
self.client.force_authenticate(user=factories.UserFactory()) # pylint: disable=no-member
response = self.client.get(self.list_path, data)
self.assertEqual(response.status_code, 403)

def assert_list_without_id_filter(self, path, expected):
"""Helper method used for making request and assertions. """
response = self.client.get(path)
Expand Down Expand Up @@ -508,6 +643,10 @@ def test_list_with_status_filter(self):
factories.UserCredentialFactory.create_batch(2, status="revoked", username=self.user_credential.username)
self.assert_list_with_status_filter(data={'program_id': self.program_id, 'status': UserCredential.AWARDED}, )

def test_permission_required(self):
""" Verify that requests require explicit model permissions. """
self.assert_permission_required({'program_id': self.program_id, 'status': UserCredential.AWARDED})


class CourseCredentialViewSetTests(CredentialViewSetTests):
""" Tests for CourseCredentialViewSetTests. """
Expand Down Expand Up @@ -555,3 +694,7 @@ def test_list_with_certificate_type(self):
json.loads(response.content),
{'count': 1, 'next': None, 'previous': None, 'results': [expected]}
)

def test_permission_required(self):
""" Verify that requests require explicit model permissions. """
self.assert_permission_required({'course_id': self.course_id, 'status': UserCredential.AWARDED})
19 changes: 12 additions & 7 deletions credentials/apps/api/v1/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
"""
import logging

from rest_framework import filters, mixins, viewsets
from django.http import Http404
from rest_framework import mixins, viewsets
from rest_framework.exceptions import ValidationError
from rest_framework.permissions import DjangoModelPermissions
from credentials.apps.api.filters import ProgramFilter, CourseFilter

from credentials.apps.api.permissions import UserCredentialViewSetPermissions
from credentials.apps.api.serializers import UserCredentialCreationSerializer, UserCredentialSerializer
from credentials.apps.credentials.models import UserCredential

Expand All @@ -19,16 +20,22 @@ class UserCredentialViewSet(viewsets.ModelViewSet):
""" UserCredentials endpoints. """

queryset = UserCredential.objects.all()
filter_backends = (filters.DjangoFilterBackend,)
filter_fields = ('username', 'status')
serializer_class = UserCredentialSerializer
permission_classes = (DjangoModelPermissions,)
permission_classes = (UserCredentialViewSetPermissions,)

def list(self, request, *args, **kwargs):
if not self.request.query_params.get('username'):
if not request.query_params.get('username'):
raise ValidationError(
{'error': 'A username query string parameter is required for filtering user credentials.'})

# provide an additional permission check related to the username
# query string parameter. See also `UserCredentialViewSetPermissions`
if not request.user.has_perm('credentials.view_usercredential') and (
request.user.username.lower() != self.request.query_params['username'].lower()
):
raise Http404

return super(UserCredentialViewSet, self).list(request, *args, **kwargs) # pylint: disable=maybe-no-member

def create(self, request, *args, **kwargs):
Expand All @@ -39,7 +46,6 @@ def create(self, request, *args, **kwargs):
class ProgramsCredentialsViewSet(mixins.ListModelMixin, viewsets.GenericViewSet):
"""It will return the all credentials for programs."""
queryset = UserCredential.objects.all()
filter_backends = (filters.DjangoFilterBackend,)
filter_class = ProgramFilter
serializer_class = UserCredentialSerializer

Expand All @@ -55,7 +61,6 @@ def list(self, request, *args, **kwargs):
class CourseCredentialsViewSet(mixins.ListModelMixin, viewsets.GenericViewSet):
"""It will return the all credentials for courses."""
queryset = UserCredential.objects.all()
filter_backends = (filters.DjangoFilterBackend,)
filter_class = CourseFilter
serializer_class = UserCredentialSerializer

Expand Down
44 changes: 44 additions & 0 deletions credentials/apps/core/migrations/0003_auto_20160331_0218.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals

from django.contrib.auth.models import Group, Permission
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ObjectDoesNotExist
from django.db import migrations

from credentials.apps.core.constants import Role


def create_view_permission(apps, schema_editor):
"""
"""
content_type = ContentType.objects.get(app_label="credentials", model="usercredential")
permission, created = Permission.objects.get_or_create(
content_type=content_type,
codename='view_usercredential',
name='Can view any user credential',
)
if created:
Group.objects.get(name=Role.ADMINS).permissions.add(permission)


def destroy_view_permission(apps, schema_editor):
"""
"""
try:
content_type = ContentType.objects.get(app_label='credentials', model='usercredential')
permission = Permission.objects.get(content_type=content_type, codename='view_usercredential')
permission.delete()
except ObjectDoesNotExist:
pass


class Migration(migrations.Migration):

dependencies = [
('core', '0002_auto_20160111_1251'),
]

operations = [
migrations.RunPython(code=create_view_permission, reverse_code=destroy_view_permission),
]
1 change: 1 addition & 0 deletions credentials/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@
),
'DEFAULT_FILTER_BACKENDS': ('rest_framework.filters.DjangoFilterBackend',),
'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.LimitOffsetPagination',
'DEFAULT_PERMISSION_CLASSES': ('rest_framework.permissions.DjangoModelPermissions',),
'PAGE_SIZE': 20,
'DATETIME_FORMAT': '%Y-%m-%dT%H:%M:%SZ'
}
Loading

0 comments on commit c6be5d7

Please sign in to comment.