Skip to content

Commit

Permalink
Merge branch 'master' into add_multi_sensor
Browse files Browse the repository at this point in the history
  • Loading branch information
erwinpan1 authored Nov 12, 2024
2 parents 007bb9a + 9e0ab58 commit 0067724
Showing 1 changed file with 208 additions and 101 deletions.
309 changes: 208 additions & 101 deletions credentials/generate-revocation-set.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import subprocess
import sys
from enum import Enum
from typing import Optional

import click
import requests
Expand Down Expand Up @@ -91,6 +92,112 @@ def parse_vid_pid_from_distinguished_name(distinguished_name):
return vid, pid


def get_akid(cert: x509.Certificate) -> Optional[bytes]:
try:
return cert.extensions.get_extension_for_oid(x509.OID_AUTHORITY_KEY_IDENTIFIER).value.key_identifier
except Exception:
logging.warning("AKID not found in certificate")
return None


def get_skid(cert: x509.Certificate) -> Optional[bytes]:
try:
return cert.extensions.get_extension_for_oid(x509.OID_SUBJECT_KEY_IDENTIFIER).value.key_identifier
except Exception:
logging.warning("SKID not found in certificate")
return None


def get_subject_b64(cert: x509.Certificate) -> str:
return base64.b64encode(cert.subject.public_bytes()).decode('utf-8')


def get_issuer_b64(cert: x509.Certificate) -> str:
return base64.b64encode(cert.issuer.public_bytes()).decode('utf-8')


def verify_cert(cert: x509.Certificate, root: x509.Certificate) -> bool:
'''
Verifies if the cert is signed by root.
'''

cert_akid = get_akid(cert)
root_skid = get_skid(root)
if cert_akid is None or root_skid is None or cert_akid != root_skid:
return False

if cert.issuer != root.subject:
return False

# public_key().verify() do not return anything if signature is valid,
# will raise an exception if signature is invalid
try:
root.public_key().verify(cert.signature, cert.tbs_certificate_bytes, ec.ECDSA(cert.signature_hash_algorithm))
except Exception:
logging.warning(f"Signature verification failed for cert subject: {get_subject_b64(cert)}, issuer: {get_issuer_b64(cert)}")
return False

return True


def is_self_signed_certificate(cert: x509.Certificate) -> bool:
return verify_cert(cert, cert)


# delegator is optional so can be None, but crl_signer and paa has to be present
def validate_cert_chain(crl_signer: x509.Certificate, crl_signer_delegator: x509.Certificate, paa: x509.Certificate):
'''
There could be four scenarios:
1. CRL Signer is PAA itself, hence its self-signed certificate
2. CRL Signer is PAI certificate, and we can validate (crl_signer -> paa) chain
3. CRL Signer delegator is PAA, and we can validate (crl_signer -> crl_signer_delegator(paa) -> paa) chain
4. CRL Signer delegator is PAI, and we can validate (crl_signer -> crl_signer_delegator -> paa) chain
'''

if crl_signer_delegator:
return verify_cert(crl_signer, crl_signer_delegator) and verify_cert(crl_signer_delegator, paa)
else:
return verify_cert(crl_signer, paa)


def validate_vid_pid(revocation_point: dict, crl_signer_certificate: x509.Certificate, crl_signer_delegator_certificate: x509.Certificate) -> bool:
crl_signer_vid, crl_signer_pid = parse_vid_pid_from_distinguished_name(crl_signer_certificate.subject)

if revocation_point["isPAA"]:
if crl_signer_vid is not None:
if revocation_point["vid"] != crl_signer_vid:
logging.warning("VID in CRL Signer Certificate does not match with VID in revocation point, continue...")
return False
else:
vid_to_match = crl_signer_vid
pid_to_match = crl_signer_pid

# if the CRL Signer is delegated then match the VID and PID of the CRL Signer Delegator
if crl_signer_delegator_certificate:
vid_to_match, pid_to_match = parse_vid_pid_from_distinguished_name(crl_signer_delegator_certificate.subject)

if vid_to_match is None or revocation_point["vid"] != vid_to_match:
logging.warning("VID in CRL Signer Certificate does not match with VID in revocation point, continue...")
return False

if pid_to_match is not None:
if revocation_point["pid"] != pid_to_match:
logging.warning("PID in CRL Signer Certificate does not match with PID in revocation point, continue...")
return False

return True


def fetch_crl_from_url(url: str, timeout: int) -> x509.CertificateRevocationList:
logging.debug(f"Fetching CRL from {url}")

try:
r = requests.get(url, timeout=timeout)
return x509.load_der_x509_crl(r.content)
except Exception:
logging.error('Failed to fetch a valid CRL')


class DCLDClient:
'''
A client for interacting with DCLD using either the REST API or command line interface (CLI).
Expand Down Expand Up @@ -172,30 +279,50 @@ def get_revocation_points(self) -> list[dict]:

return response["PkiRevocationDistributionPoint"]

def get_paa_cert_for_crl_issuer(self, crl_signer_issuer_name_b64, crl_signer_authority_key_id) -> str:
def get_issuer_cert(self, cert: x509.Certificate) -> Optional[x509.Certificate]:
'''
Get PAA certificate for CRL issuer
Get the issuer certificate for
Parameters
----------
crl_signer_issuer_name_b64: str
The issuer name of the CRL signer.
crl_signer_authority_key_id: str
The authority key ID of the CRL signer.
cert: x509.Certificate
Certificate
Returns
-------
str
PAA certificate in PEM format
Issuer certificate in PEM format
'''
issuer_name_b64 = get_issuer_b64(cert)
akid = get_akid(cert)
if akid is None:
return

# Convert CRL Signer AKID to colon separated hex
akid_hex = akid.hex().upper()
akid_hex = ':'.join([akid_hex[i:i+2] for i in range(0, len(akid_hex), 2)])

logging.debug(
f"Fetching issuer from:{self.rest_node_url}/dcl/pki/certificates/{issuer_name_b64}/{akid_hex}")

if self.use_rest:
response = requests.get(
f"{self.rest_node_url}/dcl/pki/certificates/{crl_signer_issuer_name_b64}/{crl_signer_authority_key_id}").json()
f"{self.rest_node_url}/dcl/pki/certificates/{issuer_name_b64}/{akid_hex}").json()
else:
response = self.get_dcld_cmd_output_json(
['query', 'pki', 'x509-cert', '-u', crl_signer_issuer_name_b64, '-k', crl_signer_authority_key_id])
['query', 'pki', 'x509-cert', '-u', issuer_name_b64, '-k', akid_hex])

issuer_certificate = response["approvedCertificates"]["certs"][0]["pemCert"]

logging.debug(f"issuer: {issuer_certificate}")

return response["approvedCertificates"]["certs"][0]["pemCert"]
try:
issuer_certificate_object = x509.load_pem_x509_certificate(bytes(issuer_certificate, 'utf-8'))
except Exception:
logging.error('Failed to parse PAA certificate')
return

return issuer_certificate_object

def get_revocations_points_by_skid(self, issuer_subject_key_id) -> list[dict]:
'''
Expand All @@ -211,6 +338,7 @@ def get_revocations_points_by_skid(self, issuer_subject_key_id) -> list[dict]:
list[dict]
List of revocation points
'''

if self.use_rest:
response = requests.get(f"{self.rest_node_url}/dcl/pki/revocation-points/{issuer_subject_key_id}").json()
else:
Expand Down Expand Up @@ -268,97 +396,55 @@ def main(use_main_net_dcld: str, use_test_net_dcld: str, use_main_net_http: bool
continue

# 2. Parse the certificate
crl_signer_certificate = x509.load_pem_x509_certificate(bytes(revocation_point["crlSignerCertificate"], 'utf-8'))

vid = revocation_point["vid"]
pid = revocation_point["pid"]
is_paa = revocation_point["isPAA"]

# 3. && 4. Validate VID/PID
crl_vid, crl_pid = parse_vid_pid_from_distinguished_name(crl_signer_certificate.subject)

if is_paa:
if crl_vid is not None:
if vid != crl_vid:
logging.warning("VID is not CRL VID, continue...")
continue
else:
if crl_vid is None or vid != crl_vid:
logging.warning("VID is not CRL VID, continue...")
continue
if crl_pid is not None:
if pid != crl_pid:
logging.warning("PID is not CRL PID, continue...")
continue

# 5. Validate the certification path containing CRLSignerCertificate.
crl_signer_issuer_name = base64.b64encode(crl_signer_certificate.issuer.public_bytes()).decode('utf-8')

crl_signer_authority_key_id = crl_signer_certificate.extensions.get_extension_for_oid(
x509.OID_AUTHORITY_KEY_IDENTIFIER).value.key_identifier

# Convert CRL Signer AKID to colon separated hex
crl_signer_authority_key_id = crl_signer_authority_key_id.hex().upper()
crl_signer_authority_key_id = ':'.join([crl_signer_authority_key_id[i:i+2]
for i in range(0, len(crl_signer_authority_key_id), 2)])

paa_certificate = dcld_client.get_paa_cert_for_crl_issuer(crl_signer_issuer_name, crl_signer_authority_key_id)

if paa_certificate is None:
logging.warning("PAA Certificate not found, continue...")
try:
crl_signer_certificate = x509.load_pem_x509_certificate(bytes(revocation_point["crlSignerCertificate"], 'utf-8'))
except Exception:
logging.warning("CRL Signer Certificate is not valid, continue...")
continue

paa_certificate_object = x509.load_pem_x509_certificate(bytes(paa_certificate, 'utf-8'))
# Parse the crl signer delegator
crl_signer_delegator_cert = None
if "crlSignerDelegator" in revocation_point:
crl_signer_delegator_cert_pem = revocation_point["crlSignerDelegator"]
logging.debug(f"CRLSignerDelegator: {crl_signer_delegator_cert_pem}")
try:
crl_signer_delegator_cert = x509.load_pem_x509_certificate(bytes(crl_signer_delegator_cert_pem, 'utf-8'))
except Exception:
logging.warning("CRL Signer Delegator Certificate not found...")

# TODO: use verify_directly_issued_by() method when we upgrade cryptography to v40.0.0
# Verify issuer matches with subject
if crl_signer_certificate.issuer != paa_certificate_object.subject:
logging.warning("CRL Signer Certificate issuer does not match with PAA Certificate subject, continue...")
# 3. and 4. Validate VID/PID
if not validate_vid_pid(revocation_point, crl_signer_certificate, crl_signer_delegator_cert):
logging.warning("Failed to validate VID/PID, continue...")
continue

# Check crl signers AKID matches with SKID of paa_certificate_object's AKID
paa_skid = paa_certificate_object.extensions.get_extension_for_oid(x509.OID_SUBJECT_KEY_IDENTIFIER).value.key_identifier
crl_akid = crl_signer_certificate.extensions.get_extension_for_oid(x509.OID_AUTHORITY_KEY_IDENTIFIER).value.key_identifier
if paa_skid != crl_akid:
logging.warning("CRL Signer's AKID does not match with PAA Certificate SKID, continue...")
# 5. Validate the certification path containing CRLSignerCertificate.
paa_certificate_object = dcld_client.get_issuer_cert(crl_signer_certificate)
if paa_certificate_object is None:
logging.warning("PAA Certificate not found, continue...")
continue

# verify if PAA singed the crl signer certificate
try:
paa_certificate_object.public_key().verify(crl_signer_certificate.signature,
crl_signer_certificate.tbs_certificate_bytes,
ec.ECDSA(crl_signer_certificate.signature_hash_algorithm))
except Exception:
logging.warning("CRL Signer Certificate is not signed by PAA Certificate, continue...")
if validate_cert_chain(crl_signer_certificate, crl_signer_delegator_cert, paa_certificate_object) is False:
logging.warning("Failed to validate CRL Signer Certificate chain, continue...")
continue

# 6. Obtain the CRL
logging.debug(f"Fetching CRL from {revocation_point['dataURL']}")
try:
r = requests.get(revocation_point["dataURL"], timeout=5)
except Exception:
logging.error('Failed to fetch CRL')
continue

try:
crl_file = x509.load_der_x509_crl(r.content)
except Exception:
logging.error('Failed to load CRL')
crl_file = fetch_crl_from_url(revocation_point["dataURL"], 5) # timeout in seconds
if crl_file is None:
continue

# 7. Perform CRL File Validation
crl_authority_key_id = crl_file.extensions.get_extension_for_oid(x509.OID_AUTHORITY_KEY_IDENTIFIER).value.key_identifier
crl_signer_subject_key_id = crl_signer_certificate.extensions.get_extension_for_oid(
x509.OID_SUBJECT_KEY_IDENTIFIER).value.key_identifier
if crl_authority_key_id != crl_signer_subject_key_id:
logging.warning("CRL Authority Key ID is not CRL Signer Subject Key ID, continue...")
# a.
crl_signer_skid = get_skid(crl_signer_certificate)
crl_akid = get_akid(crl_file)
if crl_akid != crl_signer_skid:
logging.warning("CRL AKID is not CRL Signer SKID, continue...")
continue

issuer_subject_key_id = ''.join('{:02X}'.format(x) for x in crl_authority_key_id)
crl_akid_hex = ''.join('{:02X}'.format(x) for x in crl_akid)

# b.
same_issuer_points = dcld_client.get_revocations_points_by_skid(issuer_subject_key_id)
count_with_matching_vid_issuer_skid = sum(item.get('vid') == vid for item in same_issuer_points)
same_issuer_points = dcld_client.get_revocations_points_by_skid(crl_akid_hex)
count_with_matching_vid_issuer_skid = sum(item.get('vid') == revocation_point["vid"] for item in same_issuer_points)

if count_with_matching_vid_issuer_skid > 1:
try:
Expand All @@ -377,40 +463,61 @@ def main(use_main_net_dcld: str, use_test_net_dcld: str, use_main_net_http: bool
logging.warning("CRL Issuing Distribution Point URI is not CRL URL, continue...")
continue

# 9. Assign CRL File Issuer
certificate_authority_name = base64.b64encode(crl_file.issuer.public_bytes()).decode('utf-8')
logging.debug(f"CRL File Issuer: {certificate_authority_name}")
# TODO: 8. Validate CRL as per Section 6.3 of RFC 5280

# 9. decide on certificate authority name and AKID
if revocation_point["isPAA"] and not is_self_signed_certificate(crl_signer_certificate):
certificate_authority_name_b64 = get_subject_b64(paa_certificate_object)
certificate_akid = get_skid(paa_certificate_object)
elif crl_signer_delegator_cert:
certificate_authority_name_b64 = get_subject_b64(crl_signer_delegator_cert)
certificate_akid = get_skid(crl_signer_delegator_cert)
else:
certificate_authority_name_b64 = get_subject_b64(crl_signer_certificate)
certificate_akid = get_skid(crl_signer_certificate)

# validate issuer skid matchces with the one in revocation points
certificate_akid_hex = ''.join('{:02X}'.format(x) for x in certificate_akid)

logging.debug(f"Certificate Authority Name: {certificate_authority_name_b64}")
logging.debug(f"Certificate AKID: {certificate_akid_hex}")
logging.debug(f"revocation_point['issuerSubjectKeyID']: {revocation_point['issuerSubjectKeyID']}")

if revocation_point["issuerSubjectKeyID"] != certificate_akid_hex:
logging.warning("CRL Issuer Subject Key ID is not CRL Signer Subject Key ID, continue...")
continue

serialnumber_list = []
# 10. Iterate through the Revoked Certificates List
for revoked_cert in crl_file:
# a.
try:
revoked_cert_issuer = revoked_cert.extensions.get_extension_for_oid(
x509.CRLEntryExtensionOID.CERTIFICATE_ISSUER).value.get_values_for_type(x509.DirectoryName).value

if revoked_cert_issuer is not None:
if revoked_cert_issuer != certificate_authority_name:
# check if this really are the same thing
if revoked_cert_issuer != certificate_authority_name_b64:
logging.warning("CRL Issuer is not CRL File Issuer, continue...")
continue
except Exception:
logging.warning("certificateIssuer entry extension not found in CRL")
pass

# b.
# TODO: Verify that the certificate chain of the entry is linking to the same PAA
# that issued the CRLSignerCertificate for this entry, including path through
# CRLSignerDelegator if present. If the PAAs under which were issued the certificate
# and the CRLSignerCertificate are different, ignore the entry.

# c. and d.
serialnumber_list.append(bytes(str('{:02X}'.format(revoked_cert.serial_number)), 'utf-8').decode('utf-8'))

issuer_name = base64.b64encode(crl_file.issuer.public_bytes()).decode('utf-8')
entry = {
"type": "revocation_set",
"issuer_subject_key_id": certificate_akid_hex,
"issuer_name": certificate_authority_name_b64,
"revoked_serial_numbers": serialnumber_list,
"crl_signer_cert": revocation_point["crlSignerCertificate"],
}

if "crlSignerDelegator" in revocation_point:
entry["crl_signer_delegator"] = revocation_point["crlSignerDelegator"]

revocation_set.append({"type": "revocation_set",
"issuer_subject_key_id": issuer_subject_key_id,
"issuer_name": issuer_name,
"revoked_serial_numbers": serialnumber_list})
logging.debug(f"Entry to append: {entry}")
revocation_set.append(entry)

with open(output, 'w+') as outfile:
json.dump(revocation_set, outfile, indent=4)
Expand Down

0 comments on commit 0067724

Please sign in to comment.