Skip to content

Commit

Permalink
sigstore: use our own Statement type (#930)
Browse files Browse the repository at this point in the history
* sigstore: use our own Statement type

Signed-off-by: William Woodruff <william@trailofbits.com>

* sigstore, test: make some APIs private

Signed-off-by: William Woodruff <william@trailofbits.com>

* dsse: typing accommodations for 3.8

Signed-off-by: William Woodruff <william@trailofbits.com>

* dsse: more 3.8 accommodations

Signed-off-by: William Woodruff <william@trailofbits.com>

* dsse: more 3.8 accommodations

Signed-off-by: William Woodruff <william@trailofbits.com>

* dsse: please stop

Signed-off-by: William Woodruff <william@trailofbits.com>

* CHANGELOG: record changes

Signed-off-by: William Woodruff <william@trailofbits.com>

* sigstore, test: fix logger visibility

Logger objects are not part of the public API.

Signed-off-by: William Woodruff <william@trailofbits.com>

* sigstore: improve dsse APIs

Signed-off-by: William Woodruff <william@trailofbits.com>

* CHANGELOG: tweak

Signed-off-by: William Woodruff <william@trailofbits.com>

---------

Signed-off-by: William Woodruff <william@trailofbits.com>
  • Loading branch information
woodruffw authored Mar 13, 2024
1 parent ebffff2 commit afc14ba
Show file tree
Hide file tree
Showing 11 changed files with 268 additions and 80 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ All versions prior to 0.9.0 are untracked.
signature verification on a pre-computed hash value
([#904](https://github.com/sigstore/sigstore-python/pull/904))

* API: The `sigstore.dsse` module has been been added, including APIs
for representing in-toto statements and DSSE envelopes
([#930](https://github.com/sigstore/sigstore-python/pull/930))

### Removed

* **BREAKING API CHANGE**: `SigningResult.input_digest` has been removed;
Expand Down
2 changes: 0 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ dependencies = [
"cryptography >= 42",
"id >= 1.1.0",
"importlib_resources ~= 5.7; python_version < '3.11'",
"in-toto-attestation == 0.9.3",
"pydantic >= 2,< 3",
"pyjwt >= 2.1",
"pyOpenSSL >= 23.0.0",
Expand Down Expand Up @@ -65,7 +64,6 @@ lint = [
# and let Dependabot periodically perform this update.
"ruff < 0.3.3",
"types-requests",
"types-protobuf",
"types-pyOpenSSL",
]
doc = ["pdoc"]
Expand Down
49 changes: 0 additions & 49 deletions sigstore/_internal/dsse.py

This file was deleted.

2 changes: 1 addition & 1 deletion sigstore/_internal/rekor/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def __init__(self, http_error: requests.HTTPError):
"""
Create a new `RekorClientError` from the given `requests.HTTPError`.
"""
if http_error.response:
if http_error.response is not None:
try:
error = rekor_types.Error.model_validate_json(http_error.response.text)
super().__init__(f"{error.code}: {error.message}")
Expand Down
211 changes: 211 additions & 0 deletions sigstore/dsse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
# Copyright 2022 The Sigstore Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Functionality for building and manipulating in-toto Statements and DSSE envelopes.
"""

from __future__ import annotations

import logging
from typing import Any, Dict, List, Literal, Optional, Union

from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
from pydantic import BaseModel, ConfigDict, Field, RootModel, StrictStr, ValidationError
from sigstore_protobuf_specs.io.intoto import Envelope as _Envelope
from sigstore_protobuf_specs.io.intoto import Signature

_logger = logging.getLogger(__name__)

_Digest = Union[
Literal["sha256"],
Literal["sha384"],
Literal["sha512"],
Literal["sha3_256"],
Literal["sha3_384"],
Literal["sha3_512"],
]
"""
NOTE: in-toto's DigestSet contains all kinds of hash algorithms that
we intentionally do not support. This model is limited to common members of the
SHA-2 and SHA-3 family that are at least as strong as SHA-256.
See: <https://github.com/in-toto/attestation/blob/main/spec/v1/digest_set.md>
"""

_DigestSet = RootModel[Dict[_Digest, str]]
"""
An internal validation model for in-toto subject digest sets.
"""


class _Subject(BaseModel):
"""
A single in-toto statement subject.
"""

name: Optional[StrictStr]
digest: _DigestSet = Field(...)


class _Statement(BaseModel):
"""
An internal validation model for in-toto statements.
"""

model_config = ConfigDict(populate_by_name=True)

type_: Literal["https://in-toto.io/Statement/v1"] = Field(..., alias="_type")
subjects: List[_Subject] = Field(..., min_length=1, alias="subject")
predicate_type: StrictStr = Field(..., alias="predicateType")
predicate: Optional[Dict[str, Any]] = Field(None, alias="predicate")


class Statement:
"""
Represents an in-toto statement.
This type deals with opaque bytes to ensure that the encoding does not
change, but Statements are internally checked for conformance against
the JSON object layout defined in the in-toto attesation spec.
See: <https://github.com/in-toto/attestation/blob/main/spec/v1/statement.md>
"""

def __init__(self, contents: bytes) -> None:
"""
Construct a new Statement.
This takes an opaque `bytes` containing the statement; use
`StatementBuilder` to manually construct an in-toto statement
from constituent pieces.
"""
self._contents = contents
try:
self._statement = _Statement.model_validate_json(contents)
except ValidationError:
raise ValueError("malformed in-toto statement")

def _pae(self) -> bytes:
"""
Construct the PAE encoding for this statement.
"""

# See:
# https://github.com/secure-systems-lab/dsse/blob/v1.0.0/envelope.md
# https://github.com/in-toto/attestation/blob/v1.0/spec/v1.0/envelope.md
pae = f"DSSEv1 {len(Envelope._TYPE)} {Envelope._TYPE} ".encode()
pae += b" ".join([str(len(self._contents)).encode(), self._contents])
return pae


class _StatementBuilder:
"""
A builder-style API for constructing in-toto Statements.
"""

def __init__(
self,
subjects: Optional[List[_Subject]] = None,
predicate_type: Optional[str] = None,
predicate: Optional[Dict[str, Any]] = None,
):
"""
Create a new `_StatementBuilder`.
"""
self._subjects = subjects or []
self._predicate_type = predicate_type
self._predicate = predicate

def subjects(self, subjects: list[_Subject]) -> _StatementBuilder:
"""
Configure the subjects for this builder.
"""
self._subjects = subjects
return self

def predicate_type(self, predicate_type: str) -> _StatementBuilder:
"""
Configure the predicate type for this builder.
"""
self._predicate_type = predicate_type
return self

def predicate(self, predicate: dict[str, Any]) -> _StatementBuilder:
"""
Configure the predicate for this builder.
"""
self._predicate = predicate
return self

def build(self) -> Statement:
"""
Build a `Statement` from the builder's state.
"""
try:
stmt = _Statement(
type_="https://in-toto.io/Statement/v1",
subjects=self._subjects,
predicate_type=self._predicate_type,
predicate=self._predicate,
)
except ValidationError as e:
raise ValueError(f"invalid statement: {e}")

return Statement(stmt.model_dump_json(by_alias=True).encode())


class Envelope:
"""
Represents a DSSE envelope.
This class cannot be constructed directly; you must use `sign`.
See: <https://github.com/secure-systems-lab/dsse/blob/v1.0.0/envelope.md>
"""

_TYPE = "application/vnd.in-toto+json"

def __init__(self, inner: _Envelope) -> None:
"""
@private
"""

self._inner = inner

def to_json(self) -> str:
"""
Return a JSON string with this DSSE envelope's contents.
"""
# TODO: Unclear why mypy thinks this is returning `Any`.
return self._inner.to_json() # type: ignore[no-any-return]


def _sign(key: ec.EllipticCurvePrivateKey, stmt: Statement) -> Envelope:
"""
Sign for the given in-toto `Statement`, and encapsulate the resulting
signature in a DSSE `Envelope`.
"""
pae = stmt._pae()
_logger.debug(f"DSSE PAE: {pae!r}")

signature = key.sign(pae, ec.ECDSA(hashes.SHA256()))
return Envelope(
_Envelope(
payload=stmt._contents,
payload_type=Envelope._TYPE,
signatures=[Signature(sig=signature, keyid=None)],
)
)
23 changes: 11 additions & 12 deletions sigstore/sign.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.x509.oid import NameOID
from in_toto_attestation.v1.statement import Statement
from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import (
Bundle,
VerificationMaterial,
Expand All @@ -71,8 +70,8 @@
)
from sigstore_protobuf_specs.io.intoto import Envelope

from sigstore import dsse
from sigstore import hashes as sigstore_hashes
from sigstore._internal import dsse
from sigstore._internal.fulcio import (
ExpiredCertificate,
FulcioCertificateSigningResponse,
Expand All @@ -85,7 +84,7 @@
from sigstore.oidc import ExpiredIdentity, IdentityToken
from sigstore.transparency import LogEntry

logger = logging.getLogger(__name__)
_logger = logging.getLogger(__name__)


class Signer:
Expand Down Expand Up @@ -119,16 +118,16 @@ def __init__(
FulcioCertificateSigningResponse
] = None
if cache:
logger.debug("Generating ephemeral keys...")
_logger.debug("Generating ephemeral keys...")
self.__cached_private_key = ec.generate_private_key(ec.SECP256R1())
logger.debug("Requesting ephemeral certificate...")
_logger.debug("Requesting ephemeral certificate...")
self.__cached_signing_certificate = self._signing_cert(self._private_key)

@property
def _private_key(self) -> ec.EllipticCurvePrivateKey:
"""Get or generate a signing key."""
if self.__cached_private_key is None:
logger.debug("no cached key; generating ephemeral key")
_logger.debug("no cached key; generating ephemeral key")
return ec.generate_private_key(ec.SECP256R1())
return self.__cached_private_key

Expand All @@ -145,7 +144,7 @@ def _signing_cert(
return self.__cached_signing_certificate

else:
logger.debug("Retrieving signed certificate...")
_logger.debug("Retrieving signed certificate...")

# Build an X.509 Certificiate Signing Request
builder = (
Expand Down Expand Up @@ -174,7 +173,7 @@ def _signing_cert(

def sign(
self,
input_: bytes | Statement | sigstore_hashes.Hashed,
input_: bytes | dsse.Statement | sigstore_hashes.Hashed,
) -> Bundle:
"""
Sign an input, and return a `Bundle` corresponding to the signed result.
Expand Down Expand Up @@ -207,7 +206,7 @@ def sign(

verify_sct(sct, cert, chain, self._signing_ctx._rekor._ct_keyring)

logger.debug("Successfully verified SCT...")
_logger.debug("Successfully verified SCT...")

# Prepare inputs
b64_cert = base64.b64encode(
Expand All @@ -217,8 +216,8 @@ def sign(
# Sign artifact
content: MessageSignature | Envelope
proposed_entry: rekor_types.Hashedrekord | rekor_types.Dsse
if isinstance(input_, Statement):
content = dsse.sign_intoto(private_key, input_)
if isinstance(input_, dsse.Statement):
content = dsse._sign(private_key, input_)

# Create the proposed DSSE entry
proposed_entry = rekor_types.Dsse(
Expand Down Expand Up @@ -265,7 +264,7 @@ def sign(
# Submit the proposed entry to the transparency log
entry = self._signing_ctx._rekor.log.entries.post(proposed_entry)

logger.debug(f"Transparency log entry created with index: {entry.log_index}")
_logger.debug(f"Transparency log entry created with index: {entry.log_index}")

return _make_bundle(
content=content,
Expand Down
Loading

0 comments on commit afc14ba

Please sign in to comment.