From de9cd9c0d803f8fcb6d1c000df6ae4c49f1b8b92 Mon Sep 17 00:00:00 2001 From: jsa Date: Wed, 30 Mar 2016 10:40:08 -0400 Subject: [PATCH] Fix model permissions for UserCredential. ECOM-3978 --- credentials/apps/api/permissions.py | 55 +++++++ credentials/apps/api/tests/test_views.py | 151 +++++++++++++++++- credentials/apps/api/v1/views.py | 19 ++- .../migrations/0003_auto_20160331_0218.py | 48 ++++++ credentials/settings/base.py | 1 + requirements/test.txt | 1 + 6 files changed, 264 insertions(+), 11 deletions(-) create mode 100644 credentials/apps/api/permissions.py create mode 100644 credentials/apps/core/migrations/0003_auto_20160331_0218.py diff --git a/credentials/apps/api/permissions.py b/credentials/apps/api/permissions.py new file mode 100644 index 000000000..79a73c5e9 --- /dev/null +++ b/credentials/apps/api/permissions.py @@ -0,0 +1,55 @@ +""" +Custom permissions classes for use with DRF. +""" +from django.http import Http404 +from rest_framework import permissions + + +class UserCredentialViewSetPermissions(permissions.DjangoModelPermissions): + """ + Custom extension to DjangoModelPermissions for use with the + UserCredentialViewSet. + + This permissions class uses an explicit 'view' permission, enabling support + for: + - explicitly granting certain users read access to any UserCredential + - implicitly granting any user read access to UserCredentials that were + awarded specifically to them + - denying read access (obscured by HTTP 404) in any other case + + NOTE: users are allowed to read their own UserCredential records regardless + of their 'status' (i.e. even if revoked). + + WARNING: this permissions implementation does not cover the 'list' method + of access. The access control required under DRF for that use case is + presently implemented in the `list` method of the viewset itself. + """ + + # refer to the super() for more context on what this override is doing. + perms_map = permissions.DjangoModelPermissions.perms_map + perms_map.update({method: ['%(app_label)s.view_%(model_name)s'] for method in permissions.SAFE_METHODS}) + + def has_permission(self, request, view): + """ + Relax the base's view-level permissions in the case of 'safe' + (read-only) methods, requiring only that the user be authenticated. + + This lets us delay deciding whether or not read permission should be + implicitly granted, until after DRF has fetched the requested object. + """ + return super(UserCredentialViewSetPermissions, self).has_permission(request, view) or ( + request.user.is_authenticated() and request.method in permissions.SAFE_METHODS + ) + + def has_object_permission(self, request, view, obj): + """ + Allow access to specific objects when granted explicitly (via model + permissions) or, if a read-only request, implicitly (via matching + username). + """ + if super(UserCredentialViewSetPermissions, self).has_permission(request, view): + return True + elif request.user.username.lower() == obj.username.lower(): + return True + else: + raise Http404 diff --git a/credentials/apps/api/tests/test_views.py b/credentials/apps/api/tests/test_views.py index 3b96db5b9..44b6c0023 100644 --- a/credentials/apps/api/tests/test_views.py +++ b/credentials/apps/api/tests/test_views.py @@ -5,13 +5,14 @@ import json import ddt -from django.contrib.auth.models import Permission +from django.contrib.auth.models import Group, Permission from django.core.urlresolvers import reverse from rest_framework.test import APITestCase, APIRequestFactory from testfixtures import LogCapture from credentials.apps.api.serializers import UserCredentialSerializer from credentials.apps.api.tests import factories +from credentials.apps.core.constants import Role from credentials.apps.credentials.models import UserCredential @@ -40,6 +41,11 @@ def setUp(self): self.username = "test_user" self.request = APIRequestFactory().get('/') + def _add_permission(self, perm): + """ DRY helper to add usercredential model permissions to self.user """ + # pylint: disable=no-member + self.user.user_permissions.add(Permission.objects.get(codename='{}_usercredential'.format(perm))) + def _attempt_update_user_credential(self, data): """ Helper method that attempts to patch an existing credential object. @@ -50,13 +56,13 @@ def _attempt_update_user_credential(self, data): Response: HTTP response from the API. """ # pylint: disable=no-member - self.user.user_permissions.add(Permission.objects.get(codename="change_usercredential")) + self._add_permission('change') path = reverse("api:v1:usercredential-detail", args=[self.user_credential.id]) return self.client.patch(path=path, data=json.dumps(data), content_type=JSON_CONTENT_TYPE) def test_get(self): """ Verify a single user credential is returned. """ - + self._add_permission('view') path = reverse("api:v1:usercredential-detail", args=[self.user_credential.id]) response = self.client.get(path) self.assertEqual(response.status_code, 200) @@ -115,7 +121,7 @@ def _attempt_create_user_credentials(self, data): Response: HTTP response from the API. """ # pylint: disable=no-member - self.user.user_permissions.add(Permission.objects.get(codename="add_usercredential")) + self._add_permission('add') path = self.list_path return self.client.post(path=path, data=json.dumps(data), content_type=JSON_CONTENT_TYPE) @@ -268,6 +274,7 @@ def test_create_with_empty_attributes(self): def test_list_with_username_filter(self): """ Verify the list endpoint supports filter data by username.""" + self._add_permission('view') factories.UserCredentialFactory(username="dummy-user") response = self.client.get(self.list_path, data={'username': self.user_credential.username}) self.assertEqual(response.status_code, 200) @@ -281,6 +288,7 @@ def test_list_with_username_filter(self): def test_list_with_status_filter(self): """ Verify the list endpoint supports filtering by status.""" + self._add_permission('view') factories.UserCredentialFactory.create_batch(2, status="revoked", username=self.user_credential.username) response = self.client.get(self.list_path, data={'status': self.user_credential.status}) self.assertEqual(response.status_code, 400) @@ -441,6 +449,122 @@ def test_users_lists_access_by_authenticated_users(self): self.assertEqual(response.status_code, 401) +@ddt.ddt +class UserCredentialViewSetPermissionsTests(APITestCase): + """ + Thoroughly exercise the custom view- and object-level permissions for this viewset. + """ + + def make_user(self, group=None, perm=None, **kwargs): + """ DRY helper to create users with specific groups and/or permissions. """ + # pylint: disable=no-member + user = factories.UserFactory(**kwargs) + if group: + user.groups.add(Group.objects.get(name=group)) + if perm: + user.user_permissions.add(Permission.objects.get(codename='{}_usercredential'.format(perm))) + return user + + @ddt.data( + ({'group': Role.ADMINS}, 200), + ({'perm': 'view'}, 200), + ({'perm': 'add'}, 404), + ({'perm': 'change'}, 404), + ({'username': 'test-user'}, 200), + ({'username': 'TeSt-uSeR'}, 200), + ({'username': 'other'}, 404), + ) + @ddt.unpack + def test_list(self, user_kwargs, expected_status): + """ + The list method (GET) requires either 'view' permission, or for the + 'username' query parameter to match that of the requesting user. + """ + list_path = reverse("api:v1:usercredential-list") + + self.client.force_authenticate(self.make_user(**user_kwargs)) # pylint: disable=no-member + response = self.client.get(list_path, {'username': 'test-user'}) + self.assertEqual(response.status_code, expected_status) + + @ddt.data( + ({'group': Role.ADMINS}, 201), + ({'perm': 'add'}, 201), + ({'perm': 'view'}, 403), + ({'perm': 'change'}, 403), + ({}, 403), + ({'username': 'test-user'}, 403), + ) + @ddt.unpack + def test_create(self, user_kwargs, expected_status): + """ + The creation (POST) method requires the 'add' permission. + """ + list_path = reverse('api:v1:usercredential-list') + program_certificate = factories.ProgramCertificateFactory() + post_data = { + 'username': 'test-user', + 'credential': { + 'program_id': program_certificate.program_id + }, + 'attributes': [], + } + + self.client.force_authenticate(self.make_user(**user_kwargs)) # pylint: disable=no-member + response = self.client.post(list_path, data=json.dumps(post_data), content_type=JSON_CONTENT_TYPE) + self.assertEqual(response.status_code, expected_status) + + @ddt.data( + ({'group': Role.ADMINS}, 200), + ({'perm': 'view'}, 200), + ({'perm': 'add'}, 404), + ({'perm': 'change'}, 404), + ({'username': 'test-user'}, 200), + ({'username': 'TeSt-uSeR'}, 200), + ({'username': 'other-user'}, 404), + ) + @ddt.unpack + def test_retrieve(self, user_kwargs, expected_status): + """ + The retrieve (GET) method requires the 'view' permission, or for the + requested object to be associated with the username of the requesting + user. + """ + program_cert = factories.ProgramCertificateFactory() + user_credential = factories.UserCredentialFactory.create(credential=program_cert, username='test-user') + detail_path = reverse("api:v1:usercredential-detail", args=[user_credential.id]) + + self.client.force_authenticate(self.make_user(**user_kwargs)) # pylint: disable=no-member + response = self.client.get(detail_path) + self.assertEqual(response.status_code, expected_status) + + @ddt.data( + ({'group': Role.ADMINS}, 200), + ({'perm': 'view'}, 403), + ({'perm': 'add'}, 403), + ({'perm': 'change'}, 200), + ({'username': 'test-user'}, 403), + ({}, 403), + ) + @ddt.unpack + def test_partial_update(self, user_kwargs, expected_status): + """ + The partial update (PATCH) method requires the 'change' permission. + """ + program_cert = factories.ProgramCertificateFactory() + user_credential = factories.UserCredentialFactory.create(credential=program_cert, username='test-user') + detail_path = reverse("api:v1:usercredential-detail", args=[user_credential.id]) + post_data = { + 'username': 'test-user', + 'credential': { + 'program_id': program_cert.program_id + }, + 'attributes': [{'name': 'dummy-attr-name', 'value': 'dummy-attr-value'}], + } + self.client.force_authenticate(self.make_user(**user_kwargs)) # pylint: disable=no-member + response = self.client.patch(path=detail_path, data=json.dumps(post_data), content_type=JSON_CONTENT_TYPE) + self.assertEqual(response.status_code, expected_status) + + class CredentialViewSetTests(APITestCase): """ Base Class for ProgramCredentialViewSetTests and CourseCredentialViewSetTests. """ @@ -450,10 +574,21 @@ class CredentialViewSetTests(APITestCase): def setUp(self): super(CredentialViewSetTests, self).setUp() + # pylint: disable=no-member self.user = factories.UserFactory() + self.user.groups.add(Group.objects.get(name=Role.ADMINS)) self.client.force_authenticate(self.user) # pylint: disable=no-member self.request = APIRequestFactory().get('/') + def assert_permission_required(self, data): + """ + Ensure access to these APIs is restricted to those with explicit model + permissions. + """ + self.client.force_authenticate(user=factories.UserFactory()) # pylint: disable=no-member + response = self.client.get(self.list_path, data) + self.assertEqual(response.status_code, 403) + def assert_list_without_id_filter(self, path, expected): """Helper method used for making request and assertions. """ response = self.client.get(path) @@ -508,6 +643,10 @@ def test_list_with_status_filter(self): factories.UserCredentialFactory.create_batch(2, status="revoked", username=self.user_credential.username) self.assert_list_with_status_filter(data={'program_id': self.program_id, 'status': UserCredential.AWARDED}, ) + def test_permission_required(self): + """ Verify that requests require explicit model permissions. """ + self.assert_permission_required({'program_id': self.program_id, 'status': UserCredential.AWARDED}) + class CourseCredentialViewSetTests(CredentialViewSetTests): """ Tests for CourseCredentialViewSetTests. """ @@ -555,3 +694,7 @@ def test_list_with_certificate_type(self): json.loads(response.content), {'count': 1, 'next': None, 'previous': None, 'results': [expected]} ) + + def test_permission_required(self): + """ Verify that requests require explicit model permissions. """ + self.assert_permission_required({'course_id': self.course_id, 'status': UserCredential.AWARDED}) diff --git a/credentials/apps/api/v1/views.py b/credentials/apps/api/v1/views.py index 94d82a23a..9afec2117 100644 --- a/credentials/apps/api/v1/views.py +++ b/credentials/apps/api/v1/views.py @@ -3,11 +3,12 @@ """ import logging -from rest_framework import filters, mixins, viewsets +from django.http import Http404 +from rest_framework import mixins, viewsets from rest_framework.exceptions import ValidationError -from rest_framework.permissions import DjangoModelPermissions from credentials.apps.api.filters import ProgramFilter, CourseFilter +from credentials.apps.api.permissions import UserCredentialViewSetPermissions from credentials.apps.api.serializers import UserCredentialCreationSerializer, UserCredentialSerializer from credentials.apps.credentials.models import UserCredential @@ -19,16 +20,22 @@ class UserCredentialViewSet(viewsets.ModelViewSet): """ UserCredentials endpoints. """ queryset = UserCredential.objects.all() - filter_backends = (filters.DjangoFilterBackend,) filter_fields = ('username', 'status') serializer_class = UserCredentialSerializer - permission_classes = (DjangoModelPermissions,) + permission_classes = (UserCredentialViewSetPermissions,) def list(self, request, *args, **kwargs): - if not self.request.query_params.get('username'): + if not request.query_params.get('username'): raise ValidationError( {'error': 'A username query string parameter is required for filtering user credentials.'}) + # provide an additional permission check related to the username + # query string parameter. See also `UserCredentialViewSetPermissions` + if not request.user.has_perm('credentials.view_usercredential') and ( + request.user.username.lower() != request.query_params['username'].lower() + ): + raise Http404 + return super(UserCredentialViewSet, self).list(request, *args, **kwargs) # pylint: disable=maybe-no-member def create(self, request, *args, **kwargs): @@ -39,7 +46,6 @@ def create(self, request, *args, **kwargs): class ProgramsCredentialsViewSet(mixins.ListModelMixin, viewsets.GenericViewSet): """It will return the all credentials for programs.""" queryset = UserCredential.objects.all() - filter_backends = (filters.DjangoFilterBackend,) filter_class = ProgramFilter serializer_class = UserCredentialSerializer @@ -55,7 +61,6 @@ def list(self, request, *args, **kwargs): class CourseCredentialsViewSet(mixins.ListModelMixin, viewsets.GenericViewSet): """It will return the all credentials for courses.""" queryset = UserCredential.objects.all() - filter_backends = (filters.DjangoFilterBackend,) filter_class = CourseFilter serializer_class = UserCredentialSerializer diff --git a/credentials/apps/core/migrations/0003_auto_20160331_0218.py b/credentials/apps/core/migrations/0003_auto_20160331_0218.py new file mode 100644 index 000000000..77b254a18 --- /dev/null +++ b/credentials/apps/core/migrations/0003_auto_20160331_0218.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.contrib.auth.models import Group, Permission +from django.contrib.contenttypes.models import ContentType +from django.core.exceptions import ObjectDoesNotExist +from django.db import migrations + +from credentials.apps.core.constants import Role + + +def create_view_permission(apps, schema_editor): + """ + Add an explicit view permission for UserCredential, and associate it with + the ADMIN role. + """ + content_type = ContentType.objects.get(app_label="credentials", model="usercredential") + permission, created = Permission.objects.get_or_create( + content_type=content_type, + codename='view_usercredential', + name='Can view any user credential', + ) + if created: + Group.objects.get(name=Role.ADMINS).permissions.add(permission) + + +def destroy_view_permission(apps, schema_editor): + """ + Remove the view permission, if it exists. Note that the permission will + automatically be removed from any roles to which it had been linked. + """ + try: + content_type = ContentType.objects.get(app_label='credentials', model='usercredential') + permission = Permission.objects.get(content_type=content_type, codename='view_usercredential') + permission.delete() + except ObjectDoesNotExist: + pass + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0002_auto_20160111_1251'), + ] + + operations = [ + migrations.RunPython(code=create_view_permission, reverse_code=destroy_view_permission), + ] diff --git a/credentials/settings/base.py b/credentials/settings/base.py index 324275076..6a401dcde 100644 --- a/credentials/settings/base.py +++ b/credentials/settings/base.py @@ -292,6 +292,7 @@ ), 'DEFAULT_FILTER_BACKENDS': ('rest_framework.filters.DjangoFilterBackend',), 'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.LimitOffsetPagination', + 'DEFAULT_PERMISSION_CLASSES': ('rest_framework.permissions.DjangoModelPermissions',), 'PAGE_SIZE': 20, 'DATETIME_FORMAT': '%Y-%m-%dT%H:%M:%SZ' } diff --git a/requirements/test.txt b/requirements/test.txt index f5a967564..889d5176c 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -3,6 +3,7 @@ bok-choy==0.4.10 coverage==4.0.3 +ddt==1.0.1 django-dynamic-fixture==1.8.5 django-nose==1.4.2 edx-lint==0.4.0