Skip to content

Migrate nvd importer to importer-improver model #664

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@
from vulnerabilities.importers import alpine_linux
from vulnerabilities.importers import github
from vulnerabilities.importers import nginx
from vulnerabilities.importers import nvd

IMPORTERS_REGISTRY = [nginx.NginxImporter, alpine_linux.AlpineImporter, github.GitHubAPIImporter]
IMPORTERS_REGISTRY = [
nginx.NginxImporter,
alpine_linux.AlpineImporter,
github.GitHubAPIImporter,
nvd.NVDImporter,
]

IMPORTERS_REGISTRY = {x.qualified_name: x for x in IMPORTERS_REGISTRY}
289 changes: 164 additions & 125 deletions vulnerabilities/importers/nvd.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,150 +20,189 @@
# VulnerableCode is a free software code scanning tool from nexB Inc. and others.
# Visit https://github.com/nexB/vulnerablecode/ for support and download.

import dataclasses
import gzip
import json
from datetime import date
from typing import Iterable

import requests
from dateutil import parser as dateparser
from django.db.models.query import QuerySet

from vulnerabilities.helpers import create_etag
from vulnerabilities.importer import Advisory
from vulnerabilities.helpers import get_item
from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import Importer
from vulnerabilities.importer import Reference
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.severity_systems import scoring_systems

BASE_URL = "https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-{}.json.gz"
from vulnerabilities.improver import Improver
from vulnerabilities.improver import Inference
from vulnerabilities.models import Advisory
from vulnerabilities.severity_systems import SCORING_SYSTEMS


class NVDImporter(Importer):
def updated_advisories(self):
# See https://github.com/nexB/vulnerablecode/issues/665 for follow up
spdx_license_expression = "LicenseRef-scancode-unknown"

def advisory_data(self):
advisory_data = []
current_year = date.today().year
# NVD json feeds start from 2002.
for year in range(2002, current_year + 1):
download_url = BASE_URL.format(year)
# Etags are like hashes of web responses. We maintain
# (url, etag) mappings in the DB. `create_etag` creates
# (url, etag) pair. If a (url, etag) already exists then the code
# skips processing the response further to avoid duplicate work
if create_etag(data_src=self, url=download_url, etag_key="etag"):
data = self.fetch(download_url)
yield self.to_advisories(data)

@staticmethod
def fetch(url):
gz_file = requests.get(url)
data = gzip.decompress(gz_file.content)
return json.loads(data)

def to_advisories(self, nvd_data):
for cve_item in nvd_data["CVE_Items"]:
if self.is_outdated(cve_item):
continue

if self.related_to_hardware(cve_item):
continue

cve_id = cve_item["cve"]["CVE_data_meta"]["ID"]
ref_urls = self.extract_reference_urls(cve_item)
references = [Reference(url=url) for url in ref_urls]
severity_scores = self.extract_severity_scores(cve_item)
download_url = f"https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-{year}.json.gz"
data = fetch(download_url)
advisory_data.extend(to_advisories(data))
return advisory_data


# Isolating network calls for simplicity of testing
def fetch(url):
gz_file = requests.get(url)
data = gzip.decompress(gz_file.content)
return json.loads(data)


def extract_summary(cve_item):
"""
Return a summary for a given CVE item.
"""
# In 99% of cases len(cve_item['cve']['description']['description_data']) == 1 , so
# this usually returns cve_item['cve']['description']['description_data'][0]['value']
# In the remaining 1% cases this returns the longest summary.
summaries = []
for desc in get_item(cve_item, "cve", "description", "description_data") or []:
if desc.get("value"):
summaries.append(desc["value"])
return max(summaries, key=len) if summaries else None


def to_advisories(nvd_data):
"""
Yield AdvisoryData objects from a NVD json feed.
"""
for cve_item in nvd_data.get("CVE_Items") or []:
cpes = extract_cpes(cve_item)
if related_to_hardware(cpes):
continue

aliases = []
cve_id = get_item(cve_item, "cve", "CVE_data_meta", "ID")
ref_urls = extract_reference_urls(cve_item)
references = []
severity_scores = list(extract_severity_scores(cve_item))
for cpe in cpes:
references.append(
Reference(
url=f"https://nvd.nist.gov/vuln/detail/{cve_id}",
reference_id=cve_id,
severities=severity_scores,
)
)
summary = self.extract_summary(cve_item)
yield Advisory(
vulnerability_id=cve_id,
summary=summary,
references=references,
)

@staticmethod
def extract_summary(cve_item):
# In 99% of cases len(cve_item['cve']['description']['description_data']) == 1 , so
# this usually returns cve_item['cve']['description']['description_data'][0]['value']
# In the remaining 1% cases this returns the longest summary.
summaries = [desc["value"] for desc in cve_item["cve"]["description"]["description_data"]]
return max(summaries, key=len)

@staticmethod
def extract_severity_scores(cve_item):
severity_scores = []

if cve_item["impact"].get("baseMetricV3"):
severity_scores.append(
VulnerabilitySeverity(
system=scoring_systems["cvssv3"],
value=str(cve_item["impact"]["baseMetricV3"]["cvssV3"]["baseScore"]),
reference_id=cpe,
)
)
severity_scores.append(
VulnerabilitySeverity(
system=scoring_systems["cvssv3_vector"],
value=str(cve_item["impact"]["baseMetricV3"]["cvssV3"]["vectorString"]),
)
)

if cve_item["impact"].get("baseMetricV2"):
severity_scores.append(
VulnerabilitySeverity(
system=scoring_systems["cvssv2"],
value=str(cve_item["impact"]["baseMetricV2"]["cvssV2"]["baseScore"]),
)
references.append(
Reference(
url=f"https://nvd.nist.gov/vuln/detail/{cve_id}",
reference_id=cve_id,
severities=severity_scores,
)
severity_scores.append(
VulnerabilitySeverity(
system=scoring_systems["cvssv2_vector"],
value=str(cve_item["impact"]["baseMetricV2"]["cvssV2"]["vectorString"]),
)
)
if "https://nvd.nist.gov/vuln/detail/{cve_id}" in ref_urls:
ref_urls.remove(f"https://nvd.nist.gov/vuln/detail/{cve_id}")
references.extend([Reference(url=url) for url in ref_urls])
if cve_id:
aliases.append(cve_id)
summary = extract_summary(cve_item)
yield AdvisoryData(
aliases=aliases,
summary=summary,
references=sorted(references),
date_published=dateparser.parse(cve_item.get("publishedDate")),
)


def extract_reference_urls(cve_item):
"""
Return a list of reference URLs for a given CVE item.
"""
urls = set()
for reference in get_item(cve_item, "cve", "references", "reference_data") or []:
ref_url = reference.get("url")

if not ref_url:
continue

if ref_url.startswith(
(
"http",
"ftp",
)

return severity_scores

def extract_reference_urls(self, cve_item):
urls = set()
for reference in cve_item["cve"]["references"]["reference_data"]:
ref_url = reference["url"]

if not ref_url:
continue

if ref_url.startswith("http") or ref_url.startswith("ftp"):
urls.add(ref_url)

return urls

def is_outdated(self, cve_item):
cve_last_modified_date = cve_item["lastModifiedDate"]
cve_last_modified_date_obj = dateparser.parse(cve_last_modified_date)

if self.config.cutoff_date:
return cve_last_modified_date_obj < self.config.cutoff_date

if self.config.last_run_date:
return cve_last_modified_date_obj < self.config.last_run_date

return False

def related_to_hardware(self, cve_item):
for cpe in self.extract_cpes(cve_item):
cpe_comps = cpe.split(":")
# CPE follow the format cpe:cpe_version:product_type:vendor:product
if cpe_comps[2] == "h":
return True

return False

@staticmethod
def extract_cpes(cve_item):
cpes = set()
for node in cve_item["configurations"]["nodes"]:
for cpe_data in node.get("cpe_match", []):
cpes.add(cpe_data["cpe23Uri"])
return cpes
):
urls.add(ref_url)

return urls


def related_to_hardware(cpes):
"""
Return True if the CVE item is related to hardware.
"""
for cpe in cpes:
cpe_comps = cpe.split(":")
# CPE follow the format cpe:cpe_version:product_type:vendor:product
if len(cpe_comps) > 2 and cpe_comps[2] == "h":
return True

return False


def extract_cpes(cve_item):
"""
Return a list of CPEs for a given CVE item.
"""
cpes = set()
for node in get_item(cve_item, "configurations", "nodes") or []:
for cpe_data in node.get("cpe_match") or []:
cpe23_uri = cpe_data.get("cpe23Uri")
if cpe23_uri:
cpes.add(cpe23_uri)
return cpes


def extract_severity_scores(cve_item):
"""
Yield a vulnerability severity for each `cve_item`.
"""
if not isinstance(cve_item, dict):
return None
impact = cve_item.get("impact") or {}
base_metric_v3 = impact.get("baseMetricV3") or {}
if base_metric_v3:
cvss_v3 = get_item(base_metric_v3, "cvssV3")
yield VulnerabilitySeverity(
system=SCORING_SYSTEMS["cvssv3"],
value=str(cvss_v3.get("baseScore") or ""),
)
yield VulnerabilitySeverity(
system=SCORING_SYSTEMS["cvssv3_vector"],
value=str(cvss_v3.get("vectorString") or ""),
)

base_metric_v2 = impact.get("baseMetricV2") or {}
if base_metric_v2:
cvss_v2 = base_metric_v2.get("cvssV2") or {}
yield VulnerabilitySeverity(
system=SCORING_SYSTEMS["cvssv2"],
value=str(cvss_v2.get("baseScore") or ""),
)
yield VulnerabilitySeverity(
system=SCORING_SYSTEMS["cvssv2_vector"],
value=str(cvss_v2.get("vectorString") or ""),
)


class NVDBasicImprover(Improver):
@property
def interesting_advisories(self) -> QuerySet:
return Advisory.objects.filter(created_by=NVDImporter.qualified_name)

def get_inferences(self, advisory_data: AdvisoryData) -> Iterable[Inference]:
yield Inference.from_advisory_data(
advisory_data=advisory_data, confidence=100, fixed_purl=None
)
1 change: 1 addition & 0 deletions vulnerabilities/improvers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
importers.nginx.NginxBasicImprover,
importers.alpine_linux.AlpineBasicImprover,
importers.github.GitHubBasicImprover,
importers.nvd.NVDBasicImprover,
]

IMPROVERS_REGISTRY = {x.qualified_name: x for x in IMPROVERS_REGISTRY}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Generated by Django 4.0.2 on 2022-04-08 18:53

from django.db import migrations
from django.db import models


class Migration(migrations.Migration):

dependencies = [
("vulnerabilities", "0006_alter_advisory_unique_together"),
]

operations = [
migrations.AlterField(
model_name="vulnerabilityreference",
name="reference_id",
field=models.CharField(
blank=True,
help_text="An optional reference ID, such as DSA-4465-1 when available",
max_length=200,
null=True,
),
),
]
2 changes: 1 addition & 1 deletion vulnerabilities/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class VulnerabilityReference(models.Model):
max_length=1024, help_text="URL to the vulnerability reference", blank=True
)
reference_id = models.CharField(
max_length=50,
max_length=200,
help_text="An optional reference ID, such as DSA-4465-1 when available",
blank=True,
null=True,
Expand Down
1 change: 0 additions & 1 deletion vulnerabilities/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ def no_rmtree(monkeypatch):
"test_apache_httpd.py",
"test_npm.py",
"test_apache_kafka.py",
"test_nvd.py",
"test_apache_tomcat.py",
"test_openssl.py",
"test_api.py",
Expand Down
Loading