From de53b3e8e563bab818b510a2e773fbc81554164b Mon Sep 17 00:00:00 2001 From: Ivan Desiatov Date: Sun, 29 Sep 2024 15:30:20 +0200 Subject: [PATCH] Remove CustomPolicyBuilder in favor of extending PolicyBuilder. --- docs/x509/verification.rst | 61 ----- .../hazmat/bindings/_rust/x509.pyi | 11 - src/cryptography/x509/verification.py | 2 - src/rust/src/lib.rs | 4 +- src/rust/src/x509/verify.rs | 223 +++++------------- tests/x509/verification/test_limbo.py | 15 +- tests/x509/verification/test_verification.py | 78 ++---- 7 files changed, 97 insertions(+), 297 deletions(-) diff --git a/docs/x509/verification.rst b/docs/x509/verification.rst index 459c398a1479..70aafd48f94c 100644 --- a/docs/x509/verification.rst +++ b/docs/x509/verification.rst @@ -297,64 +297,3 @@ the root of trust: for server verification. :returns: An instance of :class:`ClientVerifier` - -.. class:: CustomPolicyBuilder - - .. versionadded:: 44.0.0 - - A CustomPolicyBuilder provides a builder-style interface for constructing a - Verifier, but provides additional control over the verification policy compared to :class:`PolicyBuilder`. - - .. method:: time(new_time) - - Sets the verifier's verification time. - - If not called explicitly, this is set to :meth:`datetime.datetime.now` - when :meth:`build_server_verifier` or :meth:`build_client_verifier` - is called. - - :param new_time: The :class:`datetime.datetime` to use in the verifier - - :returns: A new instance of :class:`PolicyBuilder` - - .. method:: store(new_store) - - Sets the verifier's trust store. - - :param new_store: The :class:`Store` to use in the verifier - - :returns: A new instance of :class:`PolicyBuilder` - - .. method:: max_chain_depth(new_max_chain_depth) - - Sets the verifier's maximum chain building depth. - - This depth behaves tracks the length of the intermediate CA - chain: a maximum depth of zero means that the leaf must be directly - issued by a member of the store, a depth of one means no more than - one intermediate CA, and so forth. Note that self-issued intermediates - don't count against the chain depth, per RFC 5280. - - :param new_max_chain_depth: The maximum depth to allow in the verifier - - :returns: A new instance of :class:`PolicyBuilder` - - .. method:: build_server_verifier(subject) - - Builds a verifier for verifying server certificates. - - :param subject: A :class:`Subject` to use in the verifier - - :returns: An instance of :class:`ServerVerifier` - - .. method:: build_client_verifier() - - Builds a verifier for verifying client certificates. - - .. warning:: - - This API is not suitable for website (i.e. server) certificate - verification. You **must** use :meth:`build_server_verifier` - for server verification. - - :returns: An instance of :class:`ClientVerifier` diff --git a/src/cryptography/hazmat/bindings/_rust/x509.pyi b/src/cryptography/hazmat/bindings/_rust/x509.pyi index 684c60db23b6..983200df5e45 100644 --- a/src/cryptography/hazmat/bindings/_rust/x509.pyi +++ b/src/cryptography/hazmat/bindings/_rust/x509.pyi @@ -67,17 +67,6 @@ class PolicyBuilder: self, subject: x509.verification.Subject ) -> ServerVerifier: ... -class CustomPolicyBuilder: - def time(self, new_time: datetime.datetime) -> CustomPolicyBuilder: ... - def store(self, new_store: Store) -> CustomPolicyBuilder: ... - def max_chain_depth( - self, new_max_chain_depth: int - ) -> CustomPolicyBuilder: ... - def build_client_verifier(self) -> ClientVerifier: ... - def build_server_verifier( - self, subject: x509.verification.Subject - ) -> ServerVerifier: ... - class VerifiedClient: @property def subjects(self) -> list[x509.GeneralName] | None: ... diff --git a/src/cryptography/x509/verification.py b/src/cryptography/x509/verification.py index fe0153581f85..b83650681237 100644 --- a/src/cryptography/x509/verification.py +++ b/src/cryptography/x509/verification.py @@ -12,7 +12,6 @@ __all__ = [ "ClientVerifier", "PolicyBuilder", - "CustomPolicyBuilder", "ServerVerifier", "Store", "Subject", @@ -26,5 +25,4 @@ ClientVerifier = rust_x509.ClientVerifier ServerVerifier = rust_x509.ServerVerifier PolicyBuilder = rust_x509.PolicyBuilder -CustomPolicyBuilder = rust_x509.CustomPolicyBuilder VerificationError = rust_x509.VerificationError diff --git a/src/rust/src/lib.rs b/src/rust/src/lib.rs index 1c4bc3cb24cd..cd7b99f1570a 100644 --- a/src/rust/src/lib.rs +++ b/src/rust/src/lib.rs @@ -132,8 +132,8 @@ mod _rust { use crate::x509::sct::Sct; #[pymodule_export] use crate::x509::verify::{ - CustomPolicyBuilder, PolicyBuilder, PyClientVerifier, PyServerVerifier, PyStore, - PyVerifiedClient, VerificationError, + PolicyBuilder, PyClientVerifier, PyServerVerifier, PyStore, PyVerifiedClient, + VerificationError, }; } diff --git a/src/rust/src/x509/verify.rs b/src/rust/src/x509/verify.rs index 055e816d05e6..23cdc566c0b0 100644 --- a/src/rust/src/x509/verify.rs +++ b/src/rust/src/x509/verify.rs @@ -74,88 +74,15 @@ pub(crate) struct PolicyBuilder { time: Option, store: Option>, max_chain_depth: Option, -} - -#[pyo3::pymethods] -impl PolicyBuilder { - #[new] - fn new() -> PolicyBuilder { - PolicyBuilder { - time: None, - store: None, - max_chain_depth: None, - } - } - - fn time( - &self, - py: pyo3::Python<'_>, - new_time: pyo3::Bound<'_, pyo3::PyAny>, - ) -> CryptographyResult { - policy_builder_set_once_check!(self, time, "validation time"); - - Ok(PolicyBuilder { - time: Some(py_to_datetime(py, new_time)?), - store: self.store.as_ref().map(|s| s.clone_ref(py)), - max_chain_depth: self.max_chain_depth, - }) - } - - fn store(&self, new_store: pyo3::Py) -> CryptographyResult { - policy_builder_set_once_check!(self, store, "trust store"); - - Ok(PolicyBuilder { - time: self.time.clone(), - store: Some(new_store), - max_chain_depth: self.max_chain_depth, - }) - } - - fn max_chain_depth( - &self, - py: pyo3::Python<'_>, - new_max_chain_depth: u8, - ) -> CryptographyResult { - policy_builder_set_once_check!(self, max_chain_depth, "maximum chain depth"); - - Ok(PolicyBuilder { - time: self.time.clone(), - store: self.store.as_ref().map(|s| s.clone_ref(py)), - max_chain_depth: Some(new_max_chain_depth), - }) - } - - fn build_client_verifier(&self, py: pyo3::Python<'_>) -> CryptographyResult { - build_client_verifier_impl(py, &self.store, &self.time, |time| { - Policy::client(PyCryptoOps {}, time, self.max_chain_depth) - }) - } - - fn build_server_verifier( - &self, - py: pyo3::Python<'_>, - subject: pyo3::PyObject, - ) -> CryptographyResult { - build_server_verifier_impl(py, &self.store, &self.time, subject, |subject, time| { - Policy::server(PyCryptoOps {}, subject, time, self.max_chain_depth) - }) - } -} - -#[pyo3::pyclass(frozen, module = "cryptography.x509.verification")] -pub(crate) struct CustomPolicyBuilder { - time: Option, - store: Option>, - max_chain_depth: Option, ca_ext_policy: Option>, ee_ext_policy: Option>, } -impl CustomPolicyBuilder { +impl PolicyBuilder { /// Clones the builder, requires the GIL token to increase /// reference count for `self.store`. - fn py_clone(&self, py: pyo3::Python<'_>) -> CustomPolicyBuilder { - CustomPolicyBuilder { + fn py_clone(&self, py: pyo3::Python<'_>) -> PolicyBuilder { + PolicyBuilder { time: self.time.clone(), store: self.store.as_ref().map(|s| s.clone_ref(py)), max_chain_depth: self.max_chain_depth, @@ -166,10 +93,10 @@ impl CustomPolicyBuilder { } #[pyo3::pymethods] -impl CustomPolicyBuilder { +impl PolicyBuilder { #[new] - fn new() -> CustomPolicyBuilder { - CustomPolicyBuilder { + fn new() -> PolicyBuilder { + PolicyBuilder { time: None, store: None, max_chain_depth: None, @@ -182,10 +109,10 @@ impl CustomPolicyBuilder { &self, py: pyo3::Python<'_>, new_time: pyo3::Bound<'_, pyo3::PyAny>, - ) -> CryptographyResult { + ) -> CryptographyResult { policy_builder_set_once_check!(self, time, "validation time"); - Ok(CustomPolicyBuilder { + Ok(PolicyBuilder { time: Some(py_to_datetime(py, new_time)?), ..self.py_clone(py) }) @@ -195,10 +122,10 @@ impl CustomPolicyBuilder { &self, py: pyo3::Python<'_>, new_store: pyo3::Py, - ) -> CryptographyResult { + ) -> CryptographyResult { policy_builder_set_once_check!(self, store, "trust store"); - Ok(CustomPolicyBuilder { + Ok(PolicyBuilder { store: Some(new_store), ..self.py_clone(py) }) @@ -208,20 +135,36 @@ impl CustomPolicyBuilder { &self, py: pyo3::Python<'_>, new_max_chain_depth: u8, - ) -> CryptographyResult { + ) -> CryptographyResult { policy_builder_set_once_check!(self, max_chain_depth, "maximum chain depth"); - Ok(CustomPolicyBuilder { + Ok(PolicyBuilder { max_chain_depth: Some(new_max_chain_depth), ..self.py_clone(py) }) } fn build_client_verifier(&self, py: pyo3::Python<'_>) -> CryptographyResult { - build_client_verifier_impl(py, &self.store, &self.time, |time| { - // TODO: Replace with a custom policy once it's implemented in cryptography-x509-verification - Policy::client(PyCryptoOps {}, time, self.max_chain_depth) - }) + let store = match self.store.as_ref() { + Some(s) => s.clone_ref(py), + None => { + return Err(CryptographyError::from( + pyo3::exceptions::PyValueError::new_err( + "A client verifier must have a trust store.", + ), + )); + } + }; + + let time = match self.time.as_ref() { + Some(t) => t.clone(), + None => datetime_now(py)?, + }; + + // TODO: Pass extension policies here once implemented in cryptography-x509-verification. + let policy = Policy::client(PyCryptoOps {}, time, self.max_chain_depth); + + Ok(PyClientVerifier { policy, store }) } fn build_server_verifier( @@ -229,79 +172,43 @@ impl CustomPolicyBuilder { py: pyo3::Python<'_>, subject: pyo3::PyObject, ) -> CryptographyResult { - build_server_verifier_impl(py, &self.store, &self.time, subject, |subject, time| { - // TODO: Replace with a custom policy once it's implemented in cryptography-x509-verification - Policy::server(PyCryptoOps {}, subject, time, self.max_chain_depth) + let store = match self.store.as_ref() { + Some(s) => s.clone_ref(py), + None => { + return Err(CryptographyError::from( + pyo3::exceptions::PyValueError::new_err( + "A server verifier must have a trust store.", + ), + )); + } + }; + + let time = match self.time.as_ref() { + Some(t) => t.clone(), + None => datetime_now(py)?, + }; + let subject_owner = build_subject_owner(py, &subject)?; + + let policy = OwnedPolicy::try_new(subject_owner, |subject_owner| { + let subject = build_subject(py, subject_owner)?; + + // TODO: Pass extension policies here once implemented in cryptography-x509-verification. + Ok::, pyo3::PyErr>(Policy::server( + PyCryptoOps {}, + subject, + time, + self.max_chain_depth, + )) + })?; + + Ok(PyServerVerifier { + py_subject: subject, + policy, + store, }) } } -/// This is a helper to avoid code duplication between PolicyBuilder and CustomPolicyBuilder. -fn build_server_verifier_impl( - py: pyo3::Python<'_>, - store: &Option>, - time: &Option, - subject: pyo3::PyObject, - make_policy: impl Fn(Subject<'_>, asn1::DateTime) -> PyCryptoPolicy<'_>, -) -> CryptographyResult { - let store = match store { - Some(s) => s.clone_ref(py), - None => { - return Err(CryptographyError::from( - pyo3::exceptions::PyValueError::new_err( - "A server verifier must have a trust store.", - ), - )); - } - }; - - let time = match time.as_ref() { - Some(t) => t.clone(), - None => datetime_now(py)?, - }; - let subject_owner = build_subject_owner(py, &subject)?; - - let policy = OwnedPolicy::try_new(subject_owner, |subject_owner| { - let subject = build_subject(py, subject_owner)?; - Ok::, pyo3::PyErr>(make_policy(subject, time)) - })?; - - Ok(PyServerVerifier { - py_subject: subject, - policy, - store, - }) -} - -/// This is a helper to avoid code duplication between PolicyBuilder and CustomPolicyBuilder. -fn build_client_verifier_impl( - py: pyo3::Python<'_>, - store: &Option>, - time: &Option, - make_policy: impl Fn(asn1::DateTime) -> PyCryptoPolicy<'static>, -) -> CryptographyResult { - let store = match store.as_ref() { - Some(s) => s.clone_ref(py), - None => { - return Err(CryptographyError::from( - pyo3::exceptions::PyValueError::new_err( - "A client verifier must have a trust store.", - ), - )); - } - }; - - let time = match time.as_ref() { - Some(t) => t.clone(), - None => datetime_now(py)?, - }; - - Ok(PyClientVerifier { - policy: make_policy(time), - store, - }) -} - type PyCryptoPolicy<'a> = Policy<'a, PyCryptoOps>; /// This enum exists solely to provide heterogeneously typed ownership for `OwnedPolicy`. diff --git a/tests/x509/verification/test_limbo.py b/tests/x509/verification/test_limbo.py index c3298dad4a2e..d0402c4ce30a 100644 --- a/tests/x509/verification/test_limbo.py +++ b/tests/x509/verification/test_limbo.py @@ -6,7 +6,6 @@ import ipaddress import json import os -from typing import Type, Union import pytest @@ -14,7 +13,6 @@ from cryptography.x509 import load_pem_x509_certificate from cryptography.x509.verification import ( ClientVerifier, - CustomPolicyBuilder, PolicyBuilder, ServerVerifier, Store, @@ -98,11 +96,7 @@ def _get_limbo_peer(expected_peer): return x509.RFC822Name(value) -def _limbo_testcase( - id_, - testcase, - builder_type: Union[Type[PolicyBuilder], Type[CustomPolicyBuilder]], -): +def _limbo_testcase(id_, testcase): if id_ in LIMBO_SKIP_TESTCASES: pytest.skip(f"explicitly skipped testcase: {id_}") @@ -133,7 +127,7 @@ def _limbo_testcase( max_chain_depth = testcase["max_chain_depth"] should_pass = testcase["expected_result"] == "SUCCESS" - builder = builder_type().store(Store(trusted_certs)) + builder = PolicyBuilder().store(Store(trusted_certs)) if validation_time is not None: builder = builder.time(validation_time) if max_chain_depth is not None: @@ -183,8 +177,7 @@ def _limbo_testcase( verifier.verify(peer_certificate, untrusted_intermediates) -@pytest.mark.parametrize("builder_type", [PolicyBuilder, CustomPolicyBuilder]) -def test_limbo(subtests, pytestconfig, builder_type): +def test_limbo(subtests, pytestconfig): limbo_root = pytestconfig.getoption("--x509-limbo-root", skip=True) limbo_path = os.path.join(limbo_root, "limbo.json") with open(limbo_path, mode="rb") as limbo_file: @@ -194,4 +187,4 @@ def test_limbo(subtests, pytestconfig, builder_type): with subtests.test(): # NOTE: Pass in the id separately to make pytest # error renderings slightly nicer. - _limbo_testcase(testcase["id"], testcase, builder_type) + _limbo_testcase(testcase["id"], testcase) diff --git a/tests/x509/verification/test_verification.py b/tests/x509/verification/test_verification.py index f8dc6049c166..1d2f9261c57d 100644 --- a/tests/x509/verification/test_verification.py +++ b/tests/x509/verification/test_verification.py @@ -6,14 +6,12 @@ import os from functools import lru_cache from ipaddress import IPv4Address -from typing import Type, Union import pytest from cryptography import x509 from cryptography.x509.general_name import DNSName, IPAddress from cryptography.x509.verification import ( - CustomPolicyBuilder, PolicyBuilder, Store, VerificationError, @@ -30,71 +28,60 @@ def dummy_store() -> Store: return Store([cert]) -AnyPolicyBuilder = Union[PolicyBuilder, CustomPolicyBuilder] - - -@pytest.mark.parametrize("builder_type", [PolicyBuilder, CustomPolicyBuilder]) -class TestPolicyBuilderCommon: - """ - Tests functionality that is identical between - PolicyBuilder and CustomPolicyBuilder. - """ - - def test_time_already_set(self, builder_type: Type[AnyPolicyBuilder]): +class TestPolicyBuilder: + def test_time_already_set(self): with pytest.raises(ValueError): - builder_type().time(datetime.datetime.now()).time( + PolicyBuilder().time(datetime.datetime.now()).time( datetime.datetime.now() ) - def test_store_already_set(self, builder_type: Type[AnyPolicyBuilder]): + def test_store_already_set(self): with pytest.raises(ValueError): - builder_type().store(dummy_store()).store(dummy_store()) + PolicyBuilder().store(dummy_store()).store(dummy_store()) - def test_max_chain_depth_already_set( - self, builder_type: Type[AnyPolicyBuilder] - ): + def test_max_chain_depth_already_set(self): with pytest.raises(ValueError): - builder_type().max_chain_depth(8).max_chain_depth(9) + PolicyBuilder().max_chain_depth(8).max_chain_depth(9) - def test_ipaddress_subject(self, builder_type: Type[AnyPolicyBuilder]): + def test_ipaddress_subject(self): policy = ( - builder_type() + PolicyBuilder() .store(dummy_store()) .build_server_verifier(IPAddress(IPv4Address("0.0.0.0"))) ) assert policy.subject == IPAddress(IPv4Address("0.0.0.0")) - def test_dnsname_subject(self, builder_type: Type[AnyPolicyBuilder]): + def test_dnsname_subject(self): policy = ( - builder_type() + PolicyBuilder() .store(dummy_store()) .build_server_verifier(DNSName("cryptography.io")) ) assert policy.subject == DNSName("cryptography.io") - def test_subject_bad_types(self, builder_type: Type[AnyPolicyBuilder]): + def test_subject_bad_types(self): # Subject must be a supported GeneralName type with pytest.raises(TypeError): - builder_type().store(dummy_store()).build_server_verifier( + PolicyBuilder().store(dummy_store()).build_server_verifier( "cryptography.io" # type: ignore[arg-type] ) with pytest.raises(TypeError): - builder_type().store(dummy_store()).build_server_verifier( + PolicyBuilder().store(dummy_store()).build_server_verifier( "0.0.0.0" # type: ignore[arg-type] ) with pytest.raises(TypeError): - builder_type().store(dummy_store()).build_server_verifier( + PolicyBuilder().store(dummy_store()).build_server_verifier( IPv4Address("0.0.0.0") # type: ignore[arg-type] ) with pytest.raises(TypeError): - builder_type().store(dummy_store()).build_server_verifier(None) # type: ignore[arg-type] + PolicyBuilder().store(dummy_store()).build_server_verifier(None) # type: ignore[arg-type] - def test_builder_pattern(self, builder_type: Type[AnyPolicyBuilder]): + def test_builder_pattern(self): now = datetime.datetime.now().replace(microsecond=0) store = dummy_store() max_chain_depth = 16 - builder = builder_type() + builder = PolicyBuilder() builder = builder.time(now) builder = builder.store(store) builder = builder.max_chain_depth(max_chain_depth) @@ -105,13 +92,11 @@ def test_builder_pattern(self, builder_type: Type[AnyPolicyBuilder]): assert verifier.store == store assert verifier.max_chain_depth == max_chain_depth - def test_build_server_verifier_missing_store( - self, builder_type: Type[AnyPolicyBuilder] - ): + def test_build_server_verifier_missing_store(self): with pytest.raises( ValueError, match="A server verifier must have a trust store" ): - builder_type().build_server_verifier(DNSName("cryptography.io")) + PolicyBuilder().build_server_verifier(DNSName("cryptography.io")) class TestStore: @@ -124,23 +109,14 @@ def test_store_rejects_non_certificates(self): Store(["not a cert"]) # type: ignore[list-item] -@pytest.mark.parametrize( - "builder_type", - [ - PolicyBuilder, - CustomPolicyBuilder, - ], -) class TestClientVerifier: - def test_build_client_verifier_missing_store( - self, builder_type: Type[AnyPolicyBuilder] - ): + def test_build_client_verifier_missing_store(self): with pytest.raises( ValueError, match="A client verifier must have a trust store" ): - builder_type().build_client_verifier() + PolicyBuilder().build_client_verifier() - def test_verify(self, builder_type: Type[AnyPolicyBuilder]): + def test_verify(self): # expires 2018-11-16 01:15:03 UTC leaf = _load_cert( os.path.join("x509", "cryptography.io.pem"), @@ -152,7 +128,7 @@ def test_verify(self, builder_type: Type[AnyPolicyBuilder]): validation_time = datetime.datetime.fromisoformat( "2018-11-16T00:00:00+00:00" ) - builder = builder_type().store(store) + builder = PolicyBuilder().store(store) builder = builder.time(validation_time).max_chain_depth(16) verifier = builder.build_client_verifier() @@ -168,9 +144,7 @@ def test_verify(self, builder_type: Type[AnyPolicyBuilder]): assert x509.DNSName("cryptography.io") in verified_client.subjects assert len(verified_client.subjects) == 2 - def test_verify_fails_renders_oid( - self, builder_type: Type[AnyPolicyBuilder] - ): + def test_verify_fails_renders_oid(self): leaf = _load_cert( os.path.join("x509", "custom", "ekucrit-testuser-cert.pem"), x509.load_pem_x509_certificate, @@ -182,7 +156,7 @@ def test_verify_fails_renders_oid( "2024-06-26T00:00:00+00:00" ) - builder = builder_type().store(store) + builder = PolicyBuilder().store(store) builder = builder.time(validation_time) verifier = builder.build_client_verifier()