1- #
1+ # osv.py (full updated code with helper function)
2+
23# Copyright (c) nexB Inc. and others. All rights reserved.
34# VulnerableCode is a trademark of nexB Inc.
45# SPDX-License-Identifier: Apache-2.0
910
1011import json
1112import logging
12- from typing import Iterable
13- from typing import List
14- from typing import Optional
13+ from typing import Iterable , List , Optional
1514
1615import dateparser
17- from cvss .exceptions import CVSS3MalformedError
18- from cvss .exceptions import CVSS4MalformedError
16+ from cvss .exceptions import CVSS3MalformedError , CVSS4MalformedError
1917from packageurl import PackageURL
2018from univers .version_range import RANGE_CLASS_BY_SCHEMES
21- from univers .versions import InvalidVersion
22- from univers . versions import SemverVersion
23- from univers . versions import Version
24-
25- from vulnerabilities . importer import AdvisoryData
26- from vulnerabilities . importer import AffectedPackage
27- from vulnerabilities . importer import AffectedPackageV2
28- from vulnerabilities . importer import Reference
29- from vulnerabilities . importer import ReferenceV2
30- from vulnerabilities . importer import VulnerabilitySeverity
19+ from univers .versions import InvalidVersion , SemverVersion , Version
20+
21+ from vulnerabilities . importer import (
22+ AdvisoryData ,
23+ AffectedPackage ,
24+ AffectedPackageV2 ,
25+ Reference ,
26+ ReferenceV2 ,
27+ VulnerabilitySeverity ,
28+ )
3129from vulnerabilities .severity_systems import SCORING_SYSTEMS
32- from vulnerabilities .utils import build_description
33- from vulnerabilities . utils import dedupe
34- from vulnerabilities .utils import get_cwe_id
30+ from vulnerabilities .utils import build_description , dedupe , get_cwe_id
31+
32+ from vulnerabilities .importers . utils import filter_purls
3533
3634logger = logging .getLogger (__name__ )
3735
5149def parse_advisory_data (
5250 raw_data : dict , supported_ecosystems , advisory_url : str
5351) -> Optional [AdvisoryData ]:
54- """
55- Return an AdvisoryData build from a ``raw_data`` mapping of OSV advisory and
56- a ``supported_ecosystem`` string.
57- """
5852 raw_id = raw_data .get ("id" ) or ""
5953 summary = raw_data .get ("summary" ) or ""
6054 details = raw_data .get ("details" ) or ""
@@ -73,29 +67,34 @@ def parse_advisory_data(
7367 for affected_pkg in raw_data .get ("affected" ) or []:
7468 purl = get_affected_purl (affected_pkg = affected_pkg , raw_id = raw_id )
7569
76- if not purl or purl .type not in supported_ecosystems :
77- logger .error (f"Unsupported package type: { affected_pkg !r} in OSV: { raw_id !r} " )
78- continue
79-
80- affected_version_range = get_affected_version_range (
81- affected_pkg = affected_pkg ,
82- raw_id = raw_id ,
83- supported_ecosystem = purl .type ,
70+ # Replace duplicate filtering logic with helper
71+ filtered_purls = filter_purls (
72+ [purl ],
73+ allowed_types = supported_ecosystems
8474 )
8575
86- for fixed_range in affected_pkg .get ("ranges" ) or []:
87- fixed_version = get_fixed_versions (
88- fixed_range = fixed_range , raw_id = raw_id , supported_ecosystem = purl .type
76+ for p in filtered_purls :
77+ affected_version_range = get_affected_version_range (
78+ affected_pkg = affected_pkg ,
79+ raw_id = raw_id ,
80+ supported_ecosystem = p .type ,
8981 )
9082
91- for version in fixed_version :
92- affected_packages .append (
93- AffectedPackage (
94- package = purl ,
95- affected_version_range = affected_version_range ,
96- fixed_version = version ,
97- )
83+ for fixed_range in affected_pkg .get ("ranges" ) or []:
84+ fixed_version = get_fixed_versions (
85+ fixed_range = fixed_range ,
86+ raw_id = raw_id ,
87+ supported_ecosystem = p .type
9888 )
89+
90+ for version in fixed_version :
91+ affected_packages .append (
92+ AffectedPackage (
93+ package = p ,
94+ affected_version_range = affected_version_range ,
95+ fixed_version = version ,
96+ )
97+ )
9998 database_specific = raw_data .get ("database_specific" ) or {}
10099 cwe_ids = database_specific .get ("cwe_ids" ) or []
101100 weaknesses = list (map (get_cwe_id , cwe_ids ))
@@ -114,10 +113,6 @@ def parse_advisory_data(
114113def parse_advisory_data_v2 (
115114 raw_data : dict , supported_ecosystems , advisory_url : str , advisory_text : str
116115) -> Optional [AdvisoryData ]:
117- """
118- Return an AdvisoryData build from a ``raw_data`` mapping of OSV advisory and
119- a ``supported_ecosystem`` string.
120- """
121116 advisory_id = raw_data .get ("id" ) or ""
122117 if not advisory_id :
123118 logger .error (f"Missing advisory id in OSV data: { raw_data } " )
@@ -136,37 +131,39 @@ def parse_advisory_data_v2(
136131 for affected_pkg in raw_data .get ("affected" ) or []:
137132 purl = get_affected_purl (affected_pkg = affected_pkg , raw_id = advisory_id )
138133
139- if not purl or purl .type not in supported_ecosystems :
140- logger .error (f"Unsupported package type: { affected_pkg !r} in OSV: { advisory_id !r} " )
141- continue
142-
143- affected_version_range = get_affected_version_range (
144- affected_pkg = affected_pkg ,
145- raw_id = advisory_id ,
146- supported_ecosystem = purl .type ,
134+ filtered_purls = filter_purls (
135+ [purl ],
136+ allowed_types = supported_ecosystems
147137 )
148138
149- fixed_versions = []
150- fixed_version_range = None
151- for fixed_range in affected_pkg . get ( "ranges" ) or []:
152- fixed_version = get_fixed_versions (
153- fixed_range = fixed_range , raw_id = advisory_id , supported_ecosystem = purl .type
139+ for p in filtered_purls :
140+ affected_version_range = get_affected_version_range (
141+ affected_pkg = affected_pkg ,
142+ raw_id = advisory_id ,
143+ supported_ecosystem = p .type ,
154144 )
155- fixed_versions .extend ([v .string for v in fixed_version ])
156-
157- fixed_version_range = (
158- get_fixed_version_range (fixed_versions , purl .type ) if fixed_versions else None
159- )
160145
161- if fixed_version_range or affected_version_range :
162- affected_packages .append (
163- AffectedPackageV2 (
164- package = purl ,
165- affected_version_range = affected_version_range ,
166- fixed_version_range = fixed_version_range ,
146+ fixed_versions = []
147+ fixed_version_range = None
148+ for fixed_range in affected_pkg .get ("ranges" ) or []:
149+ fixed_version = get_fixed_versions (
150+ fixed_range = fixed_range , raw_id = advisory_id , supported_ecosystem = p .type
167151 )
152+ fixed_versions .extend ([v .string for v in fixed_version ])
153+
154+ fixed_version_range = (
155+ get_fixed_version_range (fixed_versions , p .type ) if fixed_versions else None
168156 )
169157
158+ if fixed_version_range or affected_version_range :
159+ affected_packages .append (
160+ AffectedPackageV2 (
161+ package = p ,
162+ affected_version_range = affected_version_range ,
163+ fixed_version_range = fixed_version_range ,
164+ )
165+ )
166+
170167 database_specific = raw_data .get ("database_specific" ) or {}
171168 cwe_ids = database_specific .get ("cwe_ids" ) or []
172169 weaknesses = list (map (get_cwe_id , cwe_ids ))
@@ -189,19 +186,6 @@ def parse_advisory_data_v2(
189186
190187
191188def extract_fixed_versions (fixed_range ) -> Iterable [str ]:
192- """
193- Return a list of fixed version strings given a ``fixed_range`` mapping of
194- OSV data.
195-
196- >>> list(extract_fixed_versions(
197- ... {"type": "SEMVER", "events": [{"introduced": "0"},{"fixed": "1.6.0"}]}))
198- ['1.6.0']
199-
200- >>> list(extract_fixed_versions(
201- ... {"type": "ECOSYSTEM","events":[{"introduced": "0"},
202- ... {"fixed": "1.0.0"},{"fixed": "9.0.0"}]}))
203- ['1.0.0', '9.0.0']
204- """
205189 for event in fixed_range .get ("events" ) or []:
206190 fixed = event .get ("fixed" )
207191 if fixed :
@@ -214,9 +198,6 @@ def get_published_date(raw_data):
214198
215199
216200def get_severities (raw_data ) -> Iterable [VulnerabilitySeverity ]:
217- """
218- Yield VulnerabilitySeverity extracted from a mapping of OSV ``raw_data``
219- """
220201 try :
221202 for severity in raw_data .get ("severity" ) or []:
222203 vector = severity .get ("score" )
@@ -257,10 +238,6 @@ def get_severities(raw_data) -> Iterable[VulnerabilitySeverity]:
257238
258239
259240def get_references (raw_data , severities ) -> List [Reference ]:
260- """
261- Return a list Reference extracted from a mapping of OSV ``raw_data`` given a
262- ``severities`` list of VulnerabilitySeverity.
263- """
264241 references = []
265242 for ref in raw_data .get ("references" ) or []:
266243 if not ref :
@@ -274,10 +251,6 @@ def get_references(raw_data, severities) -> List[Reference]:
274251
275252
276253def get_references_v2 (raw_data ) -> List [Reference ]:
277- """
278- Return a list Reference extracted from a mapping of OSV ``raw_data`` given a
279- ``severities`` list of VulnerabilitySeverity.
280- """
281254 references = []
282255 for ref in raw_data .get ("references" ) or []:
283256 if not ref :
@@ -291,10 +264,6 @@ def get_references_v2(raw_data) -> List[Reference]:
291264
292265
293266def get_affected_purl (affected_pkg , raw_id ):
294- """
295- Return an affected PackageURL or None given a mapping of ``affected_pkg``
296- data and a ``raw_id``.
297- """
298267 package = affected_pkg .get ("package" ) or {}
299268 purl = package .get ("purl" )
300269 if purl :
@@ -316,7 +285,6 @@ def get_affected_purl(affected_pkg, raw_id):
316285 namespace = ""
317286 if purl_type == "maven" :
318287 namespace , _ , name = name .partition (":" )
319-
320288 purl = PackageURL (type = purl_type , namespace = namespace , name = name )
321289 else :
322290 logger .error (
@@ -334,10 +302,6 @@ def get_affected_purl(affected_pkg, raw_id):
334302
335303
336304def get_affected_version_range (affected_pkg , raw_id , supported_ecosystem ):
337- """
338- Return a univers VersionRange for the ``affected_pkg`` package data mapping
339- or None. Use a ``raw_id`` OSV id and ``supported_ecosystem``.
340- """
341305 affected_versions = affected_pkg .get ("versions" )
342306 if affected_versions :
343307 try :
@@ -357,19 +321,6 @@ def get_fixed_version_range(versions, ecosystem):
357321
358322
359323def get_fixed_versions (fixed_range , raw_id , supported_ecosystem ) -> List [Version ]:
360- """
361- Return a list of unique fixed univers Versions given a ``fixed_range``
362- univers VersionRange and a ``raw_id``.
363- For example::
364- >>> get_fixed_versions(fixed_range={}, raw_id="GHSA-j3f7-7rmc-6wqj", supported_ecosystem="pypi",)
365- []
366- >>> get_fixed_versions(
367- ... fixed_range={"type": "ECOSYSTEM", "events": [{"fixed": "1.7.0"}], },
368- ... raw_id="GHSA-j3f7-7rmc-6wqj",
369- ... supported_ecosystem="pypi",
370- ... )
371- [PypiVersion(string='1.7.0')]
372- """
373324 fixed_versions = []
374325 if "type" not in fixed_range :
375326 logger .error (f"Invalid fixed_range type for: { fixed_range } for OSV id: { raw_id !r} " )
@@ -401,8 +352,4 @@ def get_fixed_versions(fixed_range, raw_id, supported_ecosystem) -> List[Version
401352 else :
402353 logger .error (f"Unsupported fixed version type: { version !r} for OSV id: { raw_id !r} " )
403354
404- # if fixed_range_type == "GIT":
405- # TODO add GitHubVersion univers fix_version
406- # logger.error(f"NotImplementedError GIT Version - {raw_id !r} - {i !r}")
407-
408355 return dedupe (fixed_versions )
0 commit comments