Skip to content

Switch to in-toto statements #18

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ jobs:
- "3.11"
- "3.12"
runs-on: ubuntu-latest
permissions:
id-token: write # unit tests use the ambient OIDC credential
steps:
- uses: actions/checkout@v4

Expand Down
20 changes: 12 additions & 8 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@ dynamic = ["version"]
description = "A library to convert between Sigstore Bundles and PEP-740 Attestation objects"
readme = "README.md"
license = { file = "LICENSE" }
authors = [
{ name = "Trail of Bits", email = "opensource@trailofbits.com" },
]
authors = [{ name = "Trail of Bits", email = "opensource@trailofbits.com" }]
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: Apache Software License",
]
dependencies = ["cryptography", "pydantic", "sigstore~=3.0.0"]
dependencies = [
"cryptography",
"packaging",
"pydantic",
"sigstore~=3.0.0",
"sigstore-protobuf-specs",
]
requires-python = ">=3.9"

[project.optional-dependencies]
Expand All @@ -34,7 +38,6 @@ lint = [
dev = ["pypi-attestation-models[doc,test,lint]", "twine", "wheel", "build"]



[project.urls]
Homepage = "https://pypi.org/project/pypi-attestation-models"
Documentation = "https://trailofbits.github.io/pypi-attestation-models/"
Expand Down Expand Up @@ -72,7 +75,7 @@ line-length = 100
target-version = "py39"

[tool.ruff.lint]
select = ["ALL"]
select = ["E", "F", "I", "W", "UP", "ANN", "D", "COM", "ISC", "TCH", "SLF"]
# ANN101 and ANN102 are deprecated
# D203 and D213 are incompatible with D211 and D212 respectively.
# COM812 and ISC001 can cause conflicts when using ruff as a formatter.
Expand All @@ -82,8 +85,9 @@ ignore = ["ANN101", "ANN102", "D203", "D213", "COM812", "ISC001"]
[tool.ruff.lint.per-file-ignores]

"test/**/*.py" = [
"D", # no docstrings in tests
"S101", # asserts are expected in tests
"D", # no docstrings in tests
"S101", # asserts are expected in tests
"SLF001", # private APIs are expected in tests
]

[tool.interrogate]
Expand Down
8 changes: 4 additions & 4 deletions src/pypi_attestation_models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

from ._impl import (
Attestation,
AttestationPayload,
AttestationError,
ConversionError,
InvalidAttestationError,
Envelope,
TransparencyLogEntry,
VerificationError,
VerificationMaterial,
Expand All @@ -16,9 +16,9 @@

__all__ = [
"Attestation",
"AttestationPayload",
"AttestationError",
"Envelope",
"ConversionError",
"InvalidAttestationError",
"TransparencyLogEntry",
"VerificationError",
"VerificationMaterial",
Expand Down
229 changes: 165 additions & 64 deletions src/pypi_attestation_models/_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,22 @@

from __future__ import annotations

import binascii
from base64 import b64decode, b64encode
import base64
from typing import TYPE_CHECKING, Annotated, Any, Literal, NewType

import rfc8785
import sigstore.errors
from annotated_types import MinLen # noqa: TCH002
from cryptography import x509
from cryptography.hazmat.primitives import serialization
from pydantic import BaseModel
from packaging.utils import parse_sdist_filename, parse_wheel_filename
from pydantic import Base64Bytes, BaseModel
from pydantic_core import ValidationError
from sigstore._utils import _sha256_streaming
from sigstore.dsse import Envelope as DsseEnvelope
from sigstore.dsse import _DigestSet, _Statement, _StatementBuilder, _Subject
from sigstore.models import Bundle, LogEntry
from sigstore_protobuf_specs.io.intoto import Envelope as _Envelope
from sigstore_protobuf_specs.io.intoto import Signature as _Signature

if TYPE_CHECKING:
from pathlib import Path # pragma: no cover
Expand All @@ -27,19 +30,15 @@
from sigstore.verify.policy import VerificationPolicy # pragma: no cover


class ConversionError(ValueError):
"""The base error for all errors during conversion."""

class AttestationError(ValueError):
"""Base error for all APIs."""

class InvalidAttestationError(ConversionError):
"""The PyPI Attestation given as input is not valid."""

def __init__(self: InvalidAttestationError, msg: str) -> None:
"""Initialize an `InvalidAttestationError`."""
super().__init__(f"Could not convert input Attestation: {msg}")
class ConversionError(AttestationError):
"""The base error for all errors during conversion."""


class VerificationError(ValueError):
class VerificationError(AttestationError):
"""The PyPI Attestation failed verification."""

def __init__(self: VerificationError, msg: str) -> None:
Expand All @@ -53,7 +52,7 @@ def __init__(self: VerificationError, msg: str) -> None:
class VerificationMaterial(BaseModel):
"""Cryptographic materials used to verify attestation objects."""

certificate: str
certificate: Base64Bytes
"""
The signing certificate, as `base64(DER(cert))`.
"""
Expand All @@ -78,62 +77,99 @@ class Attestation(BaseModel):
Cryptographic materials used to verify `message_signature`.
"""

message_signature: str
envelope: Envelope
"""
The attestation's signature, as `base64(raw-sig)`, where `raw-sig`
is the raw bytes of the signing operation over the attestation payload.
The enveloped attestation statement and signature.
"""

@classmethod
def sign(cls, signer: Signer, dist: Path) -> Attestation:
"""Create an envelope, with signature, from a distribution file."""
with dist.open(mode="rb", buffering=0) as io:
# Replace this with `hashlib.file_digest()` once
# our minimum supported Python is >=3.11
digest = _sha256_streaming(io).hex()

stmt = (
_StatementBuilder()
.subjects(
[
_Subject(
name=_ultranormalize_dist_filename(dist.name),
digest=_DigestSet(root={"sha256": digest}),
)
]
)
.predicate_type("https://docs.pypi.org/attestations/publish/v1")
.build()
)
bundle = signer.sign_dsse(stmt)

return sigstore_to_pypi(bundle)

def verify(self, verifier: Verifier, policy: VerificationPolicy, dist: Path) -> None:
"""Verify against an existing Python artifact.

On failure, raises:
- `InvalidAttestationError` if the attestation could not be converted to
a Sigstore Bundle.
- `VerificationError` if the attestation could not be verified.
On failure, raises an appropriate subclass of `AttestationError`.
"""
payload_to_verify = AttestationPayload.from_dist(dist)
with dist.open(mode="rb", buffering=0) as io:
# Replace this with `hashlib.file_digest()` once
# our minimum supported Python is >=3.11
expected_digest = _sha256_streaming(io).hex()

bundle = pypi_to_sigstore(self)
try:
verifier.verify_artifact(bytes(payload_to_verify), bundle, policy)
type_, payload = verifier.verify_dsse(bundle, policy)
except sigstore.errors.VerificationError as err:
raise VerificationError(str(err)) from err

if type_ != DsseEnvelope._TYPE: # noqa: SLF001
raise VerificationError(f"expected JSON envelope, got {type_}")

class AttestationPayload(BaseModel):
"""Attestation Payload object as defined in PEP 740."""
try:
statement = _Statement.model_validate_json(payload)
except ValidationError as e:
raise VerificationError(f"invalid statement: {str(e)}")

distribution: str
"""
The file name of the Python package distribution.
"""
if len(statement.subjects) != 1:
raise VerificationError("too many subjects in statement (must be exactly one)")
subject = statement.subjects[0]

digest: str
"""
The SHA-256 digest of the distribution's contents, as a hexadecimal string.
"""
if not subject.name:
raise VerificationError("invalid subject: missing name")

@classmethod
def from_dist(cls, dist: Path) -> AttestationPayload:
"""Create an `AttestationPayload` from a distribution file."""
with dist.open(mode="rb", buffering=0) as io:
# Replace this with `hashlib.file_digest()` once
# our minimum supported Python is >=3.11
digest = _sha256_streaming(io).hex()
try:
# We always ultranormalize when signing, but other signers may not.
subject_name = _ultranormalize_dist_filename(subject.name)
except ValueError as e:
raise VerificationError(f"invalid subject: {str(e)}")

normalized = _ultranormalize_dist_filename(dist.name)
if subject_name != normalized:
raise VerificationError(
f"subject does not match distribution name: {subject_name} != {normalized}"
)

digest = subject.digest.root.get("sha256")
if digest is None or digest != expected_digest:
raise VerificationError("subject does not match distribution digest")

return AttestationPayload(
distribution=dist.name,
digest=digest,
)

def sign(self, signer: Signer) -> Attestation:
"""Create a PEP 740 attestation by signing this payload."""
sigstore_bundle = signer.sign_artifact(bytes(self))
return sigstore_to_pypi(sigstore_bundle)
class Envelope(BaseModel):
"""The attestation envelope, containing the attested-for payload and its signature."""

def __bytes__(self: AttestationPayload) -> bytes:
"""Convert to bytes using a canonicalized JSON representation (from RFC8785)."""
return rfc8785.dumps(self.model_dump())
statement: Base64Bytes
"""
The attestation statement.

This is represented as opaque bytes on the wire (encoded as base64),
but it MUST be an JSON in-toto v1 Statement.
"""

signature: Base64Bytes
"""
A signature for the above statement, encoded as base64.
"""


def sigstore_to_pypi(sigstore_bundle: Bundle) -> Attestation:
Expand All @@ -142,38 +178,103 @@ def sigstore_to_pypi(sigstore_bundle: Bundle) -> Attestation:
encoding=serialization.Encoding.DER
)

signature = sigstore_bundle._inner.message_signature.signature # noqa: SLF001
envelope = sigstore_bundle._inner.dsse_envelope # noqa: SLF001

if len(envelope.signatures) != 1:
raise ConversionError(f"expected exactly one signature, got {len(envelope.signatures)}")

return Attestation(
version=1,
verification_material=VerificationMaterial(
certificate=b64encode(certificate).decode("ascii"),
certificate=base64.b64encode(certificate),
transparency_entries=[TransparencyLogEntry(sigstore_bundle.log_entry._to_dict_rekor())], # noqa: SLF001
),
message_signature=b64encode(signature).decode("ascii"),
envelope=Envelope(
statement=base64.b64encode(envelope.payload),
signature=base64.b64encode(envelope.signatures[0].sig),
),
)


def pypi_to_sigstore(pypi_attestation: Attestation) -> Bundle:
"""Convert a PyPI attestation object as defined in PEP 740 into a Sigstore Bundle."""
try:
certificate_bytes = b64decode(pypi_attestation.verification_material.certificate)
signature_bytes = b64decode(pypi_attestation.message_signature)
except binascii.Error as err:
raise InvalidAttestationError(str(err)) from err
cert_bytes = pypi_attestation.verification_material.certificate
statement = pypi_attestation.envelope.statement
signature = pypi_attestation.envelope.signature

evp = DsseEnvelope(
_Envelope(
payload=statement,
payload_type=DsseEnvelope._TYPE, # noqa: SLF001
signatures=[_Signature(sig=signature)],
)
)

tlog_entry = pypi_attestation.verification_material.transparency_entries[0]
try:
certificate = x509.load_der_x509_certificate(certificate_bytes)
certificate = x509.load_der_x509_certificate(cert_bytes)
except ValueError as err:
raise InvalidAttestationError(str(err)) from err
raise ConversionError("invalid X.509 certificate") from err

try:
log_entry = LogEntry._from_dict_rekor(tlog_entry) # noqa: SLF001
except (ValidationError, sigstore.errors.Error) as err:
raise InvalidAttestationError(str(err)) from err
raise ConversionError("invalid transparency log entry") from err

return Bundle.from_parts(
return Bundle._from_parts( # noqa: SLF001
cert=certificate,
sig=signature_bytes,
content=evp,
log_entry=log_entry,
)


def _ultranormalize_dist_filename(dist: str) -> str:
"""Return an "ultranormalized" form of the given distribution filename.

This form is equivalent to the normalized form for sdist and wheel
filenames, with the additional stipulation that compressed tag sets,
if present, are also sorted alphanumerically.

Raises `ValueError` on any invalid distribution filename.
"""
# NOTE: .whl and .tar.gz are assumed lowercase, since `packaging`
# already rejects non-lowercase variants.
if dist.endswith(".whl"):
# `parse_wheel_filename` raises a supertype of ValueError on failure.
name, ver, build, tags = parse_wheel_filename(dist)

# The name has been normalized to replace runs of `[.-_]+` with `-`,
# which then needs to be replaced with `_` for the wheel.
name = name.replace("-", "_")

# `parse_wheel_filename` normalizes the name and version for us,
# so all we need to do is re-compress the tag set in a canonical
# order.
# NOTE(ww): This is written in a not very efficient manner, since
# I wasn't feeling smart.
impls, abis, platforms = set(), set(), set()
for tag in tags:
impls.add(tag.interpreter)
abis.add(tag.abi)
platforms.add(tag.platform)

impl_tag = ".".join(sorted(impls))
abi_tag = ".".join(sorted(abis))
platform_tag = ".".join(sorted(platforms))

if build:
parts = "-".join(
[name, str(ver), f"{build[0]}{build[1]}", impl_tag, abi_tag, platform_tag]
)
else:
parts = "-".join([name, str(ver), impl_tag, abi_tag, platform_tag])

return f"{parts}.whl"

elif dist.endswith(".tar.gz"):
# `parse_sdist_filename` raises a supertype of ValueError on failure.
name, ver = parse_sdist_filename(dist)
name = name.replace("-", "_")
return f"{name}-{ver}.tar.gz"
else:
raise ValueError(f"unknown distribution format: {dist}")
Binary file not shown.
Loading