generated from oracle/template-repo
-
Notifications
You must be signed in to change notification settings - Fork 28
feat: use provenance to find commits for supported PURL types. #653
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
25 commits
Select commit
Hold shift + click to select a range
35c8a53
chore: update provenance_payload in __main__.
benmss 321c217
chore: ensure SLSA v0.1 list index is within the bounds of the associ…
benmss 6c95043
chore: keep code related to with statement in the statement block; ad…
benmss fa1bbc7
chore: replace overload with TypeVar
benmss e1ffe97
chore: remove duplicate if statement; replace x.__class__ with str(ty…
benmss ce120a7
chore: use separate exception for json extract issues; remove redunda…
benmss 1d1a085
chore: refactor stateful provenance finder.
benmss bded451
chore: use GitLab URL in GitLab provenance test.
benmss 50c0607
chore: further refactor analysis target callsite and functionality; r…
benmss bafebfc
chore: add type for npm latest version response to help mypy.
benmss e5e80d9
chore: remove duplicate test; update test for analysis target changes.
benmss 77a88bb
chore: minor fix.
benmss cfd9a3e
chore: update comment to reflect immediate proceedings only; refactor…
benmss 488f64a
chore: restrict in-toto digest set algorithms; refactor provenance ex…
benmss 7d995cf
chore: improve digest set debug information.
benmss 9a82638
chore: separate SLSA extraction digest set algorithms from in-toto ac…
benmss 673b562
chore: address PR feedback.
benmss c6481f6
chore: minor fix.
benmss c48c650
chore: unify digest set validation across in-toto versions.
benmss 2267b18
chore: specify Git in SLSA digest set algorithm list.
benmss b545b67
chore: remove algorithm validation in digest set.
benmss 26de806
chore: Move JSON utility function; Move errors to error script.
benmss f8badc0
chore: move InvalidAnalysisTargetError to errors.
benmss 64fb176
chore: add integration test for provenance extractor; add json_tools …
benmss 2e46789
chore: add integration test expected result.
benmss File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved. | ||
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. | ||
|
||
"""This module provides utility functions for JSON data.""" | ||
|
||
from typing import TypeVar | ||
|
||
from macaron.errors import JsonError | ||
from macaron.util import JsonType | ||
|
||
T = TypeVar("T", bound=JsonType) | ||
|
||
|
||
def json_extract(entry: JsonType, keys: list[str], type_: type[T]) -> T: | ||
"""Return the value found by following the list of depth-sequential keys inside the passed JSON dictionary. | ||
|
||
The value must be of the passed type. | ||
|
||
Parameters | ||
---------- | ||
entry: JsonType | ||
An entry point into a JSON structure. | ||
keys: list[str] | ||
The list of depth-sequential keys within the JSON. | ||
type: type[T] | ||
The type to check the value against and return it as. | ||
|
||
Returns | ||
------- | ||
T: | ||
The found value as the type of the type parameter. | ||
|
||
Raises | ||
------ | ||
JsonError | ||
Raised if an error occurs while searching for or validating the value. | ||
""" | ||
target = entry | ||
|
||
for index, key in enumerate(keys): | ||
if not isinstance(target, dict): | ||
raise JsonError(f"Expect the value .{'.'.join(keys[:index])} to be a dict.") | ||
if key not in target: | ||
raise JsonError(f"JSON key '{key}' not found in .{'.'.join(keys[:index])}.") | ||
target = target[key] | ||
|
||
if isinstance(target, type_): | ||
return target | ||
|
||
raise JsonError(f"Expect the value .{'.'.join(keys)} to be of type '{type_}'.") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,224 @@ | ||
# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved. | ||
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. | ||
|
||
"""This module contains methods for extracting repository and commit metadata from provenance files.""" | ||
import logging | ||
|
||
from macaron.errors import JsonError, ProvenanceError | ||
from macaron.json_tools import json_extract | ||
from macaron.slsa_analyzer.provenance.intoto import InTotoPayload, InTotoV1Payload, InTotoV01Payload | ||
from macaron.util import JsonType | ||
|
||
logger: logging.Logger = logging.getLogger(__name__) | ||
|
||
|
||
SLSA_V01_DIGEST_SET_GIT_ALGORITHMS = ["sha1"] | ||
SLSA_V02_DIGEST_SET_GIT_ALGORITHMS = ["sha1"] | ||
SLSA_V1_DIGEST_SET_GIT_ALGORITHMS = ["sha1", "gitCommit"] | ||
|
||
|
||
def extract_repo_and_commit_from_provenance(payload: InTotoPayload) -> tuple[str, str]: | ||
"""Extract the repository and commit metadata from the passed provenance payload. | ||
|
||
Parameters | ||
---------- | ||
payload: InTotoPayload | ||
The payload to extract from. | ||
|
||
Returns | ||
------- | ||
tuple[str, str] | ||
The repository URL and commit hash if found, a pair of empty strings otherwise. | ||
|
||
Raises | ||
------ | ||
ProvenanceError | ||
If the extraction process fails for any reason. | ||
""" | ||
repo = "" | ||
commit = "" | ||
predicate_type = payload.statement.get("predicateType") | ||
try: | ||
if isinstance(payload, InTotoV1Payload): | ||
if predicate_type == "https://slsa.dev/provenance/v1": | ||
repo, commit = _extract_from_slsa_v1(payload) | ||
elif isinstance(payload, InTotoV01Payload): | ||
if predicate_type == "https://slsa.dev/provenance/v0.2": | ||
repo, commit = _extract_from_slsa_v02(payload) | ||
if predicate_type == "https://slsa.dev/provenance/v0.1": | ||
repo, commit = _extract_from_slsa_v01(payload) | ||
if predicate_type == "https://witness.testifysec.com/attestation-collection/v0.1": | ||
repo, commit = _extract_from_witness_provenance(payload) | ||
except JsonError as error: | ||
logger.debug(error) | ||
raise ProvenanceError("JSON exception while extracting from provenance.") from error | ||
|
||
if not repo or not commit: | ||
msg = ( | ||
f"Extraction from provenance not supported for versions: " | ||
f"predicate_type {predicate_type}, in-toto {str(type(payload))}." | ||
) | ||
logger.debug(msg) | ||
raise ProvenanceError(msg) | ||
|
||
logger.debug("Extracted repo and commit from provenance: %s, %s", repo, commit) | ||
return repo, commit | ||
|
||
|
||
def _extract_from_slsa_v01(payload: InTotoV01Payload) -> tuple[str, str]: | ||
"""Extract the repository and commit metadata from the slsa v01 provenance payload.""" | ||
predicate: dict[str, JsonType] | None = payload.statement.get("predicate") | ||
if not predicate: | ||
raise ProvenanceError("No predicate in payload statement.") | ||
|
||
# The repository URL and commit are stored inside an entry in the list of predicate -> materials. | ||
# In predicate -> recipe -> definedInMaterial we find the list index that points to the correct entry. | ||
list_index = json_extract(predicate, ["recipe", "definedInMaterial"], int) | ||
material_list = json_extract(predicate, ["materials"], list) | ||
if list_index >= len(material_list): | ||
raise ProvenanceError("Material list index outside of material list bounds.") | ||
material = material_list[list_index] | ||
if not material or not isinstance(material, dict): | ||
raise ProvenanceError("Indexed material list entry is invalid.") | ||
|
||
uri = json_extract(material, ["uri"], str) | ||
|
||
repo = _clean_spdx(uri) | ||
|
||
digest_set = json_extract(material, ["digest"], dict) | ||
commit = _extract_commit_from_digest_set(digest_set, SLSA_V01_DIGEST_SET_GIT_ALGORITHMS) | ||
|
||
if not commit: | ||
raise ProvenanceError("Failed to extract commit hash from provenance.") | ||
|
||
return repo, commit | ||
|
||
|
||
def _extract_from_slsa_v02(payload: InTotoV01Payload) -> tuple[str, str]: | ||
"""Extract the repository and commit metadata from the slsa v02 provenance payload.""" | ||
predicate: dict[str, JsonType] | None = payload.statement.get("predicate") | ||
if not predicate: | ||
raise ProvenanceError("No predicate in payload statement.") | ||
|
||
# The repository URL and commit are stored within the predicate -> invocation -> configSource object. | ||
# See https://slsa.dev/spec/v0.2/provenance | ||
uri = json_extract(predicate, ["invocation", "configSource", "uri"], str) | ||
if not uri: | ||
raise ProvenanceError("Failed to extract repository URL from provenance.") | ||
repo = _clean_spdx(uri) | ||
|
||
digest_set = json_extract(predicate, ["invocation", "configSource", "digest"], dict) | ||
commit = _extract_commit_from_digest_set(digest_set, SLSA_V02_DIGEST_SET_GIT_ALGORITHMS) | ||
|
||
if not commit: | ||
raise ProvenanceError("Failed to extract commit hash from provenance.") | ||
|
||
return repo, commit | ||
|
||
|
||
def _extract_from_slsa_v1(payload: InTotoV1Payload) -> tuple[str, str]: | ||
"""Extract the repository and commit metadata from the slsa v1 provenance payload.""" | ||
predicate: dict[str, JsonType] | None = payload.statement.get("predicate") | ||
if not predicate: | ||
raise ProvenanceError("No predicate in payload statement.") | ||
|
||
build_def = json_extract(predicate, ["buildDefinition"], dict) | ||
build_type = json_extract(build_def, ["buildType"], str) | ||
|
||
# Extract the repository URL. | ||
repo = "" | ||
if build_type == "https://slsa-framework.github.io/gcb-buildtypes/triggered-build/v1": | ||
try: | ||
repo = json_extract(build_def, ["externalParameters", "sourceToBuild", "repository"], str) | ||
except JsonError: | ||
repo = json_extract(build_def, ["externalParameters", "configSource", "repository"], str) | ||
if build_type == "https://slsa-framework.github.io/github-actions-buildtypes/workflow/v1": | ||
repo = json_extract(build_def, ["externalParameters", "workflow", "repository"], str) | ||
|
||
if not repo: | ||
raise ProvenanceError("Failed to extract repository URL from provenance.") | ||
|
||
# Extract the commit hash. | ||
commit = "" | ||
deps = json_extract(build_def, ["resolvedDependencies"], list) | ||
for dep in deps: | ||
if not isinstance(dep, dict): | ||
continue | ||
uri = json_extract(dep, ["uri"], str) | ||
url = _clean_spdx(uri) | ||
if url != repo: | ||
continue | ||
digest_set = json_extract(dep, ["digest"], dict) | ||
commit = _extract_commit_from_digest_set(digest_set, SLSA_V1_DIGEST_SET_GIT_ALGORITHMS) | ||
|
||
if not commit: | ||
raise ProvenanceError("Failed to extract commit hash from provenance.") | ||
|
||
return repo, commit | ||
|
||
|
||
def _extract_from_witness_provenance(payload: InTotoV01Payload) -> tuple[str, str]: | ||
"""Extract the repository and commit metadata from the witness provenance file found at the passed path. | ||
|
||
To successfully return the commit and repository URL, the payload must respectively contain a Git attestation, and | ||
either a GitHub or GitLab attestation. | ||
|
||
Parameters | ||
---------- | ||
payload: InTotoPayload | ||
The payload to extract from. | ||
|
||
Returns | ||
------- | ||
tuple[str, str] | ||
The repository URL and commit hash if found, a pair of empty strings otherwise. | ||
""" | ||
predicate: dict[str, JsonType] | None = payload.statement.get("predicate") | ||
if not predicate: | ||
raise ProvenanceError("No predicate in payload statement.") | ||
|
||
attestations = json_extract(predicate, ["attestations"], list) | ||
commit = "" | ||
repo = "" | ||
for entry in attestations: | ||
if not isinstance(entry, dict): | ||
continue | ||
entry_type = entry.get("type") | ||
if not entry_type: | ||
continue | ||
if entry_type.startswith("https://witness.dev/attestations/git/"): | ||
commit = json_extract(entry, ["attestation", "commithash"], str) | ||
elif entry_type.startswith("https://witness.dev/attestations/gitlab/") or entry_type.startswith( | ||
"https://witness.dev/attestations/github/" | ||
): | ||
repo = json_extract(entry, ["attestation", "projecturl"], str) | ||
|
||
if not commit or not repo: | ||
raise ProvenanceError("Could not extract repo and commit from provenance.") | ||
|
||
return repo, commit | ||
|
||
|
||
def _extract_commit_from_digest_set(digest_set: dict[str, JsonType], valid_algorithms: list[str]) -> str: | ||
"""Extract the commit from the passed DigestSet. | ||
|
||
The DigestSet is an in-toto object that maps algorithm types to commit hashes (digests). | ||
""" | ||
if len(digest_set.keys()) > 1: | ||
logger.debug("DigestSet contains multiple algorithms: %s", digest_set.keys()) | ||
|
||
for key in digest_set: | ||
if key in valid_algorithms: | ||
value = digest_set.get(key) | ||
if isinstance(value, str): | ||
return value | ||
raise ProvenanceError(f"No valid digest in digest set: {digest_set.keys()} not in {valid_algorithms}") | ||
|
||
|
||
def _clean_spdx(uri: str) -> str: | ||
"""Clean the passed SPDX URI and return the normalised URL it represents. | ||
|
||
A SPDX URI has the form: git+https://example.com@refs/heads/main | ||
""" | ||
url, _, _ = uri.lstrip("git+").rpartition("@") | ||
return url |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.