From 88189ad539a04102ec5791c3c122d7fe03d78775 Mon Sep 17 00:00:00 2001 From: Felix Fontein Date: Thu, 30 Sep 2021 22:23:16 +0200 Subject: [PATCH] Allow to serialize extension values as DER bytes string. --- CHANGELOG.rst | 2 + docs/x509/reference.rst | 8 +++ .../hazmat/bindings/_rust/x509.pyi | 1 + src/cryptography/x509/extensions.py | 4 ++ src/rust/src/x509/common.rs | 51 ++++++++++++++++++ src/rust/src/x509/crl.rs | 4 +- src/rust/src/x509/ocsp_req.rs | 2 +- src/rust/src/x509/ocsp_resp.rs | 2 +- tests/x509/test_x509_ext.py | 52 ++++++++++++++++++- 9 files changed, 121 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index ea5ab1647173f..b3aed6eb8b138 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -13,6 +13,8 @@ Changelog :func:`~cryptography.hazmat.primitives.serialization.pkcs12.load_pkcs12`, which will return an object of type :class:`~cryptography.hazmat.primitives.serialization.pkcs12.PKCS12KeyAndCertificates`. +* Extension values can now be serialized to a DER byte string by calling + :func:`~cryptography.x509.ExtensionType.public_bytes`. .. _v35-0-0: diff --git a/docs/x509/reference.rst b/docs/x509/reference.rst index e646520f00aa1..94a856c31a059 100644 --- a/docs/x509/reference.rst +++ b/docs/x509/reference.rst @@ -1589,6 +1589,14 @@ X.509 Extensions Returns the OID associated with the given extension type. + .. method:: public_bytes + + .. versionadded:: 36.0 + + :return bytes: + + A bytes string representing the extension's DER encoded value. + .. class:: KeyUsage(digital_signature, content_commitment, key_encipherment, data_encipherment, key_agreement, key_cert_sign, crl_sign, encipher_only, decipher_only) .. versionadded:: 0.9 diff --git a/src/cryptography/hazmat/bindings/_rust/x509.pyi b/src/cryptography/hazmat/bindings/_rust/x509.pyi index c5905dc514b02..79aeb63c40cf4 100644 --- a/src/cryptography/hazmat/bindings/_rust/x509.pyi +++ b/src/cryptography/hazmat/bindings/_rust/x509.pyi @@ -12,6 +12,7 @@ def load_der_x509_crl(data: bytes) -> x509.CertificateRevocationList: ... def load_pem_x509_csr(data: bytes) -> x509.CertificateSigningRequest: ... def load_der_x509_csr(data: bytes) -> x509.CertificateSigningRequest: ... def encode_name_bytes(name: x509.Name) -> bytes: ... +def encode_extension_value(extension: x509.ExtensionType) -> bytes: ... def create_x509_certificate( builder: x509.CertificateBuilder, private_key: PRIVATE_KEY_TYPES, diff --git a/src/cryptography/x509/extensions.py b/src/cryptography/x509/extensions.py index bbcb128723cd9..e5d4203aff8fa 100644 --- a/src/cryptography/x509/extensions.py +++ b/src/cryptography/x509/extensions.py @@ -11,6 +11,7 @@ from cryptography import utils from cryptography.hazmat.bindings._rust import asn1 +from cryptography.hazmat.bindings._rust import x509 as rust_x509 from cryptography.hazmat.primitives import constant_time, serialization from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey @@ -90,6 +91,9 @@ def __init__(self, msg: str, oid: ObjectIdentifier) -> None: class ExtensionType(metaclass=abc.ABCMeta): oid: typing.ClassVar[ObjectIdentifier] + def public_bytes(self) -> bytes: + return rust_x509.encode_extension_value(self) + class Extensions(object): def __init__( diff --git a/src/rust/src/x509/common.rs b/src/rust/src/x509/common.rs index ef1fd84a8885b..2bfa11ec8d4ef 100644 --- a/src/rust/src/x509/common.rs +++ b/src/rust/src/x509/common.rs @@ -3,6 +3,7 @@ // for complete details. use crate::asn1::PyAsn1Error; +use crate::x509; use chrono::{Datelike, TimeZone, Timelike}; use pyo3::ToPyObject; use std::collections::HashSet; @@ -646,6 +647,55 @@ pub(crate) fn encode_extensions< ))) } +#[pyo3::prelude::pyfunction] +fn encode_extension_value<'p>( + py: pyo3::Python<'p>, + py_ext: &'p pyo3::PyAny, +) -> pyo3::PyResult<&'p pyo3::types::PyBytes> { + let unrecognized_extension_type: &pyo3::types::PyType = py + .import("cryptography.x509")? + .getattr("UnrecognizedExtension")? + .extract()?; + if unrecognized_extension_type.is_instance(py_ext)? { + return Ok(pyo3::types::PyBytes::new( + py, + py_ext.getattr("value")?.extract::<&[u8]>()?, + )); + } + + let oid = asn1::ObjectIdentifier::from_string( + py_ext + .getattr("oid")? + .getattr("dotted_string")? + .extract::<&str>()?, + ) + .unwrap(); + + for encode_ext in [ + x509::certificate::encode_certificate_extension, + x509::crl::encode_crl_entry_extension, + x509::crl::encode_crl_extension, + x509::ocsp_req::encode_ocsp_request_extension, + x509::ocsp_resp::encode_ocsp_basic_response_extension, + ] + .iter() + { + match encode_ext(&oid, py_ext)? { + Some(data) => { + // TODO: extra copy + let py_data = pyo3::types::PyBytes::new(py, &data); + return Ok(py_data); + } + None => {} + } + } + + return Err(pyo3::exceptions::PyNotImplementedError::new_err(format!( + "Extension not supported: {}", + oid + ))); +} + pub(crate) fn chrono_to_py<'p>( py: pyo3::Python<'p>, dt: &chrono::DateTime, @@ -743,6 +793,7 @@ mod tests { } pub(crate) fn add_to_module(module: &pyo3::prelude::PyModule) -> pyo3::PyResult<()> { + module.add_wrapped(pyo3::wrap_pyfunction!(encode_extension_value))?; module.add_wrapped(pyo3::wrap_pyfunction!(encode_name_bytes))?; Ok(()) diff --git a/src/rust/src/x509/crl.rs b/src/rust/src/x509/crl.rs index cbcd127b25815..bcc940f7444b5 100644 --- a/src/rust/src/x509/crl.rs +++ b/src/rust/src/x509/crl.rs @@ -644,7 +644,7 @@ pub fn parse_crl_entry_ext<'p>( } } -fn encode_crl_extension( +pub(crate) fn encode_crl_extension( oid: &asn1::ObjectIdentifier<'_>, ext: &pyo3::PyAny, ) -> pyo3::PyResult>> { @@ -709,7 +709,7 @@ fn encode_crl_extension( } } -fn encode_crl_entry_extension( +pub(crate) fn encode_crl_entry_extension( oid: &asn1::ObjectIdentifier<'_>, ext: &pyo3::PyAny, ) -> pyo3::PyResult>> { diff --git a/src/rust/src/x509/ocsp_req.rs b/src/rust/src/x509/ocsp_req.rs index a1552e2fd4aba..c83b0d6bb04b0 100644 --- a/src/rust/src/x509/ocsp_req.rs +++ b/src/rust/src/x509/ocsp_req.rs @@ -206,7 +206,7 @@ fn create_ocsp_request(py: pyo3::Python<'_>, builder: &pyo3::PyAny) -> PyAsn1Res load_der_ocsp_request(py, &data) } -fn encode_ocsp_request_extension( +pub(crate) fn encode_ocsp_request_extension( oid: &asn1::ObjectIdentifier<'_>, ext: &pyo3::PyAny, ) -> pyo3::PyResult>> { diff --git a/src/rust/src/x509/ocsp_resp.rs b/src/rust/src/x509/ocsp_resp.rs index 0b7431ed5f5dc..a60e0f292fb33 100644 --- a/src/rust/src/x509/ocsp_resp.rs +++ b/src/rust/src/x509/ocsp_resp.rs @@ -694,7 +694,7 @@ fn create_ocsp_response( load_der_ocsp_response(py, &data) } -fn encode_ocsp_basic_response_extension( +pub(crate) fn encode_ocsp_basic_response_extension( oid: &asn1::ObjectIdentifier<'_>, ext: &pyo3::PyAny, ) -> pyo3::PyResult>> { diff --git a/tests/x509/test_x509_ext.py b/tests/x509/test_x509_ext.py index a8ba114580767..a477a0d604f85 100644 --- a/tests/x509/test_x509_ext.py +++ b/tests/x509/test_x509_ext.py @@ -18,7 +18,10 @@ from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import ec from cryptography.x509 import DNSName, NameConstraints, SubjectAlternativeName -from cryptography.x509.extensions import _key_identifier_from_public_key +from cryptography.x509.extensions import ( + ExtensionType, + _key_identifier_from_public_key, +) from cryptography.x509.oid import ( AuthorityInformationAccessOID, ExtendedKeyUsageOID, @@ -198,6 +201,12 @@ def test_indexing(self): assert ext[-1] == ext[1] assert ext[0] == x509.TLSFeatureType.status_request + def test_der_string(self): + ext1 = x509.TLSFeature([x509.TLSFeatureType.status_request]) + assert ext1.public_bytes() == b"\x30\x03\x02\x01\x05" + ext2 = x509.TLSFeature([x509.TLSFeatureType.status_request_v2]) + assert ext2.public_bytes() == b"\x30\x03\x02\x01\x11" + class TestUnrecognizedExtension(object): def test_invalid_oid(self): @@ -251,6 +260,20 @@ def test_hash(self): assert hash(ext1) == hash(ext2) assert hash(ext1) != hash(ext3) + def test_der_string(self): + ext1 = x509.UnrecognizedExtension( + x509.ObjectIdentifier("1.2.3.5"), b"\x03\x02\x01" + ) + assert ext1.public_bytes() == b"\x03\x02\x01" + + # The following creates a BasicConstraints extension with an invalid + # value. The serialization code should still handle it correctly by + # special-casing UnrecognizedExtension. + ext2 = x509.UnrecognizedExtension( + x509.oid.ExtensionOID.BASIC_CONSTRAINTS, b"\x03\x02\x01" + ) + assert ext2.public_bytes() == b"\x03\x02\x01" + class TestCertificateIssuer(object): def test_iter_names(self): @@ -308,6 +331,10 @@ def test_hash(self): assert hash(ci1) == hash(ci2) assert hash(ci1) != hash(ci3) + def test_der_string(self): + ext = x509.CertificateIssuer([x509.DNSName("cryptography.io")]) + assert ext.public_bytes() == b"0\x11\x82\x0fcryptography.io" + class TestCRLReason(object): def test_invalid_reason_flags(self): @@ -2193,6 +2220,10 @@ def test_hash(self): assert hash(c1) == hash(c2) assert hash(c1) != hash(c3) + def test_der_string(self): + ext = x509.CRLNumber(15) + assert ext.public_bytes() == b"\x02\x01\x0f" + class TestSubjectAlternativeName(object): def test_get_values_for_type(self): @@ -2735,6 +2766,13 @@ def test_require_explicit_policy(self, backend): inhibit_policy_mapping=None, ) + def test_der_string(self): + ext = x509.PolicyConstraints( + require_explicit_policy=None, + inhibit_policy_mapping=0, + ) + assert ext.public_bytes() == b"\x30\x03\x81\x01\x00" + class TestAuthorityInformationAccess(object): def test_invalid_descriptions(self): @@ -5674,9 +5712,21 @@ def test_hash(self): assert hash(nonce1) == hash(nonce2) assert hash(nonce1) != hash(nonce3) + def test_der_string(self): + ext = x509.OCSPNonce(b"0" * 5) + assert ext.public_bytes() == b"00000" + def test_all_extension_oid_members_have_names_defined(): for oid in dir(ExtensionOID): if oid.startswith("__"): continue assert getattr(ExtensionOID, oid) in _OID_NAMES + + +def test_unknown_extension(): + class MyExtension(ExtensionType): + oid = x509.ObjectIdentifier("1.2.3.4") + + with pytest.raises(NotImplementedError): + MyExtension().public_bytes()