Skip to content
Draft
17 changes: 12 additions & 5 deletions at_client/atclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import traceback

from at_client.connections.notification.atevents import AtEvent, AtEventType
from at_client.util.iv_nonce import IVNonce


from .common.atsign import AtSign
Expand Down Expand Up @@ -187,7 +188,10 @@ def _put_self_key(self, key: SelfKey, value: str):
key.metadata.data_signature = EncryptionUtil.sign_sha256_rsa(value, self.keys[KeysUtil.encryption_private_key_name])

try:
cipher_text = EncryptionUtil.aes_encrypt_from_base64(value, self.keys[KeysUtil.self_encryption_key_name])
if key.metadata.iv_nonce is None:
raise AtMissingIVException("Missing IV for SelfKey")

cipher_text = EncryptionUtil.aes_encrypt_from_base64(value, self.keys[KeysUtil.self_encryption_key_name], IVNonce.get_bytes_from_b64(key.metadata.iv_nonce))
except Exception as e:
raise AtEncryptionException(f"Failed to encrypt value with self encryption key - {e}")

Expand All @@ -214,11 +218,14 @@ def _put_shared_key(self, key: SharedKey, value: str):
what = ""
cipher_text = None
try:
if key.metadata.iv_nonce is None:
raise AtMissingIVException("Missing IV for SharedKey")

what = "fetch/create shared encryption key"
share_to_encryption_key = self.get_encryption_key_shared_by_me(key)

what = "encrypt value with shared encryption key"
cipher_text = EncryptionUtil.aes_encrypt_from_base64(value, share_to_encryption_key)
cipher_text = EncryptionUtil.aes_encrypt_from_base64(value, share_to_encryption_key, IVNonce.get_bytes_from_b64(key.metadata.iv_nonce))
except Exception as e:
raise AtEncryptionException(f"Failed to {what} - {e}")

Expand Down Expand Up @@ -264,7 +271,7 @@ def _get_self_key(self, key: SelfKey):
encrypted_value = fetched["data"]
self_encryption_key = self.keys[KeysUtil.self_encryption_key_name]
try:
decrypted_value = EncryptionUtil.aes_decrypt_from_base64(encrypted_value, self_encryption_key)
decrypted_value = EncryptionUtil.aes_decrypt_from_base64(encrypted_value, self_encryption_key, base64.b64decode(key.metadata.iv_nonce))
except Exception as e:
raise AtDecryptionException(f"Failed to {command} - {e}")

Expand Down Expand Up @@ -305,7 +312,7 @@ def _get_shared_by_me_with_other(self, shared_key: SharedKey):
raise AtSecondaryConnectException(f"Failed to execute {command} - {e}")

try:
return EncryptionUtil.aes_decrypt_from_base64(raw_response.get_raw_data_response(), share_encryption_key)
return EncryptionUtil.aes_decrypt_from_base64(raw_response.get_raw_data_response(), share_encryption_key, IVNonce.get_bytes_from_b64(shared_key.metadata.iv_nonce))
except Exception as e:
raise AtDecryptionException(f"Failed to decrypt value with shared encryption key - {e}")

Expand All @@ -325,7 +332,7 @@ def _get_shared_by_other_with_me(self, shared_key:SharedKey):

what = "decrypt value with shared encryption key"
try:
return EncryptionUtil.aes_decrypt_from_base64(raw_response.get_raw_data_response(), share_encryption_key)
return EncryptionUtil.aes_decrypt_from_base64(raw_response.get_raw_data_response(), share_encryption_key, IVNonce.get_bytes_from_b64(shared_key.metadata.iv_nonce))
except Exception as e:
raise AtDecryptionException(f"Failed to {what} - {e}")

Expand Down
4 changes: 4 additions & 0 deletions at_client/common/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ def set_time_to_live(self, ttl: int):
def set_time_to_birth(self, ttb: int):
self.metadata.ttb = ttb
return self

def set_iv_nonce(self, iv_nonce: str):
self.metadata.iv_nonce = iv_nonce
return self


class PublicKey(AtKey):
Expand Down
2 changes: 1 addition & 1 deletion at_client/common/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def __str__(self):
if self.encoding:
s += f":encoding:{self.encoding}"
if self.iv_nonce:
s += f":ivNonce:{binascii.b2a_base64(self.iv_nonce).decode('utf-8')[:-1]}"
s += f":ivNonce:{self.iv_nonce}"
# TO?DO: Add new parameters

return s
Expand Down
4 changes: 4 additions & 0 deletions at_client/exception/atexception.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,5 +159,9 @@ def __init__(self, message):
super().__init__(message)

class AtRegistrarException(AtException):
def __init__(self, message):
super().__init__(message)

class AtMissingIVException(AtException):
def __init__(self, message):
super().__init__(message)
3 changes: 1 addition & 2 deletions at_client/util/encryptionutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,4 @@ def public_key_from_base64(s):

@staticmethod
def generate_iv_nonce():
return secrets.token_bytes(16)

return secrets.token_bytes(16)
34 changes: 34 additions & 0 deletions at_client/util/iv_nonce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import base64
from at_client.util.encryptionutil import EncryptionUtil


class IVNonce:
def __init__(self, iv_nonce_bytes=EncryptionUtil.generate_iv_nonce()):
self.iv_nonce_bytes = iv_nonce_bytes

def __repr__(self):
return str(self)

def __str__(self):
return self.as_b64()

def as_b64(self):
b64_str = ""
if self.iv_nonce_bytes:
b64_str = base64.b64encode(self.iv_nonce_bytes).rstrip().decode('utf-8')
return b64_str

def as_bytes(self):
return self.iv_nonce_bytes

@classmethod
def from_b64(cls, b64_str: str):
return cls(base64.b64decode(b64_str))

def set_iv_nonce_bytes(self, iv_nonce_bytes):
self.iv_nonce_bytes = iv_nonce_bytes
return self

@staticmethod
def get_bytes_from_b64(b64_str: str):
return base64.b64decode(b64_str)
2 changes: 1 addition & 1 deletion examples/repl.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def main():
raise Exception("Could not evaluate the key type of: " + fullKeyName)
elif verb == "put":
fullKeyName = parts[1]
value = command[verb.length() + fullKeyName.length() + 2:].strip()
value = command[len(verb) + len(fullKeyName) + 2:].strip()
keyStringUtil = KeyStringUtil(full_key_name=fullKeyName)
keyType = keyStringUtil.get_key_type()
if keyType == KeyType.PUBLIC_KEY:
Expand Down
25 changes: 15 additions & 10 deletions test/atclient_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from at_client.common import AtSign
from at_client.common.keys import PublicKey, SelfKey, SharedKey
from at_client.exception import *
from at_client.util.iv_nonce import IVNonce
from test_wrapper import skip_if_dependabot_pr

class AtClientTest(unittest.TestCase):
Expand Down Expand Up @@ -58,7 +59,8 @@ def test_put_self_key(self):
"""Test Put Function with Self Key"""
atsign = AtSign(self.atsign1)
atclient = AtClient(atsign, verbose=self.verbose)
sk = SelfKey("test_self_key", atsign)
iv = IVNonce().as_b64()
sk = SelfKey("test_self_key", atsign).set_iv_nonce(iv)
response = atclient.put(sk, "test1")
self.assertIsNotNone(response)

Expand All @@ -68,14 +70,14 @@ def test_put_shared_key(self):
shared_by = AtSign(self.atsign1)
shared_with = AtSign(self.atsign2)
atclient = AtClient(shared_by, verbose=self.verbose)
sk = SharedKey("test_shared_key", shared_by, shared_with)
sk = SharedKey("test_shared_key", shared_by, shared_with).set_iv_nonce(IVNonce().as_b64())
response = atclient.put(sk, "test1")
self.assertIsNotNone(response)

shared_with = AtSign(self.atsign1)
shared_by = AtSign(self.atsign2)
atclient = AtClient(shared_by, verbose=self.verbose)
sk = SharedKey("test_shared_key2", shared_by, shared_with)
sk = SharedKey("test_shared_key2", shared_by, shared_with).set_iv_nonce(IVNonce().as_b64())
response = atclient.put(sk, "test2")
self.assertIsNotNone(response)

Expand Down Expand Up @@ -169,7 +171,8 @@ def test_get_self_key(self):
"""Test Get Function with Self Key"""
atsign = AtSign(self.atsign1)
atclient = AtClient(atsign, verbose=self.verbose)
sk = SelfKey("test_self_key", atsign)
iv = IVNonce().as_b64()
sk = SelfKey("test_self_key", atsign).set_iv_nonce(iv)
response = atclient.put(sk, "test1")
response = atclient.get(sk)
self.assertEqual("test1", response)
Expand Down Expand Up @@ -206,15 +209,17 @@ def test_get_shared_key(self):
shared_by = AtSign(self.atsign1)
shared_with = AtSign(self.atsign2)
atclient = AtClient(shared_by, verbose=self.verbose)
sk = SharedKey("test_shared_key1445", shared_by, shared_with)
sk = SharedKey("test_shared_key1445", shared_by, shared_with).set_iv_nonce(IVNonce().as_b64())
atclient.put(sk, "test")
response = atclient.get(sk)
self.assertEqual("test", response)

# Shared by other with me
sk = SharedKey("test_shared_key2", shared_with, shared_by)
atclient.put(SharedKey("test_shared_key2", shared_by, shared_with), "test2")
response = atclient.get(sk)
iv = IVNonce().as_b64()
sk1 = SharedKey("test_shared_key2", shared_with, shared_by).set_iv_nonce(iv)
sk2 = SharedKey("test_shared_key2", shared_by, shared_with).set_iv_nonce(iv)
atclient.put(sk2, "test2")
response = atclient.get(sk1)
self.assertEqual("test2", response)

# Shared Key not found test
Expand All @@ -240,7 +245,7 @@ def test_delete_self_key(self):
atsign = AtSign(self.atsign1)
atclient = AtClient(atsign, verbose=self.verbose)
random_key_name = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(6))
sk = SelfKey(random_key_name, atsign)
sk = SelfKey(random_key_name, atsign).set_iv_nonce(IVNonce().as_b64())
response = atclient.put(sk, "test1")

response = atclient.delete(sk)
Expand All @@ -253,7 +258,7 @@ def test_delete_shared_key(self):
shared_with = AtSign(self.atsign2)
atclient = AtClient(shared_by, verbose=self.verbose)
random_key_name = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(6))
sk = SharedKey(random_key_name, shared_by, shared_with)
sk = SharedKey(random_key_name, shared_by, shared_with).set_iv_nonce(IVNonce().as_b64())
response = atclient.put(sk, "test1")

response = atclient.delete(sk)
Expand Down
5 changes: 3 additions & 2 deletions test/encryptionutil_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ def test_aes_encryption(self):
"""Test generating an AES key and encryption/decryption."""
secret_key = EncryptionUtil.generate_aes_key_base64()
plain_text = "AES"
encrypted_text = EncryptionUtil.aes_encrypt_from_base64(plain_text, secret_key)
decrypted_text = EncryptionUtil.aes_decrypt_from_base64(encrypted_text, secret_key)
iv_nonce = EncryptionUtil.generate_iv_nonce()
encrypted_text = EncryptionUtil.aes_encrypt_from_base64(plain_text, secret_key, iv_nonce)
decrypted_text = EncryptionUtil.aes_decrypt_from_base64(encrypted_text, secret_key, iv_nonce)
self.assertEqual(plain_text, decrypted_text)

def test_rsa_encryption(self):
Expand Down
3 changes: 2 additions & 1 deletion test/test_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

def skip_if_dependabot_pr(func):
"""Decorator for skipping a test method if it's a Dependabot PR."""
if int(os.getenv('DEPENDABOT_PR')):
dependabot_pr = os.getenv('DEPENDABOT_PR')
if dependabot_pr is not None and int(dependabot_pr):
return unittest.skip("Dependabot PR")(func)
else:
return func
Expand Down