forked from jazzband/djangorestframework-simplejwt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserializers.py
144 lines (99 loc) · 4.21 KB
/
serializers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
from django.contrib.auth import authenticate
from django.utils.translation import gettext_lazy as _
from rest_framework import exceptions, serializers
from .settings import api_settings
from .state import User
from .tokens import RefreshToken, SlidingToken, UntypedToken
class PasswordField(serializers.CharField):
def __init__(self, *args, **kwargs):
kwargs.setdefault('style', {})
kwargs['style']['input_type'] = 'password'
kwargs['write_only'] = True
super().__init__(*args, **kwargs)
class TokenObtainSerializer(serializers.Serializer):
username_field = User.USERNAME_FIELD
default_error_messages = {
'no_active_account': _('No active account found with the given credentials')
}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fields[self.username_field] = serializers.CharField()
self.fields['password'] = PasswordField()
def validate(self, attrs):
authenticate_kwargs = {
self.username_field: attrs[self.username_field],
'password': attrs['password'],
}
try:
authenticate_kwargs['request'] = self.context['request']
except KeyError:
pass
self.user = authenticate(**authenticate_kwargs)
# Prior to Django 1.10, inactive users could be authenticated with the
# default `ModelBackend`. As of Django 1.10, the `ModelBackend`
# prevents inactive users from authenticating. App designers can still
# allow inactive users to authenticate by opting for the new
# `AllowAllUsersModelBackend`. However, we explicitly prevent inactive
# users from authenticating to enforce a reasonable policy and provide
# sensible backwards compatibility with older Django versions.
if self.user is None or not self.user.is_active:
raise exceptions.AuthenticationFailed(
self.error_messages['no_active_account'],
'no_active_account',
)
return {}
@classmethod
def get_token(cls, user):
raise NotImplementedError('Must implement `get_token` method for `TokenObtainSerializer` subclasses')
class TokenObtainPairSerializer(TokenObtainSerializer):
@classmethod
def get_token(cls, user):
return RefreshToken.for_user(user)
def validate(self, attrs):
data = super().validate(attrs)
refresh = self.get_token(self.user)
data['refresh'] = str(refresh)
data['access'] = str(refresh.access_token)
return data
class TokenObtainSlidingSerializer(TokenObtainSerializer):
@classmethod
def get_token(cls, user):
return SlidingToken.for_user(user)
def validate(self, attrs):
data = super().validate(attrs)
token = self.get_token(self.user)
data['token'] = str(token)
return data
class TokenRefreshSerializer(serializers.Serializer):
refresh = serializers.CharField()
def validate(self, attrs):
refresh = RefreshToken(attrs['refresh'])
data = {'access': str(refresh.access_token)}
if api_settings.ROTATE_REFRESH_TOKENS:
if api_settings.BLACKLIST_AFTER_ROTATION:
try:
# Attempt to blacklist the given refresh token
refresh.blacklist()
except AttributeError:
# If blacklist app not installed, `blacklist` method will
# not be present
pass
refresh.set_jti()
refresh.set_exp()
data['refresh'] = str(refresh)
return data
class TokenRefreshSlidingSerializer(serializers.Serializer):
token = serializers.CharField()
def validate(self, attrs):
token = SlidingToken(attrs['token'])
# Check that the timestamp in the "refresh_exp" claim has not
# passed
token.check_exp(api_settings.SLIDING_TOKEN_REFRESH_EXP_CLAIM)
# Update the "exp" claim
token.set_exp()
return {'token': str(token)}
class TokenVerifySerializer(serializers.Serializer):
token = serializers.CharField()
def validate(self, attrs):
UntypedToken(attrs['token'])
return {}