|  | 
|  | 1 | +""" | 
|  | 2 | +PKCS11 key management | 
|  | 3 | +""" | 
|  | 4 | +# SPDX-License-Identifier: Apache-2.0 | 
|  | 5 | + | 
|  | 6 | +import hashlib | 
|  | 7 | +import os | 
|  | 8 | +import pkcs11 | 
|  | 9 | +import pkcs11.util.ec | 
|  | 10 | + | 
|  | 11 | +from cryptography.exceptions import InvalidSignature | 
|  | 12 | +from cryptography.hazmat.primitives import hashes | 
|  | 13 | +from cryptography.hazmat.primitives.serialization import ( | 
|  | 14 | +    load_der_public_key, | 
|  | 15 | +    Encoding, | 
|  | 16 | +    PublicFormat | 
|  | 17 | +) | 
|  | 18 | +from cryptography.hazmat.primitives.asymmetric.ec import ( | 
|  | 19 | +    ECDSA, SECP256R1, SECP384R1, | 
|  | 20 | +    EllipticCurvePublicKey | 
|  | 21 | +) | 
|  | 22 | +from urllib.parse import unquote, urlparse | 
|  | 23 | + | 
|  | 24 | +from .general import KeyClass | 
|  | 25 | + | 
|  | 26 | + | 
|  | 27 | +def unquote_to_bytes(urlencoded_string): | 
|  | 28 | +    """Replace %xx escapes by their single-character equivalent, | 
|  | 29 | +    using the “iso-8859-1” encoding to decode all 8-bit values. | 
|  | 30 | +    """ | 
|  | 31 | +    return bytes( | 
|  | 32 | +        unquote(urlencoded_string, encoding='iso-8859-1'), | 
|  | 33 | +        encoding='iso-8859-1' | 
|  | 34 | +    ) | 
|  | 35 | + | 
|  | 36 | +def get_pkcs11_uri_params(uri): | 
|  | 37 | +    """Return a dict of decoded URI key=val pairs | 
|  | 38 | +    """ | 
|  | 39 | +    uri_tokens = urlparse(uri) | 
|  | 40 | +    assert uri_tokens.scheme == 'pkcs11' | 
|  | 41 | +    assert uri_tokens.query == '' | 
|  | 42 | +    assert uri_tokens.fragment == '' | 
|  | 43 | +    return { | 
|  | 44 | +        unquote_to_bytes(key): unquote_to_bytes(value) | 
|  | 45 | +        for key, value | 
|  | 46 | +        in [ | 
|  | 47 | +            line.split('=') | 
|  | 48 | +            for line | 
|  | 49 | +            in uri_tokens.path.split(';') | 
|  | 50 | +        ] | 
|  | 51 | +    } | 
|  | 52 | + | 
|  | 53 | +class PKCS11UsageError(Exception): | 
|  | 54 | +    pass | 
|  | 55 | + | 
|  | 56 | + | 
|  | 57 | +class PKCS11(KeyClass): | 
|  | 58 | +    """ | 
|  | 59 | +    Wrapper around an ECDSA P384 key accessed via PKCS#11 URIs | 
|  | 60 | +    """ | 
|  | 61 | +    def __init__(self, uri, env=None): | 
|  | 62 | +        if env is None: | 
|  | 63 | +            env = os.environ | 
|  | 64 | +        if 'PKCS11_PIN' not in env: | 
|  | 65 | +            raise RuntimeError("Environment variable PKCS11_PIN not set. Set it to the user PIN.") | 
|  | 66 | +        params = get_pkcs11_uri_params(uri) | 
|  | 67 | +        assert b'serial' in params | 
|  | 68 | +        assert b'id' in params or b'label' in params | 
|  | 69 | +        self.user_pin = env['PKCS11_PIN'] | 
|  | 70 | + | 
|  | 71 | +        # Fall back to OpenSC | 
|  | 72 | +        pkcs11_module_path = env.get('PKCS11_MODULE', 'opensc-pkcs11.so') | 
|  | 73 | + | 
|  | 74 | +        try: | 
|  | 75 | +            lib = pkcs11.lib(pkcs11_module_path) | 
|  | 76 | +        except RuntimeError: | 
|  | 77 | +            raise RuntimeError(f"PKCS11 module {pkcs11_module_path} not loaded.") | 
|  | 78 | + | 
|  | 79 | +        self.token = lib.get_token(token_serial=params[b'serial']) | 
|  | 80 | +        # try to open a session to see if the PIN is valid | 
|  | 81 | +        with self.token.open(user_pin=self.user_pin) as _: | 
|  | 82 | +            pass | 
|  | 83 | +        self.key_id = params.get(b'id', None) | 
|  | 84 | +        self.key_label = params.get(b'label', None) | 
|  | 85 | +        self.key_label = self.key_label.decode('utf-8') if self.key_label else None | 
|  | 86 | + | 
|  | 87 | +    def shortname(self): | 
|  | 88 | +        return "ecdsa" | 
|  | 89 | + | 
|  | 90 | +    def _unsupported(self, name): | 
|  | 91 | +        raise PKCS11UsageError(f"Operation {name} requires private key") | 
|  | 92 | + | 
|  | 93 | +    def get_public_bytes(self): | 
|  | 94 | +        with self.token.open(user_pin=self.user_pin) as session: | 
|  | 95 | +            pub = session.get_key( | 
|  | 96 | +                id=self.key_id, | 
|  | 97 | +                label=self.key_label, | 
|  | 98 | +                key_type=pkcs11.KeyType.EC, | 
|  | 99 | +                object_class=pkcs11.ObjectClass.PUBLIC_KEY | 
|  | 100 | +            ) | 
|  | 101 | +            key = pkcs11.util.ec.encode_ec_public_key(pub) | 
|  | 102 | +        return key | 
|  | 103 | + | 
|  | 104 | +    def get_private_bytes(self, minimal): | 
|  | 105 | +        self._unsupported('get_private_bytes') | 
|  | 106 | + | 
|  | 107 | +    def export_private(self, path, passwd=None): | 
|  | 108 | +        self._unsupported('export_private') | 
|  | 109 | + | 
|  | 110 | +    def export_public(self, path): | 
|  | 111 | +        """Write the public key to the given file.""" | 
|  | 112 | +        with self.token.open(user_pin=self.user_pin) as session: | 
|  | 113 | +            pub = session.get_key( | 
|  | 114 | +                id=self.key_id, | 
|  | 115 | +                label=self.key_label, | 
|  | 116 | +                key_type=pkcs11.KeyType.EC, | 
|  | 117 | +                object_class=pkcs11.ObjectClass.PUBLIC_KEY | 
|  | 118 | +            ) | 
|  | 119 | +            # Encode to DER | 
|  | 120 | +            der_bytes = pkcs11.util.ec.encode_ec_public_key(pub) | 
|  | 121 | + | 
|  | 122 | +            # Convert to PEM using cryptography | 
|  | 123 | +            public_key = load_der_public_key(der_bytes) | 
|  | 124 | +            pem = public_key.public_bytes( | 
|  | 125 | +                encoding=Encoding.PEM, | 
|  | 126 | +                format=PublicFormat.SubjectPublicKeyInfo | 
|  | 127 | +            ) | 
|  | 128 | + | 
|  | 129 | +        with open(path, 'wb') as f: | 
|  | 130 | +            f.write(pem) | 
|  | 131 | + | 
|  | 132 | +    def sig_type(self): | 
|  | 133 | +        return "ECDSA384_SHA384" | 
|  | 134 | + | 
|  | 135 | +    def sig_tlv(self): | 
|  | 136 | +        return "ECDSASIG" | 
|  | 137 | + | 
|  | 138 | +    def sig_len(self): | 
|  | 139 | +        # Early versions of MCUboot (< v1.5.0) required ECDSA | 
|  | 140 | +        # signatures to be padded to a fixed length. Because the DER | 
|  | 141 | +        # encoding is done with signed integers, the size of the | 
|  | 142 | +        # signature will vary depending on whether the high bit is set | 
|  | 143 | +        # in each value. This padding was done in a | 
|  | 144 | +        # not-easily-reversible way (by just adding zeros). | 
|  | 145 | +        # | 
|  | 146 | +        # The signing code no longer requires this padding, and newer | 
|  | 147 | +        # versions of MCUboot don't require it. But, continue to | 
|  | 148 | +        # return the total length so that the padding can be done if | 
|  | 149 | +        # requested. | 
|  | 150 | +        return 103 | 
|  | 151 | + | 
|  | 152 | +    def raw_sign(self, payload): | 
|  | 153 | +        """Return the actual signature""" | 
|  | 154 | +        with self.token.open(user_pin=self.user_pin) as session: | 
|  | 155 | +            priv = session.get_key( | 
|  | 156 | +                id=self.key_id, | 
|  | 157 | +                label=self.key_label, | 
|  | 158 | +                key_type=pkcs11.KeyType.EC, | 
|  | 159 | +                object_class=pkcs11.ObjectClass.PRIVATE_KEY | 
|  | 160 | +            ) | 
|  | 161 | +            sig = priv.sign( | 
|  | 162 | +                hashlib.sha384(payload).digest(), | 
|  | 163 | +                mechanism=pkcs11.Mechanism.ECDSA | 
|  | 164 | +            ) | 
|  | 165 | +        return pkcs11.util.ec.encode_ecdsa_signature(sig) | 
|  | 166 | + | 
|  | 167 | +    def sign(self, payload): | 
|  | 168 | +        """Return signature with legacy padding""" | 
|  | 169 | +        # To make fixed length, pad with one or two zeros. | 
|  | 170 | +        while True: | 
|  | 171 | +            sig = self.raw_sign(payload) | 
|  | 172 | +            if sig[-1] != 0x00: | 
|  | 173 | +                break | 
|  | 174 | + | 
|  | 175 | +        sig += b'\000' * (self.sig_len() - len(sig)) | 
|  | 176 | +        return sig | 
|  | 177 | + | 
|  | 178 | +    def verify(self, signature, payload): | 
|  | 179 | +        """Verify the signature of the payload""" | 
|  | 180 | +        # strip possible paddings added during sign | 
|  | 181 | +        signature = signature[:signature[1] + 2] | 
|  | 182 | + | 
|  | 183 | +        # Load public key from DER bytes | 
|  | 184 | +        public_key = load_der_public_key(self.get_public_bytes()) | 
|  | 185 | + | 
|  | 186 | +        if not isinstance(public_key, EllipticCurvePublicKey): | 
|  | 187 | +            raise TypeError(f"Unsupported key type: {type(public_key).__name__}") | 
|  | 188 | + | 
|  | 189 | +        # Determine correct hash algorithm based on curve | 
|  | 190 | +        if isinstance(public_key.curve, SECP256R1): | 
|  | 191 | +            hash_alg = hashes.SHA256() | 
|  | 192 | +        elif isinstance(public_key.curve, SECP384R1): | 
|  | 193 | +            hash_alg = hashes.SHA384() | 
|  | 194 | +        else: | 
|  | 195 | +            raise ValueError(f"Unsupported curve: {public_key.curve.name}") | 
|  | 196 | + | 
|  | 197 | +        try: | 
|  | 198 | +            # Attempt ECDSA verification | 
|  | 199 | +            public_key.verify(signature, payload, ECDSA(hash_alg)) | 
|  | 200 | +            return True | 
|  | 201 | +        except InvalidSignature: | 
|  | 202 | +            return False | 
0 commit comments