Skip to content

Commit

Permalink
refactor: concentrate keys on trusted root
Browse files Browse the repository at this point in the history
  • Loading branch information
javanlacerda committed Mar 13, 2024
1 parent 3875230 commit 261457c
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 75 deletions.
25 changes: 8 additions & 17 deletions sigstore/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@
from sigstore._internal.rekor.client import (
DEFAULT_REKOR_URL,
RekorClient,
RekorKeyring,
)
from sigstore._internal.trustroot import TrustedRoot
from sigstore._internal.trustroot import KeyringPurpose, TrustedRoot
from sigstore._utils import PEMCert, cert_der_to_pem, sha256_digest
from sigstore.errors import Error
from sigstore.oidc import (
Expand Down Expand Up @@ -656,13 +655,9 @@ def _sign(args: argparse.Namespace) -> None:
ctfe_keys = [args.ctfe_pem.read()]
else:
ctfe_keys = trusted_root.get_ctfe_keys()
if args.rekor_root_pubkey is not None:
rekor_keys = [args.rekor_root_pubkey.read()]
else:
rekor_keys = trusted_root.get_rekor_keys()

ct_keyring = CTKeyring(Keyring(ctfe_keys))
rekor_keyring = RekorKeyring(Keyring(rekor_keys))
rekor_keyring = trusted_root.rekor_keyring(KeyringPurpose.SIGN)

signing_ctx = SigningContext(
fulcio=FulcioClient(args.fulcio_url),
Expand Down Expand Up @@ -814,12 +809,12 @@ def _collect_verification_state(
args,
f"Missing verification materials for {(file)}: {', '.join(missing)}",
)

purpose = KeyringPurpose.VERIFY
if args.staging:
logger.debug("verify: staging instances requested")
verifier = Verifier.staging()
verifier = Verifier.staging(purpose)
elif args.rekor_url == DEFAULT_REKOR_URL:
verifier = Verifier.production()
verifier = Verifier.production(purpose)
else:
if not args.certificate_chain:
_die(args, "Custom Rekor URL used without specifying --certificate-chain")
Expand All @@ -831,17 +826,13 @@ def _collect_verification_state(
except ValueError as error:
_die(args, f"Invalid certificate chain: {error}")

if args.rekor_root_pubkey is not None:
rekor_keys = [args.rekor_root_pubkey.read()]
else:
trusted_root = TrustedRoot.production()
rekor_keys = trusted_root.get_rekor_keys()
ct_keys = trusted_root.get_ctfe_keys()
trusted_root = TrustedRoot.production()
ct_keys = trusted_root.get_ctfe_keys()

verifier = Verifier(
rekor=RekorClient(
url=args.rekor_url,
rekor_keyring=RekorKeyring(Keyring(rekor_keys)),
rekor_keyring=trusted_root.rekor_keyring(KeyringPurpose.VERIFY),
ct_keyring=CTKeyring(Keyring(ct_keys)),
),
fulcio_certificate_chain=certificate_chain,
Expand Down
21 changes: 10 additions & 11 deletions sigstore/_internal/rekor/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
import logging
from abc import ABC
from dataclasses import dataclass
from typing import Any, Dict, NewType, Optional
from typing import Any, Dict, Optional
from urllib.parse import urljoin

import rekor_types
import requests

from sigstore._internal.ctfe import CTKeyring
from sigstore._internal.keyring import Keyring
from sigstore._internal.trustroot import TrustedRoot
from sigstore._internal.trustroot import KeyringPurpose, RekorKeyring, TrustedRoot
from sigstore.transparency import LogEntry

logger = logging.getLogger(__name__)
Expand All @@ -39,9 +39,6 @@
STAGING_REKOR_URL = "https://rekor.sigstage.dev"


RekorKeyring = NewType("RekorKeyring", Keyring)


@dataclass(frozen=True)
class RekorLogInfo:
"""
Expand Down Expand Up @@ -249,34 +246,36 @@ def __del__(self) -> None:
self.session.close()

@classmethod
def production(cls, trust_root: TrustedRoot) -> RekorClient:
def production(
cls, trust_root: TrustedRoot, purpose: KeyringPurpose
) -> RekorClient:
"""
Returns a `RekorClient` populated with the default Rekor production instance.
trust_root must be a `TrustedRoot` for the production TUF repository.
"""
rekor_keys = trust_root.get_rekor_keys()
rekor_keyring = trust_root.rekor_keyring(purpose)
ctfe_keys = trust_root.get_ctfe_keys()

return cls(
DEFAULT_REKOR_URL,
RekorKeyring(Keyring(rekor_keys)),
rekor_keyring,
CTKeyring(Keyring(ctfe_keys)),
)

@classmethod
def staging(cls, trust_root: TrustedRoot) -> RekorClient:
def staging(cls, trust_root: TrustedRoot, purpose: KeyringPurpose) -> RekorClient:
"""
Returns a `RekorClient` populated with the default Rekor staging instance.
trust_root must be a `TrustedRoot` for the staging TUF repository.
"""
rekor_keys = trust_root.get_rekor_keys()
rekor_keyring = trust_root.rekor_keyring(purpose)
ctfe_keys = trust_root.get_ctfe_keys()

return cls(
STAGING_REKOR_URL,
RekorKeyring(Keyring(rekor_keys)),
rekor_keyring,
CTKeyring(Keyring(ctfe_keys)),
)

Expand Down
46 changes: 40 additions & 6 deletions sigstore/_internal/trustroot.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
from __future__ import annotations

from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Iterable
from typing import Iterable, NewType

from cryptography.x509 import Certificate, load_der_x509_certificate
from sigstore_protobuf_specs.dev.sigstore.common.v1 import TimeRange
Expand All @@ -32,9 +33,12 @@
TrustedRoot as _TrustedRoot,
)

from sigstore._internal.keyring import Keyring
from sigstore._internal.tuf import DEFAULT_TUF_URL, STAGING_TUF_URL, TrustUpdater
from sigstore.errors import MetadataError

RekorKeyring = NewType("RekorKeyring", Keyring)


def _is_timerange_valid(period: TimeRange | None, *, allow_expired: bool) -> bool:
"""
Expand All @@ -58,6 +62,19 @@ def _is_timerange_valid(period: TimeRange | None, *, allow_expired: bool) -> boo
return allow_expired or (period.end is None or now <= period.end)


class KeyringPurpose(str, Enum):
"""
Keyring purpose typing
"""

SIGN = "Signing"
VERIFY = "Verifying"

def __str__(self) -> str:
"""Returns the purpose string value."""
return self.value


class TrustedRoot(_TrustedRoot):
"""Complete set of trusted entities for a Sigstore client"""

Expand Down Expand Up @@ -96,10 +113,14 @@ def staging(cls, offline: bool = False) -> "TrustedRoot":
return cls.from_tuf(STAGING_TUF_URL, offline)

@staticmethod
def _get_tlog_keys(tlogs: list[TransparencyLogInstance]) -> Iterable[bytes]:
def _get_tlog_keys(
tlogs: list[TransparencyLogInstance], allow_expired: bool
) -> Iterable[bytes]:
"""Return public key contents given transparency log instances."""
for key in tlogs:
if not _is_timerange_valid(key.public_key.valid_for, allow_expired=True):
if not _is_timerange_valid(
key.public_key.valid_for, allow_expired=allow_expired
):
continue
key_bytes = key.public_key.raw_bytes
if key_bytes:
Expand All @@ -117,16 +138,29 @@ def _get_ca_keys(
for cert in ca.cert_chain.certificates:
yield cert.raw_bytes

def rekor_keyring(self, purpose: KeyringPurpose) -> RekorKeyring:
"""Return public key contents given certificate authorities."""

if purpose is KeyringPurpose.VERIFY:
allow_expired = True
elif purpose is KeyringPurpose.SIGN:
allow_expired = False
else:
# TODO(javan): Create a specific exception
raise Exception()

return RekorKeyring(Keyring(self._get_rekor_keys(allow_expired)))

def get_ctfe_keys(self) -> list[bytes]:
"""Return the CTFE public keys contents."""
ctfes: list[bytes] = list(self._get_tlog_keys(self.ctlogs))
ctfes: list[bytes] = list(self._get_tlog_keys(self.ctlogs, True))
if not ctfes:
raise MetadataError("CTFE keys not found in trusted root")
return ctfes

def get_rekor_keys(self) -> list[bytes]:
def _get_rekor_keys(self, allow_expired: bool) -> list[bytes]:
"""Return the rekor public key content."""
keys: list[bytes] = list(self._get_tlog_keys(self.tlogs))
keys: list[bytes] = list(self._get_tlog_keys(self.tlogs, allow_expired))

if len(keys) != 1:
raise MetadataError("Did not find one Rekor key in trusted root")
Expand Down
6 changes: 3 additions & 3 deletions sigstore/sign.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
)
from sigstore._internal.rekor.client import RekorClient
from sigstore._internal.sct import verify_sct
from sigstore._internal.trustroot import TrustedRoot
from sigstore._internal.trustroot import KeyringPurpose, TrustedRoot
from sigstore._utils import BundleType, PEMCert, sha256_digest
from sigstore.oidc import ExpiredIdentity, IdentityToken
from sigstore.transparency import LogEntry
Expand Down Expand Up @@ -304,7 +304,7 @@ def production(cls) -> SigningContext:
Return a `SigningContext` instance configured against Sigstore's production-level services.
"""
trust_root = TrustedRoot.production()
rekor = RekorClient.production(trust_root)
rekor = RekorClient.production(trust_root, purpose=KeyringPurpose.SIGN)
return cls(
fulcio=FulcioClient.production(),
rekor=rekor,
Expand All @@ -316,7 +316,7 @@ def staging(cls) -> SigningContext:
Return a `SignerContext` instance configured against Sigstore's staging-level services.
"""
trust_root = TrustedRoot.staging()
rekor = RekorClient.staging(trust_root)
rekor = RekorClient.staging(trust_root, purpose=KeyringPurpose.SIGN)
return cls(
fulcio=FulcioClient.staging(),
rekor=rekor,
Expand Down
10 changes: 5 additions & 5 deletions sigstore/verify/verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
verify_sct,
)
from sigstore._internal.set import InvalidSETError, verify_set
from sigstore._internal.trustroot import TrustedRoot
from sigstore._internal.trustroot import KeyringPurpose, TrustedRoot
from sigstore._utils import B64Str, HexStr, sha256_digest
from sigstore.hashes import Hashed
from sigstore.verify.models import InvalidRekorEntry as InvalidRekorEntryError
Expand Down Expand Up @@ -129,24 +129,24 @@ def __init__(
]

@classmethod
def production(cls) -> Verifier:
def production(cls, purpose: KeyringPurpose) -> Verifier:
"""
Return a `Verifier` instance configured against Sigstore's production-level services.
"""
trust_root = TrustedRoot.production()
return cls(
rekor=RekorClient.production(trust_root),
rekor=RekorClient.production(trust_root, purpose),
fulcio_certificate_chain=trust_root.get_fulcio_certs(),
)

@classmethod
def staging(cls) -> Verifier:
def staging(cls, purpose: KeyringPurpose) -> Verifier:
"""
Return a `Verifier` instance configured against Sigstore's staging-level services.
"""
trust_root = TrustedRoot.staging()
return cls(
rekor=RekorClient.staging(trust_root),
rekor=RekorClient.staging(trust_root, purpose),
fulcio_certificate_chain=trust_root.get_fulcio_certs(),
)

Expand Down
Loading

0 comments on commit 261457c

Please sign in to comment.