From 655c423cc90bdb3dfe4e897a6624b2b8ef3c2794 Mon Sep 17 00:00:00 2001 From: Yurii Konovaliuk Date: Mon, 25 Apr 2016 09:39:58 +0300 Subject: [PATCH] Spec-compliant decryption This far JOSE library has supported spec draft version of encryption/decryption of JWE. In this commit spec-compliant decryption is introduced. Legacy decryption is deprecated and will be removed in further versions. --- CHANGES | 8 ++ jose.py | 272 ++++++++++++++++++++++++++++++++++++++++++----- setup.py | 4 +- tests.py | 313 ++++++++++++++++++++++++++++++++++++++++++++++++++++--- tox.ini | 4 +- 5 files changed, 559 insertions(+), 42 deletions(-) diff --git a/CHANGES b/CHANGES index 82bd801..a5e5b24 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,14 @@ CHANGES ======= +1.1.0 (2016-03-03) +------------------ +- Allowed to decrypt JWE compliant tokens (patch contributed by + yuriikonovaliuk) + +Note: Tokens generated by `encrypt` are not JWE spec compliant. Prior to this +patch `decrypt` was not able to decrypt JWE spec compliant tokens as well. + 1.0.0 (2015-10-06) ------------------ - Fixed bug in authentication tag computation (patch contributed by jaimeperez) diff --git a/jose.py b/jose.py index c93089a..c0043b8 100644 --- a/jose.py +++ b/jose.py @@ -2,16 +2,16 @@ logger = logging.getLogger(__name__) try: - from cjson import encode as json_encode, decode as json_decode + from cjson import encode as _json_encode, decode as json_decode except ImportError: # pragma: nocover logger.warn('cjson not found, falling back to stdlib json') - from json import loads as json_decode, dumps as json_encode + from json import loads as json_decode, dumps as _json_encode import zlib import datetime from base64 import urlsafe_b64encode, urlsafe_b64decode -from collections import namedtuple +from collections import OrderedDict, namedtuple from copy import deepcopy from time import time from struct import pack @@ -45,7 +45,6 @@ 'header ' 'claims ') - CLAIM_ISSUER = 'iss' CLAIM_SUBJECT = 'sub' CLAIM_AUDIENCE = 'aud' @@ -54,11 +53,19 @@ CLAIM_ISSUED_AT = 'iat' CLAIM_JWT_ID = 'jti' +HEADER_ALG = 'alg' +HEADER_ENC = 'enc' +HEADER_ZIP = 'zip' +HEADER_CRIT = 'crit' + # these are temporary to allow graceful deprecation of legacy encrypted tokens. # these will be removed in v1.0 _TEMP_VER_KEY = '__v' _TEMP_VER = 2 +JWE_REQUIRED_HEADERS = set((HEADER_ALG, HEADER_ENC)) +JWE_UNDERSTOOD_HEADERS = set((HEADER_ALG, HEADER_ENC, HEADER_ZIP, HEADER_CRIT)) + class Error(Exception): """ The base error type raised by jose @@ -97,8 +104,6 @@ def deserialize_compact(jwt): """ parts = jwt.split('.') - # http://tools.ietf.org/html/ - # draft-ietf-jose-json-web-encryption-23#section-9 if len(parts) == 3: token_type = JWS elif len(parts) == 5: @@ -109,6 +114,71 @@ def deserialize_compact(jwt): return token_type(*parts) +def json_encode(data): + return _json_encode( + OrderedDict(sorted(data.items(), key=lambda item: item[0])) + ) + + +def _generate_encryption_keys(alg, rng): + (_, key_len), _ = JWA[alg] + num_bytes = key_len / 8 + mac_key = rng(num_bytes) + enc_key = rng(num_bytes) + return mac_key, enc_key + + +def _parse_encryption_keys(key, alg): + (_, key_len), _ = JWA[alg] + num_bytes = key_len / 8 + mac_key = key[:num_bytes] + enc_key = key[num_bytes:] + return mac_key, enc_key + + +def _encrypt_key(cek, jwk, alg): + (cipher, _), _ = JWA[alg] + return cipher(cek, jwk) + + +def _decrypt_key(encrypted_key, jwk, alg): + (_, decipher), _ = JWA[alg] + return decipher(encrypted_key, jwk) + + +def _generate_iv(enc, rng): + # TODO: This would work only for A128CBC, A192CBC and A256CBC algorithms + # which are only algorithms supported ATM. In case if new algorithms + # support added this function should be revised. + return rng(AES.block_size) + + +def _generate_authentication_tag(key, protected_header, ciphertext, iv, alg): + # http://tools.ietf.org/html/rfc7516#section-2 + # Additional Authenticated Data (aad) + aad = b64encode_url(protected_header) + + _, ((cipher, _), mod) = JWA[alg] + + # http://tools.ietf.org/html/rfc7518#section-5.2.2.1 + # Number of bits in AAD expressed as a 64-bit unsigned big-endian integer + al = pack("!Q", 8 * len(aad)) + chunks = (aad, iv, ciphertext, al) + + return cipher(chunks, key, mod)[:len(key)] + + +def _verify_header(header): + for key in JWE_REQUIRED_HEADERS: + if key not in header: + return False + if HEADER_CRIT in header: + for crit in header[HEADER_CRIT]: + if crit not in JWE_UNDERSTOOD_HEADERS: + return False + return True + + def encrypt(claims, jwk, adata='', add_header=None, alg='RSA-OAEP', enc='A128CBC-HS256', rng=get_random_bytes, compression=None): """ Encrypts the given claims and produces a :class:`~jose.JWE` @@ -140,7 +210,7 @@ def encrypt(claims, jwk, adata='', add_header=None, alg='RSA-OAEP', claims[_TEMP_VER_KEY] = _TEMP_VER header = dict((add_header or {}).items() + [ - ('enc', enc), ('alg', alg)]) + (HEADER_ENC, enc), (HEADER_ALG, alg)]) # promote the temp key to the header assert _TEMP_VER_KEY not in header @@ -150,7 +220,7 @@ def encrypt(claims, jwk, adata='', add_header=None, alg='RSA-OAEP', # compress (if required) if compression is not None: - header['zip'] = compression + header[HEADER_ZIP] = compression try: (compress, _) = COMPRESSION[compression] except KeyError: @@ -179,7 +249,71 @@ def encrypt(claims, jwk, adata='', add_header=None, alg='RSA-OAEP', auth_tag(hash)))) -def decrypt(jwe, jwk, adata='', validate_claims=True, expiry_seconds=None): +def spec_compliant_encrypt(claims, jwk, add_header=None, alg='RSA-OAEP', + enc='A128CBC-HS256', rng=get_random_bytes): + """ Encrypts the given claims and produces a :class:`~jose.JWE` + + :param claims: A `dict` representing the claims for this + :class:`~jose.JWE`. + :param jwk: A `dict` representing the JWK to be used for encryption of + the CEK. This parameter is algorithm-specific. + :param add_header: Additional items to be added to the header. Additional + headers *will* be authenticated. + :param alg: The algorithm to use for CEK encryption + :param enc: The algorithm to use for claims encryption + :param rng: Random number generator. A string of random bytes is expected + as output. + : param compression: The compression algorithm to use. Currently supports + `'DEF'`. + :rtype: :class:`~jose.JWE` + :raises: :class:`~jose.Error` if there is an error producing the JWE + """ + # We need 5 components for JWE token + # 1. Generate header + header = dict((add_header or {}).items() + [(HEADER_ENC, enc), + (HEADER_ALG, alg)]) + protected_header = json_encode(header) + + # 2. Generate CEK + mac_key, enc_key = _generate_encryption_keys(enc, rng) + encrypted_key = _encrypt_key(mac_key + enc_key, jwk, alg) + + # 3. Generate Initialization Vector + iv = _generate_iv(enc, rng) + + # 4. Generate payload + plaintext = json_encode(claims) + # Compress if needed + if HEADER_ZIP in header: + try: + (compression_func, _) = COMPRESSION[header[HEADER_ZIP]] + except KeyError: + raise Error( + 'Unsupported compression algorithm: {}'.format(header[HEADER_ZIP])) + M = compression_func(plaintext) + else: + M = plaintext + + # Encrypt payload + ((cipher, _), key_len), _ = JWA[enc] + ciphertext = cipher(M, enc_key, iv) + + # 5. Generate authentication tag + authentication_tag = _generate_authentication_tag( + mac_key, protected_header, ciphertext, iv, enc + ) + + return JWE( + *map( + b64encode_url, + (protected_header, encrypted_key, iv, ciphertext, + authentication_tag) + ) + ) + + +def legacy_decrypt(jwe, jwk, adata='', validate_claims=True, + expiry_seconds=None): """ Decrypts a deserialized :class:`~jose.JWE` :param jwe: An instance of :class:`~jose.JWE` @@ -198,20 +332,23 @@ def decrypt(jwe, jwk, adata='', validate_claims=True, expiry_seconds=None): :raises: :class:`~jose.NotYetValid` if the JWT is not yet valid :raises: :class:`~jose.Error` if there is an error decrypting the JWE """ - header, encryption_key_ciphertext, iv, ciphertext, tag = map( + protected_header, encrypted_key, iv, ciphertext, authentication_tag = map( b64decode_url, jwe) - header = json_decode(header) + header = json_decode(protected_header) + + alg = header[HEADER_ALG] + enc = header[HEADER_ENC] # decrypt cek - (_, decipher), _ = JWA[header['alg']] - encryption_key = decipher(encryption_key_ciphertext, jwk) + encryption_key = _decrypt_key(encrypted_key, jwk, alg) # decrypt body - ((_, decipher), _), ((hash_fn, _), mod) = JWA[header['enc']] + ((_, decipher), _), ((hash_fn, _), mod) = JWA[enc] version = header.get(_TEMP_VER_KEY) if version: - plaintext = decipher(ciphertext, encryption_key[-mod.digest_size/2:], iv) + plaintext = decipher(ciphertext, encryption_key[-mod.digest_size/2:], + iv) hash = hash_fn(_jwe_hash_str(ciphertext, iv, adata, version), encryption_key[:-mod.digest_size/2], mod=mod) else: @@ -219,15 +356,15 @@ def decrypt(jwe, jwk, adata='', validate_claims=True, expiry_seconds=None): hash = hash_fn(_jwe_hash_str(ciphertext, iv, adata, version), encryption_key[-mod.digest_size:], mod=mod) - if not const_compare(auth_tag(hash), tag): + if not const_compare(auth_tag(hash), authentication_tag): raise Error('Mismatched authentication tags') - if 'zip' in header: + if HEADER_ZIP in header: try: - (_, decompress) = COMPRESSION[header['zip']] + (_, decompress) = COMPRESSION[header[HEADER_ZIP]] except KeyError: raise Error('Unsupported compression algorithm: {}'.format( - header['zip'])) + header[HEADER_ZIP])) plaintext = decompress(plaintext) @@ -243,6 +380,79 @@ def decrypt(jwe, jwk, adata='', validate_claims=True, expiry_seconds=None): return JWT(header, claims) +def spec_compliant_decrypt(jwe, jwk, validate_claims=True, + expiry_seconds=None): + """ Decrypts a deserialized :class:`~jose.JWE` + + :param jwe: An instance of :class:`~jose.JWE` + :param jwk: A `dict` representing the JWK required to decrypt the content + of the :class:`~jose.JWE`. + :param validate_claims: A `bool` indicating whether or not the `exp`, `iat` + and `nbf` claims should be validated. Defaults to + `True`. + :param expiry_seconds: An `int` containing the JWT expiry in seconds, used + when evaluating the `iat` claim. Defaults to `None`, + which disables `iat` claim validation. + :rtype: :class:`~jose.JWT` + :raises: :class:`~jose.Expired` if the JWT has expired + :raises: :class:`~jose.NotYetValid` if the JWT is not yet valid + :raises: :class:`~jose.Error` if there is an error decrypting the JWE + """ + protected_header, encrypted_key, iv, ciphertext, authentication_tag = map( + b64decode_url, jwe + ) + header = json_decode(protected_header) + if not _verify_header(header): + raise Error('Header is invalid') + + alg = header[HEADER_ALG] + enc = header[HEADER_ENC] + + # decrypt cek + encryption_key = _decrypt_key(encrypted_key, jwk, alg) + mac_key, enc_key = _parse_encryption_keys(encryption_key, enc) + + # verify authentication tag + expected_tag = _generate_authentication_tag( + mac_key, json_encode(header), ciphertext, iv, enc + ) + if not const_compare(expected_tag, authentication_tag): + raise Error('Mismatched authentication tags') + + # decrypt body + ((_, decipher), _), _ = JWA[enc] + # http://tools.ietf.org/html/rfc7516#section-5.1 step 11 + M = decipher(ciphertext, enc_key, iv) + + if HEADER_ZIP in header: + try: + (_, decompress) = COMPRESSION[header[HEADER_ZIP]] + except KeyError: + raise Error('Unsupported compression algorithm: {}'.format( + header[HEADER_ZIP])) + + plaintext = decompress(M) + else: + plaintext = M + claims = json_decode(plaintext) + _validate(claims, validate_claims, expiry_seconds) + + return JWT(header, claims) + + +def decrypt(*args, **kwargs): + """ Decrypts legacy or spec-compliant JOSE token. + First attempts to decrypt the token in a legacy mode + (https://tools.ietf.org/html/draft-ietf-oauth-json-web-token-19). + If it is not a valid legacy token then attempts to decrypt it in a + spec-compliant way (http://tools.ietf.org/html/rfc7519) + """ + try: + return legacy_decrypt(*args, **kwargs) + except (Error, ValueError) as e: + return spec_compliant_decrypt(*args, **kwargs) + + def sign(claims, jwk, add_header=None, alg='HS256'): """ Signs the given claims and produces a :class:`~jose.JWS` @@ -257,7 +467,7 @@ def sign(claims, jwk, add_header=None, alg='HS256'): """ (hash_fn, _), mod = JWA[alg] - header = dict((add_header or {}).items() + [('alg', alg)]) + header = dict((add_header or {}).items() + [(HEADER_ALG, alg)]) header, payload = map(b64encode_url, map(json_encode, (header, claims))) sig = b64encode_url(hash_fn(_jws_hash_str(header, payload), jwk['k'], @@ -286,10 +496,10 @@ def verify(jws, jwk, alg, validate_claims=True, expiry_seconds=None): """ header, payload, sig = map(b64decode_url, jws) header = json_decode(header) - if alg != header['alg']: + if alg != header[HEADER_ALG]: raise Error('Invalid algorithm') - (_, verify_fn), mod = JWA[header['alg']] + (_, verify_fn), mod = JWA[header[HEADER_ALG]] if not verify_fn(_jws_hash_str(jws.header, jws.payload), jwk['k'], sig, mod=mod): @@ -356,13 +566,19 @@ def decrypt_oaep(ciphertext, jwk): def hmac_sign(s, key, mod=SHA256): hmac = HMAC.new(key, digestmod=mod) - hmac.update(s) + if not isinstance(s, (tuple, list)): + s = (s,) + for item in s: + hmac.update(item) return hmac.digest() def hmac_verify(s, key, sig, mod=SHA256): hmac = HMAC.new(key, digestmod=mod) - hmac.update(s) + if not isinstance(s, (tuple, list)): + s = (s,) + for item in s: + hmac.update(item) if not const_compare(hmac.digest(), sig): return False @@ -405,7 +621,7 @@ class _JWA(object): """ Represents the implemented algorithms A big TODO list can be found here: - http://tools.ietf.org/html/draft-ietf-jose-json-web-algorithms-24 + http://tools.ietf.org/html/rfc7518 """ _impl = { 'HS256': ((hmac_sign, hmac_verify), SHA256), @@ -423,6 +639,12 @@ class _JWA(object): def __getitem__(self, key): """ Derive implementation(s) from key + If key is compond (-|+) then it will return a tuple + of implementations. + Each implementation is a tuple in following format: + - for hash algorithms ((, ), ) + - for encryption algorithms ((, ), + ) """ if key in self._impl: return self._impl[key] diff --git a/setup.py b/setup.py index 1cf1b23..130d5ef 100644 --- a/setup.py +++ b/setup.py @@ -7,6 +7,7 @@ here = os.path.abspath(os.path.dirname(__file__)) REQUIRES = filter(lambda s: len(s) > 0, open(os.path.join(here, 'requirements.txt')).read().split('\n')) +TESTS_REQUIRE = ['mock'] pkg_name = 'jose' pyver = ''.join(('python', '.'.join(map(str, sys.version_info[:2])))) @@ -43,13 +44,14 @@ def finalize_package_data(self): pkg_name = '-'.join((pyver.replace('.', ''), pkg_name)) setup(name=pkg_name, - version='1.0.0', + version='1.1.0', author='Demian Brecht', author_email='dbrecht@demonware.net', py_modules=['jose'], url='https://github.com/Demonware/jose', description='An implementation of the JOSE draft', install_requires=REQUIRES, + tests_require=TESTS_REQUIRE, classifiers=[ 'Development Status :: 4 - Beta', 'Intended Audience :: Developers', diff --git a/tests.py b/tests.py index 04a4f72..9a7282d 100644 --- a/tests.py +++ b/tests.py @@ -1,4 +1,5 @@ import json +import mock import unittest from base64 import b64encode @@ -138,14 +139,14 @@ def test_jwe(self): token = jose.serialize_compact(jwe) - jwt = jose.decrypt(jose.deserialize_compact(token), rsa_priv_key) + jwt = jose.legacy_decrypt(jose.deserialize_compact(token), rsa_priv_key) self.assertNotIn(jose._TEMP_VER_KEY, claims) self.assertEqual(jwt.claims, claims) # invalid key try: - jose.decrypt(jose.deserialize_compact(token), bad_key) + jose.legacy_decrypt(jose.deserialize_compact(token), bad_key) self.fail() except jose.Error as e: self.assertEqual(e.message, 'Incorrect decryption.') @@ -156,7 +157,7 @@ def test_jwe_add_header(self): for (alg, jwk), enc in product(self.algs, self.encs): et = jose.serialize_compact(jose.encrypt(claims, rsa_pub_key, add_header=add_header)) - jwt = jose.decrypt(jose.deserialize_compact(et), rsa_priv_key) + jwt = jose.legacy_decrypt(jose.deserialize_compact(et), rsa_priv_key) self.assertEqual(jwt.header['foo'], add_header['foo']) @@ -165,12 +166,12 @@ def test_jwe_adata(self): for (alg, jwk), enc in product(self.algs, self.encs): et = jose.serialize_compact(jose.encrypt(claims, rsa_pub_key, adata=adata)) - jwt = jose.decrypt(jose.deserialize_compact(et), rsa_priv_key, + jwt = jose.legacy_decrypt(jose.deserialize_compact(et), rsa_priv_key, adata=adata) # make sure signaures don't match when adata isn't passed in try: - hdr, dt = jose.decrypt(jose.deserialize_compact(et), + hdr, dt = jose.legacy_decrypt(jose.deserialize_compact(et), rsa_priv_key) self.fail() except jose.Error as e: @@ -180,7 +181,7 @@ def test_jwe_adata(self): def test_jwe_invalid_base64(self): try: - jose.decrypt('aaa', rsa_priv_key) + jose.legacy_decrypt('aaa', rsa_priv_key) self.fail() # expecting error due to invalid base64 except jose.Error as e: pass @@ -193,14 +194,14 @@ def test_jwe_invalid_base64(self): def test_jwe_no_error_with_exp_claim(self): claims = {jose.CLAIM_EXPIRATION_TIME: int(time()) + 5} et = jose.serialize_compact(jose.encrypt(claims, rsa_pub_key)) - jose.decrypt(jose.deserialize_compact(et), rsa_priv_key) + jose.legacy_decrypt(jose.deserialize_compact(et), rsa_priv_key) def test_jwe_expired_error_with_exp_claim(self): claims = {jose.CLAIM_EXPIRATION_TIME: int(time()) - 5} et = jose.serialize_compact(jose.encrypt(claims, rsa_pub_key)) try: - jose.decrypt(jose.deserialize_compact(et), rsa_priv_key) + jose.legacy_decrypt(jose.deserialize_compact(et), rsa_priv_key) self.fail() # expecting expired token except jose.Expired as e: pass @@ -216,7 +217,7 @@ def test_jwe_no_error_with_iat_claim(self): claims = {jose.CLAIM_ISSUED_AT: int(time()) - 15} et = jose.serialize_compact(jose.encrypt(claims, rsa_pub_key)) - jose.decrypt(jose.deserialize_compact(et), rsa_priv_key, + jose.legacy_decrypt(jose.deserialize_compact(et), rsa_priv_key, expiry_seconds=20) def test_jwe_expired_error_with_iat_claim(self): @@ -225,7 +226,7 @@ def test_jwe_expired_error_with_iat_claim(self): et = jose.serialize_compact(jose.encrypt(claims, rsa_pub_key)) try: - jose.decrypt(jose.deserialize_compact(et), rsa_priv_key, + jose.legacy_decrypt(jose.deserialize_compact(et), rsa_priv_key, expiry_seconds=expiry_seconds) self.fail() # expecting expired token except jose.Expired as e: @@ -242,14 +243,14 @@ def test_jwe_expired_error_with_iat_claim(self): def test_jwe_no_error_with_nbf_claim(self): claims = {jose.CLAIM_NOT_BEFORE: int(time()) - 5} et = jose.serialize_compact(jose.encrypt(claims, rsa_pub_key)) - jose.decrypt(jose.deserialize_compact(et), rsa_priv_key) + jose.legacy_decrypt(jose.deserialize_compact(et), rsa_priv_key) def test_jwe_not_yet_valid_error_with_nbf_claim(self): claims = {jose.CLAIM_NOT_BEFORE: int(time()) + 5} et = jose.serialize_compact(jose.encrypt(claims, rsa_pub_key)) try: - jose.decrypt(jose.deserialize_compact(et), rsa_priv_key) + jose.legacy_decrypt(jose.deserialize_compact(et), rsa_priv_key) self.fail() # expecting not valid yet except jose.NotYetValid as e: pass @@ -264,7 +265,7 @@ def test_jwe_not_yet_valid_error_with_nbf_claim(self): def test_jwe_ignores_expired_token_if_validate_claims_is_false(self): claims = {jose.CLAIM_EXPIRATION_TIME: int(time()) - 5} et = jose.serialize_compact(jose.encrypt(claims, rsa_pub_key)) - jose.decrypt(jose.deserialize_compact(et), rsa_priv_key, + jose.legacy_decrypt(jose.deserialize_compact(et), rsa_priv_key, validate_claims=False) def test_format_timestamp(self): @@ -289,7 +290,7 @@ def test_jwe_compression(self): self.assertTrue(len(compressed_ciphertext) < len(uncompressed_ciphertext)) - jwt = jose.decrypt(jose.deserialize_compact(jwe), rsa_priv_key) + jwt = jose.legacy_decrypt(jose.deserialize_compact(jwe), rsa_priv_key) self.assertEqual(jwt.claims, local_claims) def test_encrypt_invalid_compression_error(self): @@ -309,7 +310,190 @@ def test_decrypt_invalid_compression_error(self): ) try: - jose.decrypt(jose.JWE(*((header,) + (jwe[1:]))), rsa_priv_key) + jose.legacy_decrypt(jose.JWE(*((header,) + (jwe[1:]))), + rsa_priv_key) + self.fail() + except jose.Error as e: + self.assertEqual( + e.message, 'Unsupported compression algorithm: BAD') + + +class TestSpecCompliantJWE(unittest.TestCase): + encs = ('A128CBC-HS256', 'A192CBC-HS384', 'A256CBC-HS512') + algs = (('RSA-OAEP', rsa_key),) + + def test_jwe(self): + bad_key = {'k': RSA.generate(2048).exportKey('PEM')} + + for (alg, jwk), enc in product(self.algs, self.encs): + jwe = jose.spec_compliant_encrypt(claims, rsa_pub_key, enc=enc, alg=alg) + + # make sure the body can't be loaded as json (should be encrypted) + try: + json.loads(jose.b64decode_url(jwe.ciphertext)) + self.fail() + except ValueError: + pass + + token = jose.serialize_compact(jwe) + + jwt = jose.spec_compliant_decrypt(jose.deserialize_compact(token), rsa_priv_key) + self.assertNotIn(jose._TEMP_VER_KEY, claims) + + self.assertEqual(jwt.claims, claims) + + # invalid key + try: + jose.spec_compliant_decrypt(jose.deserialize_compact(token), bad_key) + self.fail() + except jose.Error as e: + self.assertEqual(e.message, 'Incorrect decryption.') + + def test_jwe_add_header(self): + add_header = {'foo': 'bar'} + + for (alg, jwk), enc in product(self.algs, self.encs): + et = jose.serialize_compact(jose.spec_compliant_encrypt(claims, rsa_pub_key, + add_header=add_header)) + jwt = jose.spec_compliant_decrypt(jose.deserialize_compact(et), rsa_priv_key) + + self.assertEqual(jwt.header['foo'], add_header['foo']) + + def test_jwe_invalid_base64(self): + try: + jose.spec_compliant_decrypt('aaa', rsa_priv_key) + self.fail() # expecting error due to invalid base64 + except jose.Error as e: + pass + + self.assertEquals( + e.args[0], + 'Unable to decode base64: Incorrect padding' + ) + + def test_jwe_no_error_with_exp_claim(self): + claims = {jose.CLAIM_EXPIRATION_TIME: int(time()) + 5} + et = jose.serialize_compact(jose.spec_compliant_encrypt(claims, rsa_pub_key)) + jose.spec_compliant_decrypt(jose.deserialize_compact(et), rsa_priv_key) + + def test_jwe_expired_error_with_exp_claim(self): + claims = {jose.CLAIM_EXPIRATION_TIME: int(time()) - 5} + et = jose.serialize_compact(jose.spec_compliant_encrypt(claims, rsa_pub_key)) + + try: + jose.spec_compliant_decrypt(jose.deserialize_compact(et), rsa_priv_key) + self.fail() # expecting expired token + except jose.Expired as e: + pass + + self.assertEquals( + e.args[0], + 'Token expired at {}'.format( + jose._format_timestamp(claims[jose.CLAIM_EXPIRATION_TIME]) + ) + ) + + def test_jwe_no_error_with_iat_claim(self): + claims = {jose.CLAIM_ISSUED_AT: int(time()) - 15} + et = jose.serialize_compact(jose.spec_compliant_encrypt(claims, rsa_pub_key)) + + jose.spec_compliant_decrypt(jose.deserialize_compact(et), rsa_priv_key, + expiry_seconds=20) + + def test_jwe_expired_error_with_iat_claim(self): + expiry_seconds = 10 + claims = {jose.CLAIM_ISSUED_AT: int(time()) - 15} + et = jose.serialize_compact(jose.spec_compliant_encrypt(claims, rsa_pub_key)) + + try: + jose.spec_compliant_decrypt(jose.deserialize_compact(et), rsa_priv_key, + expiry_seconds=expiry_seconds) + self.fail() # expecting expired token + except jose.Expired as e: + pass + + expiration_time = claims[jose.CLAIM_ISSUED_AT] + expiry_seconds + self.assertEquals( + e.args[0], + 'Token expired at {}'.format( + jose._format_timestamp(expiration_time) + ) + ) + + def test_jwe_no_error_with_nbf_claim(self): + claims = {jose.CLAIM_NOT_BEFORE: int(time()) - 5} + et = jose.serialize_compact(jose.spec_compliant_encrypt(claims, rsa_pub_key)) + jose.spec_compliant_decrypt(jose.deserialize_compact(et), rsa_priv_key) + + def test_jwe_not_yet_valid_error_with_nbf_claim(self): + claims = {jose.CLAIM_NOT_BEFORE: int(time()) + 5} + et = jose.serialize_compact(jose.spec_compliant_encrypt(claims, rsa_pub_key)) + + try: + jose.spec_compliant_decrypt(jose.deserialize_compact(et), rsa_priv_key) + self.fail() # expecting not valid yet + except jose.NotYetValid as e: + pass + + self.assertEquals( + e.args[0], + 'Token not valid until {}'.format( + jose._format_timestamp(claims[jose.CLAIM_NOT_BEFORE]) + ) + ) + + def test_jwe_ignores_expired_token_if_validate_claims_is_false(self): + claims = {jose.CLAIM_EXPIRATION_TIME: int(time()) - 5} + et = jose.serialize_compact(jose.spec_compliant_encrypt(claims, rsa_pub_key)) + jose.spec_compliant_decrypt(jose.deserialize_compact(et), rsa_priv_key, + validate_claims=False) + + def test_format_timestamp(self): + self.assertEquals( + jose._format_timestamp(1403054056), + '2014-06-18T01:14:16Z' + ) + + def test_jwe_compression(self): + local_claims = copy(claims) + + for v in xrange(1000): + local_claims['dummy_' + str(v)] = '0' * 100 + + jwe = jose.serialize_compact( + jose.spec_compliant_encrypt(local_claims, rsa_pub_key) + ) + _, _, _, uncompressed_ciphertext, _ = jwe.split('.') + + jwe = jose.serialize_compact( + jose.spec_compliant_encrypt(local_claims, rsa_pub_key, + add_header={'zip': 'DEF'}) + ) + _, _, _, compressed_ciphertext, _ = jwe.split('.') + + self.assertTrue(len(compressed_ciphertext) < + len(uncompressed_ciphertext)) + + jwt = jose.spec_compliant_decrypt(jose.deserialize_compact(jwe), + rsa_priv_key) + self.assertEqual(jwt.claims, local_claims) + + def test_encrypt_invalid_compression_error(self): + try: + jose.spec_compliant_encrypt(claims, rsa_pub_key, + add_header={'zip':'BAD'}) + self.fail() + except jose.Error: + pass + + def test_decrypt_invalid_compression_error(self): + with mock.patch.dict(jose.COMPRESSION, + {'BAD': jose.COMPRESSION['DEF']}): + jwe = jose.spec_compliant_encrypt(claims, rsa_pub_key, + add_header={'zip': 'BAD'}) + + try: + jose.spec_compliant_decrypt(jwe, rsa_priv_key) self.fail() except jose.Error as e: self.assertEqual( @@ -358,6 +542,105 @@ def test_jws_invalid_algorithm_error(self): self.assertEqual(e.message, 'Invalid algorithm') +PRIVATE_KEY = """-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEApqgUUxKXW4gVCHffi+u2nWqlYK6WBCPCNyhJzsauFbsilw0G +dU6BKUMIzrsKvm8wUxJVVSVH42dCZhLiT3yC85Eb6rrrYpdXzMkim9oPR1pG1lyg +3pJGcg4eFxd8S7xGeBELcANTmvLT0D1ka9Rs4iWImQDyQieXdglQWesIOYNSymaz +SzWrj3LZ5ihns6uFzx4ykisRVfK7TgOGGBl/53b0J8DxvjbHDFKNt4DgTF8eKP59 +O4rKsEAv5LlsbN/MirvF6D1ZoKOXDCAvmC7MMXw8nYE9qwfgCXSH9VKnUtnVmDXL +za0loS4RIpkz1cVbHKOJlp7HH66rg5yOZek/mQIDAQABAoIBAAEQHWESQ0jgK1Is +gY6A6F9EqN1e/7HzEHANn7rj5YRZ9zSDbsEcyRIcTVgUNVNVnjdJbKXoYPcAV5oT +EMJ1BtjK2iS7IHk2gebaeZAI6gQIfV8spBIHWM+ta1+2VKKfBswJP8ttGgFo/xTa +72MIrdEbcC2ZpfHqErs7//ky2JCVVTLv4GuSr5U4dYCG/swkC92lOtf7aowlMwL7 +/kFPOIpJ84SjRvcB08kIyRSf6Kcw1Tw4TUTaL345d2btlpcZN1U+h67PY9+oeQzg +WeypOCYtOm2tu3yxbvsysPLlkt3Mo4FzJkxZyXXPqggCby0aH/KI8Cv8LXdX6eeR +RwZ1DekCgYEAveO0Fb4wkHxbikzLyPoxNuoKK3z+8S+s27aa14AFtFyaic7qR6Nv +Rv6rhhYK5YBBkyW30aQbV/ZRCPlkD5TLA71eBdCUPn1Yh9gUAfNjZ/cJTPr8p8V/ +jNC5bUkOHen+riflg1yeIJm+F643n3mbDK3Ruhkc4L+rHlD0JWPgMbcCgYEA4K2x +yMI8vgp9wiI+aVvevow3P+jq4fL7rAi1FyLPSzrxW0mnIzGRHsKVz6XBI8UUtT0h +AxN7fEY1Mu0tPJObQciM8/EIOlfANsxMm0NUFPsi8sEKm8KsC8qxNxj2ShmKvNHn +GPoxbp1ouLxdLcEIl4WMyMzpIDzfRAXWl7dE2S8CgYEAsC3E1uuH2XZX5EAOTuC6 +qq2IVAL78sB+C7gnf8v6/vVwOG3u9hqP0vnUIGrxHy/ZJ3I2U16ENB+H3eCtUruF +hGm9A34bHMNlUVxMa+bqkvoj+fVgVzSpe/foIppGa8C/l8vSaQeUesDKGuR8HQ1R +qvjWfmhwX6HVXSJU8x/wUY8CgYEA3n6jxG+1v2ycRResPqHf30rzm7KIh+EcIa0t +yA+MwK9KPGCfx1Zao9+Gg+9daJLOgvxaKLWuX88W96uwVIDIC0kTbK+QulYT3zBJ +3Ke8KFrarRNF8iHCRpsfC7UIkTDiF0K2XCHHugbfobHHhHvYilSSqndhla8yWiZ9 +8BhpcbkCgYAXEZ1ErSZv9m3na2/PAhk/u7sHEi/O5wyvuGe1Q1SW8ESdYhI3vlap +o87ipFLz5YPW5Cbqz7pBvowbx91vI7imrilvSEBwl8BY/u5Q4EWiL4QlAe+xCYhJ +B60eK9sJADyJFXjFUIryuAsrxPFvDHs3iU709Cs+EH9nxWLBqpRl5Q== +-----END RSA PRIVATE KEY-----""" + +SPEC_COMPLIANT_TOKEN = ( + "eyJhbGciOiAiUlNBLU9BRVAiLCAiZW5jIjogIkExMjhDQkMtSFMyNTYifQ." + "Ha3w8AO7Jq25p3KqLhXaV1N1vbwSNQ5hjeh0nxWRJTHW-E7c3paxN38eNSJ" + "4vXWO2vhNgkAS81I35GZXy0JTAA2oswx8yzaF_UHTQ7ajTBgmvwBxgpW-Pf" + "8YL2IhGycNDrtx-ZqYwl9WDEQlZnm0_eX3bg31LGqtnz-MFWCOa7-tPZ93Z" + "i4IhdKLygjrjQssUUQGJxFVkWLhVuI9sNcxdjDCR1jN2CopHCsnuSlGjKxd" + "YLeCC50IyVlWPY3Zf3TBmtvfrLEqipQsETbxZ-ihOVSToALZ7q8QZfHPM4R" + "d7_vBGt6dY3BIqqRl88p56j1MQ-ekTZvduiuMZYNZcmdmPg.4doDewLiO-q" + "nOqAweE-Zlw.3mWP86WP6P4cCALdV8yU1LwIPKQO9MUGQSUDl6jbYSY.Kb8" + "OqWxyLhr4R0-Kzz4nMQ" +) + +# This key is generated by jose 1.0.0 (from master) +LEGACY_V1_TOKEN = ( + "eyJhbGciOiAiUlNBLU9BRVAiLCAiZW5jIjogIkExMjhDQkMtSFMyNTYiLCA" + "iX192IjogMX0.FN3aVDp2EGD34zyD3HmEyeBld9eUKToiBknlegJ06ViFuS" + "Tt_aZ0VTh72ySm0Q5l9u-7xlw3amZiSUm9ZOyaT05bu4CVjPJunqepvXkAF" + "aIEcSvKjifaweEd6HBdvOgvgBEbzt4LOysunt2N_sbaDOkzSZEUbgp8_Rh7" + "7-r5A3x3VCSBcA-ThluWG6XuaUVY2NrjJQ4bc-wF0qfEaEt1C_zV2hxJeZ_" + "3nIzGNy3M-bHnMaBIvZqP-jmRPTnvgTibDntymMmE7-c71Q1e0-HK0YpAfK" + "4RzdKfvjyKoVmpPk4Ris3W2Lr9jdToTYwocKyF0mV2uxE19cNAWoQqyS_Pc" + "g.QHgwlB0dCHXx1c-dnn7c0g.F04cKdz-M1_VSb25_kPwiAGBbVGE-Mh4OE" + "vrOsilQGc.vsO_UAlFRGIWlkFis5Xnng" +) + + +class TestDecryptCompatibility(unittest.TestCase): + def test_jwe_decrypt_compliant(self): + jwk = {'k': PRIVATE_KEY} + legacy_patch = mock.patch.object( + jose, 'legacy_decrypt', wraps=jose.legacy_decrypt + ) + spec_patch = mock.patch.object( + jose, 'spec_compliant_decrypt', wraps=jose.spec_compliant_decrypt + ) + with legacy_patch as legacy_mock, spec_patch as spec_mock: + jwt = jose.decrypt( + jose.deserialize_compact(SPEC_COMPLIANT_TOKEN), jwk + ) + + self.assertEqual(legacy_mock.call_count, 1) + self.assertEqual(spec_mock.call_count, 1) + self.assertEqual(jwt.claims, claims) + expected_header = { + 'alg': 'RSA-OAEP', + 'enc': 'A128CBC-HS256' + } + self.assertEqual(jwt.header, expected_header) + + def test_jwe_decrypt_legacy_v1(self): + jwk = {'k': PRIVATE_KEY} + legacy_patch = mock.patch.object( + jose, 'legacy_decrypt', wraps=jose.legacy_decrypt + ) + spec_patch = mock.patch.object( + jose, 'spec_compliant_decrypt', wraps=jose.spec_compliant_decrypt + ) + with legacy_patch as legacy_mock, spec_patch as spec_mock: + jwt = jose.decrypt(jose.deserialize_compact(LEGACY_V1_TOKEN), jwk) + + self.assertEqual(legacy_mock.call_count, 1) + self.assertEqual(spec_mock.call_count, 0) + self.assertEqual(jwt.claims, claims) + expected_header = { + 'alg': 'RSA-OAEP', + 'enc': 'A128CBC-HS256', + '__v': 1 + } + self.assertEqual(jwt.header, expected_header) + + class TestUtils(unittest.TestCase): def test_b64encode_url_utf8(self): istr = 'eric idle'.encode('utf8') diff --git a/tox.ini b/tox.ini index 56a1d96..734584c 100644 --- a/tox.ini +++ b/tox.ini @@ -5,5 +5,7 @@ ignore = E128 envlist=py27 [testenv] -deps=nose +deps= + nose + mock==1.0 commands=nosetests