Skip to content

Commit 336b2aa

Browse files
n-thumanngreenbonebot
authored andcommitted
Add: Unit tests for SourceApi
1 parent 5834b86 commit 336b2aa

File tree

3 files changed

+215
-0
lines changed

3 files changed

+215
-0
lines changed

tests/nvd/__init__.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,3 +127,30 @@ def get_cve_change_data(
127127
if data:
128128
cve_changes.update(data)
129129
return cve_changes
130+
131+
132+
def get_source_data(
133+
data: Optional[Dict[str, Any]] = None,
134+
) -> Dict[str, Any]:
135+
sources = {
136+
"name": "MITRE",
137+
"contact_email": "cve@mitre.org",
138+
"source_identifiers": [
139+
"cve@mitre.org",
140+
"8254265b-2729-46b6-b9e3-3dfca2d5bfca",
141+
],
142+
"last_modified": "2019-09-09T16:18:45.930",
143+
"created": "2019-09-09T16:18:45.930",
144+
"v3_acceptance_level": {
145+
"description": "Contributor",
146+
"last_modified": "2025-01-30T00:00:20.107",
147+
},
148+
"cwe_acceptance_level": {
149+
"description": "Reference",
150+
"last_modified": "2025-01-24T00:00:00.043",
151+
},
152+
}
153+
154+
if data:
155+
sources.update(data)
156+
return sources

tests/nvd/source/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# SPDX-FileCopyrightText: 2023 Greenbone AG
2+
#
3+
# SPDX-License-Identifier: GPL-3.0-or-later

tests/nvd/source/test_api.py

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
# SPDX-FileCopyrightText: 2023 Greenbone AG
2+
#
3+
# SPDX-License-Identifier: GPL-3.0-or-later
4+
5+
6+
from datetime import datetime, timezone
7+
from typing import Any, Optional
8+
from unittest.mock import MagicMock, patch
9+
10+
from httpx import AsyncClient, Response
11+
12+
from pontos.nvd.models.source import AcceptanceLevel
13+
from pontos.nvd.source.api import MAX_SOURCES_PER_PAGE, SourceApi
14+
from tests import AsyncMock, IsolatedAsyncioTestCase, aiter, anext
15+
from tests.nvd import get_source_data
16+
17+
18+
def create_source_response(
19+
name: str, update: Optional[dict[str, Any]] = None
20+
) -> MagicMock:
21+
data = {
22+
"sources": [get_source_data({"name": name})],
23+
"results_per_page": 1,
24+
}
25+
if update:
26+
data.update(update)
27+
28+
response = MagicMock(spec=Response)
29+
response.json.return_value = data
30+
return response
31+
32+
33+
def create_source_responses(count: int = 2) -> list[MagicMock]:
34+
return [
35+
create_source_response(f"MITRE-{i}", {"total_results": count})
36+
for i in range(1, count + 1)
37+
]
38+
39+
40+
class SourceAPITestCase(IsolatedAsyncioTestCase):
41+
@patch("pontos.nvd.api.AsyncClient", spec=AsyncClient)
42+
def setUp(self, async_client: MagicMock) -> None:
43+
self.http_client = AsyncMock()
44+
async_client.return_value = self.http_client
45+
self.api = SourceApi(token="token")
46+
47+
async def test_sources(self):
48+
self.http_client.get.side_effect = create_source_responses()
49+
50+
it = aiter(self.api.sources())
51+
source = await anext(it)
52+
53+
self.assertEqual(source.name, "MITRE-1")
54+
self.assertEqual(
55+
source.contact_email,
56+
"cve@mitre.org",
57+
)
58+
self.assertEqual(
59+
source.source_identifiers,
60+
[
61+
"cve@mitre.org",
62+
"8254265b-2729-46b6-b9e3-3dfca2d5bfca",
63+
],
64+
)
65+
self.assertEqual(
66+
source.last_modified,
67+
datetime(2019, 9, 9, 16, 18, 45, 930000, tzinfo=timezone.utc),
68+
)
69+
self.assertEqual(
70+
source.created,
71+
datetime(2019, 9, 9, 16, 18, 45, 930000, tzinfo=timezone.utc),
72+
)
73+
self.assertIsNone(source.v2_acceptance_level)
74+
self.assertEqual(
75+
source.v3_acceptance_level,
76+
AcceptanceLevel(
77+
"Contributor",
78+
datetime(2025, 1, 30, 0, 0, 20, 107000, tzinfo=timezone.utc),
79+
),
80+
)
81+
self.assertEqual(
82+
source.cwe_acceptance_level,
83+
AcceptanceLevel(
84+
"Reference",
85+
datetime(2025, 1, 24, 0, 0, 0, 43000, tzinfo=timezone.utc),
86+
),
87+
)
88+
89+
self.http_client.get.assert_awaited_once_with(
90+
"https://services.nvd.nist.gov/rest/json/source/2.0",
91+
headers={"apiKey": "token"},
92+
params={
93+
"startIndex": 0,
94+
"resultsPerPage": MAX_SOURCES_PER_PAGE,
95+
},
96+
)
97+
98+
self.http_client.get.reset_mock()
99+
100+
source = await anext(it)
101+
self.assertEqual(source.name, "MITRE-2")
102+
self.http_client.get.assert_awaited_once_with(
103+
"https://services.nvd.nist.gov/rest/json/source/2.0",
104+
headers={"apiKey": "token"},
105+
params={"startIndex": 1, "resultsPerPage": 1},
106+
)
107+
108+
with self.assertRaises(StopAsyncIteration):
109+
await anext(it)
110+
111+
async def test_cve_changes_change_dates(self):
112+
self.http_client.get.side_effect = create_source_responses()
113+
114+
it = aiter(
115+
self.api.sources(
116+
last_modified_start_date=datetime(2025, 1, 1),
117+
last_modified_end_date=datetime(2025, 1, 31),
118+
)
119+
)
120+
source = await anext(it)
121+
122+
self.assertEqual(source.name, "MITRE-1")
123+
self.http_client.get.assert_awaited_once_with(
124+
"https://services.nvd.nist.gov/rest/json/source/2.0",
125+
headers={"apiKey": "token"},
126+
params={
127+
"startIndex": 0,
128+
"lastModStartDate": "2025-01-01T00:00:00",
129+
"lastModEndDate": "2025-01-31T00:00:00",
130+
"resultsPerPage": MAX_SOURCES_PER_PAGE,
131+
},
132+
)
133+
134+
self.http_client.get.reset_mock()
135+
136+
source = await anext(it)
137+
138+
self.assertEqual(source.name, "MITRE-2")
139+
self.http_client.get.assert_awaited_once_with(
140+
"https://services.nvd.nist.gov/rest/json/source/2.0",
141+
headers={"apiKey": "token"},
142+
params={
143+
"startIndex": 1,
144+
"lastModStartDate": "2025-01-01T00:00:00",
145+
"lastModEndDate": "2025-01-31T00:00:00",
146+
"resultsPerPage": 1,
147+
},
148+
)
149+
150+
with self.assertRaises(StopAsyncIteration):
151+
source = await anext(it)
152+
153+
async def test_sources_source_identifier(self):
154+
self.http_client.get.side_effect = create_source_responses()
155+
156+
it = aiter(self.api.sources(source_identifier="cve@mitre.org"))
157+
source = await anext(it)
158+
self.assertEqual(source.name, "MITRE-1")
159+
160+
self.http_client.get.assert_awaited_once_with(
161+
"https://services.nvd.nist.gov/rest/json/source/2.0",
162+
headers={"apiKey": "token"},
163+
params={
164+
"startIndex": 0,
165+
"resultsPerPage": MAX_SOURCES_PER_PAGE,
166+
"sourceIdentifier": "cve@mitre.org",
167+
},
168+
)
169+
170+
self.http_client.get.reset_mock()
171+
172+
source = await anext(it)
173+
self.assertEqual(source.name, "MITRE-2")
174+
self.http_client.get.assert_awaited_once_with(
175+
"https://services.nvd.nist.gov/rest/json/source/2.0",
176+
headers={"apiKey": "token"},
177+
params={
178+
"startIndex": 1,
179+
"resultsPerPage": 1,
180+
"sourceIdentifier": "cve@mitre.org",
181+
},
182+
)
183+
184+
with self.assertRaises(StopAsyncIteration):
185+
await anext(it)

0 commit comments

Comments
 (0)