Skip to content

Modify Ruby importer to support package-first mode #1914

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions vulnerabilities/importers/ruby.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@
import logging
from pathlib import Path
from typing import Iterable
from typing import List
from typing import Optional

import requests
import saneyaml
from dateutil.parser import parse
from packageurl import PackageURL
from pytz import UTC
from univers.version_range import GemVersionRange
from univers.versions import RubygemsVersion

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AffectedPackage
Expand Down Expand Up @@ -52,7 +57,69 @@ class RubyImporter(Importer):
SOFTWARE.
"""

def __init__(self, purl=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.purl = purl
if self.purl and self.purl.type not in ("gem", "ruby"):
print(
f"Warning: PURL type {self.purl.type} is not 'gem' or 'ruby, may not match any advisories"
)

def advisory_data(self) -> Iterable[AdvisoryData]:
if not self.purl:
return self._batch_advisory_data()

return self._package_first_advisory_data()

def _package_first_advisory_data(self) -> Iterable[AdvisoryData]:
if self.purl.type not in ("gem", "ruby"):
return []

try:
yaml_files = []

if self.purl.type == "gem":
files = self._fetch_github_directory_content(f"gems/{self.purl.name}")
yaml_files.extend(
[
(file, "gems")
for file in files
if file.endswith(".yml") and not file.startswith("OSVDB-")
]
)
elif self.purl.type == "ruby":
files = self._fetch_github_directory_content("rubies")
yaml_files.extend(
[
(file, "rubies")
for file in files
if file.endswith(".yml") and not file.startswith("OSVDB-")
]
)

for file_path, schema_type in yaml_files:
content = self._fetch_github_file_content(file_path)
if not content:
continue

raw_data = saneyaml.load(content)

if schema_type == "rubies" and raw_data.get("engine") != self.purl.name:
continue

advisory_url = (
f"https://github.com/rubysec/ruby-advisory-db/blob/master/{file_path}"
)
advisory = parse_ruby_advisory(raw_data, schema_type, advisory_url)

if advisory and self._advisory_affects_purl(advisory):
yield advisory

except Exception as e:
logger.error(f"Error fetching advisories for {self.purl}: {str(e)}")
return []

def _batch_advisory_data(self) -> Iterable[AdvisoryData]:
try:
self.clone(self.repo_url)
base_path = Path(self.vcs_response.dest_dir)
Expand All @@ -72,6 +139,56 @@ def advisory_data(self) -> Iterable[AdvisoryData]:
if self.vcs_response:
self.vcs_response.delete()

def _advisory_affects_purl(self, advisory: AdvisoryData) -> bool:
if not self.purl:
return True

for affected_package in advisory.affected_packages:
if affected_package.package.type != self.purl.type:
continue

if affected_package.package.name != self.purl.name:
continue

if self.purl.version and affected_package.affected_version_range:
purl_version = RubygemsVersion(self.purl.version)

if purl_version not in affected_package.affected_version_range:
continue

return True

return False

def _fetch_github_directory_content(self, path: str) -> List[str]:
url = f"https://api.github.com/repos/rubysec/ruby-advisory-db/contents/{path}"
response = requests.get(url)

if response.status_code != 200:
logger.error(f"Failed to fetch directory contents from GitHub: {response.status_code}")
return []

contents = response.json()
file_paths = []

for item in contents:
if item["type"] == "file":
file_paths.append(item["path"])
elif item["type"] == "dir":
file_paths.extend(self._fetch_github_directory_content(item["path"]))

return file_paths

def _fetch_github_file_content(self, path: str) -> Optional[str]:
url = f"https://api.github.com/repos/rubysec/ruby-advisory-db/contents/{path}"
response = requests.get(url, headers={"Accept": "application/vnd.github.v3.raw"})

if response.status_code != 200:
logger.error(f"Failed to fetch file content from GitHub: {response.status_code}")
return None

return response.text


def parse_ruby_advisory(record, schema_type, advisory_url):
"""
Expand Down
82 changes: 82 additions & 0 deletions vulnerabilities/tests/test_ruby.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
import pytest
from packageurl import PackageURL
from univers.version_range import GemVersionRange
from univers.versions import RubygemsVersion

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AffectedPackage
from vulnerabilities.importers.ruby import RubyImporter
from vulnerabilities.importers.ruby import get_affected_packages
from vulnerabilities.importers.ruby import parse_ruby_advisory
from vulnerabilities.improvers.default import DefaultImprover
Expand Down Expand Up @@ -94,3 +96,83 @@ def test_ruby_improver(mock_response):
)
def test_get_affected_packages(record, purl, result):
assert get_affected_packages(record, purl) == result


@pytest.fixture
def mock_github_api(monkeypatch):
test_files = {
"gems/sinatra/CVE-2018-7212.yml": open(os.path.join(TEST_DATA, "CVE-2018-7212.yml")).read(),
"gems/sinatra/CVE-2018-11627.yml": open(
os.path.join(TEST_DATA, "CVE-2018-11627.yml")
).read(),
"rubies/CVE-2010-1330.yml": open(os.path.join(TEST_DATA, "CVE-2010-1330.yml")).read(),
"rubies/CVE-2007-5770.yml": open(os.path.join(TEST_DATA, "CVE-2007-5770.yml")).read(),
}

dir_listing = {
"gems/sinatra": [
"gems/sinatra/CVE-2018-7212.yml",
"gems/sinatra/CVE-2018-11627.yml",
],
"rubies": [
"rubies/CVE-2010-1330.yml",
"rubies/CVE-2007-5770.yml",
],
}

def mock_fetch_github_directory_content(self, path):
return dir_listing.get(path, [])

def mock_fetch_github_file_content(self, path):
return test_files.get(path, "")

monkeypatch.setattr(
RubyImporter, "_fetch_github_directory_content", mock_fetch_github_directory_content
)
monkeypatch.setattr(RubyImporter, "_fetch_github_file_content", mock_fetch_github_file_content)


def test_package_first_mode_gem_affecting(mock_github_api):
purl = PackageURL(type="gem", name="sinatra")
importer = RubyImporter(purl=purl)
advisories = list(importer.advisory_data())
assert len(advisories) == 2
assert all(a.affected_packages[0].package.name == "sinatra" for a in advisories)


def test_package_first_mode_gem_version(mock_github_api):
purl = PackageURL(type="gem", name="sinatra", version="1.2.7")
importer = RubyImporter(purl=purl)
advisories = list(importer.advisory_data())
assert len(advisories) == 2
for adv in advisories:
affected = any(
purl.version
and ap.package.name == purl.name
and ap.affected_version_range
and ap.affected_version_range.contains(RubygemsVersion(purl.version))
for ap in adv.affected_packages
)
assert affected


def test_package_first_mode_gem_not_affecting(mock_github_api):
purl = PackageURL(type="gem", name="nonexistent", version="9.9.9")
importer = RubyImporter(purl=purl)
advisories = list(importer.advisory_data())
assert advisories == []


def test_package_first_mode_ruby_engine(mock_github_api):
purl = PackageURL(type="ruby", name="jruby")
importer = RubyImporter(purl=purl)
advisories = list(importer.advisory_data())
assert len(advisories) == 1
assert advisories[0].affected_packages[0].package.name == "jruby"


def test_package_first_mode_ruby_engine_not_affecting(mock_github_api):
purl = PackageURL(type="ruby", name="nonexistent")
importer = RubyImporter(purl=purl)
advisories = list(importer.advisory_data())
assert advisories == []