88import  pkcs11 
99import  pkcs11 .util .ec 
1010
11+ from  pathlib  import  Path 
12+ from  urllib .parse  import  unquote , urlparse 
13+ 
1114from  .general  import  KeyClass 
1215
1316
17+ def  find_library (filename ):
18+     search_directory  =  '/usr' 
19+     for  root , directorios , filenames  in  os .walk (search_directory ):
20+         if  filename  in  filenames :
21+             return  os .path .join (
22+                 root ,
23+                 filename ,
24+             )
25+ 
26+ def  unquote_to_bytes (urlencoded_string ):
27+     """Replace %xx escapes by their single-character equivalent, 
28+ using the “iso-8859-1” encoding to decode all 8-bit values. 
29+     """ 
30+     return  bytes (
31+         unquote (urlencoded_string , encoding = 'iso-8859-1' ),
32+         encoding = 'iso-8859-1' 
33+     )
34+ 
35+ def  get_pkcs11_uri_params (uri ):
36+     uri_tokens  =  urlparse (uri )
37+     assert (uri_tokens .scheme  ==  'pkcs11' )
38+     assert (uri_tokens .query  ==  '' )
39+     assert (uri_tokens .fragment  ==  '' )
40+     return  {
41+         unquote_to_bytes (key ): unquote_to_bytes (value )
42+         for  key , value 
43+         in  [
44+             line .split ('=' )
45+             for  line 
46+             in  uri_tokens .path .split (';' )
47+         ]
48+     }
49+ 
1450class  PKCS11UsageError (Exception ):
1551    pass 
1652
1753
1854class  PKCS11 (KeyClass ):
19-     def  __init__ (self , env = os .environ ):
20-         lib  =  pkcs11 .lib (env ['PKCS11_MODULE' ])
21-         self .token  =  lib .get_token (token_label = env ['PKCS11_TOKEN_LABEL' ])
22-         self .key_id  =  bytes .fromhex (env ['PKCS11_KEY_ID' ])
23-         self .session  =  self .token .open (user_pin = env ['PKCS11_PIN' ])
24- 
25-     def  __del__ (self ):
26-         if  hasattr (self , 'session' ):
27-             self .session .close ()
55+     def  __init__ (self , uri , env = os .environ ):
56+         if  not  'PKCS11_PIN'  in  env .keys ():
57+             raise  RuntimeError ("Environment variable PKCS11_PIN not set. Set it to the user PIN." )
58+         params  =  get_pkcs11_uri_params (uri )
59+         assert (b'serial'  in  params .keys ())
60+         self .user_pin  =  env ['PKCS11_PIN' ]
61+ 
62+         # Try to find PKCS #11 module path like this: 
63+         # Use the environment variable $PKCS11_MODULE, 
64+         # then fall back to OpenSC (for NitroKey HSM). 
65+         pkcs11_module_path  =  '' 
66+         try :
67+             pkcs11_module_path  =  env ['PKCS11_MODULE' ]
68+             if  not  Path (pkcs11_module_path ).is_file ():
69+                 raise  RuntimeError ('$PKCS11_MODULE is not a file: %s'  %  pkcs11_module_path )
70+         except  KeyError :
71+             pkcs11_module_path  =  find_library ('opensc-pkcs11.so' )
72+             if  pkcs11_module_path  is  None :
73+                 raise  RuntimeError ('$PKCS11_MODULE not set and OpenSC not found.' )
74+ 
75+         lib  =  '' 
76+         try :
77+             lib  =  pkcs11 .lib (pkcs11_module_path )
78+         except  RuntimeError :
79+             pass   # happens if lib does not exist or is corrupt 
80+         if  ''  ==  lib :
81+             raise  RuntimeError ('PKCS #11 module not loaded.' )
82+ 
83+         self .token  =  lib .get_token (token_serial = params [b'serial' ])
84+         # try to open a session to see if the PIN is valid 
85+         with  self .token .open (user_pin = self .user_pin ) as  session :
86+             pass 
87+         self .key_id  =  params [b'id' ]
2888
2989    def  shortname (self ):
3090        return  "ecdsa" 
@@ -33,11 +93,16 @@ def _unsupported(self, name):
3393        raise  PKCS11UsageError ("Operation {} requires private key" .format (name ))
3494
3595    def  get_public_bytes (self ):
36-         pub  =  self .session .get_key (
37-             id = self .key_id , key_type = pkcs11 .KeyType .EC , object_class = pkcs11 .ObjectClass .PUBLIC_KEY )
38-         key  =  oscrypto .asymmetric .load_public_key (
39-             pkcs11 .util .ec .encode_ec_public_key (pub ))
40-         key  =  pkcs11 .util .ec .encode_ec_public_key (pub )
96+         with  self .token .open (user_pin = self .user_pin ) as  session :
97+             pub  =  session .get_key (
98+                 id = self .key_id ,
99+                 key_type = pkcs11 .KeyType .EC ,
100+                 object_class = pkcs11 .ObjectClass .PUBLIC_KEY 
101+             )
102+             key  =  oscrypto .asymmetric .load_public_key (
103+                 pkcs11 .util .ec .encode_ec_public_key (pub )
104+             )
105+             key  =  pkcs11 .util .ec .encode_ec_public_key (pub )
41106        return  key 
42107
43108    def  get_private_bytes (self , minimal ):
@@ -48,11 +113,16 @@ def export_private(self, path, passwd=None):
48113
49114    def  export_public (self , path ):
50115        """Write the public key to the given file.""" 
51-         pub  =  self .session .get_key (
52-             id = self .key_id , key_type = pkcs11 .KeyType .EC , object_class = pkcs11 .ObjectClass .PUBLIC_KEY )
53-         key  =  oscrypto .asymmetric .load_public_key (
54-             pkcs11 .util .ec .encode_ec_public_key (pub ))
55-         pem  =  oscrypto .asymmetric .dump_public_key (key , encoding = 'pem' )
116+         with  self .token .open (user_pin = self .user_pin ) as  session :
117+             pub  =  session .get_key (
118+                 id = self .key_id ,
119+                 key_type = pkcs11 .KeyType .EC ,
120+                 object_class = pkcs11 .ObjectClass .PUBLIC_KEY 
121+             )
122+             key  =  oscrypto .asymmetric .load_public_key (
123+                 pkcs11 .util .ec .encode_ec_public_key (pub )
124+             )
125+             pem  =  oscrypto .asymmetric .dump_public_key (key , encoding = 'pem' )
56126
57127        with  open (path , 'wb' ) as  f :
58128            f .write (pem )
@@ -74,10 +144,16 @@ def sig_len(self):
74144
75145    def  raw_sign (self , payload ):
76146        """Return the actual signature""" 
77-         priv  =  self .session .get_key (
78-             id = self .key_id , key_type = pkcs11 .KeyType .EC , object_class = pkcs11 .ObjectClass .PRIVATE_KEY )
79-         sig  =  priv .sign (hashlib .sha256 (payload ).digest (),
80-                         mechanism = pkcs11 .Mechanism .ECDSA )
147+         with  self .token .open (user_pin = self .user_pin ) as  session :
148+             priv  =  session .get_key (
149+                 id = self .key_id ,
150+                 key_type = pkcs11 .KeyType .EC ,
151+                 object_class = pkcs11 .ObjectClass .PRIVATE_KEY 
152+             )
153+             sig  =  priv .sign (
154+                 hashlib .sha256 (payload ).digest (),
155+                 mechanism = pkcs11 .Mechanism .ECDSA 
156+             )
81157        return  pkcs11 .util .ec .encode_ecdsa_signature (sig )
82158
83159    def  sign (self , payload ):
0 commit comments