Skip to content

Fix AES IV reuse - drop support for CTR and ECB - address CVE-2017-1000246 #519

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 8 additions & 48 deletions src/saml2/aes.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,36 +11,27 @@
POSTFIX_MODE = {
'cbc': modes.CBC,
'cfb': modes.CFB,
'ecb': modes.ECB,
}

AES_BLOCK_SIZE = int(algorithms.AES.block_size / 8)


class AESCipher(object):
def __init__(self, key, iv=None):
def __init__(self, key):
"""
:param key: The encryption key
:param iv: Init vector
:return: AESCipher instance
"""
self.key = key
self.iv = iv

def build_cipher(self, iv=None, alg='aes_128_cbc'):
def build_cipher(self, alg='aes_128_cbc'):
"""
:param iv: init vector
:param alg: cipher algorithm
:return: A Cipher instance
"""
typ, bits, cmode = alg.lower().split('_')
bits = int(bits)

if not iv:
if self.iv:
iv = self.iv
else:
iv = os.urandom(AES_BLOCK_SIZE)
iv = os.urandom(AES_BLOCK_SIZE)

if len(iv) != AES_BLOCK_SIZE:
raise Exception('Wrong iv size: {}'.format(len(iv)))
Expand All @@ -63,11 +54,10 @@ def build_cipher(self, iv=None, alg='aes_128_cbc'):

return cipher, iv

def encrypt(self, msg, iv=None, alg='aes_128_cbc', padding='PKCS#7',
b64enc=True, block_size=AES_BLOCK_SIZE):
def encrypt(self, msg, alg='aes_128_cbc', padding='PKCS#7', b64enc=True,
block_size=AES_BLOCK_SIZE):
"""
:param key: The encryption key
:param iv: init vector
:param msg: Message to be encrypted
:param padding: Which padding that should be used
:param b64enc: Whether the result should be base64encoded
Expand All @@ -87,7 +77,7 @@ def encrypt(self, msg, iv=None, alg='aes_128_cbc', padding='PKCS#7',
c = chr(plen).encode()
msg += c * plen

cipher, iv = self.build_cipher(iv, alg)
cipher, iv = self.build_cipher(alg)
encryptor = cipher.encryptor()
cmsg = iv + encryptor.update(msg) + encryptor.finalize()

Expand All @@ -98,48 +88,18 @@ def encrypt(self, msg, iv=None, alg='aes_128_cbc', padding='PKCS#7',

return enc_msg

def decrypt(self, msg, iv=None, alg='aes_128_cbc', padding='PKCS#7',
b64dec=True):
def decrypt(self, msg, alg='aes_128_cbc', padding='PKCS#7', b64dec=True):
"""
:param key: The encryption key
:param iv: init vector
:param msg: Base64 encoded message to be decrypted
:return: The decrypted message
"""
data = b64decode(msg) if b64dec else msg

_iv = data[:AES_BLOCK_SIZE]
if iv:
assert iv == _iv
cipher, iv = self.build_cipher(iv, alg=alg)
cipher, iv = self.build_cipher(alg=alg)
decryptor = cipher.decryptor()
res = decryptor.update(data)[AES_BLOCK_SIZE:] + decryptor.finalize()
if padding in ['PKCS#5', 'PKCS#7']:
idx = bytearray(res)[-1]
res = res[:-idx]
return res


def run_test():
key = b'1234523451234545' # 16 byte key
iv = os.urandom(AES_BLOCK_SIZE)
# Iff padded, the message doesn't have to be multiple of 16 in length
original_msg = b'ToBeOrNotTobe W.S.'
aes = AESCipher(key)

encrypted_msg = aes.encrypt(original_msg, iv)
decrypted_msg = aes.decrypt(encrypted_msg, iv)
assert decrypted_msg == original_msg

encrypted_msg = aes.encrypt(original_msg)
decrypted_msg = aes.decrypt(encrypted_msg)
assert decrypted_msg == original_msg

aes = AESCipher(key, iv)
encrypted_msg = aes.encrypt(original_msg)
decrypted_msg = aes.decrypt(encrypted_msg)
assert decrypted_msg == original_msg


if __name__ == '__main__':
run_test()
2 changes: 1 addition & 1 deletion src/saml2/authn.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def __init__(self, srv, mako_template, template_lookup, pwd, return_to):
self.return_to = return_to
self.active = {}
self.query_param = "upm_answer"
self.aes = AESCipher(self.srv.symkey.encode(), srv.iv)
self.aes = AESCipher(self.srv.symkey.encode())

def __call__(self, cookie=None, policy_url=None, logo_url=None,
query="", **kwargs):
Expand Down
3 changes: 0 additions & 3 deletions src/saml2/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,9 @@ def __init__(self, config_file="", config=None, cache=None, stype="idp",
self.init_config(stype)
self.cache = cache
self.ticket = {}
#
self.session_db = self.choose_session_storage()
# Needed for
self.symkey = symkey
self.seed = rndstr()
self.iv = os.urandom(16)
self.lock = threading.Lock()

def getvalid_certificate_str(self):
Expand Down
74 changes: 74 additions & 0 deletions tests/test_92_aes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import os

import saml2.aes


class TestAES():
def test_aes_defaults(self):
original_msg = b'ToBeOrNotTobe W.S.'
key = os.urandom(16)
aes = saml2.aes.AESCipher(key)

encrypted_msg = aes.encrypt(original_msg)
decrypted_msg = aes.decrypt(encrypted_msg)
assert decrypted_msg == original_msg

def test_aes_128_cbc(self):
original_msg = b'ToBeOrNotTobe W.S.'
key = os.urandom(16)
aes = saml2.aes.AESCipher(key)
alg = 'aes_128_cbc'

encrypted_msg = aes.encrypt(original_msg, alg=alg)
decrypted_msg = aes.decrypt(encrypted_msg, alg=alg)
assert decrypted_msg == original_msg

def test_aes_128_cfb(self):
original_msg = b'ToBeOrNotTobe W.S.'
key = os.urandom(16)
aes = saml2.aes.AESCipher(key)
alg = 'aes_128_cfb'

encrypted_msg = aes.encrypt(original_msg, alg=alg)
decrypted_msg = aes.decrypt(encrypted_msg, alg=alg)
assert decrypted_msg == original_msg

def test_aes_192_cbc(self):
original_msg = b'ToBeOrNotTobe W.S.'
key = os.urandom(24)
aes = saml2.aes.AESCipher(key)
alg = 'aes_192_cbc'

encrypted_msg = aes.encrypt(original_msg, alg=alg)
decrypted_msg = aes.decrypt(encrypted_msg, alg=alg)
assert decrypted_msg == original_msg

def test_aes_192_cfb(self):
original_msg = b'ToBeOrNotTobe W.S.'
key = os.urandom(24)
aes = saml2.aes.AESCipher(key)
alg = 'aes_192_cfb'

encrypted_msg = aes.encrypt(original_msg, alg=alg)
decrypted_msg = aes.decrypt(encrypted_msg, alg=alg)
assert decrypted_msg == original_msg

def test_aes_256_cbc(self):
original_msg = b'ToBeOrNotTobe W.S.'
key = os.urandom(32)
aes = saml2.aes.AESCipher(key)
alg = 'aes_256_cbc'

encrypted_msg = aes.encrypt(original_msg, alg=alg)
decrypted_msg = aes.decrypt(encrypted_msg, alg=alg)
assert decrypted_msg == original_msg

def test_aes_256_cfb(self):
original_msg = b'ToBeOrNotTobe W.S.'
key = os.urandom(32)
aes = saml2.aes.AESCipher(key)
alg = 'aes_256_cfb'

encrypted_msg = aes.encrypt(original_msg, alg=alg)
decrypted_msg = aes.decrypt(encrypted_msg, alg=alg)
assert decrypted_msg == original_msg