4
4
to different formats (CSV and markdown).
5
5
"""
6
6
7
+ import enum
7
8
import logging
8
9
import urllib .parse
9
10
from dataclasses import dataclass
@@ -22,6 +23,10 @@ class CvssSourceProvider:
22
23
AMAZON_INSPECTOR = "AMAZON_INSPECTOR"
23
24
DEFAULT_PROVIDER = NVD
24
25
26
+
27
+ empty_rating = {"score" : 0.0 , "source" : {"name" : "in triage" }, "severity" : "other" , "method" : "other" }
28
+
29
+
25
30
def get_rating_providers ():
26
31
"""
27
32
get_rating_providers returns a list of vulnerability
@@ -42,8 +47,8 @@ def get_rating_providers():
42
47
]
43
48
return providers
44
49
50
+
45
51
class CvssSeverity :
46
- UNTRIAGED = "untriaged"
47
52
UNKNOWN = "unknown"
48
53
49
54
@@ -74,7 +79,7 @@ class Vulnerability:
74
79
75
80
@dataclass
76
81
class CvssRating :
77
- severity : str = CvssSeverity .UNTRIAGED
82
+ severity : str = CvssSeverity .UNKNOWN
78
83
provider : str = CvssSourceProvider .DEFAULT_PROVIDER
79
84
cvss_score : str = NULL_STR
80
85
@@ -109,7 +114,12 @@ def parse_inspector_scan_result(inspector_scan_json) -> List[Vulnerability]:
109
114
pkg_vulns = get_pkg_vulns (vulns )
110
115
111
116
for v in pkg_vulns :
112
- vuln_obj = convert_package_vuln_to_vuln_obj (v , components )
117
+ vuln_obj = None
118
+ try :
119
+ vuln_obj = convert_package_vuln_to_vuln_obj (v , components )
120
+ except Exception as e :
121
+ logging .error (f"error encountered while parsing a vulnerability: { e } " )
122
+ continue
113
123
vuln_list .append (vuln_obj )
114
124
115
125
return vuln_list
@@ -143,7 +153,7 @@ def convert_package_vuln_to_vuln_obj(v, components) -> Vulnerability:
143
153
vuln_obj .published = v .get ("created" , NULL_STR )
144
154
vuln_obj .modified = v .get ("updated" , NULL_STR )
145
155
146
- ratings = v .get ("ratings" )
156
+ ratings = v .get ("ratings" , [ empty_rating ] )
147
157
add_ratings (ratings , vuln_obj )
148
158
149
159
description = v .get ("description" )
@@ -168,10 +178,7 @@ def convert_package_vuln_to_vuln_obj(v, components) -> Vulnerability:
168
178
169
179
170
180
def add_ratings (ratings , vulnerability ):
171
- if ratings is None :
172
- return
173
-
174
- rating = get_cvss_rating (ratings , vulnerability )
181
+ rating = get_highest_severity_rating (ratings )
175
182
vulnerability .severity = rating .severity
176
183
vulnerability .severity_provider = rating .provider
177
184
vulnerability .cvss_score = rating .cvss_score
@@ -276,22 +283,6 @@ def get_cwes(v) -> str:
276
283
return cwe_str
277
284
278
285
279
- def get_cvss_rating (ratings , vulnerability ) -> CvssRating :
280
- rating_provider_priority = get_rating_providers ()
281
- for provider in rating_provider_priority :
282
- for rating in ratings :
283
- if rating ["source" ]["name" ] != provider :
284
- continue
285
-
286
- severity = CvssSeverity .UNTRIAGED if rating ["severity" ] == CvssSeverity .UNKNOWN else rating ["severity" ]
287
- cvss_score = str (rating ["score" ]) if rating ["method" ] == "CVSSv31" else "null"
288
- if severity and cvss_score :
289
- return CvssRating (severity = severity , provider = provider , cvss_score = cvss_score )
290
-
291
- logging .info (f"No CVSS rating is provided for { vulnerability .vuln_id } " )
292
- return CvssRating ()
293
-
294
-
295
286
def get_epss_score (ratings ):
296
287
for rating in ratings :
297
288
source = rating .get ("source" )
@@ -320,3 +311,126 @@ def combine_str_list_into_one_str(str_list: list[str]) -> str:
320
311
if str_element == "" :
321
312
str_element = NULL_STR
322
313
return str_element
314
+
315
+
316
+ def get_highest_severity_rating (ratings ) -> CvssRating :
317
+ method = get_preferred_vuln_rating_method (ratings )
318
+ most_severe_rating = get_highest_rating_by_method (method , ratings )
319
+ cvss = CvssRating ()
320
+ cvss .provider = most_severe_rating ["source" ]["name" ]
321
+ cvss .severity = most_severe_rating ["severity" ]
322
+ if "unknown" in cvss .severity :
323
+ cvss .severity = "other"
324
+ cvss .cvss_score = str (most_severe_rating .get ("score" , 0.0 ))
325
+ return cvss
326
+
327
+
328
+ class VulnRatingMethod (enum .Enum ):
329
+ CVSSv2 = "CVSSv2"
330
+ CVSSv3 = "CVSSv3"
331
+ CVSSv31 = "CVSSv31"
332
+ CVSSv4 = "CVSSv4"
333
+ OWASP = "OWASP"
334
+ SSVC = "SSVC"
335
+ OTHER = "other"
336
+
337
+
338
+ def get_preferred_vuln_rating_method (ratings ):
339
+ if ratings is None :
340
+ return VulnRatingMethod .OTHER
341
+
342
+ found_methods = []
343
+
344
+ for rating in ratings :
345
+ if not rating :
346
+ continue
347
+
348
+ method = rating .get ("method" , "" )
349
+ if not method :
350
+ continue
351
+
352
+ # we keep a list of each rating method in the
353
+ # vulnerability so we can present the highest rating
354
+ # to the end user
355
+ if method == VulnRatingMethod .CVSSv4 .value :
356
+ found_methods .append (VulnRatingMethod .CVSSv4 )
357
+ elif method == VulnRatingMethod .CVSSv31 .value :
358
+ found_methods .append (VulnRatingMethod .CVSSv31 )
359
+ elif method == VulnRatingMethod .CVSSv3 .value :
360
+ found_methods .append (VulnRatingMethod .CVSSv3 )
361
+ elif method == VulnRatingMethod .CVSSv2 .value :
362
+ found_methods .append (VulnRatingMethod .CVSSv2 )
363
+ elif method == VulnRatingMethod .OWASP .value :
364
+ found_methods .append (VulnRatingMethod .OWASP )
365
+ elif method == VulnRatingMethod .SSVC .value :
366
+ found_methods .append (VulnRatingMethod .SSVC )
367
+ elif method == VulnRatingMethod .OTHER .value :
368
+ found_methods .append (VulnRatingMethod .OTHER )
369
+ else :
370
+ logging .error (f"Expected a spec-conforming CycloneDX vulnerability rating method, but received '{ method } '" )
371
+ continue
372
+
373
+ # select method to display to user in priority order
374
+ rating_method_priority = [VulnRatingMethod .CVSSv4 , VulnRatingMethod .CVSSv31 , VulnRatingMethod .CVSSv3 ,
375
+ VulnRatingMethod .CVSSv2 , VulnRatingMethod .OWASP , VulnRatingMethod .SSVC ]
376
+ for rating_method in rating_method_priority :
377
+ if rating_method in found_methods :
378
+ return rating_method
379
+ return VulnRatingMethod .OTHER
380
+
381
+
382
+ def get_highest_rating_by_method (method : VulnRatingMethod , ratings ):
383
+ if not ratings :
384
+ return empty_rating
385
+
386
+ ratings_with_same_method = []
387
+ for rating in ratings :
388
+ if not rating :
389
+ continue
390
+
391
+ a_method = rating .get ("method" , "" )
392
+ if not method :
393
+ continue
394
+
395
+ if is_epss (a_method , rating ):
396
+ continue
397
+
398
+ if a_method == method .value :
399
+ ratings_with_same_method .append (rating )
400
+
401
+ highest_rating = empty_rating
402
+ for rating in ratings_with_same_method :
403
+
404
+ score = rating .get ("score" , 0.0 )
405
+ try :
406
+ score = float (score )
407
+ except Exception as e :
408
+ logging .error (f"threw exception while trying to convert severity score, '{ score } ' to type float: { e } " )
409
+ continue
410
+
411
+ if score >= highest_rating ["score" ]:
412
+ highest_rating = rating
413
+
414
+
415
+ return highest_rating
416
+
417
+
418
+ def is_epss (method , rating ):
419
+ if not rating :
420
+ return False
421
+
422
+ if method != VulnRatingMethod .OTHER .value :
423
+ return False
424
+
425
+ source = rating .get ("source" , "" )
426
+ if not source :
427
+ return False
428
+
429
+ name = source .get ("name" , "" )
430
+ if not name :
431
+ return False
432
+
433
+ if name .lower () == "EPSS" .lower ():
434
+ return True
435
+
436
+ return False
0 commit comments