Skip to content

Commit 33c6400

Browse files
amoskopprretanubun
authored andcommitted
Refactor imgtool.py to handle PKCS mcu-tools#11 URIs
ref: mcu-tools#599 which is still not mainlined ref: grandcentrix/mcuboot@d7c58b62 part 3 of a 3 commit series
1 parent 9f60bd3 commit 33c6400

File tree

6 files changed

+283
-61
lines changed

6 files changed

+283
-61
lines changed

scripts/imgtool/keys/__init__.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,22 +50,21 @@ class PasswordRequired(Exception):
5050

5151

5252
def load(path, passwd=None):
53-
"""Try loading a key from the given path.
54-
Returns None if the password wasn't specified."""
55-
if path == "pkcs11":
53+
if path.startswith("pkcs11:"):
5654
try:
57-
return PKCS11()
55+
return PKCS11(path) # assume a PKCS #11 URI according to RFC7512
5856
except pkcs11.exceptions.PinIncorrect:
5957
print('ERROR: WRONG PIN')
6058
sys.exit(1)
6159
except pkcs11.exceptions.PinLocked:
6260
print('ERROR: WRONG PIN, MAX ATTEMPTS REACHED. CONTACT YOUR SECURITY OFFICER.')
6361
sys.exit(1)
6462
except pkcs11.exceptions.DataLenRange:
65-
print('ERROR: \'DataLenRange\' (maybe the PIN is too short?)')
63+
print('ERROR: PIN IS TOO SHORT OR TOO LONG')
6664
sys.exit(1)
6765

68-
"""Try loading a key from the given path. Returns None if the password wasn't specified."""
66+
"""Try loading a key from the given path.
67+
Returns None if the password wasn't specified."""
6968
with open(path, 'rb') as f:
7069
raw_pem = f.read()
7170
try:

scripts/imgtool/keys/imgtool_keys_pkcs11.py

Lines changed: 99 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,83 @@
88
import pkcs11
99
import pkcs11.util.ec
1010

11+
from pathlib import Path
12+
from urllib.parse import unquote, urlparse
13+
1114
from .general import KeyClass
1215

1316

17+
def find_library(filename):
18+
search_directory = '/usr'
19+
for root, directorios, filenames in os.walk(search_directory):
20+
if filename in filenames:
21+
return os.path.join(
22+
root,
23+
filename,
24+
)
25+
26+
def unquote_to_bytes(urlencoded_string):
27+
"""Replace %xx escapes by their single-character equivalent,
28+
using the “iso-8859-1” encoding to decode all 8-bit values.
29+
"""
30+
return bytes(
31+
unquote(urlencoded_string, encoding='iso-8859-1'),
32+
encoding='iso-8859-1'
33+
)
34+
35+
def get_pkcs11_uri_params(uri):
36+
uri_tokens = urlparse(uri)
37+
assert(uri_tokens.scheme == 'pkcs11')
38+
assert(uri_tokens.query == '')
39+
assert(uri_tokens.fragment == '')
40+
return {
41+
unquote_to_bytes(key): unquote_to_bytes(value)
42+
for key, value
43+
in [
44+
line.split('=')
45+
for line
46+
in uri_tokens.path.split(';')
47+
]
48+
}
49+
1450
class PKCS11UsageError(Exception):
1551
pass
1652

1753

1854
class PKCS11(KeyClass):
19-
def __init__(self, env=os.environ):
20-
lib = pkcs11.lib(env['PKCS11_MODULE'])
21-
self.token = lib.get_token(token_label=env['PKCS11_TOKEN_LABEL'])
22-
self.key_id = bytes.fromhex(env['PKCS11_KEY_ID'])
23-
self.session = self.token.open(user_pin=env['PKCS11_PIN'])
24-
25-
def __del__(self):
26-
if hasattr(self, 'session'):
27-
self.session.close()
55+
def __init__(self, uri, env=os.environ):
56+
if not 'PKCS11_PIN' in env.keys():
57+
raise RuntimeError("Environment variable PKCS11_PIN not set. Set it to the user PIN.")
58+
params = get_pkcs11_uri_params(uri)
59+
assert(b'serial' in params.keys())
60+
self.user_pin = env['PKCS11_PIN']
61+
62+
# Try to find PKCS #11 module path like this:
63+
# Use the environment variable $PKCS11_MODULE,
64+
# then fall back to OpenSC (for NitroKey HSM).
65+
pkcs11_module_path = ''
66+
try:
67+
pkcs11_module_path = env['PKCS11_MODULE']
68+
if not Path(pkcs11_module_path).is_file():
69+
raise RuntimeError('$PKCS11_MODULE is not a file: %s' % pkcs11_module_path)
70+
except KeyError:
71+
pkcs11_module_path = find_library('opensc-pkcs11.so')
72+
if pkcs11_module_path is None:
73+
raise RuntimeError('$PKCS11_MODULE not set and OpenSC not found.')
74+
75+
lib = ''
76+
try:
77+
lib = pkcs11.lib(pkcs11_module_path)
78+
except RuntimeError:
79+
pass # happens if lib does not exist or is corrupt
80+
if '' == lib:
81+
raise RuntimeError('PKCS #11 module not loaded.')
82+
83+
self.token = lib.get_token(token_serial=params[b'serial'])
84+
# try to open a session to see if the PIN is valid
85+
with self.token.open(user_pin=self.user_pin) as session:
86+
pass
87+
self.key_id = params[b'id']
2888

2989
def shortname(self):
3090
return "ecdsa"
@@ -33,11 +93,16 @@ def _unsupported(self, name):
3393
raise PKCS11UsageError("Operation {} requires private key".format(name))
3494

3595
def get_public_bytes(self):
36-
pub = self.session.get_key(
37-
id=self.key_id, key_type=pkcs11.KeyType.EC, object_class=pkcs11.ObjectClass.PUBLIC_KEY)
38-
key = oscrypto.asymmetric.load_public_key(
39-
pkcs11.util.ec.encode_ec_public_key(pub))
40-
key = pkcs11.util.ec.encode_ec_public_key(pub)
96+
with self.token.open(user_pin=self.user_pin) as session:
97+
pub = session.get_key(
98+
id=self.key_id,
99+
key_type=pkcs11.KeyType.EC,
100+
object_class=pkcs11.ObjectClass.PUBLIC_KEY
101+
)
102+
key = oscrypto.asymmetric.load_public_key(
103+
pkcs11.util.ec.encode_ec_public_key(pub)
104+
)
105+
key = pkcs11.util.ec.encode_ec_public_key(pub)
41106
return key
42107

43108
def get_private_bytes(self, minimal):
@@ -48,11 +113,16 @@ def export_private(self, path, passwd=None):
48113

49114
def export_public(self, path):
50115
"""Write the public key to the given file."""
51-
pub = self.session.get_key(
52-
id=self.key_id, key_type=pkcs11.KeyType.EC, object_class=pkcs11.ObjectClass.PUBLIC_KEY)
53-
key = oscrypto.asymmetric.load_public_key(
54-
pkcs11.util.ec.encode_ec_public_key(pub))
55-
pem = oscrypto.asymmetric.dump_public_key(key, encoding='pem')
116+
with self.token.open(user_pin=self.user_pin) as session:
117+
pub = session.get_key(
118+
id=self.key_id,
119+
key_type=pkcs11.KeyType.EC,
120+
object_class=pkcs11.ObjectClass.PUBLIC_KEY
121+
)
122+
key = oscrypto.asymmetric.load_public_key(
123+
pkcs11.util.ec.encode_ec_public_key(pub)
124+
)
125+
pem = oscrypto.asymmetric.dump_public_key(key, encoding='pem')
56126

57127
with open(path, 'wb') as f:
58128
f.write(pem)
@@ -74,10 +144,16 @@ def sig_len(self):
74144

75145
def raw_sign(self, payload):
76146
"""Return the actual signature"""
77-
priv = self.session.get_key(
78-
id=self.key_id, key_type=pkcs11.KeyType.EC, object_class=pkcs11.ObjectClass.PRIVATE_KEY)
79-
sig = priv.sign(hashlib.sha256(payload).digest(),
80-
mechanism=pkcs11.Mechanism.ECDSA)
147+
with self.token.open(user_pin=self.user_pin) as session:
148+
priv = session.get_key(
149+
id=self.key_id,
150+
key_type=pkcs11.KeyType.EC,
151+
object_class=pkcs11.ObjectClass.PRIVATE_KEY
152+
)
153+
sig = priv.sign(
154+
hashlib.sha256(payload).digest(),
155+
mechanism=pkcs11.Mechanism.ECDSA
156+
)
81157
return pkcs11.util.ec.encode_ecdsa_signature(sig)
82158

83159
def sign(self, payload):

0 commit comments

Comments
 (0)