From 9e0ab5826add363b1e72de16c3e235157f49ada8 Mon Sep 17 00:00:00 2001 From: Shubham Patil Date: Tue, 12 Nov 2024 16:14:02 +0530 Subject: [PATCH] da_revocation: align the revocation set generation algorithm with spec changes (#36225) * da_revocation: align the revocation set generation algorithm with spec changes * Add types to few methods * address review comments * add the vid/pid checks for PAI delegated crl signer * get_paa_cert_for_crl_issuer is actually fetching the issuer cert rather than the PAA --- credentials/generate-revocation-set.py | 309 +++++++++++++++++-------- 1 file changed, 208 insertions(+), 101 deletions(-) diff --git a/credentials/generate-revocation-set.py b/credentials/generate-revocation-set.py index e0d0cb611fa072..4cdcfdbad1ae26 100644 --- a/credentials/generate-revocation-set.py +++ b/credentials/generate-revocation-set.py @@ -26,6 +26,7 @@ import subprocess import sys from enum import Enum +from typing import Optional import click import requests @@ -91,6 +92,112 @@ def parse_vid_pid_from_distinguished_name(distinguished_name): return vid, pid +def get_akid(cert: x509.Certificate) -> Optional[bytes]: + try: + return cert.extensions.get_extension_for_oid(x509.OID_AUTHORITY_KEY_IDENTIFIER).value.key_identifier + except Exception: + logging.warning("AKID not found in certificate") + return None + + +def get_skid(cert: x509.Certificate) -> Optional[bytes]: + try: + return cert.extensions.get_extension_for_oid(x509.OID_SUBJECT_KEY_IDENTIFIER).value.key_identifier + except Exception: + logging.warning("SKID not found in certificate") + return None + + +def get_subject_b64(cert: x509.Certificate) -> str: + return base64.b64encode(cert.subject.public_bytes()).decode('utf-8') + + +def get_issuer_b64(cert: x509.Certificate) -> str: + return base64.b64encode(cert.issuer.public_bytes()).decode('utf-8') + + +def verify_cert(cert: x509.Certificate, root: x509.Certificate) -> bool: + ''' + Verifies if the cert is signed by root. + ''' + + cert_akid = get_akid(cert) + root_skid = get_skid(root) + if cert_akid is None or root_skid is None or cert_akid != root_skid: + return False + + if cert.issuer != root.subject: + return False + + # public_key().verify() do not return anything if signature is valid, + # will raise an exception if signature is invalid + try: + root.public_key().verify(cert.signature, cert.tbs_certificate_bytes, ec.ECDSA(cert.signature_hash_algorithm)) + except Exception: + logging.warning(f"Signature verification failed for cert subject: {get_subject_b64(cert)}, issuer: {get_issuer_b64(cert)}") + return False + + return True + + +def is_self_signed_certificate(cert: x509.Certificate) -> bool: + return verify_cert(cert, cert) + + +# delegator is optional so can be None, but crl_signer and paa has to be present +def validate_cert_chain(crl_signer: x509.Certificate, crl_signer_delegator: x509.Certificate, paa: x509.Certificate): + ''' + There could be four scenarios: + 1. CRL Signer is PAA itself, hence its self-signed certificate + 2. CRL Signer is PAI certificate, and we can validate (crl_signer -> paa) chain + 3. CRL Signer delegator is PAA, and we can validate (crl_signer -> crl_signer_delegator(paa) -> paa) chain + 4. CRL Signer delegator is PAI, and we can validate (crl_signer -> crl_signer_delegator -> paa) chain + ''' + + if crl_signer_delegator: + return verify_cert(crl_signer, crl_signer_delegator) and verify_cert(crl_signer_delegator, paa) + else: + return verify_cert(crl_signer, paa) + + +def validate_vid_pid(revocation_point: dict, crl_signer_certificate: x509.Certificate, crl_signer_delegator_certificate: x509.Certificate) -> bool: + crl_signer_vid, crl_signer_pid = parse_vid_pid_from_distinguished_name(crl_signer_certificate.subject) + + if revocation_point["isPAA"]: + if crl_signer_vid is not None: + if revocation_point["vid"] != crl_signer_vid: + logging.warning("VID in CRL Signer Certificate does not match with VID in revocation point, continue...") + return False + else: + vid_to_match = crl_signer_vid + pid_to_match = crl_signer_pid + + # if the CRL Signer is delegated then match the VID and PID of the CRL Signer Delegator + if crl_signer_delegator_certificate: + vid_to_match, pid_to_match = parse_vid_pid_from_distinguished_name(crl_signer_delegator_certificate.subject) + + if vid_to_match is None or revocation_point["vid"] != vid_to_match: + logging.warning("VID in CRL Signer Certificate does not match with VID in revocation point, continue...") + return False + + if pid_to_match is not None: + if revocation_point["pid"] != pid_to_match: + logging.warning("PID in CRL Signer Certificate does not match with PID in revocation point, continue...") + return False + + return True + + +def fetch_crl_from_url(url: str, timeout: int) -> x509.CertificateRevocationList: + logging.debug(f"Fetching CRL from {url}") + + try: + r = requests.get(url, timeout=timeout) + return x509.load_der_x509_crl(r.content) + except Exception: + logging.error('Failed to fetch a valid CRL') + + class DCLDClient: ''' A client for interacting with DCLD using either the REST API or command line interface (CLI). @@ -172,30 +279,50 @@ def get_revocation_points(self) -> list[dict]: return response["PkiRevocationDistributionPoint"] - def get_paa_cert_for_crl_issuer(self, crl_signer_issuer_name_b64, crl_signer_authority_key_id) -> str: + def get_issuer_cert(self, cert: x509.Certificate) -> Optional[x509.Certificate]: ''' - Get PAA certificate for CRL issuer + Get the issuer certificate for Parameters ---------- - crl_signer_issuer_name_b64: str - The issuer name of the CRL signer. - crl_signer_authority_key_id: str - The authority key ID of the CRL signer. + cert: x509.Certificate + Certificate Returns ------- str - PAA certificate in PEM format + Issuer certificate in PEM format ''' + issuer_name_b64 = get_issuer_b64(cert) + akid = get_akid(cert) + if akid is None: + return + + # Convert CRL Signer AKID to colon separated hex + akid_hex = akid.hex().upper() + akid_hex = ':'.join([akid_hex[i:i+2] for i in range(0, len(akid_hex), 2)]) + + logging.debug( + f"Fetching issuer from:{self.rest_node_url}/dcl/pki/certificates/{issuer_name_b64}/{akid_hex}") + if self.use_rest: response = requests.get( - f"{self.rest_node_url}/dcl/pki/certificates/{crl_signer_issuer_name_b64}/{crl_signer_authority_key_id}").json() + f"{self.rest_node_url}/dcl/pki/certificates/{issuer_name_b64}/{akid_hex}").json() else: response = self.get_dcld_cmd_output_json( - ['query', 'pki', 'x509-cert', '-u', crl_signer_issuer_name_b64, '-k', crl_signer_authority_key_id]) + ['query', 'pki', 'x509-cert', '-u', issuer_name_b64, '-k', akid_hex]) + + issuer_certificate = response["approvedCertificates"]["certs"][0]["pemCert"] + + logging.debug(f"issuer: {issuer_certificate}") - return response["approvedCertificates"]["certs"][0]["pemCert"] + try: + issuer_certificate_object = x509.load_pem_x509_certificate(bytes(issuer_certificate, 'utf-8')) + except Exception: + logging.error('Failed to parse PAA certificate') + return + + return issuer_certificate_object def get_revocations_points_by_skid(self, issuer_subject_key_id) -> list[dict]: ''' @@ -211,6 +338,7 @@ def get_revocations_points_by_skid(self, issuer_subject_key_id) -> list[dict]: list[dict] List of revocation points ''' + if self.use_rest: response = requests.get(f"{self.rest_node_url}/dcl/pki/revocation-points/{issuer_subject_key_id}").json() else: @@ -268,97 +396,55 @@ def main(use_main_net_dcld: str, use_test_net_dcld: str, use_main_net_http: bool continue # 2. Parse the certificate - crl_signer_certificate = x509.load_pem_x509_certificate(bytes(revocation_point["crlSignerCertificate"], 'utf-8')) - - vid = revocation_point["vid"] - pid = revocation_point["pid"] - is_paa = revocation_point["isPAA"] - - # 3. && 4. Validate VID/PID - crl_vid, crl_pid = parse_vid_pid_from_distinguished_name(crl_signer_certificate.subject) - - if is_paa: - if crl_vid is not None: - if vid != crl_vid: - logging.warning("VID is not CRL VID, continue...") - continue - else: - if crl_vid is None or vid != crl_vid: - logging.warning("VID is not CRL VID, continue...") - continue - if crl_pid is not None: - if pid != crl_pid: - logging.warning("PID is not CRL PID, continue...") - continue - - # 5. Validate the certification path containing CRLSignerCertificate. - crl_signer_issuer_name = base64.b64encode(crl_signer_certificate.issuer.public_bytes()).decode('utf-8') - - crl_signer_authority_key_id = crl_signer_certificate.extensions.get_extension_for_oid( - x509.OID_AUTHORITY_KEY_IDENTIFIER).value.key_identifier - - # Convert CRL Signer AKID to colon separated hex - crl_signer_authority_key_id = crl_signer_authority_key_id.hex().upper() - crl_signer_authority_key_id = ':'.join([crl_signer_authority_key_id[i:i+2] - for i in range(0, len(crl_signer_authority_key_id), 2)]) - - paa_certificate = dcld_client.get_paa_cert_for_crl_issuer(crl_signer_issuer_name, crl_signer_authority_key_id) - - if paa_certificate is None: - logging.warning("PAA Certificate not found, continue...") + try: + crl_signer_certificate = x509.load_pem_x509_certificate(bytes(revocation_point["crlSignerCertificate"], 'utf-8')) + except Exception: + logging.warning("CRL Signer Certificate is not valid, continue...") continue - paa_certificate_object = x509.load_pem_x509_certificate(bytes(paa_certificate, 'utf-8')) + # Parse the crl signer delegator + crl_signer_delegator_cert = None + if "crlSignerDelegator" in revocation_point: + crl_signer_delegator_cert_pem = revocation_point["crlSignerDelegator"] + logging.debug(f"CRLSignerDelegator: {crl_signer_delegator_cert_pem}") + try: + crl_signer_delegator_cert = x509.load_pem_x509_certificate(bytes(crl_signer_delegator_cert_pem, 'utf-8')) + except Exception: + logging.warning("CRL Signer Delegator Certificate not found...") - # TODO: use verify_directly_issued_by() method when we upgrade cryptography to v40.0.0 - # Verify issuer matches with subject - if crl_signer_certificate.issuer != paa_certificate_object.subject: - logging.warning("CRL Signer Certificate issuer does not match with PAA Certificate subject, continue...") + # 3. and 4. Validate VID/PID + if not validate_vid_pid(revocation_point, crl_signer_certificate, crl_signer_delegator_cert): + logging.warning("Failed to validate VID/PID, continue...") continue - # Check crl signers AKID matches with SKID of paa_certificate_object's AKID - paa_skid = paa_certificate_object.extensions.get_extension_for_oid(x509.OID_SUBJECT_KEY_IDENTIFIER).value.key_identifier - crl_akid = crl_signer_certificate.extensions.get_extension_for_oid(x509.OID_AUTHORITY_KEY_IDENTIFIER).value.key_identifier - if paa_skid != crl_akid: - logging.warning("CRL Signer's AKID does not match with PAA Certificate SKID, continue...") + # 5. Validate the certification path containing CRLSignerCertificate. + paa_certificate_object = dcld_client.get_issuer_cert(crl_signer_certificate) + if paa_certificate_object is None: + logging.warning("PAA Certificate not found, continue...") continue - # verify if PAA singed the crl signer certificate - try: - paa_certificate_object.public_key().verify(crl_signer_certificate.signature, - crl_signer_certificate.tbs_certificate_bytes, - ec.ECDSA(crl_signer_certificate.signature_hash_algorithm)) - except Exception: - logging.warning("CRL Signer Certificate is not signed by PAA Certificate, continue...") + if validate_cert_chain(crl_signer_certificate, crl_signer_delegator_cert, paa_certificate_object) is False: + logging.warning("Failed to validate CRL Signer Certificate chain, continue...") continue # 6. Obtain the CRL - logging.debug(f"Fetching CRL from {revocation_point['dataURL']}") - try: - r = requests.get(revocation_point["dataURL"], timeout=5) - except Exception: - logging.error('Failed to fetch CRL') - continue - - try: - crl_file = x509.load_der_x509_crl(r.content) - except Exception: - logging.error('Failed to load CRL') + crl_file = fetch_crl_from_url(revocation_point["dataURL"], 5) # timeout in seconds + if crl_file is None: continue # 7. Perform CRL File Validation - crl_authority_key_id = crl_file.extensions.get_extension_for_oid(x509.OID_AUTHORITY_KEY_IDENTIFIER).value.key_identifier - crl_signer_subject_key_id = crl_signer_certificate.extensions.get_extension_for_oid( - x509.OID_SUBJECT_KEY_IDENTIFIER).value.key_identifier - if crl_authority_key_id != crl_signer_subject_key_id: - logging.warning("CRL Authority Key ID is not CRL Signer Subject Key ID, continue...") + # a. + crl_signer_skid = get_skid(crl_signer_certificate) + crl_akid = get_akid(crl_file) + if crl_akid != crl_signer_skid: + logging.warning("CRL AKID is not CRL Signer SKID, continue...") continue - issuer_subject_key_id = ''.join('{:02X}'.format(x) for x in crl_authority_key_id) + crl_akid_hex = ''.join('{:02X}'.format(x) for x in crl_akid) # b. - same_issuer_points = dcld_client.get_revocations_points_by_skid(issuer_subject_key_id) - count_with_matching_vid_issuer_skid = sum(item.get('vid') == vid for item in same_issuer_points) + same_issuer_points = dcld_client.get_revocations_points_by_skid(crl_akid_hex) + count_with_matching_vid_issuer_skid = sum(item.get('vid') == revocation_point["vid"] for item in same_issuer_points) if count_with_matching_vid_issuer_skid > 1: try: @@ -377,40 +463,61 @@ def main(use_main_net_dcld: str, use_test_net_dcld: str, use_main_net_http: bool logging.warning("CRL Issuing Distribution Point URI is not CRL URL, continue...") continue - # 9. Assign CRL File Issuer - certificate_authority_name = base64.b64encode(crl_file.issuer.public_bytes()).decode('utf-8') - logging.debug(f"CRL File Issuer: {certificate_authority_name}") + # TODO: 8. Validate CRL as per Section 6.3 of RFC 5280 + + # 9. decide on certificate authority name and AKID + if revocation_point["isPAA"] and not is_self_signed_certificate(crl_signer_certificate): + certificate_authority_name_b64 = get_subject_b64(paa_certificate_object) + certificate_akid = get_skid(paa_certificate_object) + elif crl_signer_delegator_cert: + certificate_authority_name_b64 = get_subject_b64(crl_signer_delegator_cert) + certificate_akid = get_skid(crl_signer_delegator_cert) + else: + certificate_authority_name_b64 = get_subject_b64(crl_signer_certificate) + certificate_akid = get_skid(crl_signer_certificate) + + # validate issuer skid matchces with the one in revocation points + certificate_akid_hex = ''.join('{:02X}'.format(x) for x in certificate_akid) + + logging.debug(f"Certificate Authority Name: {certificate_authority_name_b64}") + logging.debug(f"Certificate AKID: {certificate_akid_hex}") + logging.debug(f"revocation_point['issuerSubjectKeyID']: {revocation_point['issuerSubjectKeyID']}") + + if revocation_point["issuerSubjectKeyID"] != certificate_akid_hex: + logging.warning("CRL Issuer Subject Key ID is not CRL Signer Subject Key ID, continue...") + continue serialnumber_list = [] # 10. Iterate through the Revoked Certificates List for revoked_cert in crl_file: - # a. try: revoked_cert_issuer = revoked_cert.extensions.get_extension_for_oid( x509.CRLEntryExtensionOID.CERTIFICATE_ISSUER).value.get_values_for_type(x509.DirectoryName).value if revoked_cert_issuer is not None: - if revoked_cert_issuer != certificate_authority_name: + # check if this really are the same thing + if revoked_cert_issuer != certificate_authority_name_b64: logging.warning("CRL Issuer is not CRL File Issuer, continue...") continue except Exception: + logging.warning("certificateIssuer entry extension not found in CRL") pass - # b. - # TODO: Verify that the certificate chain of the entry is linking to the same PAA - # that issued the CRLSignerCertificate for this entry, including path through - # CRLSignerDelegator if present. If the PAAs under which were issued the certificate - # and the CRLSignerCertificate are different, ignore the entry. - - # c. and d. serialnumber_list.append(bytes(str('{:02X}'.format(revoked_cert.serial_number)), 'utf-8').decode('utf-8')) - issuer_name = base64.b64encode(crl_file.issuer.public_bytes()).decode('utf-8') + entry = { + "type": "revocation_set", + "issuer_subject_key_id": certificate_akid_hex, + "issuer_name": certificate_authority_name_b64, + "revoked_serial_numbers": serialnumber_list, + "crl_signer_cert": revocation_point["crlSignerCertificate"], + } + + if "crlSignerDelegator" in revocation_point: + entry["crl_signer_delegator"] = revocation_point["crlSignerDelegator"] - revocation_set.append({"type": "revocation_set", - "issuer_subject_key_id": issuer_subject_key_id, - "issuer_name": issuer_name, - "revoked_serial_numbers": serialnumber_list}) + logging.debug(f"Entry to append: {entry}") + revocation_set.append(entry) with open(output, 'w+') as outfile: json.dump(revocation_set, outfile, indent=4)