66# See https://github.com/nexB/federatedcode for support or download.
77# See https://aboutcode.org for more information about AboutCode.org OSS projects.
88#
9+ from unittest .mock import PropertyMock
10+ from unittest .mock import call
11+ from unittest .mock import patch
12+
913import pytest
1014from django .contrib .auth .models import User
1115from fedcode_test_utils import mute_post_save_signal # NOQA
1216
17+ from aboutcode .hashid import get_core_purl
1318from fedcode .models import Note
1419from fedcode .models import Package
1520from fedcode .models import Repository
1621from fedcode .models import Service
1722from fedcode .models import Vulnerability
1823from fedcode .pipelines .sync_vulnerablecode import SyncVulnerableCode
24+ from fedcode .pipelines .sync_vulnerablecode import note_handler
25+ from fedcode .pipelines .sync_vulnerablecode import pkg_handler
26+ from fedcode .pipelines .sync_vulnerablecode import vul_handler
27+
28+ TEST_REPO_1_PATH = "/home/ziad-hany/PycharmProjects/vul-sample"
1929
2030
2131@pytest .fixture
@@ -30,80 +40,246 @@ def service(db):
3040 )
3141
3242
43+ @pytest .fixture
44+ def mock_latest_commit_hexsha (monkeypatch ):
45+ """
46+ Fixture to override only repo.head.commit.hexsha while keeping
47+ the rest of the git repo behavior intact.
48+ """
49+
50+ def _mock (repo_instance , hexsha : str ):
51+ """
52+ repo_instance: a Repository model instance
53+ hexsha: the fake hexsha to return for latest commit
54+ """
55+ real_repo = repo_instance .git_repo_obj # use real GitPython repo
56+
57+ # Patch only the hexsha property
58+ type(real_repo .head .commit ).hexsha = PropertyMock (return_value = hexsha )
59+
60+ return real_repo
61+
62+ return _mock
63+
64+
3365@pytest .fixture
3466def repo (db , service , mute_post_save_signal ):
3567 """Simple Git Repository"""
3668 return Repository .objects .create (
37- url = "https://github.com/nexB/fake-repo " ,
38- path = "./review/tests/test_data/test_git_repo_v1" ,
69+ url = "https://github.com/ziadhany/vul-sample " ,
70+ path = TEST_REPO_1_PATH ,
3971 admin = service ,
4072 )
4173
4274
43- @pytest .mark .skip (reason = "Need a real git repo to test the importer" )
44- @pytest .mark .django_db
45- def test_simple_importer (service , repo , mute_post_save_signal ):
46- # just add all packages and vulnerabilities
47- repo .path = "/home/ziad/vul-sample/repo1"
48- importer = SyncVulnerableCode ()
49- importer .execute ()
75+ @pytest .fixture
76+ def vul_changes ():
77+ return {"create" : [], "update" : [], "delete" : set ()}
5078
51- assert Note .objects .count () > 1
52- assert Vulnerability .objects .count () > 1
53- assert Package .objects .count () > 1
54- assert repo .last_imported_commit
5579
56- note_n = Note .objects .count ()
57- vul_n = Vulnerability .objects .count ()
58- purl_n = Package .objects .count ()
59- last_imported_commit = repo .last_imported_commit
80+ @pytest .fixture
81+ def pkg_changes ():
82+ return {"create" : [], "update" : [], "delete" : set ()}
6083
61- # Run importer again without add any new data
62- importer = SyncVulnerableCode ()
63- importer .execute ()
6484
65- assert note_n == Note .objects .count ()
66- assert vul_n == Vulnerability .objects .count ()
67- assert purl_n == Package .objects .count ()
68- assert last_imported_commit == repo .last_imported_commit
85+ @pytest .fixture
86+ def example_notes ():
87+ return [
88+ {
89+ "purl" : "pkg:alpine/ansible@2.10.1-r0?arch=aarch64&distroversion=edge&reponame=community" ,
90+ "affected_by_vulnerabilities" : [],
91+ "fixing_vulnerabilities" : ["VCID-r7zs-rzfz-aaap" ],
92+ },
93+ {
94+ "purl" : "pkg:alpine/ansible@2.10.1-r0?arch=armhf&distroversion=edge&reponame=community" ,
95+ "affected_by_vulnerabilities" : ["VCID-r7zs-rzfz-aaap" ],
96+ "fixing_vulnerabilities" : [],
97+ },
98+ ]
6999
70- # Edit last_imported_commit
71- repo .last_imported_commit = "c8de84af0a7c11bf151e96142ce711824648ec41"
72- repo .save ()
73- importer = SyncVulnerableCode ()
74- importer .execute ()
100+ def test_added_vulnerability (repo , vul_changes ):
101+ vul_handler ("A" , repo , None , {"vulnerability_id" : "VCID-1234" }, None , vul_changes )
102+ assert len (vul_changes ["create" ]) == 1
103+ vuln = vul_changes ["create" ][0 ]
104+ assert isinstance (vuln , Vulnerability )
105+ assert vuln .id == "VCID-1234"
106+ assert vuln .repo == repo
75107
76108
77- @pytest .mark .skip (reason = "Need a real git repo to test the importer" )
78- @pytest .mark .django_db
79- def test_complex_importer (service , repo , mute_post_save_signal ):
80- # repo with 1 commit
81- repo .path = "/home/ziad/vul-sample/repo1"
82- importer = SyncVulnerableCode ()
83- importer .execute ()
109+ def test_modified_vulnerability_changed_id (repo , vul_changes ):
110+ vul_handler (
111+ "M" ,
112+ repo ,
113+ {"vulnerability_id" : "VCID-1111" },
114+ {"vulnerability_id" : "VCID-2222" },
115+ None ,
116+ vul_changes ,
117+ )
84118
85- assert Note .objects .count () > 1
86- assert Vulnerability .objects .count () > 1
87- assert Package .objects .count () > 1
88- assert repo .last_imported_commit
119+ assert ("VCID-1111" , repo ) in vul_changes ["delete" ]
120+ assert ("VCID-2222" , repo ) in vul_changes ["update" ]
89121
90- note_n = Note .objects .count ()
91- vul_n = Vulnerability .objects .count ()
92- purl_n = Package .objects .count ()
93- last_imported_commit = repo .last_imported_commit
94122
95- # Run importer again without add any new data
96- # the same repo with 2 commit ( after pull )
97- repo .path = "/home/ziad/vul-sample/repo2"
98- importer = SyncVulnerableCode ()
99- importer .execute ()
123+ def test_modified_vulnerability_same_id (repo , vul_changes ):
124+ vul_handler (
125+ "M" ,
126+ repo ,
127+ {"vulnerability_id" : "VCID-1111" },
128+ {"vulnerability_id" : "VCID-1111" },
129+ None ,
130+ vul_changes ,
131+ )
132+
133+ assert not vul_changes ["delete" ]
134+ assert ("VCID-1111" , repo ) in vul_changes ["update" ]
135+
136+
137+ def test_deleted_vulnerability (repo , vul_changes ):
138+ vul_handler ("D" , repo , {"vulnerability_id" : "VCID-3333" }, None , None , vul_changes )
139+ assert ("VCID-3333" , repo ) in vul_changes ["delete" ]
140+
100141
101- assert note_n > Note .objects .count ()
102- assert vul_n > Vulnerability .objects .count ()
103- assert purl_n > Package .objects .count ()
142+ def test_added_packages (service , pkg_changes ):
143+ yaml_data_b_blob = ["pkg:pypi/django@3.2.5" , "pkg:pypi/requests@2.28.1" ]
104144
105- # Edit last_imported_commit
106- repo .last_imported_commit = "9c3ccee39baef6017d9152367402de9909eadd72"
107- repo .save ()
145+ pkg_handler ("A" , service , None , yaml_data_b_blob , None , pkg_changes )
146+
147+ assert len (pkg_changes ["create" ]) == 2
148+ created = [pkg .purl for pkg in pkg_changes ["create" ]]
149+ assert get_core_purl ("pkg:pypi/django@3.2.5" ) in created
150+ assert get_core_purl ("pkg:pypi/requests@2.28.1" ) in created
151+
152+
153+ def test_modified_packages (service , pkg_changes ):
154+ yaml_data_a_blob = ["pkg:pypi/django@3.2.5" , "pkg:pypi/requests@2.27.0" ]
155+ yaml_data_b_blob = ["pkg:pypi/django@4.0.0" , "pkg:pypi/requests@2.28.1" ]
156+
157+ pkg_handler ("M" , service , yaml_data_a_blob , yaml_data_b_blob , None , pkg_changes )
158+
159+ assert len (pkg_changes ["update" ]) == 2
160+ updates = [(a , b ) for (a , b , _ ) in pkg_changes ["update" ]]
161+ assert (
162+ get_core_purl ("pkg:pypi/django@3.2.5" ),
163+ get_core_purl ("pkg:pypi/django@4.0.0" ),
164+ ) in updates
165+ assert (
166+ get_core_purl ("pkg:pypi/requests@2.27.0" ),
167+ get_core_purl ("pkg:pypi/requests@2.28.1" ),
168+ ) in updates
169+
170+
171+ def test_deleted_packages (service , pkg_changes ):
172+ yaml_data_a_blob = ["pkg:pypi/flask@2.0.0" , "pkg:pypi/urllib3@1.26.0" ]
173+
174+ pkg_handler ("D" , service , yaml_data_a_blob , None , None , pkg_changes )
175+
176+ assert len (pkg_changes ["delete" ]) == 2
177+ deletes = [(p , svc ) for (p , svc ) in pkg_changes ["delete" ]]
178+ assert (get_core_purl ("pkg:pypi/flask@2.0.0" ), service ) in deletes
179+ assert (get_core_purl ("pkg:pypi/urllib3@1.26.0" ), service ) in deletes
180+
181+
182+ def test_note_handler_add (service , example_notes ):
183+ with patch ("fedcode.pipelines.sync_vulnerablecode.bulk_create_notes" ) as mock_create :
184+ note_handler ("A" , service , None , example_notes , None )
185+ assert mock_create .called
186+ args , _ = mock_create .call_args
187+ notes_to_create = args [0 ]
188+ assert len (notes_to_create ) == len (example_notes )
189+ pkg = Package .objects .get (purl = "pkg:alpine/ansible" )
190+ expected = [
191+ (
192+ pkg ,
193+ {
194+ "affected_by_vulnerabilities" : [],
195+ "fixing_vulnerabilities" : ["VCID-r7zs-rzfz-aaap" ],
196+ "purl" : "pkg:alpine/ansible@2.10.1-r0?arch=aarch64&distroversion=edge&reponame=community" ,
197+ },
198+ ),
199+ (
200+ pkg ,
201+ {
202+ "affected_by_vulnerabilities" : ["VCID-r7zs-rzfz-aaap" ],
203+ "fixing_vulnerabilities" : [],
204+ "purl" : "pkg:alpine/ansible@2.10.1-r0?arch=armhf&distroversion=edge&reponame=community" ,
205+ },
206+ ),
207+ ]
208+ assert notes_to_create == expected
209+
210+
211+ def test_note_handler_modify (service , example_notes ):
212+ old_notes = example_notes [:1 ] # first note only
213+ new_notes = example_notes [1 :] # second note only
214+
215+ with patch ("fedcode.pipelines.sync_vulnerablecode.bulk_update_notes" ) as mock_update , patch (
216+ "fedcode.pipelines.sync_vulnerablecode.bulk_create_notes"
217+ ) as mock_create , patch (
218+ "fedcode.pipelines.sync_vulnerablecode.bulk_delete_notes"
219+ ) as mock_delete :
220+
221+ note_handler ("M" , service , old_notes , new_notes , None )
222+ pkg = Package .objects .get (purl = "pkg:alpine/ansible" )
223+ assert mock_update .call_args_list == [
224+ call (
225+ [
226+ (
227+ pkg ,
228+ {
229+ "purl" : "pkg:alpine/ansible@2.10.1-r0?arch=aarch64&distroversion=edge&reponame=community" ,
230+ "affected_by_vulnerabilities" : [],
231+ "fixing_vulnerabilities" : ["VCID-r7zs-rzfz-aaap" ],
232+ },
233+ {
234+ "purl" : "pkg:alpine/ansible@2.10.1-r0?arch=armhf&distroversion=edge&reponame=community" ,
235+ "affected_by_vulnerabilities" : ["VCID-r7zs-rzfz-aaap" ],
236+ "fixing_vulnerabilities" : [],
237+ },
238+ )
239+ ]
240+ )
241+ ]
242+
243+ assert mock_delete .called == False
244+ assert mock_create .called == False
245+ assert mock_update .called
246+
247+
248+ def test_note_handler_delete (service , example_notes ):
249+ with patch ("fedcode.pipelines.sync_vulnerablecode.bulk_delete_notes" ) as mock_delete :
250+ note_handler ("D" , service , example_notes , None , None )
251+ assert mock_delete .called
252+ args , _ = mock_delete .call_args
253+ notes_to_delete = args [0 ]
254+ assert len (notes_to_delete ) == len (example_notes )
255+
256+
257+ @pytest .mark .skip (reason = "A real Git repository is needed to test the pipelines." )
258+ @pytest .mark .django_db
259+ def test_simple_importer (service , repo , mock_latest_commit_hexsha , mute_post_save_signal ):
260+ repo .path = TEST_REPO_1_PATH
108261 importer = SyncVulnerableCode ()
109- importer .execute ()
262+ commits = [
263+ # (commit, last_imported_commit, note_count, vuln_count, pkg_count)
264+ (
265+ "f7cd453ff1ef29a539723c44f82bcc582dac13b1" ,
266+ None ,
267+ 28 ,
268+ 6 ,
269+ 3 ,
270+ ), # vuln_count is 7, but one of them is duplicated.
271+ ("d2115ebdc64341f5b9169e42c9edde9002898b3b" , "f7cd453ff1ef29a539723c44f82bcc582dac13b1" , 45 , 6 , 3 ),
272+ ("d2115ebdc64341f5b9169e42c9edde9002898b3b" , None , 0 , 0 , 0 ),
273+ ("275987c1d758155e782b7fe0539d7089d4e618ea" , None , 0 , 0 , 0 ),
274+ ]
275+
276+ for commit , last_imported_commit , note_count , vuln_count , pkg_count in commits :
277+ repo .last_imported_commit = last_imported_commit
278+ repo .save ()
279+
280+ mock_latest_commit_hexsha (repo , hexsha = commit )
281+ importer .execute ()
282+
283+ assert Note .objects .count () == note_count
284+ assert Vulnerability .objects .count () == vuln_count
285+ assert Package .objects .count () == pkg_count
0 commit comments