Skip to content

Commit

Permalink
Add abstract filesystem support to interface funcs
Browse files Browse the repository at this point in the history
Add storage_backend as an optional parameter to functions in the interface
module which read files from the disk. When no argument is provided and
the default value of None is used, instantiate a FilesystemBackend and use
that to access files on the local filesystem.

Signed-off-by: Joshua Lock <jlock@vmware.com>
  • Loading branch information
joshuagl committed Apr 20, 2020
1 parent 5eaaad4 commit 35d93c6
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 22 deletions.
68 changes: 54 additions & 14 deletions securesystemslib/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ def generate_and_write_rsa_keypair(filepath=None, bits=DEFAULT_RSA_KEY_BITS,


def import_rsa_privatekey_from_file(filepath, password=None,
scheme='rsassa-pss-sha256', prompt=False):
scheme='rsassa-pss-sha256', prompt=False,
storage_backend=None):
"""
<Purpose>
Import the PEM file in 'filepath' containing the private key.
Expand Down Expand Up @@ -272,6 +273,11 @@ def import_rsa_privatekey_from_file(filepath, password=None,
If True the user is prompted for a passphrase to decrypt 'filepath'.
Default is False.
storage_backend:
An object which implements
securesystemslib.storage.StorageBackendInterface. When no object is
passed a FilesystemStorageBackend will be instantiated and used.
<Exceptions>
ValueError, if 'password' is passed and 'prompt' is True.
Expand Down Expand Up @@ -344,9 +350,13 @@ def import_rsa_privatekey_from_file(filepath, password=None,
logger.debug('No password was given. Attempting to import an'
' unencrypted file.')

if storage_backend is None:
storage_backend = securesystemslib.storage.FilesystemBackend()

# Read the contents of 'filepath' that should be a PEM formatted private key.
with open(filepath, 'rb') as file_object:
pem_key = file_object.read().decode('utf-8')
file_object = storage_backend.get(filepath)
pem_key = file_object.read().decode('utf-8')
file_object.close()

# Convert 'pem_key' to 'securesystemslib.formats.RSAKEY_SCHEMA' format.
# Raise 'securesystemslib.exceptions.CryptoError' if 'pem_key' is invalid.
Expand All @@ -360,7 +370,8 @@ def import_rsa_privatekey_from_file(filepath, password=None,



def import_rsa_publickey_from_file(filepath, scheme='rsassa-pss-sha256'):
def import_rsa_publickey_from_file(filepath, scheme='rsassa-pss-sha256',
storage_backend=None):
"""
<Purpose>
Import the RSA key stored in 'filepath'. The key object returned is in the
Expand All @@ -374,6 +385,11 @@ def import_rsa_publickey_from_file(filepath, scheme='rsassa-pss-sha256'):
scheme:
The signature scheme used by the imported key.
storage_backend:
An object which implements
securesystemslib.storage.StorageBackendInterface. When no object is
passed a FilesystemStorageBackend will be instantiated and used.
<Exceptions>
securesystemslib.exceptions.FormatError, if 'filepath' is improperly
formatted.
Expand All @@ -397,10 +413,14 @@ def import_rsa_publickey_from_file(filepath, scheme='rsassa-pss-sha256'):
# Is 'scheme' properly formatted?
securesystemslib.formats.RSA_SCHEME_SCHEMA.check_match(scheme)

if storage_backend is None:
storage_backend = securesystemslib.storage.FilesystemBackend()

# Read the contents of the key file that should be in PEM format and contains
# the public portion of the RSA key.
with open(filepath, 'rb') as file_object:
rsa_pubkey_pem = file_object.read().decode('utf-8')
file_object = storage_backend.get(filepath)
rsa_pubkey_pem = file_object.read().decode('utf-8')
file_object.close()

# Convert 'rsa_pubkey_pem' to 'securesystemslib.formats.RSAKEY_SCHEMA' format.
try:
Expand Down Expand Up @@ -587,7 +607,8 @@ def import_ed25519_publickey_from_file(filepath):



def import_ed25519_privatekey_from_file(filepath, password=None, prompt=False):
def import_ed25519_privatekey_from_file(filepath, password=None, prompt=False,
storage_backend=None):
"""
<Purpose>
Import the encrypted ed25519 key file in 'filepath', decrypt it, and return
Expand All @@ -610,6 +631,11 @@ def import_ed25519_privatekey_from_file(filepath, password=None, prompt=False):
If True the user is prompted for a passphrase to decrypt 'filepath'.
Default is False.
storage_backend:
An object which implements
securesystemslib.storage.StorageBackendInterface. When no object is
passed a FilesystemStorageBackend will be instantiated and used.
<Exceptions>
securesystemslib.exceptions.FormatError, if the arguments are improperly
formatted or the imported key object contains an invalid key type (i.e.,
Expand All @@ -634,6 +660,9 @@ def import_ed25519_privatekey_from_file(filepath, password=None, prompt=False):
if password and prompt:
raise ValueError("Passing 'password' and 'prompt' True is not allowed.")

if storage_backend is None:
storage_backend = securesystemslib.storage.FilesystemBackend()

# If 'password' was passed check format and that it is not empty.
if password is not None:
securesystemslib.formats.PASSWORD_SCHEMA.check_match(password)
Expand Down Expand Up @@ -664,10 +693,11 @@ def import_ed25519_privatekey_from_file(filepath, password=None, prompt=False):

# Finally, regardless of password, try decrypting the key, if necessary.
# Otherwise, load it straight from the disk.
with open(filepath, 'rb') as file_object:
json_str = file_object.read()
return securesystemslib.keys.\
import_ed25519key_from_private_json(json_str, password=password)
file_object = storage_backend.get(filepath)
json_str = file_object.read()
file_object.close()
return securesystemslib.keys.\
import_ed25519key_from_private_json(json_str, password=password)



Expand Down Expand Up @@ -832,7 +862,8 @@ def import_ecdsa_publickey_from_file(filepath):



def import_ecdsa_privatekey_from_file(filepath, password=None):
def import_ecdsa_privatekey_from_file(filepath, password=None,
storage_backend=None):
"""
<Purpose>
Import the encrypted ECDSA key file in 'filepath', decrypt it, and return
Expand All @@ -850,6 +881,11 @@ def import_ecdsa_privatekey_from_file(filepath, password=None):
encrypted key file 'filepath' must be decrypted before the ECDSA key
object can be returned.
storage_backend:
An object which implements
securesystemslib.storage.StorageBackendInterface. When no object is
passed a FilesystemStorageBackend will be instantiated and used.
<Exceptions>
securesystemslib.exceptions.FormatError, if the arguments are improperly
formatted or the imported key object contains an invalid key type (i.e.,
Expand Down Expand Up @@ -886,12 +922,16 @@ def import_ecdsa_privatekey_from_file(filepath, password=None):
# Does 'password' have the correct format?
securesystemslib.formats.PASSWORD_SCHEMA.check_match(password)

if storage_backend is None:
storage_backend = securesystemslib.storage.FilesystemBackend()

# Store the encrypted contents of 'filepath' prior to calling the decryption
# routine.
encrypted_key = None

with open(filepath, 'rb') as file_object:
encrypted_key = file_object.read()
file_object = storage_backend.get(filepath)
encrypted_key = file_object.read()
file_object.close()

# Decrypt the loaded key file, calling the 'cryptography' library to generate
# the derived encryption key from 'password'. Raise
Expand Down
17 changes: 9 additions & 8 deletions tests/test_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ def test_import_rsa_privatekey_from_file(self):
# Non-existent key file.
nonexistent_keypath = os.path.join(temporary_directory,
'nonexistent_keypath')
self.assertRaises(IOError, interface.import_rsa_privatekey_from_file,
nonexistent_keypath, 'pw')
self.assertRaises(securesystemslib.exceptions.StorageError,
interface.import_rsa_privatekey_from_file, nonexistent_keypath, 'pw')

# Invalid key file argument.
invalid_keyfile = os.path.join(temporary_directory, 'invalid_keyfile')
Expand Down Expand Up @@ -252,8 +252,8 @@ def test_import_rsa_publickey_from_file(self):
# Non-existent key file.
nonexistent_keypath = os.path.join(temporary_directory,
'nonexistent_keypath')
self.assertRaises(IOError, interface.import_rsa_publickey_from_file,
nonexistent_keypath)
self.assertRaises(securesystemslib.exceptions.StorageError,
interface.import_rsa_publickey_from_file, nonexistent_keypath)

# Invalid key file argument.
invalid_keyfile = os.path.join(temporary_directory, 'invalid_keyfile')
Expand Down Expand Up @@ -426,8 +426,9 @@ def test_import_ed25519_privatekey_from_file(self):
# Non-existent key file.
nonexistent_keypath = os.path.join(temporary_directory,
'nonexistent_keypath')
self.assertRaises(IOError, interface.import_ed25519_privatekey_from_file,
nonexistent_keypath, 'pw')
self.assertRaises(securesystemslib.exceptions.StorageError,
interface.import_ed25519_privatekey_from_file, nonexistent_keypath,
'pw')

# Invalid key file argument.
invalid_keyfile = os.path.join(temporary_directory, 'invalid_keyfile')
Expand Down Expand Up @@ -576,8 +577,8 @@ def test_import_ecdsa_privatekey_from_file(self):
# Test invalid argument.
# Non-existent key file.
nonexistent_keypath = os.path.join(temporary_directory, 'nonexistent_keypath')
self.assertRaises(IOError, interface.import_ecdsa_privatekey_from_file,
nonexistent_keypath, 'pw')
self.assertRaises(securesystemslib.exceptions.StorageError,
interface.import_ecdsa_privatekey_from_file, nonexistent_keypath, 'pw')

# Invalid key file argument.
invalid_keyfile = os.path.join(temporary_directory, 'invalid_keyfile')
Expand Down

0 comments on commit 35d93c6

Please sign in to comment.