Skip to content

chore: allow independent extraction of repo and commit from provenance #708

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions src/macaron/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,6 @@ class ProvenanceError(MacaronError):
"""When there is an error while extracting from provenance."""


class JsonError(MacaronError):
"""When there is an error while extracting from JSON."""


class InvalidAnalysisTargetError(MacaronError):
"""When a valid Analysis Target cannot be constructed."""

Expand Down
23 changes: 11 additions & 12 deletions src/macaron/json_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This module provides utility functions for JSON data."""

import logging
from typing import TypeVar

from macaron.errors import JsonError
from macaron.util import JsonType

T = TypeVar("T", bound=JsonType)

logger: logging.Logger = logging.getLogger(__name__)


def json_extract(entry: JsonType, keys: list[str], type_: type[T]) -> T:
def json_extract(entry: JsonType, keys: list[str], type_: type[T]) -> T | None:
"""Return the value found by following the list of depth-sequential keys inside the passed JSON dictionary.

The value must be of the passed type.
Expand All @@ -27,24 +28,22 @@ def json_extract(entry: JsonType, keys: list[str], type_: type[T]) -> T:

Returns
-------
T:
T | None:
The found value as the type of the type parameter.

Raises
------
JsonError
Raised if an error occurs while searching for or validating the value.
"""
target = entry

for index, key in enumerate(keys):
if not isinstance(target, dict):
raise JsonError(f"Expect the value .{'.'.join(keys[:index])} to be a dict.")
logger.debug("Expect the value .%s to be a dict.", ".".join(keys[:index]))
return None
if key not in target:
raise JsonError(f"JSON key '{key}' not found in .{'.'.join(keys[:index])}.")
logger.debug("JSON key '%s' not found in .%s", key, ".".join(keys[:index]))
return None
target = target[key]

if isinstance(target, type_):
return target

raise JsonError(f"Expect the value .{'.'.join(keys)} to be of type '{type_}'.")
logger.debug("Expect the value .%s to be of type %s", ".".join(keys), type_)
return None
14 changes: 3 additions & 11 deletions src/macaron/parsers/actionparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from macaron.config.defaults import defaults
from macaron.config.global_config import global_config
from macaron.errors import JsonError, ParseError
from macaron.errors import ParseError
from macaron.json_tools import json_extract

logger: logging.Logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -90,11 +90,7 @@ def get_run_step(step: dict[str, Any]) -> str | None:
str | None
The inlined run script or None if the run step cannot be validated.
"""
try:
return json_extract(step, ["Exec", "Run", "Value"], str)
except JsonError as error:
logger.debug(error)
return None
return json_extract(step, ["Exec", "Run", "Value"], str)


def get_step_input(step: dict[str, Any], key: str) -> str | None:
Expand All @@ -115,8 +111,4 @@ def get_step_input(step: dict[str, Any], key: str) -> str | None:
str | None
The input value or None if it doesn't exist or the parsed object validation fails.
"""
try:
return json_extract(step, ["Exec", "Inputs", key, "Value", "Value"], str)
except JsonError as error:
logger.debug(error)
return None
return json_extract(step, ["Exec", "Inputs", key, "Value", "Value"], str)
152 changes: 81 additions & 71 deletions src/macaron/repo_finder/provenance_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""This module contains methods for extracting repository and commit metadata from provenance files."""
import logging

from macaron.errors import JsonError, ProvenanceError
from macaron.errors import ProvenanceError
from macaron.json_tools import json_extract
from macaron.slsa_analyzer.provenance.intoto import InTotoPayload, InTotoV1Payload, InTotoV01Payload
from macaron.util import JsonType
Expand All @@ -17,7 +17,7 @@
SLSA_V1_DIGEST_SET_GIT_ALGORITHMS = ["sha1", "gitCommit"]


def extract_repo_and_commit_from_provenance(payload: InTotoPayload) -> tuple[str, str]:
def extract_repo_and_commit_from_provenance(payload: InTotoPayload) -> tuple[str | None, str | None]:
"""Extract the repository and commit metadata from the passed provenance payload.

Parameters
Expand All @@ -35,129 +35,137 @@ def extract_repo_and_commit_from_provenance(payload: InTotoPayload) -> tuple[str
ProvenanceError
If the extraction process fails for any reason.
"""
repo = ""
commit = ""
predicate_type = payload.statement.get("predicateType")
try:
if isinstance(payload, InTotoV1Payload):
if predicate_type == "https://slsa.dev/provenance/v1":
repo, commit = _extract_from_slsa_v1(payload)
elif isinstance(payload, InTotoV01Payload):
if predicate_type == "https://slsa.dev/provenance/v0.2":
repo, commit = _extract_from_slsa_v02(payload)
if predicate_type == "https://slsa.dev/provenance/v0.1":
repo, commit = _extract_from_slsa_v01(payload)
if predicate_type == "https://witness.testifysec.com/attestation-collection/v0.1":
repo, commit = _extract_from_witness_provenance(payload)
except JsonError as error:
logger.debug(error)
raise ProvenanceError("JSON exception while extracting from provenance.") from error

if not repo or not commit:
msg = (
f"Extraction from provenance not supported for versions: "
f"predicate_type {predicate_type}, in-toto {str(type(payload))}."
)
logger.debug(msg)
raise ProvenanceError(msg)

logger.debug("Extracted repo and commit from provenance: %s, %s", repo, commit)
return repo, commit


def _extract_from_slsa_v01(payload: InTotoV01Payload) -> tuple[str, str]:
if isinstance(payload, InTotoV1Payload):
if predicate_type == "https://slsa.dev/provenance/v1":
return _extract_from_slsa_v1(payload)
elif isinstance(payload, InTotoV01Payload):
if predicate_type == "https://slsa.dev/provenance/v0.2":
return _extract_from_slsa_v02(payload)
if predicate_type == "https://slsa.dev/provenance/v0.1":
return _extract_from_slsa_v01(payload)
if predicate_type == "https://witness.testifysec.com/attestation-collection/v0.1":
return _extract_from_witness_provenance(payload)

msg = (
f"Extraction from provenance not supported for versions: "
f"predicate_type {predicate_type}, in-toto {str(type(payload))}."
)
logger.debug(msg)
raise ProvenanceError(msg)


def _extract_from_slsa_v01(payload: InTotoV01Payload) -> tuple[str | None, str | None]:
"""Extract the repository and commit metadata from the slsa v01 provenance payload."""
predicate: dict[str, JsonType] | None = payload.statement.get("predicate")
if not predicate:
raise ProvenanceError("No predicate in payload statement.")
return None, None

# The repository URL and commit are stored inside an entry in the list of predicate -> materials.
# In predicate -> recipe -> definedInMaterial we find the list index that points to the correct entry.
list_index = json_extract(predicate, ["recipe", "definedInMaterial"], int)
if not list_index:
return None, None

material_list = json_extract(predicate, ["materials"], list)
if not material_list:
return None, None

if list_index >= len(material_list):
raise ProvenanceError("Material list index outside of material list bounds.")
logger.debug("Material list index outside of material list bounds.")
return None, None

material = material_list[list_index]
if not material or not isinstance(material, dict):
raise ProvenanceError("Indexed material list entry is invalid.")
logger.debug("Indexed material list entry is invalid.")
return None, None

repo = None
uri = json_extract(material, ["uri"], str)

repo = _clean_spdx(uri)
if uri:
repo = _clean_spdx(uri)

digest_set = json_extract(material, ["digest"], dict)
if not digest_set:
return repo, None
commit = _extract_commit_from_digest_set(digest_set, SLSA_V01_DIGEST_SET_GIT_ALGORITHMS)

if not commit:
raise ProvenanceError("Failed to extract commit hash from provenance.")

return repo, commit
return repo, commit or None


def _extract_from_slsa_v02(payload: InTotoV01Payload) -> tuple[str, str]:
def _extract_from_slsa_v02(payload: InTotoV01Payload) -> tuple[str | None, str | None]:
"""Extract the repository and commit metadata from the slsa v02 provenance payload."""
predicate: dict[str, JsonType] | None = payload.statement.get("predicate")
if not predicate:
raise ProvenanceError("No predicate in payload statement.")
logger.debug("No predicate in payload statement.")
return None, None

# The repository URL and commit are stored within the predicate -> invocation -> configSource object.
# See https://slsa.dev/spec/v0.2/provenance
repo = None
uri = json_extract(predicate, ["invocation", "configSource", "uri"], str)
if not uri:
raise ProvenanceError("Failed to extract repository URL from provenance.")
repo = _clean_spdx(uri)
if uri:
repo = _clean_spdx(uri)

digest_set = json_extract(predicate, ["invocation", "configSource", "digest"], dict)
if not digest_set:
return repo, None
commit = _extract_commit_from_digest_set(digest_set, SLSA_V02_DIGEST_SET_GIT_ALGORITHMS)

if not commit:
raise ProvenanceError("Failed to extract commit hash from provenance.")

return repo, commit
return repo, commit or None


def _extract_from_slsa_v1(payload: InTotoV1Payload) -> tuple[str, str]:
def _extract_from_slsa_v1(payload: InTotoV1Payload) -> tuple[str | None, str | None]:
"""Extract the repository and commit metadata from the slsa v1 provenance payload."""
predicate: dict[str, JsonType] | None = payload.statement.get("predicate")
if not predicate:
raise ProvenanceError("No predicate in payload statement.")
logger.debug("No predicate in payload statement.")
return None, None

build_def = json_extract(predicate, ["buildDefinition"], dict)
if not build_def:
return None, None

build_type = json_extract(build_def, ["buildType"], str)
if not build_type:
return None, None

# Extract the repository URL.
repo = ""
repo = None
if build_type == "https://slsa-framework.github.io/gcb-buildtypes/triggered-build/v1":
try:
repo = json_extract(build_def, ["externalParameters", "sourceToBuild", "repository"], str)
except JsonError:
repo = json_extract(build_def, ["externalParameters", "sourceToBuild", "repository"], str)
if not repo:
repo = json_extract(build_def, ["externalParameters", "configSource", "repository"], str)
if build_type == "https://slsa-framework.github.io/github-actions-buildtypes/workflow/v1":
repo = json_extract(build_def, ["externalParameters", "workflow", "repository"], str)

if not repo:
raise ProvenanceError("Failed to extract repository URL from provenance.")
logger.debug("Repo required to extract commit from SLSA v1.")
return None, None

# Extract the commit hash.
commit = ""
commit = None
deps = json_extract(build_def, ["resolvedDependencies"], list)
if not deps:
return repo, None
for dep in deps:
if not isinstance(dep, dict):
continue
uri = json_extract(dep, ["uri"], str)
if not uri:
continue
url = _clean_spdx(uri)
if url != repo:
continue
digest_set = json_extract(dep, ["digest"], dict)
if not digest_set:
continue
commit = _extract_commit_from_digest_set(digest_set, SLSA_V1_DIGEST_SET_GIT_ALGORITHMS)

if not commit:
raise ProvenanceError("Failed to extract commit hash from provenance.")
return repo, commit or None

return repo, commit


def _extract_from_witness_provenance(payload: InTotoV01Payload) -> tuple[str, str]:
def _extract_from_witness_provenance(payload: InTotoV01Payload) -> tuple[str | None, str | None]:
"""Extract the repository and commit metadata from the witness provenance file found at the passed path.

To successfully return the commit and repository URL, the payload must respectively contain a Git attestation, and
Expand All @@ -175,11 +183,15 @@ def _extract_from_witness_provenance(payload: InTotoV01Payload) -> tuple[str, st
"""
predicate: dict[str, JsonType] | None = payload.statement.get("predicate")
if not predicate:
raise ProvenanceError("No predicate in payload statement.")
logger.debug("No predicate in payload statement.")
return None, None

attestations = json_extract(predicate, ["attestations"], list)
commit = ""
repo = ""
if not attestations:
return None, None

repo = None
commit = None
for entry in attestations:
if not isinstance(entry, dict):
continue
Expand All @@ -193,10 +205,7 @@ def _extract_from_witness_provenance(payload: InTotoV01Payload) -> tuple[str, st
):
repo = json_extract(entry, ["attestation", "projecturl"], str)

if not commit or not repo:
raise ProvenanceError("Could not extract repo and commit from provenance.")

return repo, commit
return repo or None, commit or None


def _extract_commit_from_digest_set(digest_set: dict[str, JsonType], valid_algorithms: list[str]) -> str:
Expand All @@ -212,7 +221,8 @@ def _extract_commit_from_digest_set(digest_set: dict[str, JsonType], valid_algor
value = digest_set.get(key)
if isinstance(value, str):
return value
raise ProvenanceError(f"No valid digest in digest set: {digest_set.keys()} not in {valid_algorithms}")
logger.debug("No valid digest in digest set: %s not in %s", digest_set.keys(), valid_algorithms)
return ""


def _clean_spdx(uri: str) -> str:
Expand Down
20 changes: 9 additions & 11 deletions src/macaron/repo_finder/repo_finder_deps_dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

from packageurl import PackageURL

from macaron.errors import JsonError
from macaron.repo_finder.provenance_extractor import json_extract
from macaron.repo_finder.repo_finder_base import BaseRepoFinder
from macaron.repo_finder.repo_validator import find_valid_repository_url
Expand Down Expand Up @@ -110,11 +109,11 @@ def _create_urls(self, purl: PackageURL) -> list[str]:
return []

versions_keys = ["package", "versions"] if "package" in metadata else ["version"]
try:
versions = json_extract(metadata, versions_keys, list)
latest_version = json_extract(versions[-1], ["versionKey", "version"], str)
except JsonError as error:
logger.debug("Could not extract 'version' from deps.dev response: %s", error)
versions = json_extract(metadata, versions_keys, list)
if not versions:
return []
latest_version = json_extract(versions[-1], ["versionKey", "version"], str)
if not latest_version:
return []

logger.debug("Found latest version: %s", latest_version)
Expand Down Expand Up @@ -161,11 +160,10 @@ def _read_json(self, json_data: str) -> list[str]:
logger.debug("Failed to parse response from deps.dev: %s", error)
return []

try:
links_keys = ["version", "links"] if "version" in parsed else ["links"]
links = json_extract(parsed, links_keys, list)
except JsonError as error:
logger.debug("Could not extract 'version' or 'links' from deps.dev response: %s", error)
links_keys = ["version", "links"] if "version" in parsed else ["links"]
links = json_extract(parsed, links_keys, list)
if not links:
logger.debug("Could not extract 'version' or 'links' from deps.dev response.")
return []

result = []
Expand Down
Loading