Skip to content

Commit

Permalink
move purpose from rekor client to trusted_root
Browse files Browse the repository at this point in the history
Signed-off-by: Javan lacerda <javanlacerda@google.com>
  • Loading branch information
javanlacerda committed Mar 18, 2024
1 parent 397e166 commit 0b1cb47
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 67 deletions.
10 changes: 4 additions & 6 deletions sigstore/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,14 +649,13 @@ def _sign(args: argparse.Namespace) -> None:
signing_ctx = SigningContext.production()
else:
# Assume "production" trust root if no keys are given as arguments
trusted_root = TrustedRoot.production()
trusted_root.set_args(args)
trusted_root = TrustedRoot.production(args=args, purpose=KeyringPurpose.SIGN)
if args.ctfe_pem is not None:
ctfe_keys = [args.ctfe_pem.read()]
else:
ctfe_keys = trusted_root.get_ctfe_keys()

rekor_keyring = trusted_root.rekor_keyring(KeyringPurpose.SIGN)
rekor_keyring = trusted_root.rekor_keyring()

ct_keyring = CTKeyring(Keyring(ctfe_keys))

Expand Down Expand Up @@ -820,14 +819,13 @@ def _collect_verification_state(
if not args.certificate_chain:
_die(args, "Custom Rekor URL used without specifying --certificate-chain")

trusted_root = TrustedRoot.production()
trusted_root.set_args(args=args)
trusted_root = TrustedRoot.production(args=args, purpose=KeyringPurpose.VERIFY)
ct_keys = trusted_root.get_ctfe_keys()

verifier = Verifier(
rekor=RekorClient(
url=args.rekor_url,
rekor_keyring=trusted_root.rekor_keyring(KeyringPurpose.VERIFY),
rekor_keyring=trusted_root.rekor_keyring(),
ct_keyring=CTKeyring(Keyring(ct_keys)),
),
trusted_root=trusted_root,
Expand Down
12 changes: 5 additions & 7 deletions sigstore/_internal/rekor/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

from sigstore._internal.ctfe import CTKeyring
from sigstore._internal.keyring import Keyring
from sigstore._internal.trustroot import KeyringPurpose, RekorKeyring, TrustedRoot
from sigstore._internal.trustroot import RekorKeyring, TrustedRoot
from sigstore.transparency import LogEntry

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -246,15 +246,13 @@ def __del__(self) -> None:
self.session.close()

@classmethod
def production(
cls, trust_root: TrustedRoot, purpose: KeyringPurpose
) -> RekorClient:
def production(cls, trust_root: TrustedRoot) -> RekorClient:
"""
Returns a `RekorClient` populated with the default Rekor production instance.
trust_root must be a `TrustedRoot` for the production TUF repository.
"""
rekor_keyring = trust_root.rekor_keyring(purpose)
rekor_keyring = trust_root.rekor_keyring()
ctfe_keys = trust_root.get_ctfe_keys()

return cls(
Expand All @@ -264,13 +262,13 @@ def production(
)

@classmethod
def staging(cls, trust_root: TrustedRoot, purpose: KeyringPurpose) -> RekorClient:
def staging(cls, trust_root: TrustedRoot) -> RekorClient:
"""
Returns a `RekorClient` populated with the default Rekor staging instance.
trust_root must be a `TrustedRoot` for the staging TUF repository.
"""
rekor_keyring = trust_root.rekor_keyring(purpose)
rekor_keyring = trust_root.rekor_keyring()
ctfe_keys = trust_root.get_ctfe_keys()

return cls(
Expand Down
65 changes: 47 additions & 18 deletions sigstore/_internal/trustroot.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,42 +83,74 @@ def __str__(self) -> str:
class TrustedRoot(_TrustedRoot):
"""Complete set of trusted entities for a Sigstore client"""

def __init__(self, args: Optional[Namespace] = None):
self.args = args
args: Optional[Namespace] = None
purpose: KeyringPurpose

@classmethod
def from_file(cls, path: str) -> "TrustedRoot":
def from_file(
cls,
path: str,
args: Optional[Namespace] = None,
purpose: KeyringPurpose = KeyringPurpose.VERIFY,
) -> "TrustedRoot":
"""Create a new trust root from file"""
tr: TrustedRoot = cls().from_json(Path(path).read_bytes())
return tr
trusted_root: TrustedRoot = cls().from_json(Path(path).read_bytes())
trusted_root.args = args
trusted_root.purpose = purpose
return trusted_root

@classmethod
def from_tuf(cls, url: str, offline: bool = False) -> "TrustedRoot":
def from_tuf(
cls,
url: str,
offline: bool = False,
args: Optional[Namespace] = None,
purpose: KeyringPurpose = KeyringPurpose.VERIFY,
) -> "TrustedRoot":
"""Create a new trust root from a TUF repository.
If `offline`, will use trust root in local TUF cache. Otherwise will
update the trust root from remote TUF repository.
"""
path = TrustUpdater(url, offline).get_trusted_root_path()
return cls.from_file(path)
trusted_root = cls.from_file(path)
trusted_root.args = args
trusted_root.purpose = purpose
return trusted_root

@classmethod
def production(cls, offline: bool = False) -> "TrustedRoot":
def production(
cls,
offline: bool = False,
args: Optional[Namespace] = None,
purpose: KeyringPurpose = KeyringPurpose.VERIFY,
) -> "TrustedRoot":
"""Create new trust root from Sigstore production TUF repository.
If `offline`, will use trust root in local TUF cache. Otherwise will
update the trust root from remote TUF repository.
"""
return cls.from_tuf(DEFAULT_TUF_URL, offline)
trusted_root = cls.from_tuf(DEFAULT_TUF_URL, offline)
trusted_root.args = args
trusted_root.purpose = purpose
return trusted_root

@classmethod
def staging(cls, offline: bool = False) -> "TrustedRoot":
def staging(
cls,
offline: bool = False,
args: Optional[Namespace] = None,
purpose: KeyringPurpose = KeyringPurpose.VERIFY,
) -> "TrustedRoot":
"""Create new trust root from Sigstore staging TUF repository.
If `offline`, will use trust root in local TUF cache. Otherwise will
update the trust root from remote TUF repository.
"""
return cls.from_tuf(STAGING_TUF_URL, offline)
trusted_root = cls.from_tuf(STAGING_TUF_URL, offline)
trusted_root.args = args
trusted_root.purpose = purpose
return trusted_root

@staticmethod
def _get_tlog_keys(
Expand Down Expand Up @@ -147,13 +179,10 @@ def _get_ca_keys(
for cert in ca.cert_chain.certificates:
yield cert.raw_bytes

def set_args(self, args: Namespace) -> None:
self.args = args

def rekor_keyring(self, purpose: KeyringPurpose) -> RekorKeyring:
def rekor_keyring(self) -> RekorKeyring:
"""Return public key contents given certificate authorities."""

return RekorKeyring(self._get_rekor_keys(purpose))
return RekorKeyring(self._get_rekor_keys())

def get_ctfe_keys(self) -> list[bytes]:
"""Return the CTFE public keys contents."""
Expand All @@ -164,13 +193,13 @@ def get_ctfe_keys(self) -> list[bytes]:
raise MetadataError("CTFE keys not found in trusted root")
return ctfes

def _get_rekor_keys(self, purpose: KeyringPurpose) -> Keyring:
def _get_rekor_keys(self) -> Keyring:
"""Return the rekor public key content."""
keys: list[bytes]
if self.args and self.args.rekor_root_pubkey:
keys = self.args.rekor_root_pubkey.read()
else:
keys = list(self._get_tlog_keys(self.tlogs, purpose))
keys = list(self._get_tlog_keys(self.tlogs, self.purpose))
if len(keys) != 1:
raise MetadataError("Did not find one Rekor key in trusted root")
return Keyring(keys)
Expand Down
8 changes: 4 additions & 4 deletions sigstore/sign.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,8 @@ def production(cls) -> SigningContext:
"""
Return a `SigningContext` instance configured against Sigstore's production-level services.
"""
trusted_root = TrustedRoot.production()
rekor = RekorClient.production(trusted_root, purpose=KeyringPurpose.SIGN)
trusted_root = TrustedRoot.production(purpose=KeyringPurpose.SIGN)
rekor = RekorClient.production(trusted_root)
return cls(
fulcio=FulcioClient.production(), rekor=rekor, trusted_root=trusted_root
)
Expand All @@ -311,8 +311,8 @@ def staging(cls) -> SigningContext:
"""
Return a `SignerContext` instance configured against Sigstore's staging-level services.
"""
trusted_root = TrustedRoot.staging()
rekor = RekorClient.staging(trusted_root, purpose=KeyringPurpose.SIGN)
trusted_root = TrustedRoot.staging(purpose=KeyringPurpose.SIGN)
rekor = RekorClient.staging(trusted_root)
return cls(
fulcio=FulcioClient.staging(), rekor=rekor, trusted_root=trusted_root
)
Expand Down
8 changes: 4 additions & 4 deletions sigstore/verify/verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ def production(cls) -> Verifier:
"""
Return a `Verifier` instance configured against Sigstore's production-level services.
"""
trusted_root = TrustedRoot.production()
trusted_root = TrustedRoot.production(purpose=KeyringPurpose.VERIFY)
return cls(
rekor=RekorClient.production(trusted_root, KeyringPurpose.VERIFY),
rekor=RekorClient.production(trusted_root),
trusted_root=trusted_root,
)

Expand All @@ -141,9 +141,9 @@ def staging(cls) -> Verifier:
"""
Return a `Verifier` instance configured against Sigstore's staging-level services.
"""
trusted_root = TrustedRoot.staging()
trusted_root = TrustedRoot.staging(purpose=KeyringPurpose.VERIFY)
return cls(
rekor=RekorClient.staging(trusted_root, KeyringPurpose.VERIFY),
rekor=RekorClient.staging(trusted_root),
trusted_root=trusted_root,
)

Expand Down
37 changes: 11 additions & 26 deletions test/unit/internal/test_trust_root.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def test_trust_root_tuf_caches_and_requests(mock_staging_tuf, tuf_dirs):
# keep track of requests the TrustUpdater invoked by TrustedRoot makes
reqs, fail_reqs = mock_staging_tuf

trust_root = TrustedRoot.staging()
trust_root = TrustedRoot.staging(purpose=KeyringPurpose.VERIFY)
# metadata was "downloaded" from staging
expected = ["root.json", "snapshot.json", "targets.json", "timestamp.json"]
assert sorted(os.listdir(data_dir)) == expected
Expand All @@ -54,14 +54,14 @@ def test_trust_root_tuf_caches_and_requests(mock_staging_tuf, tuf_dirs):
assert fail_reqs == expected_fail_reqs

trust_root.get_ctfe_keys()
trust_root.rekor_keyring(KeyringPurpose.VERIFY)
trust_root.rekor_keyring()

# no new requests
assert reqs == expected_requests
assert fail_reqs == expected_fail_reqs

# New trust root (and TrustUpdater instance), same cache dirs
trust_root = TrustedRoot.staging()
trust_root = TrustedRoot.staging(purpose=KeyringPurpose.VERIFY)

# Expect new timestamp and root requests
expected_requests["timestamp.json"] += 1
Expand All @@ -70,7 +70,7 @@ def test_trust_root_tuf_caches_and_requests(mock_staging_tuf, tuf_dirs):
assert fail_reqs == expected_fail_reqs

trust_root.get_ctfe_keys()
trust_root.rekor_keyring(KeyringPurpose.VERIFY)
trust_root.rekor_keyring()
# Expect no requests
assert reqs == expected_requests
assert fail_reqs == expected_fail_reqs
Expand All @@ -83,7 +83,7 @@ def test_trust_root_tuf_offline(mock_staging_tuf, tuf_dirs):
# keep track of requests the TrustUpdater invoked by TrustedRoot makes
reqs, fail_reqs = mock_staging_tuf

trust_root = TrustedRoot.staging(offline=True)
trust_root = TrustedRoot.staging(offline=True, purpose=KeyringPurpose.VERIFY)

# Only the embedded root is in local TUF metadata, nothing is downloaded
expected = ["root.json"]
Expand All @@ -92,7 +92,7 @@ def test_trust_root_tuf_offline(mock_staging_tuf, tuf_dirs):
assert fail_reqs == {}

trust_root.get_ctfe_keys()
trust_root.rekor_keyring(KeyringPurpose.VERIFY)
trust_root.rekor_keyring()

# Still no requests
assert reqs == {}
Expand Down Expand Up @@ -158,37 +158,22 @@ def _pem_keys(keys):
]

# Assert that trust root from TUF contains the expected keys/certs
trust_root = TrustedRoot.staging()
trust_root = TrustedRoot.staging(purpose=KeyringPurpose.VERIFY)
assert ctfe_keys[0] in _der_keys(trust_root.get_ctfe_keys())
assert (
get_public_bytes(
trust_root.rekor_keyring(KeyringPurpose.VERIFY)._keyring.values()
)
== rekor_keys
)
assert get_public_bytes(trust_root.rekor_keyring()._keyring.values()) == rekor_keys
assert trust_root.get_fulcio_certs() == fulcio_certs

# Assert that trust root from offline TUF contains the expected keys/certs
trust_root = TrustedRoot.staging(offline=True)
trust_root = TrustedRoot.staging(offline=True, purpose=KeyringPurpose.VERIFY)
assert ctfe_keys[0] in _der_keys(trust_root.get_ctfe_keys())
assert (
get_public_bytes(
trust_root.rekor_keyring(KeyringPurpose.VERIFY)._keyring.values()
)
== rekor_keys
)
assert get_public_bytes(trust_root.rekor_keyring()._keyring.values()) == rekor_keys
assert trust_root.get_fulcio_certs() == fulcio_certs

# Assert that trust root from file contains the expected keys/certs
path = tuf_asset.target_path("trusted_root.json")
trust_root = TrustedRoot.from_file(path)
assert ctfe_keys[0] in _der_keys(trust_root.get_ctfe_keys())
assert (
get_public_bytes(
trust_root.rekor_keyring(KeyringPurpose.VERIFY)._keyring.values()
)
== rekor_keys
)
assert get_public_bytes(trust_root.rekor_keyring()._keyring.values()) == rekor_keys
assert trust_root.get_fulcio_certs() == fulcio_certs


Expand Down
4 changes: 2 additions & 2 deletions test/unit/verify/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ def test_verification_materials_retrieves_rekor_entry(self, signing_materials):
file, materials = signing_materials("a.txt")
assert materials._rekor_entry is None

trust_root = TrustedRoot.staging()
client = RekorClient.staging(trust_root, KeyringPurpose.VERIFY)
trust_root = TrustedRoot.staging(purpose=KeyringPurpose.VERIFY)
client = RekorClient.staging(trust_root)

with file.open(mode="rb", buffering=0) as input_:
digest = _sha256_streaming(input_)
Expand Down

0 comments on commit 0b1cb47

Please sign in to comment.