Skip to content

Commit

Permalink
OJVG Ratings/Affected Versions fixes (#33)
Browse files Browse the repository at this point in the history
* capture ojvg url

* affected version fix for earlier reports, ojvg score

* float on - modest mouse

* more floats?

* actually use float

* convert to float

* better fetching
  • Loading branch information
Scanteianu authored Jun 24, 2024
1 parent 4c1af72 commit d2a92ea
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 21 deletions.
42 changes: 32 additions & 10 deletions cvereporter/fetch_vulnerabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
from datetime import datetime
from cyclonedx.model.vulnerability import (
Vulnerability,
VulnerabilityScoreSource,
VulnerabilitySource,
VulnerabilityRating,
BomTarget,
)

Expand All @@ -20,8 +22,8 @@ def fetch_cves(date: str) -> list[Vulnerability]:


def fetch_dicts(date: str):
cve_text = retrieve_cves_from_internet(date)
dicts = parse_to_dict(cve_text, date)
cve_text, url = retrieve_cves_from_internet(date)
dicts = parse_to_dict(cve_text, date, url)
return dicts


Expand All @@ -42,18 +44,18 @@ def retrieve_cves_from_internet(date: str) -> str:
},
)
except requests.exceptions.ReadTimeout:
return None
return None, None
if r.status_code == 404:
return None
return None, None
resp_text = r.text
# todo: make this configurable
with open("data/open_jvg_dump_" + date + ".html", "w") as dump:
dump.write(resp_text)
return resp_text
return resp_text, url


def parse_to_cyclone(resp_text: str, date: str) -> list[Vulnerability]:
dicts = parse_to_dict(resp_text, date)
def parse_to_cyclone(resp_text: str, date: str, ojvg_url: str) -> list[Vulnerability]:
dicts = parse_to_dict(resp_text, date, ojvg_url)
return dict_to_vulns(dicts)


Expand Down Expand Up @@ -91,7 +93,7 @@ def intersect_major_versions_with_extracted_affected(
return affected_versions


def parse_to_dict(resp_text: str, date: str) -> list[dict]:
def parse_to_dict(resp_text: str, date: str, ojvg_url: str) -> list[dict]:
if resp_text is None:
return None
soup = BeautifulSoup(resp_text, "html.parser")
Expand All @@ -103,7 +105,7 @@ def parse_to_dict(resp_text: str, date: str) -> list[dict]:
# find the table with the CVEs
table = soup.find("table", attrs={"class": "risk-matrix"})
if table is None:
print("unable to find risk matrix for "+date)
print("unable to find risk matrix for " + date)
return None
# find all the rows in the table
rows = table.find_all("tr")
Expand All @@ -129,7 +131,7 @@ def parse_to_dict(resp_text: str, date: str) -> list[dict]:
affected_major_versions = []
index = 0
for column in row.find_all("td"):
if column.text == "•":
if "•" in column.text:
affected_major_versions.append(int(column_headers[index]))
index += 1
if cve is not None:
Expand All @@ -139,6 +141,12 @@ def parse_to_dict(resp_text: str, date: str) -> list[dict]:
link = cve.find("a")["href"]
componentsTD = cve.find_next_sibling("td")
component = componentsTD.text.replace("\n", "")
score_td = componentsTD.find_next_sibling()
score_text = score_td.text
if score_text is not None:
score_text = score_text.split()[
0
] # in 2024, we start seeing 2 line things with "NHNNUHHHN" which is not a number
affected_versions = intersect_major_versions_with_extracted_affected(
extracted_affected, affected_major_versions
)
Expand All @@ -148,6 +156,12 @@ def parse_to_dict(resp_text: str, date: str) -> list[dict]:
parsed_data["date"] = date
parsed_data["component"] = component
parsed_data["affected"] = affected_versions
parsed_data["ojvg_url"] = ojvg_url
try:
parsed_data["ojvg_score"] = float(score_text)
except ValueError:
print(score_text + " is not a valid score float")
parsed_data["ojvg_score"] = float("nan")
print(json.dumps(parsed_data))
dicts.append(parsed_data)

Expand Down Expand Up @@ -175,6 +189,12 @@ def dict_to_vulns(dicts: list[dict]) -> list[Vulnerability]:
recommendation="",
)
vuln.affects.add(affects)
vr = VulnerabilityRating(
source=parsed_data["ojvg_url"],
score=parsed_data["ojvg_score"],
method=VulnerabilityScoreSource.CVSS_V3_1,
)
vuln.ratings.add(vr)
vulnerabilities.append(vuln)
return vulnerabilities

Expand All @@ -194,6 +214,8 @@ def extract_affected(header_string: str) -> list[str]:
affected = []
start_vulns = "The affected versions are "
end_vulns = "Please note that defense-in-depth issues"
if end_vulns not in header_string:
end_vulns = "We recommend that you upgrade" # there is some inconsistency in earlier (2019) formulaic text
if start_vulns not in header_string or end_vulns not in header_string:
return []
vulns_sub = header_string[
Expand Down
22 changes: 15 additions & 7 deletions cvereporter/nist_enhance.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
def fetch_nist(url: str, id: str) -> dict:
data = None
nist_resp = None
if "NIST_NVD_TOKEN" in os.environ and os.environ["NIST_NVD_TOKEN"]: # check not empty
print("making call to NIST using api key! "+url, flush=True)
time.sleep(1) # stay well within 50 requests/30 seconds
nist_resp = requests.get(url, headers= {"apiKey": os.environ["NIST_NVD_TOKEN"]})
if (
"NIST_NVD_TOKEN" in os.environ and os.environ["NIST_NVD_TOKEN"]
): # check not empty
print("making call to NIST using api key! " + url, flush=True)
time.sleep(1) # stay well within 50 requests/30 seconds
nist_resp = requests.get(url, headers={"apiKey": os.environ["NIST_NVD_TOKEN"]})
else:
print("making call to NIST without using api key! "+url, flush=True)
time.sleep(10) # stay well within 5 requests/30 seconds
print("making call to NIST without using api key! " + url, flush=True)
time.sleep(10) # stay well within 5 requests/30 seconds
nist_resp = requests.get(url)
if nist_resp.status_code != 200:
print(
Expand Down Expand Up @@ -99,10 +101,16 @@ def enhance(vulns: list[Vulnerability]):
print("\n\n\n\n\n\nvuln: {} index {} ".format(id, count))
# print(json.dumps(relevant, indent=True))
for rating in relevant["ratings"]:

score_float = float("nan")
try:
score_float = float(rating["score"])
except ValueError:
print(str(rating["score"]) + " is not a float")
# todo: convert the ratings into the cyclonedx enums?
vr = VulnerabilityRating(
source=VulnerabilitySource(url=rating["source"]),
score=rating["score"],
score=score_float,
vector=rating["vector"],
method=VulnerabilityScoreSource.CVSS_V3_1,
)
Expand Down
32 changes: 30 additions & 2 deletions ojvg_download.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,49 @@
from cvereporter import fetch_vulnerabilities
from datetime import date, timedelta
import json
import time

"""
a brute force ojvg downloader which iterates through all dates from 1 jan 2019 (month reports start) to something close to the present day (end_date).
It downloads all the vulnerability reports as html files to the `data` directory and saves the relevant data in `data/ojvg_summary.json`
"""
start_date = date(2019, 1, 1)
start_date = date(2024, 4, 17)
end_date = date.today()
current_date = start_date
responses = []
# hard code this, to avoid excessive api calls. Assume no backdated advisories will be published, only fetch every day for dates after last report.
list_of_dates = [
"2024-04-16",
"2024-01-16",
"2023-10-17",
"2023-07-18",
"2023-04-18",
"2023-01-17",
"2022-10-18",
"2022-07-19",
"2022-04-19",
"2022-01-18",
"2021-10-19",
"2021-07-20",
"2021-04-20",
"2021-01-19",
"2020-10-20",
"2020-07-14",
"2020-04-14",
"2020-01-14",
"2019-10-15",
"2019-07-16",
"2019-04-16",
]
while current_date < end_date:
date_str = current_date.strftime("%Y-%m-%d")
current_date += timedelta(days=1)
list_of_dates.append(date_str)
for date_str in list_of_dates:
print(date_str)
resp = fetch_vulnerabilities.fetch_dicts(date_str)
print(resp, flush=True)
current_date += timedelta(days=1)
time.sleep(0.5) # avoid too many requests per second
if resp is not None:
responses.append(resp)

Expand Down
9 changes: 7 additions & 2 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
# To run a single test: python3 -m pytest -v -k test_fetch -s (in this case, runs "test_fetch")
def test_fetch():
with open("tests/data/open_jvg_dump_2023-01-17.html", "r") as data:
vulns = fetch_vulnerabilities.parse_to_cyclone(data, "2023-01-17")
vulns = fetch_vulnerabilities.parse_to_cyclone(
data, "2023-01-17", "www.fakeurl.com"
)

print(vulns)
assert len(vulns) == 3
Expand All @@ -21,11 +23,14 @@ def test_fetch():

def test_parse_to_dict():
with open("tests/data/open_jvg_dump_2023-01-17.html", "r") as data:
vulns = fetch_vulnerabilities.parse_to_dict(data, "2023-01-17")
vulns = fetch_vulnerabilities.parse_to_dict(
data, "2023-01-17", "www.fakeurl.com"
)
print(vulns)
for cve in vulns:
if cve["id"] == "CVE-2023-21830":
assert len(cve["affected"]) == 2
assert cve["ojvg_url"] == "www.fakeurl.com"


def test_nist_parse():
Expand Down

0 comments on commit d2a92ea

Please sign in to comment.