|
1 |
| -# -*- coding:utf-8 -*- |
2 |
| -# Part of Odoo. See LICENSE file for full copyright and licensing details. |
| 1 | +""" |
| 2 | + Italian E-invoice signed files content extraction. |
3 | 3 |
|
4 |
| -import logging |
5 |
| -import warnings |
| 4 | + - PyOpenSSL doesn't support ``load_pkcs7_data`` anymore. |
| 5 | + https://github.com/pyca/pyopenssl/commit/0fe822dc8d6610b8ec9ebaff626d6bf23e0a7ad3 |
| 6 | + - Cryptography is migrating PKCS7_verify to Rust, and has removed PKCS7_NOVERIFY |
| 7 | + https://github.com/pyca/cryptography/commit/615967bfab5b49e470fe7d0df44649c69fb9a847 |
| 8 | + https://github.com/pyca/cryptography/pull/8332 |
| 9 | + - ``asn1`` library is pure Python and MIT licensed, but is slower than our homemade solution |
| 10 | + https://github.com/andrivet/python-asn1/blob/master/src/asn1.py |
6 | 11 |
|
7 |
| -_logger = logging.getLogger(__name__) |
| 12 | + This version is more optimized than what we had as a fallback. |
| 13 | +""" |
8 | 14 |
|
9 |
| -try: |
10 |
| - from OpenSSL import crypto as ssl_crypto |
11 |
| - import OpenSSL._util as ssl_util |
12 |
| -except ImportError: |
13 |
| - ssl_crypto = None |
14 |
| - _logger.warning("Cannot import library 'OpenSSL' for PKCS#7 envelope extraction.") |
15 | 15 |
|
| 16 | +from contextlib import suppress |
16 | 17 |
|
17 |
| -def remove_signature(content): |
18 |
| - """ Remove the PKCS#7 envelope from given content, making a '.xml.p7m' file content readable as it was '.xml'. |
19 |
| - As OpenSSL may not be installed, in that case a warning is issued and None is returned. """ |
20 | 18 |
|
21 |
| - # Prevent using the library if it had import errors |
22 |
| - if not ssl_crypto: |
23 |
| - _logger.warning("Error reading the content, check if the OpenSSL library is installed for for PKCS#7 envelope extraction.") |
24 |
| - return None |
| 19 | +PKCS7_DATA_OID = '1.2.840.113549.1.7.1' |
| 20 | +universal_tags = [ |
| 21 | + 'Zero', 'Boolean', 'Integer', 'BitString', 'OctetString', |
| 22 | + 'Null', 'ObjectIdentifier', 'ObjectDescriptor', 'External', 'Real', |
| 23 | + 'Enumerated', 'EmbeddedPDV', 'UTF8String', 'RelativeOid', None, |
| 24 | + None, 'Sequence', 'Set', 'NumericString', 'PrintableString', |
| 25 | + 'TeletexString', 'VideotexString', 'IA5String', 'UTCTime', 'GeneralizedTime', |
| 26 | + 'GraphicString', 'VisibleString', 'GeneralString', 'UniversalString', 'CharacterString', |
| 27 | + 'BMPString', |
| 28 | +] |
25 | 29 |
|
26 |
| - # Load some tools from the library |
27 |
| - null = ssl_util.ffi.NULL |
28 |
| - verify = ssl_util.lib.PKCS7_verify |
29 | 30 |
|
30 |
| - # By default ignore the validity of the certificates, just validate the structure |
31 |
| - flags = ssl_util.lib.PKCS7_NOVERIFY | ssl_util.lib.PKCS7_NOSIGS |
| 31 | +def remove_signature(content, target=None): |
| 32 | + """ Takes a bytestring supposedly PKCS7 signed and returns its PKCS7-data only """ |
| 33 | + if target: |
| 34 | + target.remove_signature_method = '_remove_signature' |
| 35 | + try: |
| 36 | + return _remove_signature(content) |
| 37 | + except Exception: |
| 38 | + return content |
32 | 39 |
|
33 |
| - # Read the signed data fron the content |
34 |
| - out_buffer = ssl_crypto._new_mem_buf() |
35 | 40 |
|
36 |
| - # This method is deprecated, but there are actually no alternatives |
37 |
| - with warnings.catch_warnings(): |
38 |
| - warnings.filterwarnings("ignore", category=DeprecationWarning) |
39 |
| - try: |
40 |
| - loaded_data = ssl_crypto.load_pkcs7_data(ssl_crypto.FILETYPE_ASN1, content) |
41 |
| - except ssl_crypto.Error: |
42 |
| - _logger.debug("PKCS#7 signature missing or invalid. Content will be tentatively used as plain text.") |
43 |
| - return content |
| 41 | +def _remove_signature(content): |
| 42 | + """ The invoice content is inside an ASN1 node identified by PKCS7_DATA_OID (pkcs7-data). |
| 43 | + The node is defined as an OctectString, which can be composed of an arbitrary |
| 44 | + sequence of octects of string data. |
| 45 | + We visit in-order the ASN1 tree nodes until we find the pkcs7-data, then we look for content. |
| 46 | + Once we found it, we read all OctectString that get yielded by the in-order visit.. |
| 47 | + When there are no more OctectStrings, then another object will follow |
| 48 | + with its header and identifier, so we stop exploring and just return the content. |
44 | 49 |
|
45 |
| - # Verify the signature |
46 |
| - if verify(loaded_data._pkcs7, null, null, null, out_buffer, flags) != 1: |
47 |
| - ssl_crypto._raise_current_error() |
| 50 | + See also: |
| 51 | + https://datatracker.ietf.org/doc/html/rfc2315 |
| 52 | + https://www.oss.com/asn1/resources/asn1-made-simple/asn1-quick-reference/octetstring.html |
| 53 | + """ |
| 54 | + result, header_found, data_found = b'', False, False |
| 55 | + reader = Reader() |
| 56 | + for node in reader.build_from_stream(content): |
| 57 | + if node.kind == 'ObjectIdentifier' and node.content == PKCS7_DATA_OID: |
| 58 | + header_found = True |
| 59 | + if header_found and node.kind == 'OctetString': |
| 60 | + data_found = True |
| 61 | + result += node.content |
| 62 | + elif data_found: |
| 63 | + break |
| 64 | + if not header_found: |
| 65 | + raise Exception("ASN1 Header not found") |
| 66 | + if not data_found: |
| 67 | + raise Exception("ASN1 Content not found") |
| 68 | + return result |
48 | 69 |
|
49 |
| - # Get the content as a byte-string |
50 |
| - decoded_content = ssl_crypto._bio_to_string(out_buffer) |
51 |
| - return decoded_content |
| 70 | + |
| 71 | +class Asn1Node: |
| 72 | + """ Base class for Asn1 nodes """ |
| 73 | + _content = None |
| 74 | + is_primitive = False |
| 75 | + finalized = False |
| 76 | + |
| 77 | + def __init__(self, kind, start_offset, node_len): |
| 78 | + """ Initialization of the Asn1 node """ |
| 79 | + self.kind = kind |
| 80 | + self.start_offset = start_offset |
| 81 | + self.length = node_len |
| 82 | + |
| 83 | + def total_length(self): |
| 84 | + """ Get the total length of the node if defined. The definition and length bytes must be considered. """ |
| 85 | + return self.length + 2 if self.length != "?" else "?" |
| 86 | + |
| 87 | + @property |
| 88 | + def content(self): |
| 89 | + return self._content |
| 90 | + |
| 91 | + @content.setter |
| 92 | + def content(self, content): |
| 93 | + self._content = content |
| 94 | + |
| 95 | + |
| 96 | +class PrimitiveNode(Asn1Node): |
| 97 | + """ Primitive Asn1 nodes contain pure data """ |
| 98 | + is_primitive = True |
| 99 | + name = "Primitive" |
| 100 | + |
| 101 | + |
| 102 | +class ObjectIdentifierNode(PrimitiveNode): |
| 103 | + """ Asn1 Object Identifier, i.e. 1.3.6.1.5.5.7.48.1 """ |
| 104 | + @Asn1Node.content.setter |
| 105 | + def content(self, content): |
| 106 | + # Run through the content's bytes |
| 107 | + calc = 0 |
| 108 | + result = f"{content[0] // 40}.{content[0] % 40}" |
| 109 | + for octet in content[1:]: |
| 110 | + # Other positions value the less significant 7 bits, |
| 111 | + # but the most significant bit is only negative when there's a break |
| 112 | + calc = (calc << 7) + (octet % (1 << 7)) |
| 113 | + if not (octet & 0x80): |
| 114 | + result = f"{result}.{calc}" |
| 115 | + calc = 0 |
| 116 | + self._content = result |
| 117 | + |
| 118 | + |
| 119 | +class Reader: |
| 120 | + offset = 0 |
| 121 | + root = None |
| 122 | + current_node = None |
| 123 | + last_open_node = None |
| 124 | + |
| 125 | + def __init__(self, *args, **kwargs): |
| 126 | + self.open_nodes_stack = [] |
| 127 | + |
| 128 | + def finalize_last_open_node(self): |
| 129 | + """ Whenever a node is complete, it is finalized, and the references are updated """ |
| 130 | + node = self.open_nodes_stack.pop() |
| 131 | + node.content = None |
| 132 | + self.current_node = None |
| 133 | + node.end_offset = self.offset |
| 134 | + node.finalized = True |
| 135 | + self.last_open_node = self.open_nodes_stack[-1] if self.open_nodes_stack else None |
| 136 | + return node |
| 137 | + |
| 138 | + def build_from_stream(self, stream): |
| 139 | + """ Build an Asn1 tree starting from a byte string from a p7m file """ |
| 140 | + |
| 141 | + len_stream = len(stream) |
| 142 | + while self.offset < len_stream: |
| 143 | + |
| 144 | + start_offset = self.offset |
| 145 | + self.last_open_node = self.open_nodes_stack[-1] if self.open_nodes_stack else None |
| 146 | + |
| 147 | + # Read the definition and length bytes |
| 148 | + definition_byte = ord(stream[self.offset:self.offset + 1]) |
| 149 | + self.offset += 1 |
| 150 | + node_len, self.offset = self.read_length(stream, self.offset) |
| 151 | + |
| 152 | + if definition_byte == 0 and node_len == 0 and self.open_nodes_stack: |
| 153 | + yield self.finalize_last_open_node() |
| 154 | + continue |
| 155 | + |
| 156 | + # Create the current Node |
| 157 | + self.current_node = self.create_node(definition_byte, node_len, start_offset) |
| 158 | + if not self.root: |
| 159 | + self.root = self.current_node |
| 160 | + |
| 161 | + # If not primitive, add to the stack |
| 162 | + if not self.current_node.is_primitive: |
| 163 | + self.open_nodes_stack.append(self.current_node) |
| 164 | + self.last_open_node = self.current_node |
| 165 | + else: |
| 166 | + node = self.current_node |
| 167 | + new_offset = self.offset + node_len |
| 168 | + node.content = stream[self.offset:new_offset] |
| 169 | + self.offset = new_offset |
| 170 | + node.end_offset = new_offset |
| 171 | + node.finalized = True |
| 172 | + yield node |
| 173 | + |
| 174 | + # Clear the stack of all finished nodes |
| 175 | + while ( |
| 176 | + self.last_open_node |
| 177 | + and not self.last_open_node.finalized |
| 178 | + and self.last_open_node.length != '?' |
| 179 | + and self.last_open_node.start_offset + self.last_open_node.total_length() <= self.offset |
| 180 | + ): |
| 181 | + yield self.finalize_last_open_node() |
| 182 | + |
| 183 | + return self.root |
| 184 | + |
| 185 | + def read_length(self, stream, offset): |
| 186 | + """ Returns: (length of the node, bytes read, updated offset) """ |
| 187 | + |
| 188 | + # Read the first byte: if it is zero, it's a special entry. |
| 189 | + # Probably it's the second byte of a closing tag of a node (\x00 \x00 <--) |
| 190 | + |
| 191 | + first_byte = stream[offset:offset + 1] |
| 192 | + if first_byte == b'\x00': |
| 193 | + return 0, offset + 1 |
| 194 | + elif first_byte == b'\x80': |
| 195 | + # If it's the only bit being set, the length is indefinite, |
| 196 | + # and the node will terminate with a double \x00 |
| 197 | + return '?', offset + 1 |
| 198 | + first_byte_val = ord(first_byte) |
| 199 | + if first_byte < b'\x80': |
| 200 | + # If the first bit of the first length byte is on |
| 201 | + return first_byte_val, offset + 1 |
| 202 | + else: |
| 203 | + # Each byte we read is less significant, so we increase the significance of the |
| 204 | + # value we already read and increment by the current byte |
| 205 | + offset += 1 |
| 206 | + node_len = 0 |
| 207 | + length_bytes_no = first_byte_val % (1 << 7) |
| 208 | + for length_byte in stream[offset:offset + length_bytes_no]: |
| 209 | + node_len = (node_len << 8) + length_byte |
| 210 | + return node_len, offset + length_bytes_no |
| 211 | + |
| 212 | + def create_node(self, definition_byte, node_len, start_offset): |
| 213 | + """ Method to create new Asn1 nodes, given the definition bytes and the offset """ |
| 214 | + target_class = Asn1Node |
| 215 | + kind = "Indefinite" if node_len == "?" else "Container" |
| 216 | + cls = { |
| 217 | + (0, 0): 'Universal', |
| 218 | + (0, 1): 'Application', |
| 219 | + (1, 0): 'Context-specific', |
| 220 | + (1, 1): 'Private' |
| 221 | + }[( |
| 222 | + definition_byte & (1 << 7) and 1, |
| 223 | + definition_byte & (1 << 6) and 1 |
| 224 | + )] |
| 225 | + if cls == 'Universal' and not definition_byte & (1 << 5) and 1: |
| 226 | + tag = definition_byte % (1 << 5) |
| 227 | + kind = universal_tags[tag] |
| 228 | + if kind == 'ObjectIdentifier': |
| 229 | + target_class = ObjectIdentifierNode |
| 230 | + else: |
| 231 | + target_class = PrimitiveNode |
| 232 | + return target_class(kind, start_offset, node_len) |
| 233 | + |
| 234 | + |
| 235 | +if __name__ == '__main__': |
| 236 | + """ |
| 237 | + python remove_signature.py /path/to/einvoice.xml.p7m [times] |
| 238 | + """ |
| 239 | + import sys |
| 240 | + from lxml import etree |
| 241 | + from cProfile import Profile |
| 242 | + from pstats import SortKey, Stats |
| 243 | + |
| 244 | + filename = sys.argv[1] |
| 245 | + times = len(sys.argv) > 2 and sys.argv[2] |
| 246 | + |
| 247 | + with open(filename, 'rb') as f: |
| 248 | + content = f.read().rstrip() |
| 249 | + |
| 250 | + if times: |
| 251 | + with Profile() as profile: |
| 252 | + for i in range(1, int(times) + 1): |
| 253 | + result = remove_signature(content) |
| 254 | + Stats(profile).strip_dirs().sort_stats(SortKey.CALLS).print_stats() |
| 255 | + else: |
| 256 | + result = remove_signature(content) |
| 257 | + parser = etree.XMLParser(recover=True, resolve_entities=False) |
| 258 | + print(etree.tostring(etree.fromstring(result, parser)).decode()) |
0 commit comments