|
6 | 6 |
|
7 | 7 | import collections |
8 | 8 | import contextlib |
9 | | -import itertools |
10 | 9 | import typing |
11 | 10 |
|
12 | 11 | from cryptography import utils, x509 |
|
33 | 32 | ) |
34 | 33 | from cryptography.hazmat.primitives.ciphers.algorithms import ( |
35 | 34 | AES, |
36 | | - AES128, |
37 | | - AES256, |
38 | | - ARC4, |
39 | | - SM4, |
40 | | - Camellia, |
41 | | - ChaCha20, |
42 | | - TripleDES, |
43 | | - _BlowfishInternal, |
44 | | - _CAST5Internal, |
45 | | - _IDEAInternal, |
46 | | - _SEEDInternal, |
47 | 35 | ) |
48 | 36 | from cryptography.hazmat.primitives.ciphers.modes import ( |
49 | 37 | CBC, |
50 | | - CFB, |
51 | | - CFB8, |
52 | | - CTR, |
53 | | - ECB, |
54 | | - GCM, |
55 | | - OFB, |
56 | | - XTS, |
57 | 38 | Mode, |
58 | 39 | ) |
59 | 40 | from cryptography.hazmat.primitives.serialization.pkcs12 import ( |
|
69 | 50 |
|
70 | 51 | # Not actually supported, just used as a marker for some serialization tests. |
71 | 52 | class _RC2: |
72 | | - pass |
| 53 | + key_size = 128 |
73 | 54 |
|
74 | 55 |
|
75 | 56 | class Backend: |
@@ -128,12 +109,6 @@ def __init__(self) -> None: |
128 | 109 | self._lib = self._binding.lib |
129 | 110 | self._fips_enabled = rust_openssl.is_fips_enabled() |
130 | 111 |
|
131 | | - self._cipher_registry: dict[ |
132 | | - tuple[type[CipherAlgorithm], type[Mode]], |
133 | | - typing.Callable, |
134 | | - ] = {} |
135 | | - self._register_default_ciphers() |
136 | | - |
137 | 112 | def __repr__(self) -> str: |
138 | 113 | return "<OpenSSLBackend(version: {}, FIPS: {}, Legacy: {})>".format( |
139 | 114 | self.openssl_version_text(), |
@@ -217,93 +192,7 @@ def cipher_supported(self, cipher: CipherAlgorithm, mode: Mode) -> bool: |
217 | 192 | if not isinstance(cipher, self._fips_ciphers): |
218 | 193 | return False |
219 | 194 |
|
220 | | - try: |
221 | | - adapter = self._cipher_registry[type(cipher), type(mode)] |
222 | | - except KeyError: |
223 | | - return False |
224 | | - evp_cipher = adapter(self, cipher, mode) |
225 | | - return self._ffi.NULL != evp_cipher |
226 | | - |
227 | | - def register_cipher_adapter(self, cipher_cls, mode_cls, adapter) -> None: |
228 | | - if (cipher_cls, mode_cls) in self._cipher_registry: |
229 | | - raise ValueError( |
230 | | - f"Duplicate registration for: {cipher_cls} {mode_cls}." |
231 | | - ) |
232 | | - self._cipher_registry[cipher_cls, mode_cls] = adapter |
233 | | - |
234 | | - def _register_default_ciphers(self) -> None: |
235 | | - for cipher_cls in [AES, AES128, AES256]: |
236 | | - for mode_cls in [CBC, CTR, ECB, OFB, CFB, CFB8, GCM]: |
237 | | - self.register_cipher_adapter( |
238 | | - cipher_cls, |
239 | | - mode_cls, |
240 | | - GetCipherByName( |
241 | | - "{cipher.name}-{cipher.key_size}-{mode.name}" |
242 | | - ), |
243 | | - ) |
244 | | - for mode_cls in [CBC, CTR, ECB, OFB, CFB]: |
245 | | - self.register_cipher_adapter( |
246 | | - Camellia, |
247 | | - mode_cls, |
248 | | - GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}"), |
249 | | - ) |
250 | | - for mode_cls in [CBC, CFB, CFB8, OFB]: |
251 | | - self.register_cipher_adapter( |
252 | | - TripleDES, mode_cls, GetCipherByName("des-ede3-{mode.name}") |
253 | | - ) |
254 | | - self.register_cipher_adapter( |
255 | | - TripleDES, ECB, GetCipherByName("des-ede3") |
256 | | - ) |
257 | | - # ChaCha20 uses the Long Name "chacha20" in OpenSSL, but in LibreSSL |
258 | | - # it uses "chacha" |
259 | | - self.register_cipher_adapter( |
260 | | - ChaCha20, |
261 | | - type(None), |
262 | | - GetCipherByName( |
263 | | - "chacha" if self._lib.CRYPTOGRAPHY_IS_LIBRESSL else "chacha20" |
264 | | - ), |
265 | | - ) |
266 | | - self.register_cipher_adapter(AES, XTS, _get_xts_cipher) |
267 | | - for mode_cls in [ECB, CBC, OFB, CFB, CTR, GCM]: |
268 | | - self.register_cipher_adapter( |
269 | | - SM4, mode_cls, GetCipherByName("sm4-{mode.name}") |
270 | | - ) |
271 | | - # Don't register legacy ciphers if they're unavailable. Hypothetically |
272 | | - # this wouldn't be necessary because we test availability by seeing if |
273 | | - # we get an EVP_CIPHER * in the _CipherContext __init__, but OpenSSL 3 |
274 | | - # will return a valid pointer even though the cipher is unavailable. |
275 | | - if ( |
276 | | - self._binding._legacy_provider_loaded |
277 | | - or not self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER |
278 | | - ): |
279 | | - for mode_cls in [CBC, CFB, OFB, ECB]: |
280 | | - self.register_cipher_adapter( |
281 | | - _BlowfishInternal, |
282 | | - mode_cls, |
283 | | - GetCipherByName("bf-{mode.name}"), |
284 | | - ) |
285 | | - for mode_cls in [CBC, CFB, OFB, ECB]: |
286 | | - self.register_cipher_adapter( |
287 | | - _SEEDInternal, |
288 | | - mode_cls, |
289 | | - GetCipherByName("seed-{mode.name}"), |
290 | | - ) |
291 | | - for cipher_cls, mode_cls in itertools.product( |
292 | | - [_CAST5Internal, _IDEAInternal], |
293 | | - [CBC, OFB, CFB, ECB], |
294 | | - ): |
295 | | - self.register_cipher_adapter( |
296 | | - cipher_cls, |
297 | | - mode_cls, |
298 | | - GetCipherByName("{cipher.name}-{mode.name}"), |
299 | | - ) |
300 | | - self.register_cipher_adapter( |
301 | | - ARC4, type(None), GetCipherByName("rc4") |
302 | | - ) |
303 | | - # We don't actually support RC2, this is just used by some tests. |
304 | | - self.register_cipher_adapter( |
305 | | - _RC2, type(None), GetCipherByName("rc2") |
306 | | - ) |
| 195 | + return rust_openssl.ciphers.cipher_supported(cipher, mode) |
307 | 196 |
|
308 | 197 | def pbkdf2_hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool: |
309 | 198 | return self.hmac_supported(algorithm) |
@@ -1119,34 +1008,4 @@ def _load_pkcs7_certificates(self, p7) -> list[x509.Certificate]: |
1119 | 1008 | return certs |
1120 | 1009 |
|
1121 | 1010 |
|
1122 | | -class GetCipherByName: |
1123 | | - def __init__(self, fmt: str): |
1124 | | - self._fmt = fmt |
1125 | | - |
1126 | | - def __call__(self, backend: Backend, cipher: CipherAlgorithm, mode: Mode): |
1127 | | - cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower() |
1128 | | - evp_cipher = backend._lib.EVP_get_cipherbyname( |
1129 | | - cipher_name.encode("ascii") |
1130 | | - ) |
1131 | | - |
1132 | | - # try EVP_CIPHER_fetch if present |
1133 | | - if ( |
1134 | | - evp_cipher == backend._ffi.NULL |
1135 | | - and backend._lib.Cryptography_HAS_300_EVP_CIPHER |
1136 | | - ): |
1137 | | - evp_cipher = backend._lib.EVP_CIPHER_fetch( |
1138 | | - backend._ffi.NULL, |
1139 | | - cipher_name.encode("ascii"), |
1140 | | - backend._ffi.NULL, |
1141 | | - ) |
1142 | | - |
1143 | | - backend._consume_errors() |
1144 | | - return evp_cipher |
1145 | | - |
1146 | | - |
1147 | | -def _get_xts_cipher(backend: Backend, cipher: AES, mode): |
1148 | | - cipher_name = f"aes-{cipher.key_size // 2}-xts" |
1149 | | - return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii")) |
1150 | | - |
1151 | | - |
1152 | 1011 | backend = Backend() |
0 commit comments