Skip to content

Migrate OpenSSL importer to importer-improver model #690

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion AUTHORS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ The following organizations or individuals have contributed to this repo:
- Navonil Das @NavonilDas
- Tushar Upadhyay @tushar912
- Hritik Vijay @hritik14
- Tushar Goel @TG1999
- Tushar Goel @TG1999
- Keshav Priyadarshi @keshav-space
2 changes: 2 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
from vulnerabilities.importers import github
from vulnerabilities.importers import nginx
from vulnerabilities.importers import nvd
from vulnerabilities.importers import openssl

IMPORTERS_REGISTRY = [
nginx.NginxImporter,
alpine_linux.AlpineImporter,
github.GitHubAPIImporter,
nvd.NVDImporter,
openssl.OpensslImporter,
]

IMPORTERS_REGISTRY = {x.qualified_name: x for x in IMPORTERS_REGISTRY}
219 changes: 135 additions & 84 deletions vulnerabilities/importers/openssl.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,101 +20,152 @@
# VulnerableCode is a free software code scanning tool from nexB Inc. and others.
# Visit https://github.com/nexB/vulnerablecode/ for support and download.

import dataclasses
import re
import xml.etree.ElementTree as ET
from typing import Set
import logging
from datetime import timezone
from typing import Iterable
from urllib.parse import urljoin

import defusedxml.ElementTree as DET
import requests
from dateutil import parser as dateparser
from packageurl import PackageURL
from univers.version_range import OpensslVersionRange
from univers.versions import OpensslVersion

from vulnerabilities.helpers import create_etag
from vulnerabilities.helpers import nearest_patched_package
from vulnerabilities.importer import Advisory
from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AffectedPackage
from vulnerabilities.importer import Importer
from vulnerabilities.importer import Reference
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.severity_systems import SCORING_SYSTEMS

logger = logging.getLogger(__name__)

class OpenSSLImporter(Importer):

class OpensslImporter(Importer):
spdx_license_expression = "Apache-2.0"
license_url = "https://github.com/openssl/openssl/blob/master/LICENSE.txt"
url = "https://www.openssl.org/news/vulnerabilities.xml"

def updated_advisories(self) -> Set[Advisory]:
# Etags are like hashes of web responses. We maintain
# (url, etag) mappings in the DB. `create_etag` creates
# (url, etag) pair. If a (url, etag) already exists then the code
# skips processing the response further to avoid duplicate work
if create_etag(data_src=self, url=self.url, etag_key="ETag"):
raw_data = self.fetch()
advisories = self.to_advisories(raw_data)
return self.batch_advisories(advisories)

return []

def fetch(self):
return requests.get(self.url).content

@staticmethod
def to_advisories(xml_response: str) -> Set[Advisory]:
advisories = []
pkg_name = "openssl"
pkg_type = "generic"
root = ET.fromstring(xml_response)
for element in root:
if element.tag == "issue":
cve_id = ""
summary = ""
safe_pkg_versions = []
vuln_pkg_versions = []
ref_urls = []
for info in element:

if info.tag == "cve":
if info.attrib.get("name"):
cve_id = "CVE-" + info.attrib.get("name")

else:
continue

if cve_id == "CVE-2007-5502":
# This CVE has weird version "fips-1.1.2".This is
# probably a submodule. Skip this for now.
continue

if info.tag == "affects":
# Vulnerable package versions
vuln_pkg_versions.append(info.attrib.get("version"))

if info.tag == "fixed":
# Fixed package versions
safe_pkg_versions.append(info.attrib.get("version"))

if info:
commit_hash = info[0].attrib["hash"]
ref_urls.append(
Reference(
url="https://github.com/openssl/openssl/commit/" + commit_hash
)
)
if info.tag == "description":
# Description
summary = re.sub(r"\s+", " ", info.text).strip()

safe_purls = [
PackageURL(name=pkg_name, type=pkg_type, version=version)
for version in safe_pkg_versions
]
vuln_purls = [
PackageURL(name=pkg_name, type=pkg_type, version=version)
for version in vuln_pkg_versions
]

advisory = Advisory(
vulnerability_id=cve_id,
summary=summary,
affected_packages=nearest_patched_package(vuln_purls, safe_purls),
references=ref_urls,
response = requests.get(url=self.url)
if not response.status_code == 200:
logger.error(f"Error while fetching {self.url}: {response.status_code}")
return
return response.content

def advisory_data(self) -> Iterable[AdvisoryData]:
xml_response = self.fetch()
return parse_vulnerabilities(xml_response)


def parse_vulnerabilities(xml_response) -> Iterable[AdvisoryData]:
root = DET.fromstring(xml_response)
for xml_issue in root:
if xml_issue.tag == "issue":
advisory = to_advisory_data(xml_issue)
if advisory:
yield advisory


def to_advisory_data(xml_issue) -> AdvisoryData:
"""
Return AdvisoryData from given xml_issue
"""

purl = PackageURL(type="openssl", name="openssl")
cve = advisory_url = severity = summary = None
safe_pkg_versions = {}
vuln_pkg_versions_by_base_version = {}
aliases = []
references = []
affected_packages = []
date_published = xml_issue.attrib["public"].strip()

for info in xml_issue:
if info.tag == "impact":
severity = VulnerabilitySeverity(
system=SCORING_SYSTEMS["generic_textual"], value=info.attrib["severity"]
)

elif info.tag == "advisory":
advisory_url = info.attrib["url"]
if not advisory_url.startswith("https://web.archive.org"):
advisory_url = urljoin("https://www.openssl.org", advisory_url)

elif info.tag == "cve":
cve = info.attrib.get("name")
# use made up alias to compensate for case when advisory doesn't have CVE-ID
madeup_alias = f"VC-OPENSSL-{date_published}"
if cve:
cve = f"CVE-{cve}"
madeup_alias = f"{madeup_alias}-{cve}"
aliases.append(cve)
references.append(Reference(reference_id=cve))
aliases.append(madeup_alias)

elif info.tag == "affects":
affected_base = info.attrib["base"]
affected_version = info.attrib["version"]
if affected_base.startswith("fips"):
logger.error(
f"{affected_base!r} is a OpenSSL-FIPS Object Module and isn't supported by OpensslImporter. Use a different importer."
)
return
if affected_base in vuln_pkg_versions_by_base_version:
vuln_pkg_versions_by_base_version[affected_base].append(affected_version)
else:
vuln_pkg_versions_by_base_version[affected_base] = [affected_version]

elif info.tag == "fixed":
fixed_base = info.attrib["base"]
fixed_version = info.attrib["version"]
safe_pkg_versions[fixed_base] = fixed_version
for commit in info:
commit_hash = commit.attrib["hash"]
references.append(
Reference(
url=urljoin("https://github.com/openssl/openssl/commit/", commit_hash)
)
)
advisories.append(advisory)

return advisories
elif info.tag == "description":
summary = " ".join(info.text.split())

elif info.tag in ("reported", "problemtype", "title"):
# as of now, these info isn't useful for AdvisoryData
# for more see: https://github.com/nexB/vulnerablecode/issues/688
continue
else:
logger.error(
f"{info.tag!r} is a newly introduced tag. Modify the importer to make use of this new info."
)

for base_version, affected_versions in vuln_pkg_versions_by_base_version.items():
affected_version_range = OpensslVersionRange.from_versions(affected_versions)
fixed_version = None
if base_version in safe_pkg_versions:
fixed_version = OpensslVersion(safe_pkg_versions[base_version])
affected_package = AffectedPackage(
package=purl,
affected_version_range=affected_version_range,
fixed_version=fixed_version,
)
affected_packages.append(affected_package)

if severity and advisory_url:
references.append(Reference(url=advisory_url, severities=[severity]))
elif advisory_url:
references.append(Reference(url=advisory_url))

parsed_date_published = dateparser.parse(date_published, yearfirst=True).replace(
tzinfo=timezone.utc
)

return AdvisoryData(
aliases=aliases,
summary=summary,
affected_packages=affected_packages,
references=references,
date_published=parsed_date_published,
)
1 change: 0 additions & 1 deletion vulnerabilities/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ def no_rmtree(monkeypatch):
"test_npm.py",
"test_apache_kafka.py",
"test_apache_tomcat.py",
"test_openssl.py",
"test_api.py",
"test_package_managers.py",
"test_archlinux.py",
Expand Down
Loading