11import hashlib
22import json
33import random
4- from base64 import decodebytes , encodebytes
4+ import logging
55
6- from pubnub .crypto_core import PubNubCrypto
6+
7+ from base64 import decodebytes , encodebytes , b64decode , b64encode
78from Cryptodome .Cipher import AES
89from Cryptodome .Util .Padding import pad , unpad
10+ from pubnub .crypto_core import PubNubCrypto , PubNubCryptor , PubNubLegacyCryptor , PubNubAesCbcCryptor , CryptoHeader , \
11+ CryptorPayload
12+ from pubnub .exceptions import PubNubException
13+ from typing import Union , Dict
914
1015
1116Initial16bytes = '0123456789012345'
@@ -80,9 +85,10 @@ def get_secret(self, key):
8085
8186
8287class PubNubFileCrypto (PubNubCryptodome ):
83- def encrypt (self , key , file ):
88+ def encrypt (self , key , file , use_random_iv = True ):
89+
8490 secret = self .get_secret (key )
85- initialization_vector = self .get_initialization_vector (use_random_iv = True )
91+ initialization_vector = self .get_initialization_vector (use_random_iv )
8692 cipher = AES .new (bytes (secret [0 :32 ], "utf-8" ), self .mode , bytes (initialization_vector , 'utf-8' ))
8793 initialization_vector = bytes (initialization_vector , 'utf-8' )
8894
@@ -92,9 +98,9 @@ def encrypt(self, key, file):
9298 initialization_vector = initialization_vector
9399 )
94100
95- def decrypt (self , key , file ):
101+ def decrypt (self , key , file , use_random_iv = True ):
96102 secret = self .get_secret (key )
97- initialization_vector , extracted_file = self .extract_random_iv (file , use_random_iv = True )
103+ initialization_vector , extracted_file = self .extract_random_iv (file , use_random_iv )
98104 try :
99105 cipher = AES .new (bytes (secret [0 :32 ], "utf-8" ), self .mode , initialization_vector )
100106 result = unpad (cipher .decrypt (extracted_file ), 16 )
@@ -103,3 +109,163 @@ def decrypt(self, key, file):
103109 result = unpad (cipher .decrypt (extracted_file ), 16 )
104110
105111 return result
112+
113+
114+ class PubNubCryptoModule (PubNubCrypto ):
115+ FALLBACK_CRYPTOR_ID : str = '0000'
116+ cryptor_map = {}
117+ default_cryptor_id : str
118+
119+ def __init__ (self , cryptor_map : Dict [str , PubNubCryptor ], default_cryptor : PubNubCryptor ):
120+ self .cryptor_map = cryptor_map
121+ self .default_cryptor_id = default_cryptor .CRYPTOR_ID
122+
123+ def _validate_cryptor_id (self , cryptor_id : str ) -> str :
124+ cryptor_id = cryptor_id or self .default_cryptor_id
125+
126+ if len (cryptor_id ) != 4 :
127+ logging .error (f'Malformed cryptor id: { cryptor_id } ' )
128+ raise PubNubException ('Malformed cryptor id' )
129+
130+ if cryptor_id not in self .cryptor_map .keys ():
131+ logging .error (f'Unsupported cryptor: { cryptor_id } ' )
132+ raise PubNubException ('unknown cryptor error' )
133+ return cryptor_id
134+
135+ def _get_cryptor (self , cryptor_id ):
136+ if not cryptor_id or cryptor_id not in self .cryptor_map :
137+ raise PubNubException ('unknown cryptor error' )
138+ return self .cryptor_map [cryptor_id ]
139+
140+ # encrypt string
141+ def encrypt (self , message : str , cryptor_id : str = None ) -> str :
142+ if not len (message ):
143+ raise PubNubException ('encryption error' )
144+ cryptor_id = self ._validate_cryptor_id (cryptor_id )
145+ data = message .encode ('utf-8' )
146+ crypto_payload = self .cryptor_map [cryptor_id ].encrypt (data )
147+ header = self .encode_header (cryptor_id = cryptor_id , cryptor_data = crypto_payload ['cryptor_data' ])
148+ return b64encode (header + crypto_payload ['data' ]).decode ()
149+
150+ def decrypt (self , message ):
151+ data = b64decode (message )
152+ header = self .decode_header (data )
153+ if header :
154+ cryptor_id = header ['cryptor_id' ]
155+ payload = CryptorPayload (data = data [header ['length' ]:], cryptor_data = header ['cryptor_data' ])
156+ if not header :
157+ cryptor_id = self .FALLBACK_CRYPTOR_ID
158+ payload = CryptorPayload (data = data )
159+
160+ if not len (payload ['data' ]):
161+ raise PubNubException ('decryption error' )
162+
163+ if cryptor_id not in self .cryptor_map .keys ():
164+ raise PubNubException ('unknown cryptor error' )
165+
166+ message = self ._get_cryptor (cryptor_id ).decrypt (payload )
167+ try :
168+ return json .loads (message )
169+ except Exception :
170+ return message
171+
172+ def encrypt_file (self , file_data , cryptor_id : str = None ):
173+ if not len (file_data ):
174+ raise PubNubException ('encryption error' )
175+ cryptor_id = self ._validate_cryptor_id (cryptor_id )
176+ crypto_payload = self .cryptor_map [cryptor_id ].encrypt (file_data )
177+ header = self .encode_header (cryptor_id = cryptor_id , cryptor_data = crypto_payload ['cryptor_data' ])
178+ return header + crypto_payload ['data' ]
179+
180+ def decrypt_file (self , file_data ):
181+ header = self .decode_header (file_data )
182+ if header :
183+ cryptor_id = header ['cryptor_id' ]
184+ payload = CryptorPayload (data = file_data [header ['length' ]:], cryptor_data = header ['cryptor_data' ])
185+ else :
186+ cryptor_id = self .FALLBACK_CRYPTOR_ID
187+ payload = CryptorPayload (data = file_data )
188+
189+ if not len (payload ['data' ]):
190+ raise PubNubException ('decryption error' )
191+
192+ if cryptor_id not in self .cryptor_map .keys ():
193+ raise PubNubException ('unknown cryptor error' )
194+
195+ return self ._get_cryptor (cryptor_id ).decrypt (payload , binary_mode = True )
196+
197+ def encode_header (self , cryptor_id : str = None , cryptor_data : any = None ) -> str :
198+ if cryptor_id == self .FALLBACK_CRYPTOR_ID :
199+ return b''
200+ if cryptor_data and len (cryptor_data ) > 65535 :
201+ raise PubNubException ('Cryptor data is too long' )
202+ cryptor_id = self ._validate_cryptor_id (cryptor_id )
203+
204+ sentinel = b'PNED'
205+ version = CryptoHeader .header_ver .to_bytes (1 , byteorder = 'big' )
206+ crid = bytes (cryptor_id , 'utf-8' )
207+
208+ if cryptor_data :
209+ crd = cryptor_data
210+ cryptor_data_len = len (cryptor_data )
211+ else :
212+ crd = b''
213+ cryptor_data_len = 0
214+
215+ if cryptor_data_len < 255 :
216+ crlen = cryptor_data_len .to_bytes (1 , byteorder = 'big' )
217+ else :
218+ crlen = b'\xff ' + cryptor_data_len .to_bytes (2 , byteorder = 'big' )
219+ return sentinel + version + crid + crlen + crd
220+
221+ def decode_header (self , header : bytes ) -> Union [None , CryptoHeader ]:
222+ try :
223+ sentinel = header [:4 ]
224+ if sentinel != b'PNED' :
225+ return False
226+ except ValueError :
227+ return False
228+
229+ try :
230+ header_version = header [4 ]
231+ if header_version > CryptoHeader .header_ver :
232+ raise PubNubException ('unknown cryptor error' )
233+
234+ cryptor_id = header [5 :9 ].decode ()
235+ crlen = header [9 ]
236+ if crlen < 255 :
237+ cryptor_data = header [10 : 10 + crlen ]
238+ hlen = 10 + crlen
239+ else :
240+ crlen = int (header [10 :12 ].hex (), 16 )
241+ cryptor_data = header [12 :12 + crlen ]
242+ hlen = 12 + crlen
243+
244+ return CryptoHeader (sentinel = sentinel , header_ver = header_version , cryptor_id = cryptor_id ,
245+ cryptor_data = cryptor_data , length = hlen )
246+ except IndexError :
247+ raise PubNubException ('decryption error' )
248+
249+
250+ class LegacyCryptoModule (PubNubCryptoModule ):
251+ def __init__ (self , config ) -> None :
252+ cryptor_map = {
253+ PubNubLegacyCryptor .CRYPTOR_ID : PubNubLegacyCryptor (config .cipher_key ,
254+ config .use_random_initialization_vector ,
255+ config .cipher_mode ,
256+ config .fallback_cipher_mode ),
257+ PubNubAesCbcCryptor .CRYPTOR_ID : PubNubAesCbcCryptor (config .cipher_key ),
258+ }
259+ super ().__init__ (cryptor_map , PubNubLegacyCryptor )
260+
261+
262+ class AesCbcCryptoModule (PubNubCryptoModule ):
263+ def __init__ (self , config ) -> None :
264+ cryptor_map = {
265+ PubNubLegacyCryptor .CRYPTOR_ID : PubNubLegacyCryptor (config .cipher_key ,
266+ config .use_random_initialization_vector ,
267+ config .cipher_mode ,
268+ config .fallback_cipher_mode ),
269+ PubNubAesCbcCryptor .CRYPTOR_ID : PubNubAesCbcCryptor (config .cipher_key ),
270+ }
271+ super ().__init__ (cryptor_map , PubNubAesCbcCryptor )
0 commit comments