Skip to content

Commit

Permalink
refactor: concentrate keys on trusted root
Browse files Browse the repository at this point in the history
refactors and adding trusted_root to Verifier and SigningContext

move purpose from rekor client to trusted_root

Signed-off-by: Javan lacerda <javanlacerda@google.com>
  • Loading branch information
javanlacerda committed Mar 18, 2024
1 parent f793ef0 commit 21eb980
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 107 deletions.
34 changes: 9 additions & 25 deletions sigstore/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from textwrap import dedent
from typing import NoReturn, Optional, TextIO, Union, cast

from cryptography.x509 import load_pem_x509_certificates
from rich.logging import RichHandler
from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import Bundle

Expand All @@ -38,9 +37,8 @@
from sigstore._internal.rekor.client import (
DEFAULT_REKOR_URL,
RekorClient,
RekorKeyring,
)
from sigstore._internal.trustroot import TrustedRoot
from sigstore._internal.trustroot import KeyringPurpose, TrustedRoot
from sigstore._utils import PEMCert, cert_der_to_pem, sha256_digest
from sigstore.errors import Error
from sigstore.oidc import (
Expand Down Expand Up @@ -651,22 +649,20 @@ def _sign(args: argparse.Namespace) -> None:
signing_ctx = SigningContext.production()
else:
# Assume "production" trust root if no keys are given as arguments
trusted_root = TrustedRoot.production()
trusted_root = TrustedRoot.production(args=args, purpose=KeyringPurpose.SIGN)
if args.ctfe_pem is not None:
ctfe_keys = [args.ctfe_pem.read()]
else:
ctfe_keys = trusted_root.get_ctfe_keys()
if args.rekor_root_pubkey is not None:
rekor_keys = [args.rekor_root_pubkey.read()]
else:
rekor_keys = trusted_root.get_rekor_keys()

rekor_keyring = trusted_root.rekor_keyring()

ct_keyring = CTKeyring(Keyring(ctfe_keys))
rekor_keyring = RekorKeyring(Keyring(rekor_keys))

signing_ctx = SigningContext(
fulcio=FulcioClient(args.fulcio_url),
rekor=RekorClient(args.rekor_url, rekor_keyring, ct_keyring),
trusted_root=trusted_root,
)

# The order of precedence for identities is as follows:
Expand Down Expand Up @@ -814,7 +810,6 @@ def _collect_verification_state(
args,
f"Missing verification materials for {(file)}: {', '.join(missing)}",
)

if args.staging:
logger.debug("verify: staging instances requested")
verifier = Verifier.staging()
Expand All @@ -824,27 +819,16 @@ def _collect_verification_state(
if not args.certificate_chain:
_die(args, "Custom Rekor URL used without specifying --certificate-chain")

try:
certificate_chain = load_pem_x509_certificates(
args.certificate_chain.read()
)
except ValueError as error:
_die(args, f"Invalid certificate chain: {error}")

if args.rekor_root_pubkey is not None:
rekor_keys = [args.rekor_root_pubkey.read()]
else:
trusted_root = TrustedRoot.production()
rekor_keys = trusted_root.get_rekor_keys()
ct_keys = trusted_root.get_ctfe_keys()
trusted_root = TrustedRoot.production(args=args, purpose=KeyringPurpose.VERIFY)
ct_keys = trusted_root.get_ctfe_keys()

verifier = Verifier(
rekor=RekorClient(
url=args.rekor_url,
rekor_keyring=RekorKeyring(Keyring(rekor_keys)),
rekor_keyring=trusted_root.rekor_keyring(),
ct_keyring=CTKeyring(Keyring(ct_keys)),
),
fulcio_certificate_chain=certificate_chain,
trusted_root=trusted_root,
)

all_materials = []
Expand Down
15 changes: 6 additions & 9 deletions sigstore/_internal/rekor/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
import logging
from abc import ABC
from dataclasses import dataclass
from typing import Any, Dict, NewType, Optional
from typing import Any, Dict, Optional
from urllib.parse import urljoin

import rekor_types
import requests

from sigstore._internal.ctfe import CTKeyring
from sigstore._internal.keyring import Keyring
from sigstore._internal.trustroot import TrustedRoot
from sigstore._internal.trustroot import RekorKeyring, TrustedRoot
from sigstore.transparency import LogEntry

logger = logging.getLogger(__name__)
Expand All @@ -39,9 +39,6 @@
STAGING_REKOR_URL = "https://rekor.sigstage.dev"


RekorKeyring = NewType("RekorKeyring", Keyring)


@dataclass(frozen=True)
class RekorLogInfo:
"""
Expand Down Expand Up @@ -255,12 +252,12 @@ def production(cls, trust_root: TrustedRoot) -> RekorClient:
trust_root must be a `TrustedRoot` for the production TUF repository.
"""
rekor_keys = trust_root.get_rekor_keys()
rekor_keyring = trust_root.rekor_keyring()
ctfe_keys = trust_root.get_ctfe_keys()

return cls(
DEFAULT_REKOR_URL,
RekorKeyring(Keyring(rekor_keys)),
rekor_keyring,
CTKeyring(Keyring(ctfe_keys)),
)

Expand All @@ -271,12 +268,12 @@ def staging(cls, trust_root: TrustedRoot) -> RekorClient:
trust_root must be a `TrustedRoot` for the staging TUF repository.
"""
rekor_keys = trust_root.get_rekor_keys()
rekor_keyring = trust_root.rekor_keyring()
ctfe_keys = trust_root.get_ctfe_keys()

return cls(
STAGING_REKOR_URL,
RekorKeyring(Keyring(rekor_keys)),
rekor_keyring,
CTKeyring(Keyring(ctfe_keys)),
)

Expand Down
125 changes: 103 additions & 22 deletions sigstore/_internal/trustroot.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@

from __future__ import annotations

from argparse import Namespace
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Iterable
from typing import Iterable, NewType, Optional

from cryptography.x509 import Certificate, load_der_x509_certificate
from cryptography.x509 import (
Certificate,
load_der_x509_certificate,
load_pem_x509_certificates,
)
from sigstore_protobuf_specs.dev.sigstore.common.v1 import TimeRange
from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import (
CertificateAuthority,
Expand All @@ -32,9 +38,12 @@
TrustedRoot as _TrustedRoot,
)

from sigstore._internal.keyring import Keyring
from sigstore._internal.tuf import DEFAULT_TUF_URL, STAGING_TUF_URL, TrustUpdater
from sigstore.errors import MetadataError

RekorKeyring = NewType("RekorKeyring", Keyring)


def _is_timerange_valid(period: TimeRange | None, *, allow_expired: bool) -> bool:
"""
Expand All @@ -58,48 +67,101 @@ def _is_timerange_valid(period: TimeRange | None, *, allow_expired: bool) -> boo
return allow_expired or (period.end is None or now <= period.end)


class KeyringPurpose(str, Enum):
"""
Keyring purpose typing
"""

SIGN = "sign"
VERIFY = "verify"

def __str__(self) -> str:
"""Returns the purpose string value."""
return self.value


class TrustedRoot(_TrustedRoot):
"""Complete set of trusted entities for a Sigstore client"""

args: Optional[Namespace] = None
purpose: KeyringPurpose

@classmethod
def from_file(cls, path: str) -> "TrustedRoot":
def from_file(
cls,
path: str,
args: Optional[Namespace] = None,
purpose: KeyringPurpose = KeyringPurpose.VERIFY,
) -> "TrustedRoot":
"""Create a new trust root from file"""
tr: TrustedRoot = cls().from_json(Path(path).read_bytes())
return tr
trusted_root: TrustedRoot = cls().from_json(Path(path).read_bytes())
trusted_root.args = args
trusted_root.purpose = purpose
return trusted_root

@classmethod
def from_tuf(cls, url: str, offline: bool = False) -> "TrustedRoot":
def from_tuf(
cls,
url: str,
offline: bool = False,
args: Optional[Namespace] = None,
purpose: KeyringPurpose = KeyringPurpose.VERIFY,
) -> "TrustedRoot":
"""Create a new trust root from a TUF repository.
If `offline`, will use trust root in local TUF cache. Otherwise will
update the trust root from remote TUF repository.
"""
path = TrustUpdater(url, offline).get_trusted_root_path()
return cls.from_file(path)
trusted_root = cls.from_file(path)
trusted_root.args = args
trusted_root.purpose = purpose
return trusted_root

@classmethod
def production(cls, offline: bool = False) -> "TrustedRoot":
def production(
cls,
offline: bool = False,
args: Optional[Namespace] = None,
purpose: KeyringPurpose = KeyringPurpose.VERIFY,
) -> "TrustedRoot":
"""Create new trust root from Sigstore production TUF repository.
If `offline`, will use trust root in local TUF cache. Otherwise will
update the trust root from remote TUF repository.
"""
return cls.from_tuf(DEFAULT_TUF_URL, offline)
trusted_root = cls.from_tuf(DEFAULT_TUF_URL, offline)
trusted_root.args = args
trusted_root.purpose = purpose
return trusted_root

@classmethod
def staging(cls, offline: bool = False) -> "TrustedRoot":
def staging(
cls,
offline: bool = False,
args: Optional[Namespace] = None,
purpose: KeyringPurpose = KeyringPurpose.VERIFY,
) -> "TrustedRoot":
"""Create new trust root from Sigstore staging TUF repository.
If `offline`, will use trust root in local TUF cache. Otherwise will
update the trust root from remote TUF repository.
"""
return cls.from_tuf(STAGING_TUF_URL, offline)
trusted_root = cls.from_tuf(STAGING_TUF_URL, offline)
trusted_root.args = args
trusted_root.purpose = purpose
return trusted_root

@staticmethod
def _get_tlog_keys(tlogs: list[TransparencyLogInstance]) -> Iterable[bytes]:
def _get_tlog_keys(
tlogs: list[TransparencyLogInstance], purpose: KeyringPurpose
) -> Iterable[bytes]:
"""Return public key contents given transparency log instances."""
allow_expired = purpose is KeyringPurpose.VERIFY
for key in tlogs:
if not _is_timerange_valid(key.public_key.valid_for, allow_expired=True):
if not _is_timerange_valid(
key.public_key.valid_for, allow_expired=allow_expired
):
continue
key_bytes = key.public_key.raw_bytes
if key_bytes:
Expand All @@ -117,20 +179,30 @@ def _get_ca_keys(
for cert in ca.cert_chain.certificates:
yield cert.raw_bytes

def rekor_keyring(self) -> RekorKeyring:
"""Return public key contents given certificate authorities."""

return RekorKeyring(self._get_rekor_keys())

def get_ctfe_keys(self) -> list[bytes]:
"""Return the CTFE public keys contents."""
ctfes: list[bytes] = list(self._get_tlog_keys(self.ctlogs))
# TODO: get purpose as argument
purpose = KeyringPurpose.VERIFY
ctfes: list[bytes] = list(self._get_tlog_keys(self.ctlogs, purpose))
if not ctfes:
raise MetadataError("CTFE keys not found in trusted root")
return ctfes

def get_rekor_keys(self) -> list[bytes]:
def _get_rekor_keys(self) -> Keyring:
"""Return the rekor public key content."""
keys: list[bytes] = list(self._get_tlog_keys(self.tlogs))

keys: list[bytes]
if self.args and self.args.rekor_root_pubkey:
keys = self.args.rekor_root_pubkey.read()
else:
keys = list(self._get_tlog_keys(self.tlogs, self.purpose))
if len(keys) != 1:
raise MetadataError("Did not find one Rekor key in trusted root")
return keys
return Keyring(keys)

def get_fulcio_certs(self) -> list[Certificate]:
"""Return the Fulcio certificates."""
Expand All @@ -139,11 +211,20 @@ def get_fulcio_certs(self) -> list[Certificate]:

# Return expired certificates too: they are expired now but may have
# been active when the certificate was used to sign.
certs = [
load_der_x509_certificate(c)
for c in self._get_ca_keys(self.certificate_authorities, allow_expired=True)
]

if self.args:
try:
certs = load_pem_x509_certificates(self.args.certificate_chain.read())
except ValueError as error:
self.args._parser.error(f"Invalid certificate chain: {error}")
raise ValueError("unreachable")
else:
certs = [
load_der_x509_certificate(c)
for c in self._get_ca_keys(
self.certificate_authorities, allow_expired=True
)
]
if not certs:
raise MetadataError("Fulcio certificates not found in trusted root")
return certs
Loading

0 comments on commit 21eb980

Please sign in to comment.