Skip to content

Commit 1877043

Browse files
committed
chore: modify vsa generation to populate each subject with a sha256 digest if it exists
Signed-off-by: Nathan Nguyen <nathan.nguyen@oracle.com>
1 parent 9aeca7b commit 1877043

File tree

2 files changed

+151
-124
lines changed

2 files changed

+151
-124
lines changed

src/macaron/vsa/vsa.py

Lines changed: 116 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,19 @@
99
import datetime
1010
import json
1111
import logging
12+
from collections.abc import Iterable
1213
from enum import StrEnum
1314
from importlib import metadata as importlib_metadata
1415
from typing import TypedDict
1516

17+
import sqlalchemy
18+
from packageurl import PackageURL
19+
from sqlalchemy.orm import Session
20+
21+
from macaron.database.database_manager import get_db_manager
22+
from macaron.database.table_definitions import ProvenanceSubject
23+
from macaron.util import JsonType
24+
1625
logger: logging.Logger = logging.getLogger(__name__)
1726

1827
# Note: The lint error "N815:mixedCase variable in class scope" is disabled for
@@ -135,11 +144,42 @@ class VerificationResult(StrEnum):
135144
PASSED = "PASSED"
136145

137146

147+
def get_common_purl_from_artifact_purls(purl_strs: Iterable[str]) -> str | None:
148+
"""Get a single common PackageURL given some artifact PackageURLs.
149+
150+
Assumption: A package may have more than one artifact. If each artifact is identified
151+
by a PackageURL, these PackageURLs still share the type, namespace, name, and
152+
version values. The common PackageURL contains these values.
153+
"""
154+
try:
155+
purls = [PackageURL.from_string(_) for _ in purl_strs]
156+
except ValueError:
157+
return None
158+
159+
purl_type = purls[0].type
160+
namespace = purls[0].namespace
161+
name = purls[0].name
162+
version = purls[0].version
163+
164+
for purl in purls:
165+
if any(
166+
[
167+
purl_type != purl.type,
168+
namespace != purl.namespace,
169+
name != purl.name,
170+
version != purl.version,
171+
]
172+
):
173+
return None
174+
175+
common_purl = PackageURL(type=purl_type, namespace=namespace, name=name, version=version)
176+
return str(common_purl)
177+
178+
138179
def create_vsa_statement(
139-
subject_purl: str,
180+
passed_components: dict[str, int],
140181
policy_content: str,
141-
verification_result: VerificationResult,
142-
) -> VsaStatement:
182+
) -> VsaStatement | None:
143183
"""Construct the Statement layer of the VSA.
144184
145185
Parameters
@@ -157,13 +197,49 @@ def create_vsa_statement(
157197
VsaStatement
158198
A Statement layer of the VSA.
159199
"""
200+
subjects = []
201+
202+
try:
203+
with Session(get_db_manager().engine) as session, session.begin():
204+
for purl, component_id in passed_components.items():
205+
try:
206+
provenance_subject = (
207+
session.execute(
208+
sqlalchemy.select(ProvenanceSubject).where(ProvenanceSubject.component_id == component_id)
209+
)
210+
.scalars()
211+
.one()
212+
)
213+
sha256 = provenance_subject.sha256
214+
except sqlalchemy.orm.exc.NoResultFound:
215+
sha256 = None
216+
logger.debug("No digest stored for software component '%s'.", purl)
217+
except sqlalchemy.orm.exc.MultipleResultsFound as e:
218+
logger.debug(
219+
"Unexpected database query result. "
220+
"Expected no more than one result when retrieving SHA256 of a provenance subject. "
221+
"Error: %s",
222+
e,
223+
)
224+
continue
225+
226+
subject: dict[str, JsonType] = {
227+
"uri": purl,
228+
}
229+
if sha256:
230+
subject["digest"] = {
231+
"sha256": sha256,
232+
}
233+
234+
subjects.append(subject)
235+
236+
except sqlalchemy.exc.SQLAlchemyError as error:
237+
logger.debug("Cannot retrieve hash digest of software components: %s.", error)
238+
return None
239+
160240
return VsaStatement(
161241
_type="https://in-toto.io/Statement/v1",
162-
subject=[
163-
{
164-
"uri": subject_purl,
165-
}
166-
],
242+
subject=subjects,
167243
predicateType="https://slsa.dev/verification_summary/v1",
168244
predicate=VsaPredicate(
169245
verifier=Verifier(
@@ -173,34 +249,33 @@ def create_vsa_statement(
173249
},
174250
),
175251
timeVerified=datetime.datetime.now(tz=datetime.UTC).isoformat(),
176-
resourceUri=subject_purl,
252+
resourceUri=get_common_purl_from_artifact_purls(passed_components.keys()) or "",
177253
policy={
178254
"content": policy_content,
179255
},
180-
verificationResult=verification_result,
256+
verificationResult=VerificationResult.PASSED,
181257
verifiedLevels=[],
182258
),
183259
)
184260

185261

186-
def get_subject_verification_result(policy_result: dict) -> tuple[str, VerificationResult] | None:
187-
"""Get the PURL (string) and verification result of the single software component the policy applies to.
262+
def get_components_passing_policy(policy_result: dict) -> dict[str, int] | None:
263+
"""Get the verification result in the form of PURLs and component ids of software artifacts passing the policy.
188264
189265
This is currently done by reading the facts of two relations:
190266
``component_violates_policy``, and ``component_satisfies_policy``
191267
from the result of the policy engine.
192268
193-
We define two PURLs to be different if the two PURL strings are different.
269+
The result of this function depends on the policy engine result.
270+
271+
If there exist any software component failing the policy, this function returns ``None``.
194272
195-
The result of this function depends on the policy engine result:
273+
When all software components in the result pass the policy, if there exist multiple occurrences
274+
of the same PURL, this function returns the latest occurrence, which is the one with the highest
275+
component id, taking advantage of component ids being auto-incremented.
196276
197-
- If there exist multiple different PURLs, this function returns ``None``.
198-
- If there exist multiple occurrences of the same PURL and it is the only unique
199-
PURL in the policy engine result, this function returns the latest occurrence,
200-
which is the PURL that goes with the highest component ID, taking advantage of
201-
component IDs being auto-incremented.
202-
- If there is no PURL in the result, i.e. the policy applies to no software component
203-
in the database, this function also returns ``None``.
277+
If there is no PURL in the result, i.e. the policy applies to no software component in the database,
278+
this function also returns ``None``.
204279
205280
Parameters
206281
----------
@@ -210,53 +285,39 @@ def get_subject_verification_result(policy_result: dict) -> tuple[str, Verificat
210285
211286
Returns
212287
-------
213-
tuple[str, VerificationResult] | None
214-
A pair of PURL and verification result of the only software component that
215-
the policy applies to, or ``None`` according to the aforementioned conditions.
288+
dict[str, int] | None
289+
A dictionary of software components passing the policy, or ``None`` if there is any
290+
component failing the policy or if there is no software component in the policy engine result.
291+
Each key is a PackageURL of the software component, and each value is the corresponding
292+
component id of that component.
216293
"""
217294
component_violates_policy_facts = policy_result.get("component_violates_policy", [])
218295
component_satisfies_policy_facts = policy_result.get("component_satisfies_policy", [])
219296

297+
if len(component_violates_policy_facts) > 0:
298+
logger.info("Encountered software component failing the policy. No VSA is generated.")
299+
return None
300+
220301
# key: PURL; value: result with the highest component id
221-
component_results: dict[str, tuple[int, VerificationResult]] = {}
302+
passed_components: dict[str, int] = {}
222303

223-
for component_id_string, purl, _ in component_violates_policy_facts:
224-
try:
225-
component_id = int(component_id_string)
226-
except ValueError:
227-
logger.error("Expected component id %s to be an integer.", component_id_string)
228-
return None
229-
if purl not in component_results:
230-
component_results[purl] = (component_id, VerificationResult.FAILED)
231-
else:
232-
current_component_id, _ = component_results[purl]
233-
if component_id > current_component_id:
234-
component_results[purl] = (component_id, VerificationResult.FAILED)
235304
for component_id_string, purl, _ in component_satisfies_policy_facts:
236305
try:
237306
component_id = int(component_id_string)
238307
except ValueError:
239308
logger.error("Expected component id %s to be an integer.", component_id_string)
240309
return None
241-
if purl not in component_results:
242-
component_results[purl] = (component_id, VerificationResult.PASSED)
310+
if purl not in passed_components:
311+
passed_components[purl] = component_id
243312
else:
244-
current_component_id, _ = component_results[purl]
313+
current_component_id = passed_components[purl]
245314
if component_id > current_component_id:
246-
component_results[purl] = (component_id, VerificationResult.PASSED)
247-
248-
if len(component_results) != 1:
249-
if len(component_results) == 0:
250-
logger.info("The policy applies to no software components.")
251-
if len(component_results) > 1:
252-
logger.info("The policy applies to more than one software components.")
253-
logger.info("No VSA will be generated.")
254-
return None
315+
passed_components[purl] = component_id
255316

256-
subject_purl = next(iter(component_results.keys()))
257-
_, verification_result = component_results[subject_purl]
317+
if len(passed_components) == 0:
318+
return None
258319

259-
return subject_purl, verification_result
320+
return passed_components
260321

261322

262323
def generate_vsa(policy_content: str, policy_result: dict) -> Vsa | None:
@@ -275,17 +336,14 @@ def generate_vsa(policy_content: str, policy_result: dict) -> Vsa | None:
275336
The VSA, or ``None`` if generating a VSA is not appropriate according
276337
to the policy engine result.
277338
"""
278-
subject_verification_result = get_subject_verification_result(policy_result)
339+
passed_components = get_components_passing_policy(policy_result)
279340

280-
if subject_verification_result is None:
341+
if passed_components is None:
281342
return None
282343

283-
subject_purl, verification_result = subject_verification_result
284-
285344
unencoded_payload = create_vsa_statement(
286-
subject_purl=subject_purl,
345+
passed_components,
287346
policy_content=policy_content,
288-
verification_result=verification_result,
289347
)
290348

291349
try:

0 commit comments

Comments
 (0)