3434MAX_CPE_MATCHES_PER_PAGE = 500
3535
3636
37- def _result_iterator (data : JSON ) -> Iterator [CPEMatchString ]:
38- results : list [dict [str , Any ]] = data .get ("match_strings" , []) # type: ignore
39- return (
40- CPEMatchString .from_dict (result ["match_string" ]) for result in results
41- )
42-
43-
4437class CPEMatchApi (NVDApi ):
4538 """
4639 API for querying the NIST NVD CPE match information.
@@ -62,6 +55,7 @@ def __init__(
6255 token : Optional [str ] = None ,
6356 timeout : Optional [Timeout ] = DEFAULT_TIMEOUT_CONFIG ,
6457 rate_limit : bool = True ,
58+ cache_cpe_matches : bool = True ,
6559 ) -> None :
6660 """
6761 Create a new instance of the CPE API.
@@ -76,13 +70,22 @@ def __init__(
7670 rolling 30 second window.
7771 See https://nvd.nist.gov/developers/start-here#divRateLimits
7872 Default: True.
73+ cache_cpe_matches: If set to True (the default) the entries in the
74+ lists of matching CPEs for each match string are cached and reused
75+ to use less memory.
76+ If set to False, a separate CPEMatch object is kept for each entry
77+ to avoid possible side effects when modifying the data.
7978 """
8079 super ().__init__ (
8180 DEFAULT_NIST_NVD_CPE_MATCH_URL ,
8281 token = token ,
8382 timeout = timeout ,
8483 rate_limit = rate_limit ,
8584 )
85+ if cache_cpe_matches :
86+ self ._cpe_match_cache = {}
87+ else :
88+ self ._cpe_match_cache = None
8689
8790 def cpe_matches (
8891 self ,
@@ -157,12 +160,30 @@ def cpe_matches(
157160 return NVDResults (
158161 self ,
159162 params ,
160- _result_iterator ,
163+ self . _result_iterator ,
161164 request_results = request_results ,
162165 results_per_page = results_per_page ,
163166 start_index = start_index ,
164167 )
165168
169+ def _result_iterator (self , data : JSON ) -> Iterator [CPEMatchString ]:
170+ """
171+ Creates an iterator of all the CPEMatchStrings in given API response JSON
172+
173+ Args:
174+ data: The JSON response data to get the match strings from
175+
176+ Returns:
177+ An iterator over the CPEMatchStrings
178+ """
179+ results : list [dict [str , Any ]] = data .get ("match_strings" , []) # type: ignore
180+ return (
181+ CPEMatchString .from_dict_with_cache (
182+ result ["match_string" ], self ._cpe_match_cache
183+ )
184+ for result in results
185+ )
186+
166187 async def cpe_match (self , match_criteria_id : str ) -> CPEMatchString :
167188 """
168189 Returns a single CPE match for the given match criteria id.
@@ -201,7 +222,9 @@ async def cpe_match(self, match_criteria_id: str) -> CPEMatchString:
201222 )
202223
203224 match_string = match_strings [0 ]
204- return CPEMatchString .from_dict (match_string ["match_string" ])
225+ return CPEMatchString .from_dict_with_cache (
226+ match_string ["match_string" ], self ._cpe_match_cache
227+ )
205228
206229 async def __aenter__ (self ) -> "CPEMatchApi" :
207230 await super ().__aenter__ ()
0 commit comments