diff --git a/BACKERS.md b/BACKERS.md
index 05e80cb1..fdc24744 100644
--- a/BACKERS.md
+++ b/BACKERS.md
@@ -103,5 +103,11 @@ Jeff Heaton
Birk Jernström
+
+
+
+
+Yaal Coop
+ |
diff --git a/README.md b/README.md
index 3d402a65..f0cb6db4 100644
--- a/README.md
+++ b/README.md
@@ -52,6 +52,7 @@ Generic, spec-compliant implementation to build clients and providers:
- [RFC7662: OAuth 2.0 Token Introspection](https://docs.authlib.org/en/latest/specs/rfc7662.html)
- [RFC8414: OAuth 2.0 Authorization Server Metadata](https://docs.authlib.org/en/latest/specs/rfc8414.html)
- [RFC8628: OAuth 2.0 Device Authorization Grant](https://docs.authlib.org/en/latest/specs/rfc8628.html)
+ - [RFC9068: JSON Web Token (JWT) Profile for OAuth 2.0 Access Tokens](https://docs.authlib.org/en/latest/specs/rfc9068.html)
- [Javascript Object Signing and Encryption](https://docs.authlib.org/en/latest/jose/index.html)
- [RFC7515: JSON Web Signature](https://docs.authlib.org/en/latest/jose/jws.html)
- [RFC7516: JSON Web Encryption](https://docs.authlib.org/en/latest/jose/jwe.html)
diff --git a/authlib/integrations/django_oauth2/resource_protector.py b/authlib/integrations/django_oauth2/resource_protector.py
index 5e797e6f..b89257ba 100644
--- a/authlib/integrations/django_oauth2/resource_protector.py
+++ b/authlib/integrations/django_oauth2/resource_protector.py
@@ -15,7 +15,7 @@
class ResourceProtector(_ResourceProtector):
- def acquire_token(self, request, scopes=None):
+ def acquire_token(self, request, scopes=None, **kwargs):
"""A method to acquire current valid token with the given scope.
:param request: Django HTTP request instance
@@ -23,18 +23,24 @@ def acquire_token(self, request, scopes=None):
:return: token object
"""
req = DjangoJsonRequest(request)
- if isinstance(scopes, str):
- scopes = [scopes]
- token = self.validate_request(scopes, req)
+ # backward compatibility
+ kwargs['scopes'] = scopes
+ for claim in kwargs:
+ if isinstance(kwargs[claim], str):
+ kwargs[claim] = [kwargs[claim]]
+ token = self.validate_request(request=req, **kwargs)
token_authenticated.send(sender=self.__class__, token=token)
return token
- def __call__(self, scopes=None, optional=False):
+ def __call__(self, scopes=None, optional=False, **kwargs):
+ claims = kwargs
+ # backward compatibility
+ claims['scopes'] = scopes
def wrapper(f):
@functools.wraps(f)
def decorated(request, *args, **kwargs):
try:
- token = self.acquire_token(request, scopes)
+ token = self.acquire_token(request, **claims)
request.oauth_token = token
except MissingAuthorizationError as error:
if optional:
diff --git a/authlib/integrations/flask_oauth2/resource_protector.py b/authlib/integrations/flask_oauth2/resource_protector.py
index 152555bb..be2b3fa2 100644
--- a/authlib/integrations/flask_oauth2/resource_protector.py
+++ b/authlib/integrations/flask_oauth2/resource_protector.py
@@ -54,17 +54,19 @@ def raise_error_response(self, error):
headers = error.get_headers()
raise_http_exception(status, body, headers)
- def acquire_token(self, scopes=None):
+ def acquire_token(self, scopes=None, **kwargs):
"""A method to acquire current valid token with the given scope.
:param scopes: a list of scope values
:return: token object
"""
request = FlaskJsonRequest(_req)
- # backward compatible
- if isinstance(scopes, str):
- scopes = [scopes]
- token = self.validate_request(scopes, request)
+ # backward compatibility
+ kwargs['scopes'] = scopes
+ for claim in kwargs:
+ if isinstance(kwargs[claim], str):
+ kwargs[claim] = [kwargs[claim]]
+ token = self.validate_request(request=request, **kwargs)
token_authenticated.send(self, token=token)
g.authlib_server_oauth2_token = token
return token
@@ -85,12 +87,15 @@ def user_api():
except OAuth2Error as error:
self.raise_error_response(error)
- def __call__(self, scopes=None, optional=False):
+ def __call__(self, scopes=None, optional=False, **kwargs):
+ claims = kwargs
+ # backward compatibility
+ claims['scopes'] = scopes
def wrapper(f):
@functools.wraps(f)
def decorated(*args, **kwargs):
try:
- self.acquire_token(scopes)
+ self.acquire_token(**claims)
except MissingAuthorizationError as error:
if optional:
return f(*args, **kwargs)
diff --git a/authlib/jose/errors.py b/authlib/jose/errors.py
index abdaeeb9..fb02eb4e 100644
--- a/authlib/jose/errors.py
+++ b/authlib/jose/errors.py
@@ -82,6 +82,7 @@ class InvalidClaimError(JoseError):
error = 'invalid_claim'
def __init__(self, claim):
+ self.claim_name = claim
description = f'Invalid claim "{claim}"'
super().__init__(description=description)
diff --git a/authlib/jose/rfc7519/jwt.py b/authlib/jose/rfc7519/jwt.py
index 3737d303..3e85f120 100644
--- a/authlib/jose/rfc7519/jwt.py
+++ b/authlib/jose/rfc7519/jwt.py
@@ -50,7 +50,7 @@ def encode(self, header, payload, key, check=True):
:param check: check if sensitive data in payload
:return: bytes
"""
- header['typ'] = 'JWT'
+ header.setdefault('typ', 'JWT')
for k in ['exp', 'iat', 'nbf']:
# convert datetime into timestamp
diff --git a/authlib/oauth2/rfc6749/authorization_server.py b/authlib/oauth2/rfc6749/authorization_server.py
index 8b886a04..3190540e 100644
--- a/authlib/oauth2/rfc6749/authorization_server.py
+++ b/authlib/oauth2/rfc6749/authorization_server.py
@@ -179,16 +179,21 @@ def authenticate_user(self, credential):
if hasattr(grant_cls, 'check_token_endpoint'):
self._token_grants.append((grant_cls, extensions))
- def register_endpoint(self, endpoint_cls):
+ def register_endpoint(self, endpoint):
"""Add extra endpoint to authorization server. e.g.
RevocationEndpoint::
authorization_server.register_endpoint(RevocationEndpoint)
- :param endpoint_cls: A endpoint class
+ :param endpoint_cls: A endpoint class or instance.
"""
- endpoints = self._endpoints.setdefault(endpoint_cls.ENDPOINT_NAME, [])
- endpoints.append(endpoint_cls(self))
+ if isinstance(endpoint, type):
+ endpoint = endpoint(self)
+ else:
+ endpoint.server = self
+
+ endpoints = self._endpoints.setdefault(endpoint.ENDPOINT_NAME, [])
+ endpoints.append(endpoint)
def get_authorization_grant(self, request):
"""Find the authorization grant for current request.
diff --git a/authlib/oauth2/rfc6749/resource_protector.py b/authlib/oauth2/rfc6749/resource_protector.py
index 1964bc3d..60a85d80 100644
--- a/authlib/oauth2/rfc6749/resource_protector.py
+++ b/authlib/oauth2/rfc6749/resource_protector.py
@@ -131,10 +131,10 @@ def parse_request_authorization(self, request):
validator = self.get_token_validator(token_type)
return validator, token_string
- def validate_request(self, scopes, request):
+ def validate_request(self, scopes, request, **kwargs):
"""Validate the request and return a token."""
validator, token_string = self.parse_request_authorization(request)
validator.validate_request(request)
token = validator.authenticate_token(token_string)
- validator.validate_token(token, scopes, request)
+ validator.validate_token(token, scopes, request, **kwargs)
return token
diff --git a/authlib/oauth2/rfc7009/revocation.py b/authlib/oauth2/rfc7009/revocation.py
index b130827d..f0984789 100644
--- a/authlib/oauth2/rfc7009/revocation.py
+++ b/authlib/oauth2/rfc7009/revocation.py
@@ -27,6 +27,12 @@ def authenticate_token(self, request, client):
OPTIONAL. A hint about the type of the token submitted for
revocation.
"""
+ self.check_params(request, client)
+ token = self.query_token(request.form['token'], request.form.get('token_type_hint'))
+ if token and token.check_client(client):
+ return token
+
+ def check_params(self, request, client):
if 'token' not in request.form:
raise InvalidRequestError()
@@ -34,10 +40,6 @@ def authenticate_token(self, request, client):
if hint and hint not in self.SUPPORTED_TOKEN_TYPES:
raise UnsupportedTokenTypeError()
- token = self.query_token(request.form['token'], hint)
- if token and token.check_client(client):
- return token
-
def create_endpoint_response(self, request):
"""Validate revocation request and create the response for revocation.
For example, a client may request the revocation of a refresh token
diff --git a/authlib/oauth2/rfc7662/introspection.py b/authlib/oauth2/rfc7662/introspection.py
index cca15b83..515d6ca6 100644
--- a/authlib/oauth2/rfc7662/introspection.py
+++ b/authlib/oauth2/rfc7662/introspection.py
@@ -34,6 +34,13 @@ def authenticate_token(self, request, client):
**OPTIONAL** A hint about the type of the token submitted for
introspection.
"""
+
+ self.check_params(request, client)
+ token = self.query_token(request.form['token'], request.form.get('token_type_hint'))
+ if token and self.check_permission(token, client, request):
+ return token
+
+ def check_params(self, request, client):
params = request.form
if 'token' not in params:
raise InvalidRequestError()
@@ -42,10 +49,6 @@ def authenticate_token(self, request, client):
if hint and hint not in self.SUPPORTED_TOKEN_TYPES:
raise UnsupportedTokenTypeError()
- token = self.query_token(params['token'], hint)
- if token and self.check_permission(token, client, request):
- return token
-
def create_endpoint_response(self, request):
"""Validate introspection request and create the response.
diff --git a/authlib/oauth2/rfc9068/__init__.py b/authlib/oauth2/rfc9068/__init__.py
new file mode 100644
index 00000000..b914509a
--- /dev/null
+++ b/authlib/oauth2/rfc9068/__init__.py
@@ -0,0 +1,11 @@
+from .introspection import JWTIntrospectionEndpoint
+from .revocation import JWTRevocationEndpoint
+from .token import JWTBearerTokenGenerator
+from .token_validator import JWTBearerTokenValidator
+
+__all__ = [
+ 'JWTBearerTokenGenerator',
+ 'JWTBearerTokenValidator',
+ 'JWTIntrospectionEndpoint',
+ 'JWTRevocationEndpoint',
+]
diff --git a/authlib/oauth2/rfc9068/claims.py b/authlib/oauth2/rfc9068/claims.py
new file mode 100644
index 00000000..4dcfea8e
--- /dev/null
+++ b/authlib/oauth2/rfc9068/claims.py
@@ -0,0 +1,62 @@
+from authlib.jose.errors import InvalidClaimError
+from authlib.jose.rfc7519 import JWTClaims
+
+
+class JWTAccessTokenClaims(JWTClaims):
+ REGISTERED_CLAIMS = JWTClaims.REGISTERED_CLAIMS + [
+ 'client_id',
+ 'auth_time',
+ 'acr',
+ 'amr',
+ 'scope',
+ 'groups',
+ 'roles',
+ 'entitlements',
+ ]
+
+ def validate(self, **kwargs):
+ self.validate_typ()
+
+ super().validate(**kwargs)
+ self.validate_client_id()
+ self.validate_auth_time()
+ self.validate_acr()
+ self.validate_amr()
+ self.validate_scope()
+ self.validate_groups()
+ self.validate_roles()
+ self.validate_entitlements()
+
+ def validate_typ(self):
+ # The resource server MUST verify that the 'typ' header value is 'at+jwt'
+ # or 'application/at+jwt' and reject tokens carrying any other value.
+ if self.header['typ'].lower() not in ('at+jwt', 'application/at+jwt'):
+ raise InvalidClaimError('typ')
+
+ def validate_client_id(self):
+ return self._validate_claim_value('client_id')
+
+ def validate_auth_time(self):
+ auth_time = self.get('auth_time')
+ if auth_time and not isinstance(auth_time, (int, float)):
+ raise InvalidClaimError('auth_time')
+
+ def validate_acr(self):
+ return self._validate_claim_value('acr')
+
+ def validate_amr(self):
+ amr = self.get('amr')
+ if amr and not isinstance(self['amr'], list):
+ raise InvalidClaimError('amr')
+
+ def validate_scope(self):
+ return self._validate_claim_value('scope')
+
+ def validate_groups(self):
+ return self._validate_claim_value('groups')
+
+ def validate_roles(self):
+ return self._validate_claim_value('roles')
+
+ def validate_entitlements(self):
+ return self._validate_claim_value('entitlements')
diff --git a/authlib/oauth2/rfc9068/introspection.py b/authlib/oauth2/rfc9068/introspection.py
new file mode 100644
index 00000000..17b5eb5a
--- /dev/null
+++ b/authlib/oauth2/rfc9068/introspection.py
@@ -0,0 +1,126 @@
+from ..rfc7662 import IntrospectionEndpoint
+from authlib.common.errors import ContinueIteration
+from authlib.consts import default_json_headers
+from authlib.jose.errors import ExpiredTokenError
+from authlib.jose.errors import InvalidClaimError
+from authlib.oauth2.rfc6750.errors import InvalidTokenError
+from authlib.oauth2.rfc9068.token_validator import JWTBearerTokenValidator
+
+
+class JWTIntrospectionEndpoint(IntrospectionEndpoint):
+ '''
+ JWTIntrospectionEndpoint inherits from :ref:`specs/rfc7662`
+ :class:`~authlib.oauth2.rfc7662.IntrospectionEndpoint` and implements the machinery
+ to automatically process the JWT access tokens.
+
+ :param issuer: The issuer identifier for which tokens will be introspected.
+
+ :param \\*\\*kwargs: Other parameters are inherited from
+ :class:`~authlib.oauth2.rfc7662.introspection.IntrospectionEndpoint`.
+
+ ::
+
+ class MyJWTAccessTokenIntrospectionEndpoint(JWTRevocationEndpoint):
+ def get_jwks(self):
+ ...
+
+ def get_username(self, user_id):
+ ...
+
+ authorization_server.register_endpoint(
+ MyJWTAccessTokenIntrospectionEndpoint(
+ issuer="https://authorization-server.example.org",
+ )
+ )
+ authorization_server.register_endpoint(MyRefreshTokenIntrospectionEndpoint)
+
+ '''
+
+ #: Endpoint name to be registered
+ ENDPOINT_NAME = 'introspection'
+
+ def __init__(self, issuer, server=None, *args, **kwargs):
+ super().__init__(*args, server=server, **kwargs)
+ self.issuer = issuer
+
+ def create_endpoint_response(self, request):
+ ''''''
+ # The authorization server first validates the client credentials
+ client = self.authenticate_endpoint_client(request)
+
+ # then verifies whether the token was issued to the client making
+ # the revocation request
+ token = self.authenticate_token(request, client)
+
+ # the authorization server invalidates the token
+ body = self.create_introspection_payload(token)
+ return 200, body, default_json_headers
+
+ def authenticate_token(self, request, client):
+ ''''''
+ self.check_params(request, client)
+
+ # do not attempt to decode refresh_tokens
+ if request.form.get('token_type_hint') not in ('access_token', None):
+ raise ContinueIteration()
+
+ validator = JWTBearerTokenValidator(issuer=self.issuer, resource_server=None)
+ validator.get_jwks = self.get_jwks
+ try:
+ token = validator.authenticate_token(request.form['token'])
+
+ # if the token is not a JWT, fall back to the regular flow
+ except InvalidTokenError:
+ raise ContinueIteration()
+
+ if token and self.check_permission(token, client, request):
+ return token
+
+ def create_introspection_payload(self, token):
+ if not token:
+ return {'active': False}
+
+ try:
+ token.validate()
+ except ExpiredTokenError:
+ return {'active': False}
+ except InvalidClaimError as exc:
+ if exc.claim_name == 'iss':
+ raise ContinueIteration()
+ raise InvalidTokenError()
+
+
+ payload = {
+ 'active': True,
+ 'token_type': 'Bearer',
+ 'client_id': token['client_id'],
+ 'scope': token['scope'],
+ 'sub': token['sub'],
+ 'aud': token['aud'],
+ 'iss': token['iss'],
+ 'exp': token['exp'],
+ 'iat': token['iat'],
+ }
+
+ if username := self.get_username(token['sub']):
+ payload['username'] = username
+
+ return payload
+
+ def get_jwks(self):
+ '''Return the JWKs that will be used to check the JWT access token signature.
+ Developers MUST re-implement this method::
+
+ def get_jwks(self):
+ return load_jwks("jwks.json")
+ '''
+ raise NotImplementedError()
+
+ def get_username(self, user_id: str) -> str:
+ '''Returns an username from a user ID.
+ Developers MAY re-implement this method::
+
+ def get_username(self, user_id):
+ return User.get(id=user_id).username
+ '''
+ return None
diff --git a/authlib/oauth2/rfc9068/revocation.py b/authlib/oauth2/rfc9068/revocation.py
new file mode 100644
index 00000000..9453c79a
--- /dev/null
+++ b/authlib/oauth2/rfc9068/revocation.py
@@ -0,0 +1,70 @@
+from ..rfc6749 import UnsupportedTokenTypeError
+from ..rfc7009 import RevocationEndpoint
+from authlib.common.errors import ContinueIteration
+from authlib.oauth2.rfc6750.errors import InvalidTokenError
+from authlib.oauth2.rfc9068.token_validator import JWTBearerTokenValidator
+
+
+class JWTRevocationEndpoint(RevocationEndpoint):
+ '''JWTRevocationEndpoint inherits from `RFC7009`_
+ :class:`~authlib.oauth2.rfc7009.RevocationEndpoint`.
+
+ The JWT access tokens cannot be revoked.
+ If the submitted token is a JWT access token, then revocation returns
+ a `invalid_token_error`.
+
+ :param issuer: The issuer identifier.
+
+ :param \\*\\*kwargs: Other parameters are inherited from
+ :class:`~authlib.oauth2.rfc7009.RevocationEndpoint`.
+
+ Plain text access tokens and other kind of tokens such as refresh_tokens
+ will be ignored by this endpoint and passed to the next revocation endpoint::
+
+ class MyJWTAccessTokenRevocationEndpoint(JWTRevocationEndpoint):
+ def get_jwks(self):
+ ...
+
+ authorization_server.register_endpoint(
+ MyJWTAccessTokenRevocationEndpoint(
+ issuer="https://authorization-server.example.org",
+ )
+ )
+ authorization_server.register_endpoint(MyRefreshTokenRevocationEndpoint)
+
+ .. _RFC7009: https://tools.ietf.org/html/rfc7009
+ '''
+
+ def __init__(self, issuer, server=None, *args, **kwargs):
+ super().__init__(*args, server=server, **kwargs)
+ self.issuer = issuer
+
+ def authenticate_token(self, request, client):
+ ''''''
+ self.check_params(request, client)
+
+ # do not attempt to revoke refresh_tokens
+ if request.form.get('token_type_hint') not in ('access_token', None):
+ raise ContinueIteration()
+
+ validator = JWTBearerTokenValidator(issuer=self.issuer, resource_server=None)
+ validator.get_jwks = self.get_jwks
+
+ try:
+ validator.authenticate_token(request.form['token'])
+
+ # if the token is not a JWT, fall back to the regular flow
+ except InvalidTokenError:
+ raise ContinueIteration()
+
+ # JWT access token cannot be revoked
+ raise UnsupportedTokenTypeError()
+
+ def get_jwks(self):
+ '''Return the JWKs that will be used to check the JWT access token signature.
+ Developers MUST re-implement this method::
+
+ def get_jwks(self):
+ return load_jwks("jwks.json")
+ '''
+ raise NotImplementedError()
diff --git a/authlib/oauth2/rfc9068/token.py b/authlib/oauth2/rfc9068/token.py
new file mode 100644
index 00000000..6751b88e
--- /dev/null
+++ b/authlib/oauth2/rfc9068/token.py
@@ -0,0 +1,218 @@
+import time
+from typing import List
+from typing import Optional
+from typing import Union
+
+from authlib.common.security import generate_token
+from authlib.jose import jwt
+from authlib.oauth2.rfc6750.token import BearerTokenGenerator
+
+
+class JWTBearerTokenGenerator(BearerTokenGenerator):
+ '''A JWT formatted access token generator.
+
+ :param issuer: The issuer identifier. Will appear in the JWT ``iss`` claim.
+
+ :param \\*\\*kwargs: Other parameters are inherited from
+ :class:`~authlib.oauth2.rfc6750.token.BearerTokenGenerator`.
+
+ This token generator can be registered into the authorization server::
+
+ class MyJWTBearerTokenGenerator(JWTBearerTokenGenerator):
+ def get_jwks(self):
+ ...
+
+ def get_extra_claims(self, client, grant_type, user, scope):
+ ...
+
+ authorization_server.register_token_generator(
+ 'default',
+ MyJWTBearerTokenGenerator(issuer='https://authorization-server.example.org'),
+ )
+ '''
+
+ def __init__(
+ self,
+ issuer,
+ alg='RS256',
+ refresh_token_generator=None,
+ expires_generator=None,
+ ):
+ super().__init__(
+ self.access_token_generator, refresh_token_generator, expires_generator
+ )
+ self.issuer = issuer
+ self.alg = alg
+
+ def get_jwks(self):
+ '''Return the JWKs that will be used to sign the JWT access token.
+ Developers MUST re-implement this method::
+
+ def get_jwks(self):
+ return load_jwks("jwks.json")
+ '''
+ raise NotImplementedError()
+
+ def get_extra_claims(self, client, grant_type, user, scope):
+ '''Return extra claims to add in the JWT access token. Developers MAY
+ re-implement this method to add identity claims like the ones in
+ :ref:`specs/oidc` ID Token, or any other arbitrary claims::
+
+ def get_extra_claims(self, client, grant_type, user, scope):
+ return generate_user_info(user, scope)
+ '''
+ return {}
+
+ def get_audiences(self, client, user, scope) -> Union[str, List[str]]:
+ '''Return the audience for the token. By default this simply returns
+ the client ID. Developpers MAY re-implement this method to add extra
+ audiences::
+
+ def get_audiences(self, client, user, scope):
+ return [
+ client.get_client_id(),
+ resource_server.get_id(),
+ ]
+ '''
+ return client.get_client_id()
+
+ def get_acr(self, user) -> Optional[str]:
+ '''Authentication Context Class Reference.
+ Returns a user-defined case sensitive string indicating the class of
+ authentication the used performed. Token audience may refuse to give access to
+ some resources if some ACR criterias are not met.
+ :ref:`specs/oidc` defines one special value: ``0`` means that the user
+ authentication did not respect `ISO29115`_ level 1, and will be refused monetary
+ operations. Developers MAY re-implement this method::
+
+ def get_acr(self, user):
+ if user.insecure_session():
+ return '0'
+ return 'urn:mace:incommon:iap:silver'
+
+ .. _ISO29115: https://www.iso.org/standard/45138.html
+ '''
+ return None
+
+ def get_auth_time(self, user) -> Optional[int]:
+ '''User authentication time.
+ Time when the End-User authentication occurred. Its value is a JSON number
+ representing the number of seconds from 1970-01-01T0:0:0Z as measured in UTC
+ until the date/time. Developers MAY re-implement this method::
+
+ def get_auth_time(self, user):
+ return datetime.timestamp(user.get_auth_time())
+ '''
+ return None
+
+ def get_amr(self, user) -> Optional[List[str]]:
+ '''Authentication Methods References.
+ Defined by :ref:`specs/oidc` as an option list of user-defined case-sensitive
+ strings indication which authentication methods have been used to authenticate
+ the user. Developers MAY re-implement this method::
+
+ def get_amr(self, user):
+ return ['2FA'] if user.has_2fa_enabled() else []
+ '''
+ return None
+
+ def get_jti(self, client, grant_type, user, scope) -> str:
+ '''JWT ID.
+ Create an unique identifier for the token. Developers MAY re-implement
+ this method::
+
+ def get_jti(self, client, grant_type, user scope):
+ return generate_random_string(16)
+ '''
+ return generate_token(16)
+
+ def access_token_generator(self, client, grant_type, user, scope):
+ now = int(time.time())
+ expires_in = now + self._get_expires_in(client, grant_type)
+
+ token_data = {
+ 'iss': self.issuer,
+ 'exp': expires_in,
+ 'client_id': client.get_client_id(),
+ 'iat': now,
+ 'jti': self.get_jti(client, grant_type, user, scope),
+ 'scope': scope,
+ }
+
+ # In cases of access tokens obtained through grants where a resource owner is
+ # involved, such as the authorization code grant, the value of 'sub' SHOULD
+ # correspond to the subject identifier of the resource owner.
+
+ if user:
+ token_data['sub'] = user.get_user_id()
+
+ # In cases of access tokens obtained through grants where no resource owner is
+ # involved, such as the client credentials grant, the value of 'sub' SHOULD
+ # correspond to an identifier the authorization server uses to indicate the
+ # client application.
+
+ else:
+ token_data['sub'] = client.get_client_id()
+
+ # If the request includes a 'resource' parameter (as defined in [RFC8707]), the
+ # resulting JWT access token 'aud' claim SHOULD have the same value as the
+ # 'resource' parameter in the request.
+
+ # TODO: Implement this with RFC8707
+ if False: # pragma: no cover
+ ...
+
+ # If the request does not include a 'resource' parameter, the authorization
+ # server MUST use a default resource indicator in the 'aud' claim. If a 'scope'
+ # parameter is present in the request, the authorization server SHOULD use it to
+ # infer the value of the default resource indicator to be used in the 'aud'
+ # claim. The mechanism through which scopes are associated with default resource
+ # indicator values is outside the scope of this specification.
+
+ else:
+ token_data['aud'] = self.get_audiences(client, user, scope)
+
+ # If the values in the 'scope' parameter refer to different default resource
+ # indicator values, the authorization server SHOULD reject the request with
+ # 'invalid_scope' as described in Section 4.1.2.1 of [RFC6749].
+ # TODO: Implement this with RFC8707
+
+ if auth_time := self.get_auth_time(user):
+ token_data['auth_time'] = auth_time
+
+ # The meaning and processing of acr Claim Values is out of scope for this
+ # specification.
+
+ if acr := self.get_acr(user):
+ token_data['acr'] = acr
+
+ # The definition of particular values to be used in the amr Claim is beyond the
+ # scope of this specification.
+
+ if amr := self.get_amr(user):
+ token_data['amr'] = amr
+
+ # Authorization servers MAY return arbitrary attributes not defined in any
+ # existing specification, as long as the corresponding claim names are collision
+ # resistant or the access tokens are meant to be used only within a private
+ # subsystem. Please refer to Sections 4.2 and 4.3 of [RFC7519] for details.
+
+ token_data.update(self.get_extra_claims(client, grant_type, user, scope))
+
+ # This specification registers the 'application/at+jwt' media type, which can
+ # be used to indicate that the content is a JWT access token. JWT access tokens
+ # MUST include this media type in the 'typ' header parameter to explicitly
+ # declare that the JWT represents an access token complying with this profile.
+ # Per the definition of 'typ' in Section 4.1.9 of [RFC7515], it is RECOMMENDED
+ # that the 'application/' prefix be omitted. Therefore, the 'typ' value used
+ # SHOULD be 'at+jwt'.
+
+ header = {'alg': self.alg, 'typ': 'at+jwt'}
+
+ access_token = jwt.encode(
+ header,
+ token_data,
+ key=self.get_jwks(),
+ check=False,
+ )
+ return access_token.decode()
diff --git a/authlib/oauth2/rfc9068/token_validator.py b/authlib/oauth2/rfc9068/token_validator.py
new file mode 100644
index 00000000..b11ff80b
--- /dev/null
+++ b/authlib/oauth2/rfc9068/token_validator.py
@@ -0,0 +1,163 @@
+'''
+ authlib.oauth2.rfc9068.token_validator
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Implementation of Validating JWT Access Tokens per `Section 4`_.
+
+ .. _`Section 7`: https://www.rfc-editor.org/rfc/rfc9068.html#name-validating-jwt-access-token
+'''
+from authlib.jose import jwt
+from authlib.jose.errors import DecodeError
+from authlib.jose.errors import JoseError
+from authlib.oauth2.rfc6750.errors import InsufficientScopeError
+from authlib.oauth2.rfc6750.errors import InvalidTokenError
+from authlib.oauth2.rfc6750.validator import BearerTokenValidator
+from .claims import JWTAccessTokenClaims
+
+
+class JWTBearerTokenValidator(BearerTokenValidator):
+ '''JWTBearerTokenValidator can protect your resource server endpoints.
+
+ :param issuer: The issuer from which tokens will be accepted.
+ :param resource_server: An identifier for the current resource server,
+ which must appear in the JWT ``aud`` claim.
+
+ Developers needs to implement the missing methods::
+
+ class MyJWTBearerTokenValidator(JWTBearerTokenValidator):
+ def get_jwks(self):
+ ...
+
+ require_oauth = ResourceProtector()
+ require_oauth.register_token_validator(
+ MyJWTBearerTokenValidator(
+ issuer='https://authorization-server.example.org',
+ resource_server='https://resource-server.example.org',
+ )
+ )
+
+ You can then protect resources depending on the JWT `scope`, `groups`,
+ `roles` or `entitlements` claims::
+
+ @require_oauth(
+ scope='profile',
+ groups='admins',
+ roles='student',
+ entitlements='captain',
+ )
+ def resource_endpoint():
+ ...
+ '''
+
+ def __init__(self, issuer, resource_server, *args, **kwargs):
+ self.issuer = issuer
+ self.resource_server = resource_server
+ super().__init__(*args, **kwargs)
+
+ def get_jwks(self):
+ '''Return the JWKs that will be used to check the JWT access token signature.
+ Developers MUST re-implement this method. Typically the JWKs are statically
+ stored in the resource server configuration, or dynamically downloaded and
+ cached using :ref:`specs/rfc8414`::
+
+ def get_jwks(self):
+ if 'jwks' in cache:
+ return cache.get('jwks')
+
+ server_metadata = get_server_metadata(self.issuer)
+ jwks_uri = server_metadata.get('jwks_uri')
+ cache['jwks'] = requests.get(jwks_uri).json()
+ return cache['jwks']
+ '''
+ raise NotImplementedError()
+
+ def validate_iss(self, claims, iss: 'str') -> bool:
+ # The issuer identifier for the authorization server (which is typically
+ # obtained during discovery) MUST exactly match the value of the 'iss'
+ # claim.
+ return iss == self.issuer
+
+ def authenticate_token(self, token_string):
+ ''''''
+ # empty docstring avoids to display the irrelevant parent docstring
+
+ claims_options = {
+ 'iss': {'essential': True, 'validate': self.validate_iss},
+ 'exp': {'essential': True},
+ 'aud': {'essential': True, 'value': self.resource_server},
+ 'sub': {'essential': True},
+ 'client_id': {'essential': True},
+ 'iat': {'essential': True},
+ 'jti': {'essential': True},
+ 'auth_time': {'essential': False},
+ 'acr': {'essential': False},
+ 'amr': {'essential': False},
+ 'scope': {'essential': False},
+ 'groups': {'essential': False},
+ 'roles': {'essential': False},
+ 'entitlements': {'essential': False},
+ }
+ jwks = self.get_jwks()
+
+ # If the JWT access token is encrypted, decrypt it using the keys and algorithms
+ # that the resource server specified during registration. If encryption was
+ # negotiated with the authorization server at registration time and the incoming
+ # JWT access token is not encrypted, the resource server SHOULD reject it.
+
+ # The resource server MUST validate the signature of all incoming JWT access
+ # tokens according to [RFC7515] using the algorithm specified in the JWT 'alg'
+ # Header Parameter. The resource server MUST reject any JWT in which the value
+ # of 'alg' is 'none'. The resource server MUST use the keys provided by the
+ # authorization server.
+ try:
+ return jwt.decode(
+ token_string,
+ key=jwks,
+ claims_cls=JWTAccessTokenClaims,
+ claims_options=claims_options,
+ )
+ except DecodeError:
+ raise InvalidTokenError(
+ realm=self.realm, extra_attributes=self.extra_attributes
+ )
+
+ def validate_token(
+ self, token, scopes, request, groups=None, roles=None, entitlements=None
+ ):
+ ''''''
+ # empty docstring avoids to display the irrelevant parent docstring
+ try:
+ token.validate()
+ except JoseError as exc:
+ raise InvalidTokenError(
+ realm=self.realm, extra_attributes=self.extra_attributes
+ ) from exc
+
+ # If an authorization request includes a scope parameter, the corresponding
+ # issued JWT access token SHOULD include a 'scope' claim as defined in Section
+ # 4.2 of [RFC8693]. All the individual scope strings in the 'scope' claim MUST
+ # have meaning for the resources indicated in the 'aud' claim. See Section 5 for
+ # more considerations about the relationship between scope strings and resources
+ # indicated by the 'aud' claim.
+
+ if self.scope_insufficient(token['scope'], scopes):
+ raise InsufficientScopeError()
+
+ # Many authorization servers embed authorization attributes that go beyond the
+ # delegated scenarios described by [RFC7519] in the access tokens they issue.
+ # Typical examples include resource owner memberships in roles and groups that
+ # are relevant to the resource being accessed, entitlements assigned to the
+ # resource owner for the targeted resource that the authorization server knows
+ # about, and so on. An authorization server wanting to include such attributes
+ # in a JWT access token SHOULD use the 'groups', 'roles', and 'entitlements'
+ # attributes of the 'User' resource schema defined by Section 4.1.2 of
+ # [RFC7643]) as claim types.
+
+ if self.scope_insufficient(token.get('groups'), groups):
+ raise InvalidTokenError()
+
+ if self.scope_insufficient(token.get('roles'), roles):
+ raise InvalidTokenError()
+
+ if self.scope_insufficient(token.get('entitlements'), entitlements):
+ raise InvalidTokenError()
diff --git a/docs/community/authors.rst b/docs/community/authors.rst
index 34c91140..f97d3fcf 100644
--- a/docs/community/authors.rst
+++ b/docs/community/authors.rst
@@ -16,6 +16,7 @@ Here is the list of the main contributors:
- Mario Jimenez Carrasco
- Bastian Venthur
- Nuno Santos
+- Éloi Rivard
And more on https://github.com/lepture/authlib/graphs/contributors
@@ -42,6 +43,7 @@ Here is a full list of our backers:
* `Aveline `_
* `Callam `_
* `Krishna Kumar `_
+* `Yaal Coop `_
.. _`GitHub Sponsors`: https://github.com/sponsors/lepture
.. _Patreon: https://www.patreon.com/lepture
diff --git a/docs/specs/index.rst b/docs/specs/index.rst
index 52820df3..3fef7537 100644
--- a/docs/specs/index.rst
+++ b/docs/specs/index.rst
@@ -26,4 +26,5 @@ works.
rfc8037
rfc8414
rfc8628
+ rfc9068
oidc
diff --git a/docs/specs/rfc9068.rst b/docs/specs/rfc9068.rst
new file mode 100644
index 00000000..1bc68df0
--- /dev/null
+++ b/docs/specs/rfc9068.rst
@@ -0,0 +1,66 @@
+.. _specs/rfc9068:
+
+RFC9068: JSON Web Token (JWT) Profile for OAuth 2.0 Access Tokens
+=================================================================
+
+This section contains the generic implementation of RFC9068_.
+JSON Web Token (JWT) Profile for OAuth 2.0 Access Tokens allows
+developpers to generate JWT access tokens.
+
+Using JWT instead of plain text for access tokens result in different
+possibilities:
+
+- User information can be filled in the JWT claims, similar to the
+ :ref:`specs/oidc` ``id_token``, possibly making the economy of
+ requests to the ``userinfo_endpoint``.
+- Resource servers do not *need* to reach the authorization server
+ :ref:`specs/rfc7662` endpoint to verify each incoming tokens, as
+ the JWT signature is a proof of its validity. This brings the economy
+ of one network request at each resource access.
+- Consequently, the authorization server do not need to store access
+ tokens in a database. If a resource server does not implement this
+ spec and still need to reach the authorization server introspection
+ endpoint to check the token validation, then the authorization server
+ can simply validate the JWT without requesting its database.
+- If the authorization server do not store access tokens in a database,
+ it won't have the possibility to revoke the tokens. The produced access
+ tokens will be valid until the timestamp defined in its ``exp`` claim
+ is reached.
+
+This specification is just about **access** tokens. Other kinds of tokens
+like refresh tokens are not covered.
+
+RFC9068_ define a few optional JWT claims inspired from RFC7643_ that can
+can be used to determine if the token bearer is authorized to access a
+resource: ``groups``, ``roles`` and ``entitlements``.
+
+This module brings tools to:
+
+- generate JWT access tokens with :class:`~authlib.oauth2.rfc9068.JWTBearerTokenGenerator`
+- protected resources endpoints and validate JWT access tokens with :class:`~authlib.oauth2.rfc9068.JWTBearerTokenValidator`
+- introspect JWT access tokens with :class:`~authlib.oauth2.rfc9068.JWTIntrospectionEndpoint`
+- deny JWT access tokens revokation attempts with :class:`~authlib.oauth2.rfc9068.JWTRevocationEndpoint`
+
+.. _RFC9068: https://www.rfc-editor.org/rfc/rfc9068.html
+.. _RFC7643: https://tools.ietf.org/html/rfc7643
+
+API Reference
+-------------
+
+.. module:: authlib.oauth2.rfc9068
+
+.. autoclass:: JWTBearerTokenGenerator
+ :member-order: bysource
+ :members:
+
+.. autoclass:: JWTBearerTokenValidator
+ :member-order: bysource
+ :members:
+
+.. autoclass:: JWTIntrospectionEndpoint
+ :member-order: bysource
+ :members:
+
+.. autoclass:: JWTRevocationEndpoint
+ :member-order: bysource
+ :members:
diff --git a/tests/flask/test_oauth2/test_jwt_access_token.py b/tests/flask/test_oauth2/test_jwt_access_token.py
new file mode 100644
index 00000000..f4b8cf99
--- /dev/null
+++ b/tests/flask/test_oauth2/test_jwt_access_token.py
@@ -0,0 +1,834 @@
+import time
+
+import pytest
+from flask import json
+from flask import jsonify
+
+from .models import Client
+from .models import CodeGrantMixin
+from .models import db
+from .models import save_authorization_code
+from .models import Token
+from .models import User
+from .oauth2_server import create_authorization_server
+from .oauth2_server import TestCase
+from authlib.common.security import generate_token
+from authlib.common.urls import url_decode
+from authlib.common.urls import urlparse
+from authlib.integrations.flask_oauth2 import current_token
+from authlib.integrations.flask_oauth2 import ResourceProtector
+from authlib.jose import jwt
+from authlib.oauth2.rfc6749.grants import (
+ AuthorizationCodeGrant as _AuthorizationCodeGrant,
+)
+from authlib.oauth2.rfc7009 import RevocationEndpoint
+from authlib.oauth2.rfc7662 import IntrospectionEndpoint
+from authlib.oauth2.rfc9068 import JWTBearerTokenGenerator
+from authlib.oauth2.rfc9068 import JWTBearerTokenValidator
+from authlib.oauth2.rfc9068 import JWTIntrospectionEndpoint
+from authlib.oauth2.rfc9068 import JWTRevocationEndpoint
+from tests.util import read_file_path
+
+
+def create_token_validator(issuer, resource_server, jwks):
+ class MyJWTBearerTokenValidator(JWTBearerTokenValidator):
+ def get_jwks(self):
+ return jwks
+
+ validator = MyJWTBearerTokenValidator(
+ issuer=issuer, resource_server=resource_server
+ )
+ return validator
+
+
+def create_resource_protector(app, validator):
+ require_oauth = ResourceProtector()
+ require_oauth.register_token_validator(validator)
+
+ @app.route('/protected')
+ @require_oauth()
+ def protected():
+ user = db.session.get(User, current_token['sub'])
+ return jsonify(id=user.id, username=user.username, token=current_token)
+
+ @app.route('/protected-by-scope')
+ @require_oauth('profile')
+ def protected_by_scope():
+ user = db.session.get(User, current_token['sub'])
+ return jsonify(id=user.id, username=user.username, token=current_token)
+
+ @app.route('/protected-by-groups')
+ @require_oauth(groups=['admins'])
+ def protected_by_groups():
+ user = db.session.get(User, current_token['sub'])
+ return jsonify(id=user.id, username=user.username, token=current_token)
+
+ @app.route('/protected-by-roles')
+ @require_oauth(roles=['student'])
+ def protected_by_roles():
+ user = db.session.get(User, current_token['sub'])
+ return jsonify(id=user.id, username=user.username, token=current_token)
+
+ @app.route('/protected-by-entitlements')
+ @require_oauth(entitlements=['captain'])
+ def protected_by_entitlements():
+ user = db.session.get(User, current_token['sub'])
+ return jsonify(id=user.id, username=user.username, token=current_token)
+
+ return require_oauth
+
+
+def create_token_generator(authorization_server, issuer, jwks):
+ class MyJWTBearerTokenGenerator(JWTBearerTokenGenerator):
+ def get_jwks(self):
+ return jwks
+
+ token_generator = MyJWTBearerTokenGenerator(issuer=issuer)
+ authorization_server.register_token_generator('default', token_generator)
+ return token_generator
+
+
+def create_introspection_endpoint(app, authorization_server, issuer, jwks):
+ class MyJWTIntrospectionEndpoint(JWTIntrospectionEndpoint):
+ def get_jwks(self):
+ return jwks
+
+ def check_permission(self, token, client, request):
+ return client.client_id == 'client-id'
+
+ endpoint = MyJWTIntrospectionEndpoint(issuer=issuer)
+ authorization_server.register_endpoint(endpoint)
+
+ @app.route('/oauth/introspect', methods=['POST'])
+ def introspect_token():
+ return authorization_server.create_endpoint_response(
+ MyJWTIntrospectionEndpoint.ENDPOINT_NAME
+ )
+
+ return endpoint
+
+
+def create_revocation_endpoint(app, authorization_server, issuer, jwks):
+ class MyJWTRevocationEndpoint(JWTRevocationEndpoint):
+ def get_jwks(self):
+ return jwks
+
+ endpoint = MyJWTRevocationEndpoint(issuer=issuer)
+ authorization_server.register_endpoint(endpoint)
+
+ @app.route('/oauth/revoke', methods=['POST'])
+ def revoke_token():
+ return authorization_server.create_endpoint_response(
+ MyJWTRevocationEndpoint.ENDPOINT_NAME
+ )
+
+ return endpoint
+
+
+def create_user():
+ user = User(username='foo')
+ db.session.add(user)
+ db.session.commit()
+ return user
+
+
+def create_oauth_client(client_id, user):
+ oauth_client = Client(
+ user_id=user.id,
+ client_id=client_id,
+ client_secret=client_id,
+ )
+ oauth_client.set_client_metadata(
+ {
+ 'scope': 'profile',
+ 'redirect_uris': ['http://localhost/authorized'],
+ 'response_types': ['code'],
+ 'token_endpoint_auth_method': 'client_secret_post',
+ 'grant_types': ['authorization_code'],
+ }
+ )
+ db.session.add(oauth_client)
+ db.session.commit()
+ return oauth_client
+
+
+def create_access_token_claims(client, user, issuer, **kwargs):
+ now = int(time.time())
+ expires_in = now + 3600
+ auth_time = now - 60
+
+ return {
+ 'iss': kwargs.get('issuer', issuer),
+ 'exp': kwargs.get('exp', expires_in),
+ 'aud': kwargs.get('aud', client.client_id),
+ 'sub': kwargs.get('sub', user.get_user_id()),
+ 'client_id': kwargs.get('client_id', client.client_id),
+ 'iat': kwargs.get('iat', now),
+ 'jti': kwargs.get('jti', generate_token(16)),
+ 'auth_time': kwargs.get('auth_time', auth_time),
+ 'scope': kwargs.get('scope', client.scope),
+ 'groups': kwargs.get('groups', ['admins']),
+ 'roles': kwargs.get('groups', ['student']),
+ 'entitlements': kwargs.get('groups', ['captain']),
+ }
+
+
+def create_access_token(claims, jwks, alg='RS256', typ='at+jwt'):
+ header = {'alg': alg, 'typ': typ}
+ access_token = jwt.encode(
+ header,
+ claims,
+ key=jwks,
+ check=False,
+ )
+ return access_token.decode()
+
+
+def create_token(access_token):
+ token = Token(
+ user_id=1,
+ client_id='resource-server',
+ token_type='bearer',
+ access_token=access_token,
+ scope='profile',
+ expires_in=3600,
+ )
+ db.session.add(token)
+ db.session.commit()
+ return token
+
+
+class AuthorizationCodeGrant(CodeGrantMixin, _AuthorizationCodeGrant):
+ TOKEN_ENDPOINT_AUTH_METHODS = ['client_secret_basic', 'client_secret_post', 'none']
+
+ def save_authorization_code(self, code, request):
+ return save_authorization_code(code, request)
+
+
+class JWTAccessTokenGenerationTest(TestCase):
+ def setUp(self):
+ super().setUp()
+ self.issuer = 'https://authlib.org/'
+ self.jwks = read_file_path('jwks_private.json')
+ self.authorization_server = create_authorization_server(self.app)
+ self.authorization_server.register_grant(AuthorizationCodeGrant)
+ self.token_generator = create_token_generator(
+ self.authorization_server, self.issuer, self.jwks
+ )
+ self.user = create_user()
+ self.oauth_client = create_oauth_client('client-id', self.user)
+
+ def test_generate_jwt_access_token(self):
+ res = self.client.post(
+ '/oauth/authorize',
+ data={
+ 'response_type': self.oauth_client.response_types[0],
+ 'client_id': self.oauth_client.client_id,
+ 'redirect_uri': self.oauth_client.redirect_uris[0],
+ 'scope': self.oauth_client.scope,
+ 'user_id': self.user.id,
+ },
+ )
+
+ params = dict(url_decode(urlparse.urlparse(res.location).query))
+ code = params['code']
+ res = self.client.post(
+ '/oauth/token',
+ data={
+ 'grant_type': 'authorization_code',
+ 'code': code,
+ 'client_id': self.oauth_client.client_id,
+ 'client_secret': self.oauth_client.client_secret,
+ 'scope': ' '.join(self.oauth_client.scope),
+ 'redirect_uri': self.oauth_client.redirect_uris[0],
+ },
+ )
+
+ access_token = res.json['access_token']
+ claims = jwt.decode(access_token, self.jwks)
+
+ assert claims['iss'] == self.issuer
+ assert claims['sub'] == self.user.id
+ assert claims['scope'] == self.oauth_client.scope
+ assert claims['client_id'] == self.oauth_client.client_id
+
+ # This specification registers the 'application/at+jwt' media type, which can
+ # be used to indicate that the content is a JWT access token. JWT access tokens
+ # MUST include this media type in the 'typ' header parameter to explicitly
+ # declare that the JWT represents an access token complying with this profile.
+ # Per the definition of 'typ' in Section 4.1.9 of [RFC7515], it is RECOMMENDED
+ # that the 'application/' prefix be omitted. Therefore, the 'typ' value used
+ # SHOULD be 'at+jwt'.
+
+ assert claims.header['typ'] == 'at+jwt'
+
+ def test_generate_jwt_access_token_extra_claims(self):
+ '''
+ Authorization servers MAY return arbitrary attributes not defined in any
+ existing specification, as long as the corresponding claim names are collision
+ resistant or the access tokens are meant to be used only within a private
+ subsystem. Please refer to Sections 4.2 and 4.3 of [RFC7519] for details.
+ '''
+
+ def get_extra_claims(client, grant_type, user, scope):
+ return {'username': user.username}
+
+ self.token_generator.get_extra_claims = get_extra_claims
+
+ res = self.client.post(
+ '/oauth/authorize',
+ data={
+ 'response_type': self.oauth_client.response_types[0],
+ 'client_id': self.oauth_client.client_id,
+ 'redirect_uri': self.oauth_client.redirect_uris[0],
+ 'scope': self.oauth_client.scope,
+ 'user_id': self.user.id,
+ },
+ )
+
+ params = dict(url_decode(urlparse.urlparse(res.location).query))
+ code = params['code']
+ res = self.client.post(
+ '/oauth/token',
+ data={
+ 'grant_type': 'authorization_code',
+ 'code': code,
+ 'client_id': self.oauth_client.client_id,
+ 'client_secret': self.oauth_client.client_secret,
+ 'scope': ' '.join(self.oauth_client.scope),
+ 'redirect_uri': self.oauth_client.redirect_uris[0],
+ },
+ )
+
+ access_token = res.json['access_token']
+ claims = jwt.decode(access_token, self.jwks)
+ assert claims['username'] == self.user.username
+
+ @pytest.mark.skip
+ def test_generate_jwt_access_token_no_user(self):
+ res = self.client.post(
+ '/oauth/authorize',
+ data={
+ 'response_type': self.oauth_client.response_types[0],
+ 'client_id': self.oauth_client.client_id,
+ 'redirect_uri': self.oauth_client.redirect_uris[0],
+ 'scope': self.oauth_client.scope,
+ #'user_id': self.user.id,
+ },
+ )
+
+ params = dict(url_decode(urlparse.urlparse(res.location).query))
+ code = params['code']
+ res = self.client.post(
+ '/oauth/token',
+ data={
+ 'grant_type': 'authorization_code',
+ 'code': code,
+ 'client_id': self.oauth_client.client_id,
+ 'client_secret': self.oauth_client.client_secret,
+ 'scope': ' '.join(self.oauth_client.scope),
+ 'redirect_uri': self.oauth_client.redirect_uris[0],
+ },
+ )
+
+ access_token = res.json['access_token']
+ claims = jwt.decode(access_token, self.jwks)
+
+ assert claims['sub'] == self.oauth_client.client_id
+
+ def test_optional_fields(self):
+ self.token_generator.get_auth_time = lambda *args: 1234
+ self.token_generator.get_amr = lambda *args: 'amr'
+ self.token_generator.get_acr = lambda *args: 'acr'
+
+ res = self.client.post(
+ '/oauth/authorize',
+ data={
+ 'response_type': self.oauth_client.response_types[0],
+ 'client_id': self.oauth_client.client_id,
+ 'redirect_uri': self.oauth_client.redirect_uris[0],
+ 'scope': self.oauth_client.scope,
+ 'user_id': self.user.id,
+ },
+ )
+
+ params = dict(url_decode(urlparse.urlparse(res.location).query))
+ code = params['code']
+ res = self.client.post(
+ '/oauth/token',
+ data={
+ 'grant_type': 'authorization_code',
+ 'code': code,
+ 'client_id': self.oauth_client.client_id,
+ 'client_secret': self.oauth_client.client_secret,
+ 'scope': ' '.join(self.oauth_client.scope),
+ 'redirect_uri': self.oauth_client.redirect_uris[0],
+ },
+ )
+
+ access_token = res.json['access_token']
+ claims = jwt.decode(access_token, self.jwks)
+
+ assert claims['auth_time'] == 1234
+ assert claims['amr'] == 'amr'
+ assert claims['acr'] == 'acr'
+
+
+class JWTAccessTokenResourceServerTest(TestCase):
+ def setUp(self):
+ super().setUp()
+ self.issuer = 'https://authorization-server.example.org/'
+ self.resource_server = 'resource-server-id'
+ self.jwks = read_file_path('jwks_private.json')
+ self.token_validator = create_token_validator(
+ self.issuer, self.resource_server, self.jwks
+ )
+ self.resource_protector = create_resource_protector(
+ self.app, self.token_validator
+ )
+ self.user = create_user()
+ self.oauth_client = create_oauth_client(self.resource_server, self.user)
+ self.claims = create_access_token_claims(
+ self.oauth_client, self.user, self.issuer
+ )
+ self.access_token = create_access_token(self.claims, self.jwks)
+ self.token = create_token(self.access_token)
+
+ def test_access_resource(self):
+ headers = {'Authorization': f'Bearer {self.access_token}'}
+
+ rv = self.client.get('/protected', headers=headers)
+ resp = json.loads(rv.data)
+ self.assertEqual(resp['username'], 'foo')
+
+ def test_missing_authorization(self):
+ rv = self.client.get('/protected')
+ self.assertEqual(rv.status_code, 401)
+ resp = json.loads(rv.data)
+ self.assertEqual(resp['error'], 'missing_authorization')
+
+ def test_unsupported_token_type(self):
+ headers = {'Authorization': 'invalid token'}
+ rv = self.client.get('/protected', headers=headers)
+ self.assertEqual(rv.status_code, 401)
+ resp = json.loads(rv.data)
+ self.assertEqual(resp['error'], 'unsupported_token_type')
+
+ def test_invalid_token(self):
+ headers = {'Authorization': 'Bearer invalid'}
+ rv = self.client.get('/protected', headers=headers)
+ self.assertEqual(rv.status_code, 401)
+ resp = json.loads(rv.data)
+ self.assertEqual(resp['error'], 'invalid_token')
+
+ def test_typ(self):
+ '''
+ The resource server MUST verify that the 'typ' header value is 'at+jwt' or
+ 'application/at+jwt' and reject tokens carrying any other value.
+ '''
+ access_token = create_access_token(self.claims, self.jwks, typ='at+jwt')
+
+ headers = {'Authorization': f'Bearer {access_token}'}
+ rv = self.client.get('/protected', headers=headers)
+ resp = json.loads(rv.data)
+ self.assertEqual(resp['username'], 'foo')
+
+ access_token = create_access_token(
+ self.claims, self.jwks, typ='application/at+jwt'
+ )
+
+ headers = {'Authorization': f'Bearer {access_token}'}
+ rv = self.client.get('/protected', headers=headers)
+ resp = json.loads(rv.data)
+ self.assertEqual(resp['username'], 'foo')
+
+ access_token = create_access_token(self.claims, self.jwks, typ='invalid')
+
+ headers = {'Authorization': f'Bearer {access_token}'}
+ rv = self.client.get('/protected', headers=headers)
+ resp = json.loads(rv.data)
+ self.assertEqual(resp['error'], 'invalid_token')
+
+ def test_missing_required_claims(self):
+ required_claims = ['iss', 'exp', 'aud', 'sub', 'client_id', 'iat', 'jti']
+ for claim in required_claims:
+ claims = create_access_token_claims(
+ self.oauth_client, self.user, self.issuer
+ )
+ del claims[claim]
+ access_token = create_access_token(claims, self.jwks)
+
+ headers = {'Authorization': f'Bearer {access_token}'}
+ rv = self.client.get('/protected', headers=headers)
+ resp = json.loads(rv.data)
+ self.assertEqual(resp['error'], 'invalid_token')
+
+ def test_invalid_iss(self):
+ '''
+ The issuer identifier for the authorization server (which is typically obtained
+ during discovery) MUST exactly match the value of the 'iss' claim.
+ '''
+ self.claims['iss'] = 'invalid-issuer'
+ access_token = create_access_token(self.claims, self.jwks)
+
+ headers = {'Authorization': f'Bearer {access_token}'}
+ rv = self.client.get('/protected', headers=headers)
+ resp = json.loads(rv.data)
+ self.assertEqual(resp['error'], 'invalid_token')
+
+ def test_invalid_aud(self):
+ '''
+ The resource server MUST validate that the 'aud' claim contains a resource
+ indicator value corresponding to an identifier the resource server expects for
+ itself. The JWT access token MUST be rejected if 'aud' does not contain a
+ resource indicator of the current resource server as a valid audience.
+ '''
+ self.claims['aud'] = 'invalid-resource-indicator'
+ access_token = create_access_token(self.claims, self.jwks)
+
+ headers = {'Authorization': f'Bearer {access_token}'}
+ rv = self.client.get('/protected', headers=headers)
+ resp = json.loads(rv.data)
+ self.assertEqual(resp['error'], 'invalid_token')
+
+ def test_invalid_exp(self):
+ '''
+ The current time MUST be before the time represented by the 'exp' claim.
+ Implementers MAY provide for some small leeway, usually no more than a few
+ minutes, to account for clock skew.
+ '''
+ self.claims['exp'] = time.time() - 1
+ access_token = create_access_token(self.claims, self.jwks)
+
+ headers = {'Authorization': f'Bearer {access_token}'}
+ rv = self.client.get('/protected', headers=headers)
+ resp = json.loads(rv.data)
+ self.assertEqual(resp['error'], 'invalid_token')
+
+ def test_scope_restriction(self):
+ '''
+ If an authorization request includes a scope parameter, the corresponding
+ issued JWT access token SHOULD include a 'scope' claim as defined in Section
+ 4.2 of [RFC8693]. All the individual scope strings in the 'scope' claim MUST
+ have meaning for the resources indicated in the 'aud' claim. See Section 5 for
+ more considerations about the relationship between scope strings and resources
+ indicated by the 'aud' claim.
+ '''
+
+ self.claims['scope'] = ['invalid-scope']
+ access_token = create_access_token(self.claims, self.jwks)
+
+ headers = {'Authorization': f'Bearer {access_token}'}
+ rv = self.client.get('/protected', headers=headers)
+ resp = json.loads(rv.data)
+ self.assertEqual(resp['username'], 'foo')
+
+ rv = self.client.get('/protected-by-scope', headers=headers)
+ resp = json.loads(rv.data)
+ self.assertEqual(resp['error'], 'insufficient_scope')
+
+ def test_entitlements_restriction(self):
+ '''
+ Many authorization servers embed authorization attributes that go beyond the
+ delegated scenarios described by [RFC7519] in the access tokens they issue.
+ Typical examples include resource owner memberships in roles and groups that
+ are relevant to the resource being accessed, entitlements assigned to the
+ resource owner for the targeted resource that the authorization server knows
+ about, and so on. An authorization server wanting to include such attributes
+ in a JWT access token SHOULD use the 'groups', 'roles', and 'entitlements'
+ attributes of the 'User' resource schema defined by Section 4.1.2 of
+ [RFC7643]) as claim types.
+ '''
+
+ for claim in ['groups', 'roles', 'entitlements']:
+ claims = create_access_token_claims(
+ self.oauth_client, self.user, self.issuer
+ )
+ claims[claim] = ['invalid']
+ access_token = create_access_token(claims, self.jwks)
+
+ headers = {'Authorization': f'Bearer {access_token}'}
+ rv = self.client.get('/protected', headers=headers)
+ resp = json.loads(rv.data)
+ self.assertEqual(resp['username'], 'foo')
+
+ rv = self.client.get(f'/protected-by-{claim}', headers=headers)
+ resp = json.loads(rv.data)
+ self.assertEqual(resp['error'], 'invalid_token')
+
+ def test_extra_attributes(self):
+ '''
+ Authorization servers MAY return arbitrary attributes not defined in any
+ existing specification, as long as the corresponding claim names are collision
+ resistant or the access tokens are meant to be used only within a private
+ subsystem. Please refer to Sections 4.2 and 4.3 of [RFC7519] for details.
+ '''
+
+ self.claims['email'] = 'user@example.org'
+ access_token = create_access_token(self.claims, self.jwks)
+
+ headers = {'Authorization': f'Bearer {access_token}'}
+ rv = self.client.get('/protected', headers=headers)
+ resp = json.loads(rv.data)
+ self.assertEqual(resp['token']['email'], 'user@example.org')
+
+ def test_invalid_auth_time(self):
+ self.claims['auth_time'] = 'invalid-auth-time'
+ access_token = create_access_token(self.claims, self.jwks)
+
+ headers = {'Authorization': f'Bearer {access_token}'}
+ rv = self.client.get('/protected', headers=headers)
+ resp = json.loads(rv.data)
+ self.assertEqual(resp['error'], 'invalid_token')
+
+ def test_invalid_amr(self):
+ self.claims['amr'] = 'invalid-amr'
+ access_token = create_access_token(self.claims, self.jwks)
+
+ headers = {'Authorization': f'Bearer {access_token}'}
+ rv = self.client.get('/protected', headers=headers)
+ resp = json.loads(rv.data)
+ self.assertEqual(resp['error'], 'invalid_token')
+
+
+class JWTAccessTokenIntrospectionTest(TestCase):
+ def setUp(self):
+ super().setUp()
+ self.issuer = 'https://authlib.org/'
+ self.resource_server = 'resource-server-id'
+ self.jwks = read_file_path('jwks_private.json')
+ self.authorization_server = create_authorization_server(self.app)
+ self.authorization_server.register_grant(AuthorizationCodeGrant)
+ self.introspection_endpoint = create_introspection_endpoint(
+ self.app, self.authorization_server, self.issuer, self.jwks
+ )
+ self.user = create_user()
+ self.oauth_client = create_oauth_client('client-id', self.user)
+ self.claims = create_access_token_claims(
+ self.oauth_client,
+ self.user,
+ self.issuer,
+ aud=[self.resource_server],
+ )
+ self.access_token = create_access_token(self.claims, self.jwks)
+
+ def test_introspection(self):
+ headers = self.create_basic_header(
+ self.oauth_client.client_id, self.oauth_client.client_secret
+ )
+ rv = self.client.post(
+ '/oauth/introspect', data={'token': self.access_token}, headers=headers
+ )
+ self.assertEqual(rv.status_code, 200)
+ resp = json.loads(rv.data)
+ self.assertTrue(resp['active'])
+ self.assertEqual(resp['client_id'], self.oauth_client.client_id)
+ self.assertEqual(resp['token_type'], 'Bearer')
+ self.assertEqual(resp['scope'], self.oauth_client.scope)
+ self.assertEqual(resp['sub'], self.user.id)
+ self.assertEqual(resp['aud'], [self.resource_server])
+ self.assertEqual(resp['iss'], self.issuer)
+
+ def test_introspection_username(self):
+ self.introspection_endpoint.get_username = lambda user_id: db.session.get(
+ User, user_id
+ ).username
+
+ headers = self.create_basic_header(
+ self.oauth_client.client_id, self.oauth_client.client_secret
+ )
+ rv = self.client.post(
+ '/oauth/introspect', data={'token': self.access_token}, headers=headers
+ )
+ self.assertEqual(rv.status_code, 200)
+ resp = json.loads(rv.data)
+ self.assertTrue(resp['active'])
+ self.assertEqual(resp['username'], self.user.username)
+
+ def test_non_access_token_skipped(self):
+ class MyIntrospectionEndpoint(IntrospectionEndpoint):
+ def query_token(self, token, token_type_hint):
+ return None
+
+ self.authorization_server.register_endpoint(MyIntrospectionEndpoint)
+ headers = self.create_basic_header(
+ self.oauth_client.client_id, self.oauth_client.client_secret
+ )
+ rv = self.client.post(
+ '/oauth/introspect',
+ data={
+ 'token': 'refresh-token',
+ 'token_type_hint': 'refresh_token',
+ },
+ headers=headers,
+ )
+ self.assertEqual(rv.status_code, 200)
+ resp = json.loads(rv.data)
+ self.assertFalse(resp['active'])
+
+ def test_access_token_non_jwt_skipped(self):
+ class MyIntrospectionEndpoint(IntrospectionEndpoint):
+ def query_token(self, token, token_type_hint):
+ return None
+
+ self.authorization_server.register_endpoint(MyIntrospectionEndpoint)
+ headers = self.create_basic_header(
+ self.oauth_client.client_id, self.oauth_client.client_secret
+ )
+ rv = self.client.post(
+ '/oauth/introspect',
+ data={
+ 'token': 'non-jwt-access-token',
+ },
+ headers=headers,
+ )
+ self.assertEqual(rv.status_code, 200)
+ resp = json.loads(rv.data)
+ self.assertFalse(resp['active'])
+
+ def test_permission_denied(self):
+ self.introspection_endpoint.check_permission = lambda *args: False
+
+ headers = self.create_basic_header(
+ self.oauth_client.client_id, self.oauth_client.client_secret
+ )
+ rv = self.client.post(
+ '/oauth/introspect', data={'token': self.access_token}, headers=headers
+ )
+ self.assertEqual(rv.status_code, 200)
+ resp = json.loads(rv.data)
+ self.assertFalse(resp['active'])
+
+ def test_token_expired(self):
+ self.claims['exp'] = time.time() - 3600
+ access_token = create_access_token(self.claims, self.jwks)
+ headers = self.create_basic_header(
+ self.oauth_client.client_id, self.oauth_client.client_secret
+ )
+ rv = self.client.post(
+ '/oauth/introspect', data={'token': access_token}, headers=headers
+ )
+ self.assertEqual(rv.status_code, 200)
+ resp = json.loads(rv.data)
+ self.assertFalse(resp['active'])
+
+ def test_introspection_different_issuer(self):
+ class MyIntrospectionEndpoint(IntrospectionEndpoint):
+ def query_token(self, token, token_type_hint):
+ return None
+
+ self.authorization_server.register_endpoint(MyIntrospectionEndpoint)
+
+ self.claims['iss'] = 'different-issuer'
+ access_token = create_access_token(self.claims, self.jwks)
+ headers = self.create_basic_header(
+ self.oauth_client.client_id, self.oauth_client.client_secret
+ )
+ rv = self.client.post(
+ '/oauth/introspect', data={'token': access_token}, headers=headers
+ )
+ self.assertEqual(rv.status_code, 200)
+ resp = json.loads(rv.data)
+ self.assertFalse(resp['active'])
+
+ def test_introspection_invalid_claim(self):
+ self.claims['exp'] = "invalid"
+ access_token = create_access_token(self.claims, self.jwks)
+ headers = self.create_basic_header(
+ self.oauth_client.client_id, self.oauth_client.client_secret
+ )
+ rv = self.client.post(
+ '/oauth/introspect', data={'token': access_token}, headers=headers
+ )
+ self.assertEqual(rv.status_code, 401)
+ resp = json.loads(rv.data)
+ self.assertEqual(resp['error'], 'invalid_token')
+
+
+class JWTAccessTokenRevocationTest(TestCase):
+ def setUp(self):
+ super().setUp()
+ self.issuer = 'https://authlib.org/'
+ self.resource_server = 'resource-server-id'
+ self.jwks = read_file_path('jwks_private.json')
+ self.authorization_server = create_authorization_server(self.app)
+ self.authorization_server.register_grant(AuthorizationCodeGrant)
+ self.revocation_endpoint = create_revocation_endpoint(
+ self.app, self.authorization_server, self.issuer, self.jwks
+ )
+ self.user = create_user()
+ self.oauth_client = create_oauth_client('client-id', self.user)
+ self.claims = create_access_token_claims(
+ self.oauth_client,
+ self.user,
+ self.issuer,
+ aud=[self.resource_server],
+ )
+ self.access_token = create_access_token(self.claims, self.jwks)
+
+ def test_revocation(self):
+ headers = self.create_basic_header(
+ self.oauth_client.client_id, self.oauth_client.client_secret
+ )
+ rv = self.client.post(
+ '/oauth/revoke', data={'token': self.access_token}, headers=headers
+ )
+ self.assertEqual(rv.status_code, 401)
+ resp = json.loads(rv.data)
+ self.assertEqual(resp['error'], 'unsupported_token_type')
+
+ def test_non_access_token_skipped(self):
+ class MyRevocationEndpoint(RevocationEndpoint):
+ def query_token(self, token, token_type_hint):
+ return None
+
+ self.authorization_server.register_endpoint(MyRevocationEndpoint)
+ headers = self.create_basic_header(
+ self.oauth_client.client_id, self.oauth_client.client_secret
+ )
+ rv = self.client.post(
+ '/oauth/revoke',
+ data={
+ 'token': 'refresh-token',
+ 'token_type_hint': 'refresh_token',
+ },
+ headers=headers,
+ )
+ self.assertEqual(rv.status_code, 200)
+ resp = json.loads(rv.data)
+ self.assertEqual(resp, {})
+
+ def test_access_token_non_jwt_skipped(self):
+ class MyRevocationEndpoint(RevocationEndpoint):
+ def query_token(self, token, token_type_hint):
+ return None
+
+ self.authorization_server.register_endpoint(MyRevocationEndpoint)
+ headers = self.create_basic_header(
+ self.oauth_client.client_id, self.oauth_client.client_secret
+ )
+ rv = self.client.post(
+ '/oauth/revoke',
+ data={
+ 'token': 'non-jwt-access-token',
+ },
+ headers=headers,
+ )
+ self.assertEqual(rv.status_code, 200)
+ resp = json.loads(rv.data)
+ self.assertEqual(resp, {})
+
+ def test_revocation_different_issuer(self):
+ self.claims['iss'] = 'different-issuer'
+ access_token = create_access_token(self.claims, self.jwks)
+
+ headers = self.create_basic_header(
+ self.oauth_client.client_id, self.oauth_client.client_secret
+ )
+ rv = self.client.post(
+ '/oauth/revoke', data={'token': access_token}, headers=headers
+ )
+ self.assertEqual(rv.status_code, 401)
+ resp = json.loads(rv.data)
+ self.assertEqual(resp['error'], 'unsupported_token_type')
+