Skip to content

Commit

Permalink
Merge pull request #488 from lukpueh/add-GPGKey
Browse files Browse the repository at this point in the history
Add GPGKey and use with GPGSigner
  • Loading branch information
lukpueh authored Mar 9, 2023
2 parents be32a9f + cc14e51 commit 7252521
Show file tree
Hide file tree
Showing 6 changed files with 323 additions and 61 deletions.
2 changes: 1 addition & 1 deletion securesystemslib/gpg/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ def get_pubkey_bundle(data, keyid):
):
if public_key and public_key["keyid"].endswith(keyid.lower()):
if idx > 1:
log.warning(
log.debug(
"Exporting master key '{}' including subkeys '{}' for" # pylint: disable=logging-format-interpolation,consider-using-f-string
" passed keyid '{}'.".format(
master_public_key["keyid"],
Expand Down
6 changes: 5 additions & 1 deletion securesystemslib/signer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
Some implementations are provided by default but more can be added by users.
"""
from securesystemslib.signer._gcp_signer import GCPSigner
from securesystemslib.signer._gpg_signer import GPGSigner
from securesystemslib.signer._gpg_signer import GPGKey, GPGSigner
from securesystemslib.signer._hsm_signer import HSMSigner
from securesystemslib.signer._key import KEY_FOR_TYPE_AND_SCHEME, Key, SSlibKey
from securesystemslib.signer._signature import Signature
Expand All @@ -23,6 +23,7 @@
SSlibSigner.FILE_URI_SCHEME: SSlibSigner,
GCPSigner.SCHEME: GCPSigner,
HSMSigner.SCHEME: HSMSigner,
GPGSigner.SCHEME: GPGSigner,
}
)

Expand All @@ -47,5 +48,8 @@
("rsa", "rsa-pkcs1v15-sha384"): SSlibKey,
("rsa", "rsa-pkcs1v15-sha512"): SSlibKey,
("sphincs", "sphincs-shake-128s"): SSlibKey,
("rsa", "pgp+rsa-pkcsv1.5"): GPGKey,
("dsa", "pgp+dsa-fips-180-2"): GPGKey,
("eddsa", "pgp+eddsa-ed25519"): GPGKey,
}
)
203 changes: 180 additions & 23 deletions securesystemslib/signer/_gpg_signer.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,107 @@
"""Signer implementation for OpenPGP """
from typing import Dict, Optional

import securesystemslib.gpg.functions as gpg
import logging
from typing import Any, Dict, Optional, Tuple
from urllib import parse

from securesystemslib import exceptions, formats
from securesystemslib.gpg import exceptions as gpg_exceptions
from securesystemslib.gpg import functions as gpg
from securesystemslib.signer._key import Key
from securesystemslib.signer._signer import SecretsHandler, Signature, Signer

logger = logging.getLogger(__name__)


class GPGKey(Key):
"""OpenPGP Key.
*All parameters named below are not just constructor arguments but also
instance attributes.*
Attributes:
keyid: Key identifier that is unique within the metadata it is used in.
It is also used to identify the GnuPG local user signing key.
ketytype: Key type, e.g. "rsa", "dsa" or "eddsa".
scheme: Signing schemes, e.g. "pgp+rsa-pkcsv1.5", "pgp+dsa-fips-180-2",
"pgp+eddsa-ed25519".
keyval: Opaque key content.
unrecognized_fields: Dictionary of all attributes that are not managed
by Securesystemslib
"""

@classmethod
def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "GPGKey":
keytype = key_dict.pop("keytype")
scheme = key_dict.pop("scheme")
keyval = key_dict.pop("keyval")

return cls(keyid, keytype, scheme, keyval, key_dict)

def to_dict(self) -> Dict:
return {
"keytype": self.keytype,
"scheme": self.scheme,
"keyval": self.keyval,
**self.unrecognized_fields,
}

def verify_signature(self, signature: Signature, data: bytes) -> None:
try:
if not gpg.verify_signature(
GPGSigner._sig_to_legacy_dict( # pylint: disable=protected-access
signature
),
GPGSigner._key_to_legacy_dict( # pylint: disable=protected-access
self
),
data,
):
raise exceptions.UnverifiedSignatureError(
f"Failed to verify signature by {self.keyid}"
)
except (
exceptions.FormatError,
exceptions.UnsupportedLibraryError,
) as e:
logger.info("Key %s failed to verify sig: %s", self.keyid, str(e))
raise exceptions.VerificationError(
f"Unknown failure to verify signature by {self.keyid}"
) from e


class GPGSigner(Signer):
"""OpenPGP Signer
Runs command in ``GNUPG`` environment variable to sign, fallback commands are
Runs command in ``GNUPG`` environment variable to sign. Fallback commands are
``gpg2`` and ``gpg``.
Supported signing schemes are: "pgp+rsa-pkcsv1.5", "pgp+dsa-fips-180-2" and
"pgp+eddsa-ed25519", with SHA-256 hashing.
GPGSigner can be instantiated with Signer.from_priv_key_uri(). These private key URI
schemes are supported:
* "gnupg:[<GnuPG homedir>]":
Signs with GnuPG key in keyring in home dir. The signing key is
identified with the keyid of the passed public key. If homedir is not
passed, the default homedir is used.
Arguments:
keyid: GnuPG local user signing key id. If not passed, the default key is used.
public_key: The related public key instance.
homedir: GnuPG home directory path. If not passed, the default homedir is used.
"""

SCHEME = "gnupg"

def __init__(
self, keyid: Optional[str] = None, homedir: Optional[str] = None
self,
public_key: Key,
homedir: Optional[str] = None,
):
self.keyid = keyid
self.homedir = homedir
self.public_key = public_key

@classmethod
def from_priv_key_uri(
Expand All @@ -35,41 +110,123 @@ def from_priv_key_uri(
public_key: Key,
secrets_handler: Optional[SecretsHandler] = None,
) -> "GPGSigner":
raise NotImplementedError("Incompatible with private key URIs")
if not isinstance(public_key, GPGKey):
raise ValueError(f"expected GPGKey for {priv_key_uri}")

uri = parse.urlparse(priv_key_uri)

if uri.scheme != cls.SCHEME:
raise ValueError(f"GPGSigner does not support {priv_key_uri}")

homedir = uri.path or None

return cls(public_key, homedir)

@staticmethod
def _to_gpg_sig(sig: Signature) -> Dict:
"""Helper to convert Signature -> internal gpg signature format."""
def _sig_to_legacy_dict(sig: Signature) -> Dict:
"""Helper to convert Signature to internal gpg signature dict format."""
sig_dict = sig.to_dict()
sig_dict["signature"] = sig_dict.pop("sig")
return sig_dict

@staticmethod
def _from_gpg_sig(sig_dict: Dict) -> Signature:
"""Helper to convert internal gpg signature format -> Signature."""
def _sig_from_legacy_dict(sig_dict: Dict) -> Signature:
"""Helper to convert internal gpg signature format to Signature."""
sig_dict["sig"] = sig_dict.pop("signature")
return Signature.from_dict(sig_dict)

@staticmethod
def _key_to_legacy_dict(key: GPGKey) -> Dict[str, Any]:
"""Returns legacy dictionary representation of self."""
return {
"keyid": key.keyid,
"type": key.keytype,
"method": key.scheme,
"hashes": [formats.GPG_HASH_ALGORITHM_STRING],
"keyval": key.keyval,
}

@staticmethod
def _key_from_legacy_dict(key_dict: Dict[str, Any]) -> GPGKey:
"""Create GPGKey from legacy dictionary representation."""
keyid = key_dict["keyid"]
keytype = key_dict["type"]
scheme = key_dict["method"]
keyval = key_dict["keyval"]

return GPGKey(keyid, keytype, scheme, keyval)

@classmethod
def import_(
cls, keyid: str, homedir: Optional[str] = None
) -> Tuple[str, Key]:
"""Load key and signer details from GnuPG keyring.
NOTE: Information about the key validity (expiration, revocation, etc.)
is discarded at import and not considered when verifying a signature.
Args:
keyid: GnuPG local user signing key id.
homedir: GnuPG home directory path. If not passed, the default homedir is
used.
Raises:
UnsupportedLibraryError: The gpg command or pyca/cryptography are
not available.
KeyNotFoundError: No key was found for the passed keyid.
Returns:
Tuple of private key uri and the public key.
"""
uri = f"{cls.SCHEME}:{homedir or ''}"

raw_key = gpg.export_pubkey(keyid, homedir)
raw_keys = [raw_key] + list(raw_key.pop("subkeys", {}).values())
keyids = []

for key in raw_keys:
if key["keyid"] == keyid:
# TODO: Raise here if key is expired, revoked, incapable, ...
public_key = cls._key_from_legacy_dict(key)
break
keyids.append(key["keyid"])

else:
raise gpg_exceptions.KeyNotFoundError(
f"No exact match found for passed keyid"
f" {keyid}, found: {keyids}."
)

return (uri, public_key)

def sign(self, payload: bytes) -> Signature:
"""Signs payload with ``gpg``.
"""Signs payload with GnuPG.
Arguments:
payload: bytes to be signed.
Raises:
ValueError: The gpg command failed to create a valid signature.
OSError: the gpg command is not present or non-executable.
securesystemslib.exceptions.UnsupportedLibraryError: The gpg
command is not available, or the cryptography library is
not installed.
securesystemslib.gpg.exceptions.CommandError: The gpg command
returned a non-zero exit code.
securesystemslib.gpg.exceptions.KeyNotFoundError: The used gpg
version is not fully supported.
ValueError: gpg command failed to create a valid signature.
OSError: gpg command is not present or non-executable.
securesystemslib.exceptions.UnsupportedLibraryError: gpg command is not
available, or the cryptography library is not installed.
securesystemslib.gpg.exceptions.CommandError: gpg command returned a
non-zero exit code.
securesystemslib.gpg.exceptions.KeyNotFoundError: gpg version is not fully
supported.
Returns:
Signature.
"""
return self._from_gpg_sig(
gpg.create_signature(payload, self.keyid, self.homedir)
raw_sig = gpg.create_signature(
payload, self.public_key.keyid, self.homedir
)
if raw_sig["keyid"] != self.public_key.keyid:
raise ValueError(
f"The signing key {raw_sig['keyid']} does not"
f" match the attached public key {self.public_key.keyid}."
)

return self._sig_from_legacy_dict(raw_sig)
13 changes: 13 additions & 0 deletions tests/check_public_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@
import securesystemslib.gpg.util # pylint: disable=wrong-import-position
import securesystemslib.interface # pylint: disable=wrong-import-position
import securesystemslib.keys # pylint: disable=wrong-import-position
from securesystemslib.exceptions import (
UnsupportedLibraryError,
VerificationError,
)
from securesystemslib.signer import GPGKey, Signature


class TestPublicInterfaces(
Expand Down Expand Up @@ -314,6 +319,14 @@ def test_gpg_functions(self):
securesystemslib.gpg.functions.export_pubkey("f00")
self.assertEqual(expected_error_msg, str(ctx.exception))

def test_signer(self):
"""Assert generic VerificationError from UnsupportedLibraryError."""
key = GPGKey("aa", "rsa", "pgp+rsa-pkcsv1.5", {"public": "val"})
sig = Signature("aa", "aaaaaaa", {"other_headers": "aaaaaa"})
with self.assertRaises(VerificationError) as ctx:
key.verify_signature(sig, b"data")
self.assertIsInstance(ctx.exception.__cause__, UnsupportedLibraryError)


if __name__ == "__main__":
unittest.main(verbosity=1, buffer=True)
36 changes: 26 additions & 10 deletions tests/check_public_interfaces_gpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
export_pubkeys,
verify_signature,
)
from securesystemslib.signer import GPGKey, GPGSigner, Signer


class TestPublicInterfacesGPG(
Expand All @@ -47,17 +48,28 @@ def setUpClass(cls):

def test_gpg_functions(self):
"""Signing, key export and util functions must raise on missing gpg."""
with self.assertRaises(UnsupportedLibraryError) as ctx:
create_signature("bar")
self.assertEqual(NO_GPG_MSG, str(ctx.exception))

with self.assertRaises(UnsupportedLibraryError) as ctx:
export_pubkey("f00")
self.assertEqual(NO_GPG_MSG, str(ctx.exception))

with self.assertRaises(UnsupportedLibraryError) as ctx:
export_pubkeys(["f00"])
self.assertEqual(NO_GPG_MSG, str(ctx.exception))
# Hand-crafting a GPG public key and loading a signer works w/o gpg, but
# signing fails (see below).
mock_public_key = GPGKey(
"aa",
"rsa",
"pgp+rsa-pkcsv1.5",
{"public": {"key": "value"}},
)
signer = Signer.from_priv_key_uri("gnupg:?id=abcd", mock_public_key)

# Run commands that require gpg and assert error plus message
for fn, args in (
(create_signature, ("bar",)),
(export_pubkey, ("f00",)),
(export_pubkeys, (["f00"],)),
(GPGSigner.import_, ("keyid",)),
(signer.sign, (b"data",)),
):
with self.assertRaises(UnsupportedLibraryError) as ctx:
fn(*args)
self.assertEqual(NO_GPG_MSG, str(ctx.exception))

def test_gpg_verify(self):
"""Signature verification does not require gpg to be installed on the host.
Expand Down Expand Up @@ -139,6 +151,10 @@ def test_gpg_verify(self):

for key, sig in key_signature_pairs:
self.assertTrue(verify_signature(sig, key, data))
# pylint: disable=protected-access
GPGSigner._key_from_legacy_dict(key).verify_signature(
GPGSigner._sig_from_legacy_dict(sig), data
)


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit 7252521

Please sign in to comment.