|
| 1 | +# |
| 2 | +# Copyright (c) nexB Inc. and others. All rights reserved. |
| 3 | +# VulnerableCode is a trademark of nexB Inc. |
| 4 | +# SPDX-License-Identifier: Apache-2.0 |
| 5 | +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. |
| 6 | +# See https://github.com/aboutcode-org/vulnerablecode for support or download. |
| 7 | +# See https://aboutcode.org for more information about nexB OSS projects. |
| 8 | +# |
| 9 | + |
| 10 | +import json |
| 11 | +from pathlib import Path |
| 12 | + |
| 13 | +import pytest |
| 14 | + |
| 15 | +from vulnerabilities.importer import AdvisoryData |
| 16 | +from vulnerabilities.pipelines.v2_importers.openssf_malicious_importer import ( |
| 17 | + OpenSSFMaliciousImporterPipeline, |
| 18 | +) |
| 19 | + |
| 20 | + |
| 21 | +@pytest.fixture |
| 22 | +def sample_malicious_advisory(tmp_path: Path): |
| 23 | + """Create a sample malicious package advisory in OSV format.""" |
| 24 | + advisory_data = { |
| 25 | + "modified": "2025-03-28T13:05:11Z", |
| 26 | + "published": "2025-03-28T13:05:11Z", |
| 27 | + "schema_version": "1.5.0", |
| 28 | + "id": "MAL-2025-1234", |
| 29 | + "summary": "Malicious code in malicious-test-package (PyPI)", |
| 30 | + "details": "This package contains malicious code that exfiltrates data.", |
| 31 | + "affected": [ |
| 32 | + { |
| 33 | + "package": { |
| 34 | + "ecosystem": "PyPI", |
| 35 | + "name": "malicious-test-package", |
| 36 | + "purl": "pkg:pypi/malicious-test-package", |
| 37 | + }, |
| 38 | + "versions": ["0.0.1", "0.0.2"], |
| 39 | + } |
| 40 | + ], |
| 41 | + "credits": [ |
| 42 | + { |
| 43 | + "name": "Security Researcher", |
| 44 | + "type": "FINDER", |
| 45 | + "contact": ["https://example.com"], |
| 46 | + } |
| 47 | + ], |
| 48 | + "database_specific": { |
| 49 | + "malicious-packages-origins": [ |
| 50 | + { |
| 51 | + "id": "TEST-2025-01234", |
| 52 | + "import_time": "2025-03-31T07:07:04.129197674Z", |
| 53 | + "modified_time": "2025-03-28T13:05:11Z", |
| 54 | + "sha256": "abc123def456", |
| 55 | + "source": "test-source", |
| 56 | + "versions": ["0.0.1", "0.0.2"], |
| 57 | + } |
| 58 | + ] |
| 59 | + }, |
| 60 | + } |
| 61 | + |
| 62 | + advisory_dir = tmp_path / "osv" / "malicious" / "pypi" / "malicious-test-package" |
| 63 | + advisory_dir.mkdir(parents=True) |
| 64 | + |
| 65 | + advisory_file = advisory_dir / "MAL-2025-1234.json" |
| 66 | + advisory_file.write_text(json.dumps(advisory_data, indent=2)) |
| 67 | + |
| 68 | + return tmp_path, advisory_file.read_text(), advisory_data |
| 69 | + |
| 70 | + |
| 71 | +@pytest.fixture |
| 72 | +def sample_npm_malicious_advisory(tmp_path: Path): |
| 73 | + """Create a sample npm malicious package advisory.""" |
| 74 | + advisory_data = { |
| 75 | + "modified": "2025-01-15T10:00:00Z", |
| 76 | + "published": "2025-01-15T10:00:00Z", |
| 77 | + "schema_version": "1.5.0", |
| 78 | + "id": "MAL-2025-5678", |
| 79 | + "summary": "Malicious code in typosquat-package (npm)", |
| 80 | + "details": "Typosquatting attack targeting popular package.", |
| 81 | + "affected": [ |
| 82 | + { |
| 83 | + "package": { |
| 84 | + "ecosystem": "npm", |
| 85 | + "name": "typosquat-package", |
| 86 | + }, |
| 87 | + "versions": ["1.0.0"], |
| 88 | + } |
| 89 | + ], |
| 90 | + } |
| 91 | + |
| 92 | + advisory_dir = tmp_path / "osv" / "malicious" / "npm" / "typosquat-package" |
| 93 | + advisory_dir.mkdir(parents=True) |
| 94 | + |
| 95 | + advisory_file = advisory_dir / "MAL-2025-5678.json" |
| 96 | + advisory_file.write_text(json.dumps(advisory_data, indent=2)) |
| 97 | + |
| 98 | + return tmp_path, advisory_file.read_text(), advisory_data |
| 99 | + |
| 100 | + |
| 101 | +class DummyVCSResponse: |
| 102 | + """Mock VCS response for testing.""" |
| 103 | + |
| 104 | + def __init__(self, dest_dir): |
| 105 | + self.dest_dir = dest_dir |
| 106 | + |
| 107 | + def delete(self): |
| 108 | + pass |
| 109 | + |
| 110 | + |
| 111 | +def test_collect_advisories_from_openssf_malicious(sample_malicious_advisory): |
| 112 | + """Test collecting advisories from OpenSSF malicious packages repo.""" |
| 113 | + tmp_path, advisory_text, advisory_json = sample_malicious_advisory |
| 114 | + |
| 115 | + importer = OpenSSFMaliciousImporterPipeline() |
| 116 | + importer.vcs_response = DummyVCSResponse(str(tmp_path)) |
| 117 | + |
| 118 | + advisories = list(importer.collect_advisories()) |
| 119 | + assert len(advisories) == 1 |
| 120 | + |
| 121 | + advisory = advisories[0] |
| 122 | + assert isinstance(advisory, AdvisoryData) |
| 123 | + assert advisory.advisory_id == "MAL-2025-1234" |
| 124 | + assert "Malicious code" in advisory.summary |
| 125 | + assert advisory.original_advisory_text.strip().startswith("{") |
| 126 | + assert advisory.affected_packages |
| 127 | + assert advisory.affected_packages[0].package.type == "pypi" |
| 128 | + assert advisory.affected_packages[0].package.name == "malicious-test-package" |
| 129 | + |
| 130 | + |
| 131 | +def test_collect_npm_advisories(sample_npm_malicious_advisory): |
| 132 | + """Test collecting npm malicious package advisories.""" |
| 133 | + tmp_path, advisory_text, advisory_json = sample_npm_malicious_advisory |
| 134 | + |
| 135 | + importer = OpenSSFMaliciousImporterPipeline() |
| 136 | + importer.vcs_response = DummyVCSResponse(str(tmp_path)) |
| 137 | + |
| 138 | + advisories = list(importer.collect_advisories()) |
| 139 | + assert len(advisories) == 1 |
| 140 | + |
| 141 | + advisory = advisories[0] |
| 142 | + assert advisory.advisory_id == "MAL-2025-5678" |
| 143 | + assert advisory.affected_packages[0].package.type == "npm" |
| 144 | + assert advisory.affected_packages[0].package.name == "typosquat-package" |
| 145 | + |
| 146 | + |
| 147 | +def test_advisories_count(sample_malicious_advisory): |
| 148 | + """Test counting advisories.""" |
| 149 | + tmp_path, _, _ = sample_malicious_advisory |
| 150 | + |
| 151 | + importer = OpenSSFMaliciousImporterPipeline() |
| 152 | + importer.vcs_response = DummyVCSResponse(str(tmp_path)) |
| 153 | + |
| 154 | + count = importer.advisories_count() |
| 155 | + assert count == 1 |
| 156 | + |
| 157 | + |
| 158 | +def test_multiple_advisories(tmp_path: Path): |
| 159 | + """Test collecting multiple advisories from different ecosystems.""" |
| 160 | + # Create PyPI advisory |
| 161 | + pypi_dir = tmp_path / "osv" / "malicious" / "pypi" / "bad-pkg" |
| 162 | + pypi_dir.mkdir(parents=True) |
| 163 | + (pypi_dir / "MAL-2025-0001.json").write_text( |
| 164 | + json.dumps( |
| 165 | + { |
| 166 | + "id": "MAL-2025-0001", |
| 167 | + "summary": "Bad PyPI package", |
| 168 | + "affected": [{"package": {"ecosystem": "PyPI", "name": "bad-pkg"}, "versions": ["1.0"]}], |
| 169 | + } |
| 170 | + ) |
| 171 | + ) |
| 172 | + |
| 173 | + # Create npm advisory |
| 174 | + npm_dir = tmp_path / "osv" / "malicious" / "npm" / "bad-js" |
| 175 | + npm_dir.mkdir(parents=True) |
| 176 | + (npm_dir / "MAL-2025-0002.json").write_text( |
| 177 | + json.dumps( |
| 178 | + { |
| 179 | + "id": "MAL-2025-0002", |
| 180 | + "summary": "Bad npm package", |
| 181 | + "affected": [{"package": {"ecosystem": "npm", "name": "bad-js"}, "versions": ["2.0"]}], |
| 182 | + } |
| 183 | + ) |
| 184 | + ) |
| 185 | + |
| 186 | + importer = OpenSSFMaliciousImporterPipeline() |
| 187 | + importer.vcs_response = DummyVCSResponse(str(tmp_path)) |
| 188 | + |
| 189 | + advisories = list(importer.collect_advisories()) |
| 190 | + assert len(advisories) == 2 |
| 191 | + assert importer.advisories_count() == 2 |
| 192 | + |
| 193 | + advisory_ids = {a.advisory_id for a in advisories} |
| 194 | + assert advisory_ids == {"MAL-2025-0001", "MAL-2025-0002"} |
| 195 | + |
| 196 | + |
| 197 | +def test_pipeline_metadata(): |
| 198 | + """Test pipeline metadata is correctly set.""" |
| 199 | + assert OpenSSFMaliciousImporterPipeline.pipeline_id == "openssf_malicious_importer" |
| 200 | + assert OpenSSFMaliciousImporterPipeline.spdx_license_expression == "Apache-2.0" |
| 201 | + assert "ossf/malicious-packages" in OpenSSFMaliciousImporterPipeline.repo_url |
| 202 | + |
| 203 | + |
| 204 | +def test_unsupported_ecosystem_skipped(tmp_path: Path): |
| 205 | + """Test that unsupported ecosystems are skipped gracefully.""" |
| 206 | + # Create advisory with unsupported ecosystem |
| 207 | + advisory_dir = tmp_path / "osv" / "malicious" / "unsupported" / "pkg" |
| 208 | + advisory_dir.mkdir(parents=True) |
| 209 | + (advisory_dir / "MAL-2025-9999.json").write_text( |
| 210 | + json.dumps( |
| 211 | + { |
| 212 | + "id": "MAL-2025-9999", |
| 213 | + "summary": "Package in unsupported ecosystem", |
| 214 | + "affected": [ |
| 215 | + {"package": {"ecosystem": "UnsupportedEcosystem", "name": "pkg"}, "versions": ["1.0"]} |
| 216 | + ], |
| 217 | + } |
| 218 | + ) |
| 219 | + ) |
| 220 | + |
| 221 | + importer = OpenSSFMaliciousImporterPipeline() |
| 222 | + importer.vcs_response = DummyVCSResponse(str(tmp_path)) |
| 223 | + |
| 224 | + advisories = list(importer.collect_advisories()) |
| 225 | + # Advisory should be yielded but with no affected packages due to unsupported ecosystem |
| 226 | + assert len(advisories) == 1 |
| 227 | + assert advisories[0].affected_packages == [] |
0 commit comments