Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions vulnerabilities/improvers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from vulnerabilities.improvers import valid_versions
from vulnerabilities.improvers import vulnerability_status
from vulnerabilities.pipelines import VulnerableCodePipeline
from vulnerabilities.pipelines import add_cvss31_to_CVEs
from vulnerabilities.pipelines import collect_commits
from vulnerabilities.pipelines import compute_package_risk
from vulnerabilities.pipelines import compute_package_version_rank
Expand Down Expand Up @@ -43,6 +44,7 @@
compute_package_risk.ComputePackageRiskPipeline,
compute_package_version_rank.ComputeVersionRankPipeline,
collect_commits.CollectFixCommitsPipeline,
add_cvss31_to_CVEs.CVEAdvisoryMappingPipeline,
]

IMPROVERS_REGISTRY = {
Expand Down
2 changes: 2 additions & 0 deletions vulnerabilities/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ class VulnerabilitySeverity(models.Model):
blank=True, null=True, help_text="UTC Date of publication of the vulnerability severity"
)

objects = BaseQuerySet.as_manager()

class Meta:
ordering = ["url", "scoring_system", "value"]

Expand Down
105 changes: 105 additions & 0 deletions vulnerabilities/pipelines/add_cvss31_to_CVEs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
import re

from aboutcode.pipeline import LoopProgress
from django.db import transaction

from vulnerabilities import severity_systems
from vulnerabilities.models import Advisory
from vulnerabilities.models import VulnerabilitySeverity
from vulnerabilities.pipelines import VulnerableCodePipeline


class CVEAdvisoryMappingPipeline(VulnerableCodePipeline):
"""
Pipeline to map CVEs from VulnerabilitySeverity to corresponding Advisories with CVSS3.1 scores.
"""

pipeline_id = "add_cvssv3.1_to_CVEs"

@classmethod
def steps(cls):
return (cls.process_cve_advisory_mapping,)

def process_cve_advisory_mapping(self):
nvd_severities = (
VulnerabilitySeverity.objects.filter(
url__startswith="https://nvd.nist.gov/vuln/detail/CVE-", scoring_system="cvssv3"
)
.prefetch_related("vulnerabilities")
.distinct()
)

self.log(f"Processing {nvd_severities.count():,d} CVE severity records")

progress = LoopProgress(
total_iterations=nvd_severities.count(),
logger=self.log,
progress_step=5,
)

batch_size = 1000
results = []

for severity in progress.iter(nvd_severities.paginated(per_page=batch_size)):
print(severity.url)
cve_pattern = re.compile(r"(CVE-\d{4}-\d{4,7})").search
cve_match = cve_pattern(severity.url)
if cve_match:
cve_id = cve_match.group()
else:
self.log(f"Could not find CVE ID in URL: {severity.url}")
continue

matching_advisories = Advisory.objects.filter(
aliases=[cve_id],
created_by="nvd_importer",
)

for advisory in matching_advisories:
for reference in advisory.references:
for sev in reference.get("severities", []):
if sev.get("system") == "cvssv3.1":
results.append(
{
"cve_id": cve_id,
"cvss31_score": sev.get("value"),
"cvss31_vector": sev.get("scoring_elements"),
"vulnerabilities": severity.vulnerabilities.all(),
}
)

if results:
print(results)
self._process_batch(results)

self.log(f"Completed processing CVE to Advisory mappings")

def _process_batch(self, results):
"""
Process a batch of results. Transactions are used to ensure data consistency.
"""
self.log(f"Processing batch of {len(results)} mappings")

with transaction.atomic():
for result in results:
self.log(
f"CVE: {result['cve_id']}, "
f"CVSS3.1: {result['cvss31_score']}, "
f"Vector: {result['cvss31_vector']}"
)

for vulnerability in result["vulnerabilities"]:
vuln_severity, _ = VulnerabilitySeverity.objects.update_or_create(
scoring_system=severity_systems.CVSSV31.identifier,
url=f"https://nvd.nist.gov/vuln/detail/{result['cve_id']}",
value=result["cvss31_score"],
scoring_elements=result["cvss31_vector"],
)
vulnerability.severities.add(vuln_severity)
56 changes: 56 additions & 0 deletions vulnerabilities/tests/test_add_cvsssv31.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import unittest
from unittest.mock import Mock
from unittest.mock import patch

from django.test import TestCase

from vulnerabilities.models import Advisory
from vulnerabilities.models import Alias
from vulnerabilities.models import Vulnerability
from vulnerabilities.models import VulnerabilitySeverity
from vulnerabilities.pipelines.add_cvss31_to_CVEs import CVEAdvisoryMappingPipeline
from vulnerabilities.severity_systems import CVSSV3
from vulnerabilities.severity_systems import CVSSV31


class TestCVEAdvisoryMappingPipeline(TestCase):
def setUp(self):
self.pipeline = CVEAdvisoryMappingPipeline()
Advisory.objects.create(
created_by="nvd_importer",
aliases=["CVE-2024-1234"],
references=[
{
"severities": [
{
"system": "cvssv3.1",
"value": "7.5",
"scoring_elements": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:N/A:N",
}
],
"url": "https://nvd.nist.gov/vuln/detail/CVE-2024-1234",
}
],
date_collected="2024-09-27T19:38:00Z",
)
vuln = Vulnerability.objects.create(vulnerability_id="CVE-2024-1234")
sev = VulnerabilitySeverity.objects.create(
scoring_system=CVSSV3.identifier,
url="https://nvd.nist.gov/vuln/detail/CVE-2024-1234",
value="7.5",
scoring_elements="CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:N/A:N",
)
vuln.severities.add(sev)

def test_process_cve_advisory_mapping_single_record(self):
self.pipeline.process_cve_advisory_mapping()
self.assertEqual(VulnerabilitySeverity.objects.count(), 2)
# check if severity with cvssv3.1 is created
sev = VulnerabilitySeverity.objects.get(scoring_system=CVSSV31.identifier)
self.assertEqual(sev.url, "https://nvd.nist.gov/vuln/detail/CVE-2024-1234")
self.assertEqual(sev.value, "7.5")
self.assertEqual(sev.scoring_elements, "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:N/A:N")
# check if severity is added to existing vulnerability
vuln = Vulnerability.objects.get(vulnerability_id="CVE-2024-1234")
self.assertEqual(vuln.severities.count(), 2)
self.assertIn(sev, vuln.severities.all())