Skip to content

Commit e9df3cd

Browse files
committed
fix(jwe): limit DEF decompress size to 250k bytes
1 parent bf3eaf5 commit e9df3cd

File tree

3 files changed

+40
-2
lines changed

3 files changed

+40
-2
lines changed

src/joserfc/errors.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,12 @@ class BadSignatureError(JoseError):
7272
error = "bad_signature"
7373

7474

75+
class ExceededSizeError(JoseError):
76+
"""This error is designed for DEF zip algorithm. It raised when the
77+
compressed data exceeds the maximum allowed length."""
78+
error = "exceeded_size"
79+
80+
7581
class InvalidEncryptionAlgorithmError(JoseError):
7682
"""This error is designed for JWE. It is raised when "enc" value
7783
does not work together with "alg" value.

src/joserfc/rfc7518/jwe_zips.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import zlib
22
from typing import List
33
from ..rfc7516.models import JWEZipModel
4+
from ..errors import ExceededSizeError
5+
6+
GZIP_HEAD = bytes([120, 156])
7+
MAX_SIZE = 250 * 1024
48

59

610
class DeflateZipModel(JWEZipModel):
@@ -10,12 +14,20 @@ class DeflateZipModel(JWEZipModel):
1014
def compress(self, s: bytes) -> bytes:
1115
"""Compress bytes data with DEFLATE algorithm."""
1216
data = zlib.compress(s)
13-
# drop gzip headers and tail
17+
# https://datatracker.ietf.org/doc/html/rfc1951
18+
# since DEF is always gzip, we can drop gzip headers and tail
1419
return data[2:-4]
1520

1621
def decompress(self, s: bytes) -> bytes:
1722
"""Decompress DEFLATE bytes data."""
18-
return zlib.decompress(s, -zlib.MAX_WBITS)
23+
if s.startswith(GZIP_HEAD):
24+
decompressor = zlib.decompressobj()
25+
else:
26+
decompressor = zlib.decompressobj(-zlib.MAX_WBITS)
27+
value = decompressor.decompress(s, MAX_SIZE)
28+
if decompressor.unconsumed_tail:
29+
raise ExceededSizeError(f"Decompressed string exceeds {MAX_SIZE} bytes")
30+
return value
1931

2032

2133
JWE_ZIP_MODELS: List[JWEZipModel] = [DeflateZipModel()]

tests/jwe/test_compact.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
MissingAlgorithmError,
88
MissingEncryptionError,
99
DecodeError,
10+
ExceededSizeError,
1011
)
1112
from joserfc.util import json_b64encode
1213
from tests.base import load_key
@@ -176,6 +177,25 @@ def test_with_zip_header(self):
176177
obj = decrypt_compact(result, private_key)
177178
self.assertEqual(obj.plaintext, plaintext)
178179

180+
def test_decompress_zip_with_gzip_head(self):
181+
key = OctKey.import_key({'k': 'pyL42ncDFSYnenl-GiZjRw', 'kty': 'oct'})
182+
s = (
183+
'eyJhbGciOiJkaXIiLCJlbmMiOiJBMTI4R0NNIiwiemlwIjoiREVGIn0..'
184+
'YbDfdYa6p-wAEFul.YK7j0MsH-Dko6ifsEg.wES6-QAOEbErZqXiS0JHRw'
185+
)
186+
result = decrypt_compact(s, key)
187+
self.assertEqual(result.plaintext, b'hello')
188+
self.assertEqual(result.headers().get('zip'), 'DEF')
189+
190+
def test_decompress_zip_exceeds_size(self):
191+
key = OctKey.import_key({'k': 'pyL42ncDFSYnenl-GiZjRw', 'kty': 'oct'})
192+
result = encrypt_compact(
193+
{"alg": "dir", "enc": "A128GCM", "zip": "DEF"},
194+
b'h' * 300000,
195+
key
196+
)
197+
self.assertRaises(ExceededSizeError, decrypt_compact, result, key)
198+
179199
def test_invalid_compact_data(self):
180200
private_key: RSAKey = load_key('rsa-openssl-private.pem')
181201
value = b"a.b.c.d.e.f.g"

0 commit comments

Comments
 (0)