2323import bisect
2424import dataclasses
2525import json
26+ import logging
2627import re
28+ from functools import total_ordering
2729from typing import List
2830from typing import Optional
2931from typing import Tuple
3436import toml
3537import urllib3
3638from packageurl import PackageURL
39+ from univers .version_range import RANGE_CLASS_BY_SCHEMES
3740
38- # TODO add logging here
41+ LOGGER = logging . getLogger ( __name__ )
3942
4043cve_regex = re .compile (r"CVE-\d{4}-\d{4,7}" , re .IGNORECASE )
4144is_cve = cve_regex .match
@@ -133,32 +136,36 @@ def requests_with_5xx_retry(max_retries=5, backoff_factor=0.5):
133136 return session
134137
135138
136- def nearest_patched_package (
137- vulnerable_packages : List [PackageURL ], resolved_packages : List [PackageURL ]
138- ) -> List [AffectedPackage ]:
139- class PackageURLWithVersionComparator :
140- """
141- This class is used to get around bisect module's lack of supplying custom
142- compartor. Get rid of this once we use python 3.10 which supports this.
143- See https://github.com/python/cpython/pull/20556
144- """
139+ @total_ordering
140+ class VersionedPackage :
141+ """
142+ A PackageURL with a Version class.
143+ This class is used to get around bisect module's lack of supplying custom
144+ comparator. Get rid of this once we use python 3.10 which supports this.
145+ See https://github.com/python/cpython/pull/20556
146+ """
145147
146- def __init__ (self , package ):
147- self .package = package
148- self .version_object = version_class_by_package_type [package .type ](package .version )
148+ def __init__ (self , purl : PackageURL ):
149+ self .purl = purl
150+ vrc = RANGE_CLASS_BY_SCHEMES .get (purl .type )
151+ self .version = vrc .version_class (purl .version )
149152
150- def __eq__ (self , other ):
151- return self .version_object == other .version_object
153+ def __eq__ (self , other ):
154+ return self .version == other .version
152155
153- def __lt__ (self , other ):
154- return self .version_object < other .version_object
156+ def __lt__ (self , other ):
157+ return self .version < other .version
155158
156- vulnerable_packages = sorted (
157- [PackageURLWithVersionComparator (package ) for package in vulnerable_packages ]
158- )
159- resolved_packages = sorted (
160- [PackageURLWithVersionComparator (package ) for package in resolved_packages ]
161- )
159+
160+ def nearest_patched_package (
161+ vulnerable_packages : List [PackageURL ], resolved_packages : List [PackageURL ]
162+ ) -> List [AffectedPackage ]:
163+ """
164+ Return a list of Affected Packages for each Patched package.
165+ """
166+
167+ vulnerable_packages = sorted ([VersionedPackage (package ) for package in vulnerable_packages ])
168+ resolved_packages = sorted ([VersionedPackage (package ) for package in resolved_packages ])
162169
163170 resolved_package_count = len (resolved_packages )
164171 affected_package_with_patched_package_objects = []
@@ -167,11 +174,11 @@ def __lt__(self, other):
167174 patched_package_index = bisect .bisect_right (resolved_packages , vulnerable_package )
168175 patched_package = None
169176 if patched_package_index < resolved_package_count :
170- patched_package = resolved_packages [patched_package_index ]. package
177+ patched_package = resolved_packages [patched_package_index ]
171178
172179 affected_package_with_patched_package_objects .append (
173180 AffectedPackage (
174- vulnerable_package = vulnerable_package .package , patched_package = patched_package
181+ vulnerable_package = vulnerable_package .purl , patched_package = patched_package . purl
175182 )
176183 )
177184
@@ -211,3 +218,26 @@ def __init__(self, fget):
211218
212219 def __get__ (self , owner_self , owner_cls ):
213220 return self .fget (owner_cls )
221+
222+
223+ def get_item (object : dict , * attributes ):
224+ """
225+ Return `item` by going through all the `attributes` present in the `json_object`
226+
227+ Do a DFS for the `item` in the `json_object` by traversing the `attributes`
228+ and return None if can not traverse through the `attributes`
229+ For example:
230+ >>> get_item({'a': {'b': {'c': 'd'}}}, 'a', 'b', 'c')
231+ 'd'
232+ >>> assert(get_item({'a': {'b': {'c': 'd'}}}, 'a', 'b', 'e')) == None
233+ """
234+ if not object :
235+ LOGGER .error (f"Object is empty: { object } " )
236+ return
237+ item = object
238+ for attribute in attributes :
239+ if attribute not in item :
240+ LOGGER .error (f"Missing attribute { attribute } in { item } " )
241+ return None
242+ item = item [attribute ]
243+ return item
0 commit comments