Skip to content

CISA Known Exploited Vulnerabilities catalog support #89

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions doc/vulnix.1.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,17 @@ should not be reported.
Fetches NIST NVD updates from <URL>. Defaults to
_https://nvd.nist.gov/feeds/json/cve/1.1/_.

* `--kev`, `--no-kev`:
Enable support for the CISA Known Exploited Vulnerabilities catalog.
Vulnerabilities known to be exploited in the wild are indicated with an
exclamation mark (!). A second exclamation mark (!!) indicates a
known-exploited vulnerability past CISA's prescribed remediation date.
Defaults to `--kev`.

* `-k`, `--kev-mirror`=<URL>:
Fetches CISA KEV updates from <URL>. Defaults to
_https://www.cisa.gov/sites/default/files/csv/known_exploited_vulnerabilities.csv_.

* `-j`, `--json`:
Outputs affected package versions as JSON document. See [JSON output] below.

Expand Down
121 changes: 121 additions & 0 deletions src/vulnix/kev.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
from abc import ABC, abstractmethod
import csv
from datetime import datetime, timedelta
import fcntl
import logging
import os
import os.path as p
import requests
import tempfile

from .nvd import DEFAULT_CACHE_DIR

DEFAULT_KEV_MIRROR = 'https://www.cisa.gov/sites/default/files/csv/known_exploited_vulnerabilities.csv'
KEV_FILENAME = 'known_exploited_vulnerabilities.csv'

_log = logging.getLogger(__name__)


class KEVInterface(ABC):

def is_past_due(self, cve_id):
"""Is due_date() in the past?"""
return self.is_known_exploited(cve_id) and datetime.strptime(
self.due_date(cve_id), "%Y-%m-%d") < datetime.now()

@abstractmethod
def is_known_exploited(self, cve_id):
"""Is this cve_id known to be under active exploitation?"""
...

@abstractmethod
def due_date(self, cve_id):
"""By what date does the KVE Catalog say this vulnerability must be mitigated?"""
...


class KEV(KEVInterface):
"""Access to the Known Exploited Vulnerabilities Catalog.

https://www.cisa.gov/known-exploited-vulnerabilities-catalog
"""

def __init__(self, mirror=DEFAULT_KEV_MIRROR, cache_dir=DEFAULT_CACHE_DIR):
self.mirror = mirror
self.cache_dir = p.expanduser(cache_dir)
self.cache_filename = p.join(self.cache_dir, KEV_FILENAME)

self._catalog = None

def is_fresh(self):
"""Is our local cache of the KEV Catalog recent enough?."""
if not p.exists(self.cache_filename):
return False
last_update = datetime.fromtimestamp(
os.stat(self.cache_filename).st_mtime)
return last_update > datetime.now() - timedelta(hours=2)

def update(self):
"""Fetch a fresh copy of the KEV Catalog, if needed."""

if self.is_fresh():
return

_log.info('Loading %s', self.mirror)

etag = None
if p.exists(self.cache_filename):
try:
etag = os.getxattr(self.cache_filename, "user.ETag")
except BaseException:
_log.debug(
"Couldn't get ETag xattr. Oh well. This is not essential.")

headers = {'If-None-Match': etag} if etag else {}
r = requests.get(self.mirror, headers=headers)
r.raise_for_status()
if r.status_code == 200:
with tempfile.NamedTemporaryFile(mode="wb", buffering=0, dir=self.cache_dir, prefix=f"{KEV_FILENAME}.", delete=False) as f:
f.write(r.content)
if 'ETag' in r.headers:
try:
os.setxattr(
f.name, "user.ETag", r.headers['ETag'].encode('utf-8'))
except BaseException:
_log.debug(
"Couldn't set ETag xattr. Oh well. This is not essential.")
os.replace(f.name, self.cache_filename)

def load(self):
"""Read the Known Exploited Vulnerabilities Catalog from local cache."""
self._catalog = {}
with open(self.cache_filename) as f:
reader = csv.DictReader(f)
for row in reader:
self._catalog[row["cveID"]] = row["dueDate"]

def is_known_exploited(self, cve_id):
if self._catalog is None:
self.load()
return cve_id in self._catalog

def due_date(self, cve_id):
if self._catalog is None:
self.load()
return self._catalog[cve_id]


class FakeKEV(KEVInterface):
"""An in-memory test double for KEV"""

def __init__(self, data):
self.data = data

def is_known_exploited(self, cve_id):
return cve_id in self.data

def due_date(self, cve_id):
return self.data[cve_id]

def update(self):
pass
17 changes: 15 additions & 2 deletions src/vulnix/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from .nix import Store
from .nvd import NVD, DEFAULT_MIRROR, DEFAULT_CACHE_DIR
from .kev import KEV, DEFAULT_KEV_MIRROR, FakeKEV
from .resource import open_resources
from .utils import Timer
from .whitelist import Whitelist
Expand Down Expand Up @@ -100,6 +101,13 @@ def run(nvd, store):
help='Mirror to fetch NVD archives from. Default: {}.'.format(
DEFAULT_MIRROR),
default=DEFAULT_MIRROR)
@click.option('--kev/--no-kev', default=True,
help='CISA Known Exploited Vulnerabilities support '
'(default: yes)')
@click.option('-k', '--kev-mirror',
help='Mirror to fetch KEV archives from. Default: {}.'.format(
DEFAULT_KEV_MIRROR),
default=DEFAULT_KEV_MIRROR)
# output control
@click.option('-j', '--json/--no-json', help='JSON vs. human readable output.')
@click.option('-s', '--show-whitelisted', is_flag=True,
Expand All @@ -115,8 +123,9 @@ def run(nvd, store):
@click.option('-F', '--notfixed', is_flag=True,
help='(obsolete; kept for compatibility reasons)')
def main(verbose, gc_roots, system, from_file, profile, path, mirror,
cache_dir, requisites, whitelist, write_whitelist, version, json,
show_whitelisted, show_description, default_whitelist, notfixed):
kev, kev_mirror, cache_dir, requisites, whitelist, write_whitelist,
version, json, show_whitelisted, show_description, default_whitelist,
notfixed):
if version:
print('vulnix ' + pkg_resources.get_distribution('vulnix').version)
sys.exit(0)
Expand Down Expand Up @@ -151,11 +160,15 @@ def main(verbose, gc_roots, system, from_file, profile, path, mirror,
with NVD(mirror, cache_dir) as nvd:
with Timer('Update NVD data'):
nvd.update()
kev = KEV(kev_mirror, cache_dir) if kev else FakeKEV({})
with Timer('Update KEV data'):
kev.update()
with Timer('Scan vulnerabilities'):
filtered_items = whitelist.filter(run(nvd, store))

rc = output(
filtered_items,
kev,
json,
show_whitelisted,
show_description,
Expand Down
40 changes: 28 additions & 12 deletions src/vulnix/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
import json


def fmt_vuln(v, show_description=False):
def fmt_vuln(v, kev, show_description=False):
cvssv3 = str(v.cvssv3 or "")
cvssv3 += '!' if kev.is_known_exploited(v.cve_id) else ''
cvssv3 += '!' if kev.is_past_due(v.cve_id) else ''

out = 'https://nvd.nist.gov/vuln/detail/{:17}'.format(v.cve_id)
out += ' {:<8} '.format(v.cvssv3 or "")
out += ' {:<10} '.format(cvssv3)
if show_description:
# Show the description in a different color as they can run over the
# line length, and this makes distinguishing them from the next entry
Expand Down Expand Up @@ -57,7 +61,7 @@ def add(self, wl_rule):
self.masked |= self.report
self.report = set()

def print(self, show_masked=False, show_description=False):
def print(self, kev, show_masked=False, show_description=False):
if not self.report and not show_masked:
return
d = self.derivation
Expand All @@ -69,19 +73,20 @@ def print(self, show_masked=False, show_description=False):
click.secho(d.store_path, fg='magenta', dim=wl)

click.secho(
'{:50} {:<8} {}'.format(
'{:50} {:<10} {}'.format(
'CVE',
'CVSSv3',
'Description' if show_description else ''
).rstrip(),
dim=wl
)
for v in sorted(self.report, key=vuln_sort_key):
click.echo(fmt_vuln(v, show_description))
click.echo(fmt_vuln(v, kev, show_description))
if show_masked:
for v in sorted(self.masked, key=vuln_sort_key):
click.secho("{} [whitelisted]".format(fmt_vuln(
v,
kev,
show_description
)), dim=True)

Expand All @@ -98,7 +103,7 @@ def print(self, show_masked=False, show_description=False):
click.secho('* ' + comment, fg='blue', dim=wl)


def output_text(vulns, show_whitelisted=False, show_description=False):
def output_text(vulns, kev, show_whitelisted=False, show_description=False):
report = [v for v in vulns if v.report]
wl = [v for v in vulns if not v.report]

Expand All @@ -117,16 +122,16 @@ def output_text(vulns, show_whitelisted=False, show_description=False):
len(wl)), fg='blue')

for i in sorted(report, key=attrgetter('derivation')):
i.print(show_whitelisted, show_description)
i.print(kev, show_whitelisted, show_description)
if show_whitelisted:
for i in sorted(wl, key=attrgetter('derivation')):
i.print(show_whitelisted, show_description)
i.print(kev, show_whitelisted, show_description)
if wl and not show_whitelisted:
click.secho('\nuse --show-whitelisted to see derivations with only '
'whitelisted CVEs', fg='blue')


def output_json(items, show_whitelisted=False):
def output_json(items, kev, show_whitelisted=False):
out = []
for i in sorted(items, key=attrgetter('derivation')):
if not i.report and not show_whitelisted:
Expand All @@ -139,6 +144,12 @@ def output_json(items, show_whitelisted=False):
'derivation': d.store_path,
'affected_by': sorted(v.cve_id for v in i.report),
'whitelisted': sorted(v.cve_id for v in i.masked),
'known_exploited': sorted(v.cve_id for v in i.report
if kev.is_known_exploited(v.cve_id)),
'known_exploited_due_date': {
v.cve_id: kev.due_date(v.cve_id) for v in i.report
if kev.is_known_exploited(v.cve_id)
},
'cvssv3_basescore': {
v.cve_id: v.cvssv3
for v in (i.report | i.masked)
Expand All @@ -153,11 +164,16 @@ def output_json(items, show_whitelisted=False):
print(json.dumps(out, indent=1))


def output(items, json=False, show_whitelisted=False, show_description=False):
def output(
items,
kev,
json=False,
show_whitelisted=False,
show_description=False):
if json:
output_json(items, show_whitelisted)
output_json(items, kev, show_whitelisted)
else:
output_text(items, show_whitelisted, show_description)
output_text(items, kev, show_whitelisted, show_description)
if any(i.report for i in items):
return 2
if show_whitelisted and any(i.masked for i in items):
Expand Down
8 changes: 8 additions & 0 deletions src/vulnix/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from http import HTTPStatus
from vulnix.kev import KEV
from vulnix.nvd import NVD
from vulnix.whitelist import Whitelist
import hashlib
Expand Down Expand Up @@ -72,3 +73,10 @@ def nvd(tmpdir, http_server):
nvd.available_archives = ['modified']
with nvd:
yield nvd


@pytest.fixture
def kev(tmpdir, http_server):
mirror = f"{http_server}known_exploited_vulnerabilities.csv"
kev = KEV(mirror=mirror, cache_dir=str(tmpdir))
return kev
3 changes: 3 additions & 0 deletions src/vulnix/tests/fixtures/known_exploited_vulnerabilities.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"cveID","vendorProject","product","vulnerabilityName","dateAdded","shortDescription","requiredAction","dueDate","notes"
"CVE-1988-1234","Allman","sendmail","sendmail Debug Mode Shell Escape","1988-11-02","A remote attacker can cause sendmail to execute arbitrary shell commands by requesting DEBUG mode and then supplying a destination mail address containing a shell escape","Apply updates per vendor instructions.","1988-12-02",""
"CVE-1988-5678","BSD","fingerd","fingerd Stack Buffer Overflow","1988-11-02","An unauthenticated stack-based buffer overflow vulnerability exists in fingerd which allows an attacker to perform remote code execution.","Apply updates per vendor instructions.","1988-12-02",""
11 changes: 11 additions & 0 deletions src/vulnix/tests/kev_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from vulnix.vulnerability import Vulnerability, Node


def test_kev(kev):
kev.update()
assert kev.is_known_exploited("CVE-1988-5678")
assert kev.is_known_exploited("CVE-1988-1234")
assert not kev.is_known_exploited("CVE-1988-7777")

assert kev.due_date("CVE-1988-1234") == "1988-12-02"
assert kev.is_past_due("CVE-1988-1234")
Loading