Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions cwt/algs/asymmetric.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing import Any, Dict, List

from certvalidator import CertificateValidator, ValidationContext
from cryptography.x509 import Certificate, DNSName, load_der_x509_certificate
from cryptography.x509.oid import NameOID
from cryptography.x509.verification import PolicyBuilder, Store

from ..cose_key_interface import COSEKeyInterface
from ..exceptions import VerifyError
Expand All @@ -11,28 +13,32 @@ def __init__(self, params: Dict[int, Any]):
super().__init__(params)

self._key: Any = b""
self._cert = b""
self._intermediates = []
self._cert: Certificate = None
self._intermediates: List[Certificate] = []

if 33 in params:
if not isinstance(params[33], (bytes, list)):
raise ValueError("x5c(33) should be bytes(bstr) or list.")
certs = [params[33]] if isinstance(params[33], bytes) else params[33]
self._cert = certs[0]
self._cert = load_der_x509_certificate(certs[0])
if len(certs) > 1:
self._intermediates = certs[1:]
for c in certs[1:]:
self._intermediates.append(load_der_x509_certificate(c))
return

def validate_certificate(self, ca_certs: List[bytes]) -> bool:
def validate_certificate(self, ca_certs: List[Certificate]) -> bool:
if not ca_certs:
raise ValueError("ca_certs should be set.")
if not self._cert:
return False

ctx = ValidationContext(trust_roots=ca_certs)
store = Store(ca_certs)
builder = PolicyBuilder().store(store)
verifier = builder.build_server_verifier(
DNSName(self._cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value)
)
try:
validator = CertificateValidator(self._cert, self._intermediates, validation_context=ctx)
validator.validate_usage(set(["digital_signature"]), extended_optional=True)
verifier.verify(self._cert, self._intermediates)
except Exception as err:
raise VerifyError("Failed to validate the certificate bound to the key.") from err
return True
7 changes: 2 additions & 5 deletions cwt/cose.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Any, Dict, List, Optional, Tuple, Union

from asn1crypto import pem
from cbor2 import CBORTag
from cryptography.x509 import load_pem_x509_certificates

from .cbor_processor import CBORProcessor
from .const import (
Expand Down Expand Up @@ -54,11 +54,8 @@ def __init__(
if ca_certs:
if not isinstance(ca_certs, str):
raise ValueError("ca_certs should be str.")
self._trust_roots: List[bytes] = []
with open(ca_certs, "rb") as f:
for _, _, der_bytes in pem.unarmor(f.read(), multiple=True):
self._ca_certs.append(der_bytes)

self._ca_certs = load_pem_x509_certificates(f.read())
if not isinstance(deterministic_header, bool):
raise ValueError("deterministic_header should be bool.")
self._deterministic_header = deterministic_header
Expand Down
Loading
Loading