40
40
41
41
_ENCRYPTION_PROTOCOL_V1 = '1.0'
42
42
_ENCRYPTION_PROTOCOL_V2 = '2.0'
43
+ _ENCRYPTION_PROTOCOL_V2_1 = '2.1'
44
+ _VALID_ENCRYPTION_PROTOCOLS = [_ENCRYPTION_PROTOCOL_V1 , _ENCRYPTION_PROTOCOL_V2 , _ENCRYPTION_PROTOCOL_V2_1 ]
45
+ _ENCRYPTION_V2_PROTOCOLS = [_ENCRYPTION_PROTOCOL_V2 , _ENCRYPTION_PROTOCOL_V2_1 ]
43
46
_GCM_REGION_DATA_LENGTH = 4 * 1024 * 1024
44
47
_GCM_NONCE_LENGTH = 12
45
48
_GCM_TAG_LENGTH = 16
46
49
47
50
_ERROR_OBJECT_INVALID = \
48
51
'{0} does not define a complete interface. Value of {1} is either missing or invalid.'
49
52
53
+ _ERROR_UNSUPPORTED_METHOD_FOR_ENCRYPTION = (
54
+ 'The require_encryption flag is set, but encryption is not supported'
55
+ ' for this method.' )
56
+
50
57
51
58
class KeyEncryptionKey (Protocol ):
52
59
@@ -289,14 +296,14 @@ def encrypt_data_v2(data: bytes, nonce: int, key: bytes) -> bytes:
289
296
290
297
def is_encryption_v2 (encryption_data : Optional [_EncryptionData ]) -> bool :
291
298
"""
292
- Determine whether the given encryption data signifies version 2.0.
299
+ Determine whether the given encryption data signifies version 2.0 or 2.1 .
293
300
294
301
:param Optional[_EncryptionData] encryption_data: The encryption data. Will return False if this is None.
295
302
:return: True, if the encryption data indicates encryption V2, false otherwise.
296
303
:rtype: bool
297
304
"""
298
305
# If encryption_data is None, assume no encryption
299
- return bool (encryption_data and (encryption_data .encryption_agent .protocol == _ENCRYPTION_PROTOCOL_V2 ))
306
+ return bool (encryption_data and (encryption_data .encryption_agent .protocol in _ENCRYPTION_V2_PROTOCOLS ))
300
307
301
308
302
309
def modify_user_agent_for_encryption (
@@ -357,7 +364,7 @@ def get_adjusted_upload_size(length: int, encryption_version: str) -> int:
357
364
def get_adjusted_download_range_and_offset (
358
365
start : int ,
359
366
end : int ,
360
- length : int ,
367
+ length : Optional [ int ] ,
361
368
encryption_data : Optional [_EncryptionData ]) -> Tuple [Tuple [int , int ], Tuple [int , int ]]:
362
369
"""
363
370
Gets the new download range and offsets into the decrypted data for
@@ -374,7 +381,7 @@ def get_adjusted_download_range_and_offset(
374
381
375
382
:param int start: The user-requested start index.
376
383
:param int end: The user-requested end index.
377
- :param int length: The user-requested length. Only used for V1.
384
+ :param Optional[ int] length: The user-requested length. Only used for V1.
378
385
:param Optional[_EncryptionData] encryption_data: The encryption data to determine version and sizes.
379
386
:return: (new start, new end), (start offset, end offset)
380
387
:rtype: Tuple[Tuple[int, int], Tuple[int, int]]
@@ -401,7 +408,7 @@ def get_adjusted_download_range_and_offset(
401
408
end_offset = 15 - (end % 16 )
402
409
end += end_offset
403
410
404
- elif encryption_data .encryption_agent .protocol == _ENCRYPTION_PROTOCOL_V2 :
411
+ elif encryption_data .encryption_agent .protocol in _ENCRYPTION_V2_PROTOCOLS :
405
412
start_offset , end_offset = 0 , end
406
413
407
414
if encryption_data .encrypted_region_info is None :
@@ -451,17 +458,20 @@ def parse_encryption_data(metadata: Dict[str, Any]) -> Optional[_EncryptionData]
451
458
return None
452
459
453
460
454
- def adjust_blob_size_for_encryption (size : int , encryption_data : _EncryptionData ) -> int :
461
+ def adjust_blob_size_for_encryption (size : int , encryption_data : Optional [ _EncryptionData ] ) -> int :
455
462
"""
456
463
Adjusts the given blob size for encryption by subtracting the size of
457
464
the encryption data (nonce + tag). This only has an affect for encryption V2.
458
465
459
466
:param int size: The original blob size.
460
- :param _EncryptionData encryption_data: The encryption data to determine version and sizes.
467
+ :param Optional[ _EncryptionData] encryption_data: The encryption data to determine version and sizes.
461
468
:return: The new blob size.
462
469
:rtype: int
463
470
"""
464
- if is_encryption_v2 (encryption_data ) and encryption_data .encrypted_region_info is not None :
471
+ if (encryption_data is not None and
472
+ encryption_data .encrypted_region_info is not None and
473
+ is_encryption_v2 (encryption_data )):
474
+
465
475
nonce_length = encryption_data .encrypted_region_info .nonce_length
466
476
data_length = encryption_data .encrypted_region_info .data_length
467
477
tag_length = encryption_data .encrypted_region_info .tag_length
@@ -543,7 +553,7 @@ def _dict_to_encryption_data(encryption_data_dict: Dict[str, Any]) -> _Encryptio
543
553
"""
544
554
try :
545
555
protocol = encryption_data_dict ['EncryptionAgent' ]['Protocol' ]
546
- if protocol not in [ _ENCRYPTION_PROTOCOL_V1 , _ENCRYPTION_PROTOCOL_V2 ] :
556
+ if protocol not in _VALID_ENCRYPTION_PROTOCOLS :
547
557
raise ValueError ("Unsupported encryption version." )
548
558
except KeyError as exc :
549
559
raise ValueError ("Unsupported encryption version." ) from exc
@@ -629,7 +639,7 @@ def _validate_and_unwrap_cek(
629
639
# Validate we have the right info for the specified version
630
640
if encryption_data .encryption_agent .protocol == _ENCRYPTION_PROTOCOL_V1 :
631
641
_validate_not_none ('content_encryption_IV' , encryption_data .content_encryption_IV )
632
- elif encryption_data .encryption_agent .protocol == _ENCRYPTION_PROTOCOL_V2 :
642
+ elif encryption_data .encryption_agent .protocol in _ENCRYPTION_V2_PROTOCOLS :
633
643
_validate_not_none ('encrypted_region_info' , encryption_data .encrypted_region_info )
634
644
else :
635
645
raise ValueError ('Specified encryption version is not supported.' )
@@ -655,8 +665,8 @@ def _validate_and_unwrap_cek(
655
665
656
666
# For V2, the version is included with the cek. We need to validate it
657
667
# and remove it from the actual cek.
658
- if encryption_data .encryption_agent .protocol == _ENCRYPTION_PROTOCOL_V2 :
659
- version_2_bytes = _ENCRYPTION_PROTOCOL_V2 .encode ().ljust (8 , b'\0 ' )
668
+ if encryption_data .encryption_agent .protocol in _ENCRYPTION_V2_PROTOCOLS :
669
+ version_2_bytes = encryption_data . encryption_agent . protocol .encode ().ljust (8 , b'\0 ' )
660
670
cek_version_bytes = content_encryption_key [:len (version_2_bytes )]
661
671
if cek_version_bytes != version_2_bytes :
662
672
raise ValueError ('The encryption metadata is not valid and may have been modified.' )
@@ -715,7 +725,7 @@ def _decrypt_message(
715
725
unpadder = PKCS7 (128 ).unpadder ()
716
726
decrypted_data = (unpadder .update (decrypted_data ) + unpadder .finalize ())
717
727
718
- elif encryption_data .encryption_agent .protocol == _ENCRYPTION_PROTOCOL_V2 :
728
+ elif encryption_data .encryption_agent .protocol in _ENCRYPTION_V2_PROTOCOLS :
719
729
block_info = encryption_data .encrypted_region_info
720
730
if not block_info or not block_info .nonce_length :
721
731
raise ValueError ("Missing required metadata for decryption." )
@@ -887,7 +897,7 @@ def decrypt_blob( # pylint: disable=too-many-locals,too-many-statements
887
897
raise ValueError ('Specified encryption algorithm is not supported.' )
888
898
889
899
version = encryption_data .encryption_agent .protocol
890
- if version not in ( _ENCRYPTION_PROTOCOL_V1 , _ENCRYPTION_PROTOCOL_V2 ) :
900
+ if version not in _VALID_ENCRYPTION_PROTOCOLS :
891
901
raise ValueError ('Specified encryption version is not supported.' )
892
902
893
903
content_encryption_key = _validate_and_unwrap_cek (encryption_data , key_encryption_key , key_resolver )
@@ -938,7 +948,7 @@ def decrypt_blob( # pylint: disable=too-many-locals,too-many-statements
938
948
939
949
return content [start_offset : len (content ) - end_offset ]
940
950
941
- if version == _ENCRYPTION_PROTOCOL_V2 :
951
+ if version in _ENCRYPTION_V2_PROTOCOLS :
942
952
# We assume the content contains only full encryption regions
943
953
total_size = len (content )
944
954
offset = 0
0 commit comments