diff --git a/sigstore/_cli.py b/sigstore/_cli.py index 9a2ca759..5751698c 100644 --- a/sigstore/_cli.py +++ b/sigstore/_cli.py @@ -649,14 +649,13 @@ def _sign(args: argparse.Namespace) -> None: signing_ctx = SigningContext.production() else: # Assume "production" trust root if no keys are given as arguments - trusted_root = TrustedRoot.production() - trusted_root.set_args(args) + trusted_root = TrustedRoot.production(args=args, purpose=KeyringPurpose.SIGN) if args.ctfe_pem is not None: ctfe_keys = [args.ctfe_pem.read()] else: ctfe_keys = trusted_root.get_ctfe_keys() - rekor_keyring = trusted_root.rekor_keyring(KeyringPurpose.SIGN) + rekor_keyring = trusted_root.rekor_keyring() ct_keyring = CTKeyring(Keyring(ctfe_keys)) @@ -820,14 +819,13 @@ def _collect_verification_state( if not args.certificate_chain: _die(args, "Custom Rekor URL used without specifying --certificate-chain") - trusted_root = TrustedRoot.production() - trusted_root.set_args(args=args) + trusted_root = TrustedRoot.production(args=args, purpose=KeyringPurpose.VERIFY) ct_keys = trusted_root.get_ctfe_keys() verifier = Verifier( rekor=RekorClient( url=args.rekor_url, - rekor_keyring=trusted_root.rekor_keyring(KeyringPurpose.VERIFY), + rekor_keyring=trusted_root.rekor_keyring(), ct_keyring=CTKeyring(Keyring(ct_keys)), ), trusted_root=trusted_root, diff --git a/sigstore/_internal/rekor/client.py b/sigstore/_internal/rekor/client.py index 6060cdbb..b5ade3db 100644 --- a/sigstore/_internal/rekor/client.py +++ b/sigstore/_internal/rekor/client.py @@ -30,7 +30,7 @@ from sigstore._internal.ctfe import CTKeyring from sigstore._internal.keyring import Keyring -from sigstore._internal.trustroot import KeyringPurpose, RekorKeyring, TrustedRoot +from sigstore._internal.trustroot import RekorKeyring, TrustedRoot from sigstore.transparency import LogEntry logger = logging.getLogger(__name__) @@ -246,15 +246,13 @@ def __del__(self) -> None: self.session.close() @classmethod - def production( - cls, trust_root: TrustedRoot, purpose: KeyringPurpose - ) -> RekorClient: + def production(cls, trust_root: TrustedRoot) -> RekorClient: """ Returns a `RekorClient` populated with the default Rekor production instance. trust_root must be a `TrustedRoot` for the production TUF repository. """ - rekor_keyring = trust_root.rekor_keyring(purpose) + rekor_keyring = trust_root.rekor_keyring() ctfe_keys = trust_root.get_ctfe_keys() return cls( @@ -264,13 +262,13 @@ def production( ) @classmethod - def staging(cls, trust_root: TrustedRoot, purpose: KeyringPurpose) -> RekorClient: + def staging(cls, trust_root: TrustedRoot) -> RekorClient: """ Returns a `RekorClient` populated with the default Rekor staging instance. trust_root must be a `TrustedRoot` for the staging TUF repository. """ - rekor_keyring = trust_root.rekor_keyring(purpose) + rekor_keyring = trust_root.rekor_keyring() ctfe_keys = trust_root.get_ctfe_keys() return cls( diff --git a/sigstore/_internal/trustroot.py b/sigstore/_internal/trustroot.py index 280bd024..02a3d6b3 100644 --- a/sigstore/_internal/trustroot.py +++ b/sigstore/_internal/trustroot.py @@ -83,42 +83,74 @@ def __str__(self) -> str: class TrustedRoot(_TrustedRoot): """Complete set of trusted entities for a Sigstore client""" - def __init__(self, args: Optional[Namespace] = None): - self.args = args + args: Optional[Namespace] = None + purpose: KeyringPurpose @classmethod - def from_file(cls, path: str) -> "TrustedRoot": + def from_file( + cls, + path: str, + args: Optional[Namespace] = None, + purpose: KeyringPurpose = KeyringPurpose.VERIFY, + ) -> "TrustedRoot": """Create a new trust root from file""" - tr: TrustedRoot = cls().from_json(Path(path).read_bytes()) - return tr + trusted_root: TrustedRoot = cls().from_json(Path(path).read_bytes()) + trusted_root.args = args + trusted_root.purpose = purpose + return trusted_root @classmethod - def from_tuf(cls, url: str, offline: bool = False) -> "TrustedRoot": + def from_tuf( + cls, + url: str, + offline: bool = False, + args: Optional[Namespace] = None, + purpose: KeyringPurpose = KeyringPurpose.VERIFY, + ) -> "TrustedRoot": """Create a new trust root from a TUF repository. If `offline`, will use trust root in local TUF cache. Otherwise will update the trust root from remote TUF repository. """ path = TrustUpdater(url, offline).get_trusted_root_path() - return cls.from_file(path) + trusted_root = cls.from_file(path) + trusted_root.args = args + trusted_root.purpose = purpose + return trusted_root @classmethod - def production(cls, offline: bool = False) -> "TrustedRoot": + def production( + cls, + offline: bool = False, + args: Optional[Namespace] = None, + purpose: KeyringPurpose = KeyringPurpose.VERIFY, + ) -> "TrustedRoot": """Create new trust root from Sigstore production TUF repository. If `offline`, will use trust root in local TUF cache. Otherwise will update the trust root from remote TUF repository. """ - return cls.from_tuf(DEFAULT_TUF_URL, offline) + trusted_root = cls.from_tuf(DEFAULT_TUF_URL, offline) + trusted_root.args = args + trusted_root.purpose = purpose + return trusted_root @classmethod - def staging(cls, offline: bool = False) -> "TrustedRoot": + def staging( + cls, + offline: bool = False, + args: Optional[Namespace] = None, + purpose: KeyringPurpose = KeyringPurpose.VERIFY, + ) -> "TrustedRoot": """Create new trust root from Sigstore staging TUF repository. If `offline`, will use trust root in local TUF cache. Otherwise will update the trust root from remote TUF repository. """ - return cls.from_tuf(STAGING_TUF_URL, offline) + trusted_root = cls.from_tuf(STAGING_TUF_URL, offline) + trusted_root.args = args + trusted_root.purpose = purpose + return trusted_root @staticmethod def _get_tlog_keys( @@ -147,13 +179,10 @@ def _get_ca_keys( for cert in ca.cert_chain.certificates: yield cert.raw_bytes - def set_args(self, args: Namespace) -> None: - self.args = args - - def rekor_keyring(self, purpose: KeyringPurpose) -> RekorKeyring: + def rekor_keyring(self) -> RekorKeyring: """Return public key contents given certificate authorities.""" - return RekorKeyring(self._get_rekor_keys(purpose)) + return RekorKeyring(self._get_rekor_keys()) def get_ctfe_keys(self) -> list[bytes]: """Return the CTFE public keys contents.""" @@ -164,13 +193,13 @@ def get_ctfe_keys(self) -> list[bytes]: raise MetadataError("CTFE keys not found in trusted root") return ctfes - def _get_rekor_keys(self, purpose: KeyringPurpose) -> Keyring: + def _get_rekor_keys(self) -> Keyring: """Return the rekor public key content.""" keys: list[bytes] if self.args and self.args.rekor_root_pubkey: keys = self.args.rekor_root_pubkey.read() else: - keys = list(self._get_tlog_keys(self.tlogs, purpose)) + keys = list(self._get_tlog_keys(self.tlogs, self.purpose)) if len(keys) != 1: raise MetadataError("Did not find one Rekor key in trusted root") return Keyring(keys) diff --git a/sigstore/sign.py b/sigstore/sign.py index 84419b2d..7126a084 100644 --- a/sigstore/sign.py +++ b/sigstore/sign.py @@ -300,8 +300,8 @@ def production(cls) -> SigningContext: """ Return a `SigningContext` instance configured against Sigstore's production-level services. """ - trusted_root = TrustedRoot.production() - rekor = RekorClient.production(trusted_root, purpose=KeyringPurpose.SIGN) + trusted_root = TrustedRoot.production(purpose=KeyringPurpose.SIGN) + rekor = RekorClient.production(trusted_root) return cls( fulcio=FulcioClient.production(), rekor=rekor, trusted_root=trusted_root ) @@ -311,8 +311,8 @@ def staging(cls) -> SigningContext: """ Return a `SignerContext` instance configured against Sigstore's staging-level services. """ - trusted_root = TrustedRoot.staging() - rekor = RekorClient.staging(trusted_root, purpose=KeyringPurpose.SIGN) + trusted_root = TrustedRoot.staging(purpose=KeyringPurpose.SIGN) + rekor = RekorClient.staging(trusted_root) return cls( fulcio=FulcioClient.staging(), rekor=rekor, trusted_root=trusted_root ) diff --git a/sigstore/verify/verifier.py b/sigstore/verify/verifier.py index 907cc05f..27d694a4 100644 --- a/sigstore/verify/verifier.py +++ b/sigstore/verify/verifier.py @@ -130,9 +130,9 @@ def production(cls) -> Verifier: """ Return a `Verifier` instance configured against Sigstore's production-level services. """ - trusted_root = TrustedRoot.production() + trusted_root = TrustedRoot.production(purpose=KeyringPurpose.VERIFY) return cls( - rekor=RekorClient.production(trusted_root, KeyringPurpose.VERIFY), + rekor=RekorClient.production(trusted_root), trusted_root=trusted_root, ) @@ -141,9 +141,9 @@ def staging(cls) -> Verifier: """ Return a `Verifier` instance configured against Sigstore's staging-level services. """ - trusted_root = TrustedRoot.staging() + trusted_root = TrustedRoot.staging(purpose=KeyringPurpose.VERIFY) return cls( - rekor=RekorClient.staging(trusted_root, KeyringPurpose.VERIFY), + rekor=RekorClient.staging(trusted_root), trusted_root=trusted_root, ) diff --git a/test/unit/internal/test_trust_root.py b/test/unit/internal/test_trust_root.py index 53753eb0..24848aef 100644 --- a/test/unit/internal/test_trust_root.py +++ b/test/unit/internal/test_trust_root.py @@ -37,7 +37,7 @@ def test_trust_root_tuf_caches_and_requests(mock_staging_tuf, tuf_dirs): # keep track of requests the TrustUpdater invoked by TrustedRoot makes reqs, fail_reqs = mock_staging_tuf - trust_root = TrustedRoot.staging() + trust_root = TrustedRoot.staging(purpose=KeyringPurpose.VERIFY) # metadata was "downloaded" from staging expected = ["root.json", "snapshot.json", "targets.json", "timestamp.json"] assert sorted(os.listdir(data_dir)) == expected @@ -54,14 +54,14 @@ def test_trust_root_tuf_caches_and_requests(mock_staging_tuf, tuf_dirs): assert fail_reqs == expected_fail_reqs trust_root.get_ctfe_keys() - trust_root.rekor_keyring(KeyringPurpose.VERIFY) + trust_root.rekor_keyring() # no new requests assert reqs == expected_requests assert fail_reqs == expected_fail_reqs # New trust root (and TrustUpdater instance), same cache dirs - trust_root = TrustedRoot.staging() + trust_root = TrustedRoot.staging(purpose=KeyringPurpose.VERIFY) # Expect new timestamp and root requests expected_requests["timestamp.json"] += 1 @@ -70,7 +70,7 @@ def test_trust_root_tuf_caches_and_requests(mock_staging_tuf, tuf_dirs): assert fail_reqs == expected_fail_reqs trust_root.get_ctfe_keys() - trust_root.rekor_keyring(KeyringPurpose.VERIFY) + trust_root.rekor_keyring() # Expect no requests assert reqs == expected_requests assert fail_reqs == expected_fail_reqs @@ -83,7 +83,7 @@ def test_trust_root_tuf_offline(mock_staging_tuf, tuf_dirs): # keep track of requests the TrustUpdater invoked by TrustedRoot makes reqs, fail_reqs = mock_staging_tuf - trust_root = TrustedRoot.staging(offline=True) + trust_root = TrustedRoot.staging(offline=True, purpose=KeyringPurpose.VERIFY) # Only the embedded root is in local TUF metadata, nothing is downloaded expected = ["root.json"] @@ -92,7 +92,7 @@ def test_trust_root_tuf_offline(mock_staging_tuf, tuf_dirs): assert fail_reqs == {} trust_root.get_ctfe_keys() - trust_root.rekor_keyring(KeyringPurpose.VERIFY) + trust_root.rekor_keyring() # Still no requests assert reqs == {} @@ -158,37 +158,22 @@ def _pem_keys(keys): ] # Assert that trust root from TUF contains the expected keys/certs - trust_root = TrustedRoot.staging() + trust_root = TrustedRoot.staging(purpose=KeyringPurpose.VERIFY) assert ctfe_keys[0] in _der_keys(trust_root.get_ctfe_keys()) - assert ( - get_public_bytes( - trust_root.rekor_keyring(KeyringPurpose.VERIFY)._keyring.values() - ) - == rekor_keys - ) + assert get_public_bytes(trust_root.rekor_keyring()._keyring.values()) == rekor_keys assert trust_root.get_fulcio_certs() == fulcio_certs # Assert that trust root from offline TUF contains the expected keys/certs - trust_root = TrustedRoot.staging(offline=True) + trust_root = TrustedRoot.staging(offline=True, purpose=KeyringPurpose.VERIFY) assert ctfe_keys[0] in _der_keys(trust_root.get_ctfe_keys()) - assert ( - get_public_bytes( - trust_root.rekor_keyring(KeyringPurpose.VERIFY)._keyring.values() - ) - == rekor_keys - ) + assert get_public_bytes(trust_root.rekor_keyring()._keyring.values()) == rekor_keys assert trust_root.get_fulcio_certs() == fulcio_certs # Assert that trust root from file contains the expected keys/certs path = tuf_asset.target_path("trusted_root.json") trust_root = TrustedRoot.from_file(path) assert ctfe_keys[0] in _der_keys(trust_root.get_ctfe_keys()) - assert ( - get_public_bytes( - trust_root.rekor_keyring(KeyringPurpose.VERIFY)._keyring.values() - ) - == rekor_keys - ) + assert get_public_bytes(trust_root.rekor_keyring()._keyring.values()) == rekor_keys assert trust_root.get_fulcio_certs() == fulcio_certs diff --git a/test/unit/verify/test_models.py b/test/unit/verify/test_models.py index fd20442a..9fa6d689 100644 --- a/test/unit/verify/test_models.py +++ b/test/unit/verify/test_models.py @@ -52,8 +52,8 @@ def test_verification_materials_retrieves_rekor_entry(self, signing_materials): file, materials = signing_materials("a.txt") assert materials._rekor_entry is None - trust_root = TrustedRoot.staging() - client = RekorClient.staging(trust_root, KeyringPurpose.VERIFY) + trust_root = TrustedRoot.staging(purpose=KeyringPurpose.VERIFY) + client = RekorClient.staging(trust_root) with file.open(mode="rb", buffering=0) as input_: digest = _sha256_streaming(input_)