Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make EPSS behave like other data sources #4125

Merged
merged 5 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
rebase: main
  • Loading branch information
terriko committed Jun 25, 2024
commit 9b858132a549842cf626515eedd7146ed8591d47
5 changes: 5 additions & 0 deletions cve_bin_tool/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from cve_bin_tool.data_sources import (
DataSourceSupport,
curl_source,
epss_source,
gad_source,
nvd_source,
osv_source,
Expand Down Expand Up @@ -722,6 +723,10 @@ def main(argv=None):
source_purl2cpe = purl2cpe_source.PURL2CPE_Source()
enabled_sources.append(source_purl2cpe)

if "EPSS" not in disabled_sources:
source_epss = epss_source.Epss_Source()
enabled_sources.append(source_epss)

if "NVD" not in disabled_sources:
source_nvd = nvd_source.NVD_Source(
nvd_type=nvd_type,
Expand Down
50 changes: 24 additions & 26 deletions cve_bin_tool/cvedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class CVEDB:
LOGGER = LOGGER.getChild("CVEDB")
SOURCES = [
curl_source.Curl_Source,
epss_source.Epss_Source,
osv_source.OSV_Source,
gad_source.GAD_Source,
purl2cpe_source.PURL2CPE_Source,
Expand Down Expand Up @@ -520,11 +521,9 @@ def populate_db(self) -> None:
"""

self.populate_metrics()

# EPSS uses metrics table to get the EPSS metric id.
# It can't be run before creation of metrics table.
self.populate_epss()
self.store_epss_data()
self.populate_purl2cpe()

for idx, data in enumerate(self.data):
Expand All @@ -538,22 +537,29 @@ def populate_db(self) -> None:
# if source_name != "NVD" and cve_data[0] is not None:
# cve_data = self.update_vendors(cve_data)

severity_data, affected_data = cve_data
if source_name == "EPSS":
if cve_data is not None:
self.store_epss_data(cve_data)

cursor = self.db_open_and_get_cursor()
else:
severity_data, affected_data = cve_data

if severity_data is not None and len(severity_data) > 0:
self.populate_severity(severity_data, cursor, data_source=source_name)
self.populate_cve_metrics(severity_data, cursor)
if affected_data is not None:
self.populate_affected(
affected_data,
cursor,
data_source=source_name,
)
if self.connection is not None:
self.connection.commit()
self.db_close()
cursor = self.db_open_and_get_cursor()

if severity_data is not None and len(severity_data) > 0:
self.populate_severity(
severity_data, cursor, data_source=source_name
)
self.populate_cve_metrics(severity_data, cursor)
if affected_data is not None:
self.populate_affected(
affected_data,
cursor,
data_source=source_name,
)
if self.connection is not None:
self.connection.commit()
self.db_close()

def populate_severity(self, severity_data, cursor, data_source):
"""Populate the database with CVE severities."""
Expand Down Expand Up @@ -671,14 +677,6 @@ def populate_metrics(self):
self.connection.commit()
self.db_close()

def populate_epss(self):
"""Exploit Prediction Scoring System (EPSS) data to help users evaluate risks
Add EPSS data into the database"""
epss = epss_source.Epss_Source()
cursor = self.db_open_and_get_cursor()
self.epss_data = run_coroutine(epss.update_epss(cursor))
self.db_close()

def metric_finder(self, cursor, cve):
"""
SQL query to retrieve the metrics_name based on the metrics_id
Expand Down Expand Up @@ -908,11 +906,11 @@ def populate_exploit_db(self, exploits):
self.connection.commit()
self.db_close()

def store_epss_data(self):
def store_epss_data(self, epss_data):
"""Insert Exploit Prediction Scoring System (EPSS) data into database."""
insert_cve_metrics = self.INSERT_QUERIES["insert_cve_metrics"]
cursor = self.db_open_and_get_cursor()
cursor.executemany(insert_cve_metrics, self.epss_data)
cursor.executemany(insert_cve_metrics, epss_data)
self.connection.commit()
self.db_close()

Expand Down
26 changes: 20 additions & 6 deletions cve_bin_tool/data_sources/epss_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@


class Epss_Source:
SOURCE = "Epss"
SOURCE = "EPSS"
CACHEDIR = DISK_LOCATION_DEFAULT
BACKUPCACHEDIR = DISK_LOCATION_BACKUP
LOGGER = logging.getLogger().getChild("CVEDB")
Expand All @@ -43,14 +43,12 @@ async def update_epss(self, cursor):
- EPSS score
- EPSS percentile
"""
self.EPSS_id_finder(cursor)
await self.download_and_parse_epss()
return self.epss_data
self.LOGGER.debug("Fetching EPSS data...")

async def download_and_parse_epss(self):
"""Downloads and parses the EPSS data from the CSV file."""
self.EPSS_id_finder(cursor)
await self.download_epss_data()
self.epss_data = self.parse_epss_data()
return self.epss_data

async def download_epss_data(self):
"""Downloads the EPSS CSV file and saves it to the local filesystem.
Expand Down Expand Up @@ -138,3 +136,19 @@ def parse_epss_data(self, file_path=None):
(cve_id, self.epss_metric_id, epss_score, epss_percentile)
)
return parsed_data

async def get_cve_data(self):
"""Gets EPSS data.
This function is so that the epss source matches the others api-wise to make for
easier disabling/enabling.

returns (data, "EPSS") so that the source can be identified for storing data
"""

try:
await self.update_epss()
except Exception as e:
self.LOGGER.debug(f"Error while fetching EPSS data: {e}")
self.LOGGER.error("Unable to fetch EPSS, skipping EPSS.")

return self.epss_data, self.SOURCE
2 changes: 1 addition & 1 deletion cve_bin_tool/data_sources/nvd_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def format_data_api2(self, all_cve_entries):
cvss_data = cve_cvss["cvssMetricV2"][0]["cvssData"]
cve["CVSS_version"] = 2
else:
LOGGER.info(f"Unknown CVSS metrics field {cve_item['id']}")
LOGGER.debug(f"Unknown CVSS metrics field {cve_item['id']}")
cvss_available = False
if cvss_available:
cve["severity"] = cvss_data.get("baseSeverity", "UNKNOWN")
Expand Down