Skip to content

feat: use provenance to find commits for supported PURL types. #653

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
35c8a53
chore: update provenance_payload in __main__.
benmss Mar 5, 2024
321c217
chore: ensure SLSA v0.1 list index is within the bounds of the associ…
benmss Mar 5, 2024
6c95043
chore: keep code related to with statement in the statement block; ad…
benmss Mar 5, 2024
fa1bbc7
chore: replace overload with TypeVar
benmss Mar 6, 2024
e1ffe97
chore: remove duplicate if statement; replace x.__class__ with str(ty…
benmss Mar 8, 2024
ce120a7
chore: use separate exception for json extract issues; remove redunda…
benmss Mar 12, 2024
1d1a085
chore: refactor stateful provenance finder.
benmss Mar 13, 2024
bded451
chore: use GitLab URL in GitLab provenance test.
benmss Mar 13, 2024
50c0607
chore: further refactor analysis target callsite and functionality; r…
benmss Mar 13, 2024
bafebfc
chore: add type for npm latest version response to help mypy.
benmss Mar 13, 2024
e5e80d9
chore: remove duplicate test; update test for analysis target changes.
benmss Mar 13, 2024
77a88bb
chore: minor fix.
benmss Mar 14, 2024
cfd9a3e
chore: update comment to reflect immediate proceedings only; refactor…
benmss Mar 14, 2024
488f64a
chore: restrict in-toto digest set algorithms; refactor provenance ex…
benmss Mar 18, 2024
7d995cf
chore: improve digest set debug information.
benmss Mar 18, 2024
9a82638
chore: separate SLSA extraction digest set algorithms from in-toto ac…
benmss Mar 18, 2024
673b562
chore: address PR feedback.
benmss Mar 18, 2024
c6481f6
chore: minor fix.
benmss Mar 18, 2024
c48c650
chore: unify digest set validation across in-toto versions.
benmss Mar 19, 2024
2267b18
chore: specify Git in SLSA digest set algorithm list.
benmss Mar 20, 2024
b545b67
chore: remove algorithm validation in digest set.
benmss Mar 21, 2024
26de806
chore: Move JSON utility function; Move errors to error script.
benmss Mar 21, 2024
f8badc0
chore: move InvalidAnalysisTargetError to errors.
benmss Mar 21, 2024
64fb176
chore: add integration test for provenance extractor; add json_tools …
benmss Mar 21, 2024
2e46789
chore: add integration test expected result.
benmss Mar 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions scripts/dev_scripts/integration_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ if [[ -z "$NO_NPM_TEST" ]]; then
$RUN_MACARON analyze -purl pkg:npm/@sigstore/mock@0.1.0 -rp https://github.com/sigstore/sigstore-js -b main -d ebdcfdfbdfeb9c9aeee6df53674ef230613629f5 --skip-deps || log_fail

check_or_update_expected_output $COMPARE_JSON_OUT $JSON_RESULT $JSON_EXPECTED || log_fail

echo -e "\n----------------------------------------------------------------------------------"
echo "semver@7.6.0: Extracting repository URL and commit from provenance while Repo Finder is disabled."
echo -e "----------------------------------------------------------------------------------\n"
JSON_EXPECTED=$WORKSPACE/tests/e2e/expected_results/purl/npm/semver/semver.json
JSON_RESULT=$WORKSPACE/output/reports/npm/semver/semver.json
$RUN_MACARON -dp tests/e2e/defaults/disable_repo_finder.ini analyze -purl pkg:npm/semver@7.6.0 || log_fail

check_or_update_expected_output $COMPARE_JSON_OUT $JSON_RESULT $JSON_EXPECTED || log_fail
fi

echo -e "\n----------------------------------------------------------------------------------"
Expand Down
2 changes: 1 addition & 1 deletion src/macaron/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def analyze_slsa_levels_single(analyzer_single_args: argparse.Namespace) -> None
run_config,
analyzer_single_args.sbom_path,
analyzer_single_args.skip_deps,
prov_payload=prov_payload,
provenance_payload=prov_payload,
)
sys.exit(status_code)

Expand Down
12 changes: 12 additions & 0 deletions src/macaron/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,15 @@ class InvalidHTTPResponseError(MacaronError):

class CheckRegistryError(MacaronError):
"""The Check Registry Error class."""


class ProvenanceError(MacaronError):
"""When there is an error while extracting from provenance."""


class JsonError(MacaronError):
"""When there is an error while extracting from JSON."""


class InvalidAnalysisTargetError(MacaronError):
"""When a valid Analysis Target cannot be constructed."""
50 changes: 50 additions & 0 deletions src/macaron/json_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This module provides utility functions for JSON data."""

from typing import TypeVar

from macaron.errors import JsonError
from macaron.util import JsonType

T = TypeVar("T", bound=JsonType)


def json_extract(entry: JsonType, keys: list[str], type_: type[T]) -> T:
"""Return the value found by following the list of depth-sequential keys inside the passed JSON dictionary.

The value must be of the passed type.

Parameters
----------
entry: JsonType
An entry point into a JSON structure.
keys: list[str]
The list of depth-sequential keys within the JSON.
type: type[T]
The type to check the value against and return it as.

Returns
-------
T:
The found value as the type of the type parameter.

Raises
------
JsonError
Raised if an error occurs while searching for or validating the value.
"""
target = entry

for index, key in enumerate(keys):
if not isinstance(target, dict):
raise JsonError(f"Expect the value .{'.'.join(keys[:index])} to be a dict.")
if key not in target:
raise JsonError(f"JSON key '{key}' not found in .{'.'.join(keys[:index])}.")
target = target[key]

if isinstance(target, type_):
return target

raise JsonError(f"Expect the value .{'.'.join(keys)} to be of type '{type_}'.")
224 changes: 224 additions & 0 deletions src/macaron/repo_finder/provenance_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This module contains methods for extracting repository and commit metadata from provenance files."""
import logging

from macaron.errors import JsonError, ProvenanceError
from macaron.json_tools import json_extract
from macaron.slsa_analyzer.provenance.intoto import InTotoPayload, InTotoV1Payload, InTotoV01Payload
from macaron.util import JsonType

logger: logging.Logger = logging.getLogger(__name__)


SLSA_V01_DIGEST_SET_GIT_ALGORITHMS = ["sha1"]
SLSA_V02_DIGEST_SET_GIT_ALGORITHMS = ["sha1"]
SLSA_V1_DIGEST_SET_GIT_ALGORITHMS = ["sha1", "gitCommit"]


def extract_repo_and_commit_from_provenance(payload: InTotoPayload) -> tuple[str, str]:
"""Extract the repository and commit metadata from the passed provenance payload.

Parameters
----------
payload: InTotoPayload
The payload to extract from.

Returns
-------
tuple[str, str]
The repository URL and commit hash if found, a pair of empty strings otherwise.

Raises
------
ProvenanceError
If the extraction process fails for any reason.
"""
repo = ""
commit = ""
predicate_type = payload.statement.get("predicateType")
try:
if isinstance(payload, InTotoV1Payload):
if predicate_type == "https://slsa.dev/provenance/v1":
repo, commit = _extract_from_slsa_v1(payload)
elif isinstance(payload, InTotoV01Payload):
if predicate_type == "https://slsa.dev/provenance/v0.2":
repo, commit = _extract_from_slsa_v02(payload)
if predicate_type == "https://slsa.dev/provenance/v0.1":
repo, commit = _extract_from_slsa_v01(payload)
if predicate_type == "https://witness.testifysec.com/attestation-collection/v0.1":
repo, commit = _extract_from_witness_provenance(payload)
except JsonError as error:
logger.debug(error)
raise ProvenanceError("JSON exception while extracting from provenance.") from error

if not repo or not commit:
msg = (
f"Extraction from provenance not supported for versions: "
f"predicate_type {predicate_type}, in-toto {str(type(payload))}."
)
logger.debug(msg)
raise ProvenanceError(msg)

logger.debug("Extracted repo and commit from provenance: %s, %s", repo, commit)
return repo, commit


def _extract_from_slsa_v01(payload: InTotoV01Payload) -> tuple[str, str]:
"""Extract the repository and commit metadata from the slsa v01 provenance payload."""
predicate: dict[str, JsonType] | None = payload.statement.get("predicate")
if not predicate:
raise ProvenanceError("No predicate in payload statement.")

# The repository URL and commit are stored inside an entry in the list of predicate -> materials.
# In predicate -> recipe -> definedInMaterial we find the list index that points to the correct entry.
list_index = json_extract(predicate, ["recipe", "definedInMaterial"], int)
material_list = json_extract(predicate, ["materials"], list)
if list_index >= len(material_list):
raise ProvenanceError("Material list index outside of material list bounds.")
material = material_list[list_index]
if not material or not isinstance(material, dict):
raise ProvenanceError("Indexed material list entry is invalid.")

uri = json_extract(material, ["uri"], str)

repo = _clean_spdx(uri)

digest_set = json_extract(material, ["digest"], dict)
commit = _extract_commit_from_digest_set(digest_set, SLSA_V01_DIGEST_SET_GIT_ALGORITHMS)

if not commit:
raise ProvenanceError("Failed to extract commit hash from provenance.")

return repo, commit


def _extract_from_slsa_v02(payload: InTotoV01Payload) -> tuple[str, str]:
"""Extract the repository and commit metadata from the slsa v02 provenance payload."""
predicate: dict[str, JsonType] | None = payload.statement.get("predicate")
if not predicate:
raise ProvenanceError("No predicate in payload statement.")

# The repository URL and commit are stored within the predicate -> invocation -> configSource object.
# See https://slsa.dev/spec/v0.2/provenance
uri = json_extract(predicate, ["invocation", "configSource", "uri"], str)
if not uri:
raise ProvenanceError("Failed to extract repository URL from provenance.")
repo = _clean_spdx(uri)

digest_set = json_extract(predicate, ["invocation", "configSource", "digest"], dict)
commit = _extract_commit_from_digest_set(digest_set, SLSA_V02_DIGEST_SET_GIT_ALGORITHMS)

if not commit:
raise ProvenanceError("Failed to extract commit hash from provenance.")

return repo, commit


def _extract_from_slsa_v1(payload: InTotoV1Payload) -> tuple[str, str]:
"""Extract the repository and commit metadata from the slsa v1 provenance payload."""
predicate: dict[str, JsonType] | None = payload.statement.get("predicate")
if not predicate:
raise ProvenanceError("No predicate in payload statement.")

build_def = json_extract(predicate, ["buildDefinition"], dict)
build_type = json_extract(build_def, ["buildType"], str)

# Extract the repository URL.
repo = ""
if build_type == "https://slsa-framework.github.io/gcb-buildtypes/triggered-build/v1":
try:
repo = json_extract(build_def, ["externalParameters", "sourceToBuild", "repository"], str)
except JsonError:
repo = json_extract(build_def, ["externalParameters", "configSource", "repository"], str)
if build_type == "https://slsa-framework.github.io/github-actions-buildtypes/workflow/v1":
repo = json_extract(build_def, ["externalParameters", "workflow", "repository"], str)

if not repo:
raise ProvenanceError("Failed to extract repository URL from provenance.")

# Extract the commit hash.
commit = ""
deps = json_extract(build_def, ["resolvedDependencies"], list)
for dep in deps:
if not isinstance(dep, dict):
continue
uri = json_extract(dep, ["uri"], str)
url = _clean_spdx(uri)
if url != repo:
continue
digest_set = json_extract(dep, ["digest"], dict)
commit = _extract_commit_from_digest_set(digest_set, SLSA_V1_DIGEST_SET_GIT_ALGORITHMS)

if not commit:
raise ProvenanceError("Failed to extract commit hash from provenance.")

return repo, commit


def _extract_from_witness_provenance(payload: InTotoV01Payload) -> tuple[str, str]:
"""Extract the repository and commit metadata from the witness provenance file found at the passed path.

To successfully return the commit and repository URL, the payload must respectively contain a Git attestation, and
either a GitHub or GitLab attestation.

Parameters
----------
payload: InTotoPayload
The payload to extract from.

Returns
-------
tuple[str, str]
The repository URL and commit hash if found, a pair of empty strings otherwise.
"""
predicate: dict[str, JsonType] | None = payload.statement.get("predicate")
if not predicate:
raise ProvenanceError("No predicate in payload statement.")

attestations = json_extract(predicate, ["attestations"], list)
commit = ""
repo = ""
for entry in attestations:
if not isinstance(entry, dict):
continue
entry_type = entry.get("type")
if not entry_type:
continue
if entry_type.startswith("https://witness.dev/attestations/git/"):
commit = json_extract(entry, ["attestation", "commithash"], str)
elif entry_type.startswith("https://witness.dev/attestations/gitlab/") or entry_type.startswith(
"https://witness.dev/attestations/github/"
):
repo = json_extract(entry, ["attestation", "projecturl"], str)

if not commit or not repo:
raise ProvenanceError("Could not extract repo and commit from provenance.")

return repo, commit


def _extract_commit_from_digest_set(digest_set: dict[str, JsonType], valid_algorithms: list[str]) -> str:
"""Extract the commit from the passed DigestSet.

The DigestSet is an in-toto object that maps algorithm types to commit hashes (digests).
"""
if len(digest_set.keys()) > 1:
logger.debug("DigestSet contains multiple algorithms: %s", digest_set.keys())

for key in digest_set:
if key in valid_algorithms:
value = digest_set.get(key)
if isinstance(value, str):
return value
raise ProvenanceError(f"No valid digest in digest set: {digest_set.keys()} not in {valid_algorithms}")


def _clean_spdx(uri: str) -> str:
"""Clean the passed SPDX URI and return the normalised URL it represents.

A SPDX URI has the form: git+https://example.com@refs/heads/main
"""
url, _, _ = uri.lstrip("git+").rpartition("@")
return url
Loading