Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,4 @@ websocket-client==0.59.0
yarl==1.7.2
zipp==3.8.0
dateparser==1.1.1
fetchcode==0.1.0
fetchcode==0.2.0
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ install_requires =
# networking
GitPython>=3.1.17
requests>=2.25.1
fetchcode>=0.1.0
fetchcode>=0.2.0

[options.extras_require]
dev =
Expand Down
69 changes: 35 additions & 34 deletions vulnerabilities/importers/gitlab.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,60 +64,61 @@
GITLAB_SCHEME_BY_PURL_TYPE = {v: k for k, v in PURL_TYPE_BY_GITLAB_SCHEME.items()}


def fork_and_get_dir(url):
"""
Fetch a clone of the gitlab repository at url and return the directory destination
"""
return fetch_via_vcs(url).dest_dir


class GitLabAPIImporter(GitImporter):
spdx_license_expression = "MIT"
license_url = "https://gitlab.com/gitlab-org/advisories-community/-/blob/main/LICENSE"

def __init__(self):
super().__init__(repo_url="git+https://gitlab.com/gitlab-org/advisories-community/")

def advisory_data(self) -> Iterable[AdvisoryData]:
def advisory_data(self, _keep_clone=True) -> Iterable[AdvisoryData]:
try:
self.clone()
path = Path(self.vcs_response.dest_dir)

glob = "**/*.yml"
files = (p for p in path.glob(glob) if p.is_file())
for file in files:
# split a path according to gitlab conventions where package type and name are a part of path
# For example with this path:
# /tmp/tmpi1klhpmd/pypi/gradio/CVE-2021-43831.yml
# the package type is pypi and the package name is gradio
# to ('/', 'tmp', 'tmpi1klhpmd', 'pypi', 'gradio', 'CVE-2021-43831.yml')
purl_type = get_gitlab_package_type(path=file)
if not purl_type:
logger.error(f"Unknow gitlab directory structure {file!r}")
continue
base_path = Path(self.vcs_response.dest_dir)

for file_path in base_path.glob("**/*.yml"):
gitlab_type, package_slug, vuln_id = parse_advisory_path(
base_path=base_path,
file_path=file_path,
)

if purl_type in PURL_TYPE_BY_GITLAB_SCHEME:
yield parse_gitlab_advisory(file)
if gitlab_type in PURL_TYPE_BY_GITLAB_SCHEME:
yield parse_gitlab_advisory(file_path)

else:
logger.error(f"Unknow package type {purl_type!r}")
logger.error(f"Unknow package type {gitlab_type!r} in {file_path!r}")
continue
finally:
if self.vcs_response:
if self.vcs_response and not _keep_clone:
self.vcs_response.delete()


def get_gitlab_package_type(path: Path):
def parse_advisory_path(base_path: Path, file_path: Path) -> Optional[AdvisoryData]:
"""
Return a package type extracted from a gitlab advisory path or None
"""
parts = path.parts[-3:]
Parse a gitlab advisory file and return a 3-tuple of:
(gitlab_type, package_slug, vulnerability_id)

if len(parts) < 3:
return
For example::

>>> base_path = Path("/tmp/tmpi1klhpmd/checkout")
>>> file_path=Path("/tmp/tmpi1klhpmd/checkout/pypi/gradio/CVE-2021-43831.yml")
>>> parse_advisory_path(base_path=base_path, file_path=file_path)
('pypi', 'gradio', 'CVE-2021-43831')

>>> file_path=Path("/tmp/tmpi1klhpmd/checkout/nuget/github.com/beego/beego/v2/nuget/CVE-2021-43831.yml")
>>> parse_advisory_path(base_path=base_path, file_path=file_path)
('nuget', 'github.com/beego/beego/v2/nuget', 'CVE-2021-43831')

>>> file_path = Path("/tmp/tmpi1klhpmd/checkout/npm/@express/beego/beego/v2/CVE-2021-43831.yml")
>>> parse_advisory_path(base_path=base_path, file_path=file_path)
('npm', '@express/beego/beego/v2', 'CVE-2021-43831')
"""
relative_path_segments = str(file_path.relative_to(base_path)).strip("/").split("/")
gitlab_type = relative_path_segments[0]
vuln_id = relative_path_segments[-1].replace(".yml", "")
package_slug = "/".join(relative_path_segments[1:-1])

type, _name, _vid = parts
return type
return gitlab_type, package_slug, vuln_id


def get_purl(package_slug):
Expand Down