77# See https://aboutcode.org for more information about nexB OSS projects.
88#
99
10+ import json
1011import logging
12+ from pathlib import Path
1113from typing import Any
1214from typing import Iterable
1315from typing import List
1416from typing import Mapping
15- from urllib .parse import urljoin
1617
17- from bs4 import BeautifulSoup
18+ from fetchcode . vcs import fetch_via_vcs
1819from packageurl import PackageURL
1920from univers .version_range import AlpineLinuxVersionRange
2021from univers .versions import InvalidVersion
2627from vulnerabilities .references import WireSharkReferenceV2
2728from vulnerabilities .references import XsaReferenceV2
2829from vulnerabilities .references import ZbxReferenceV2
29- from vulnerabilities .utils import fetch_response
30+ from vulnerabilities .utils import get_advisory_url
3031
3132
3233class AlpineLinuxImporterPipeline (VulnerableCodeBaseImporterPipelineV2 ):
@@ -35,90 +36,59 @@ class AlpineLinuxImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
3536 pipeline_id = "alpine_linux_importer_v2"
3637 spdx_license_expression = "CC-BY-SA-4.0"
3738 license_url = "https://secdb.alpinelinux.org/license.txt"
38- url = "https://secdb.alpinelinux. org/"
39+ repo_url = "git+ https://github.com/aboutcode- org/aboutcode-mirror-alpine-secdb /"
3940
4041 @classmethod
4142 def steps (cls ):
42- return (cls .collect_and_store_advisories ,)
43+ return (
44+ cls .clone ,
45+ cls .collect_and_store_advisories ,
46+ )
4347
4448 def advisories_count (self ) -> int :
45- return 0
49+ base_path = Path (self .vcs_response .dest_dir ) / "secdb"
50+ count = 0
4651
47- def collect_advisories (self ) -> Iterable [AdvisoryData ]:
48- page_response_content = fetch_response (self .url ).content
49- advisory_directory_links = fetch_advisory_directory_links (
50- page_response_content , self .url , self .log
51- )
52- advisory_links = set ()
53- visited_directories = set ()
54- for advisory_directory_link in advisory_directory_links :
55- if advisory_directory_link in visited_directories :
56- continue
52+ for json_file in base_path .rglob ("*.json" ):
53+ data = json .loads (json_file .read_text (encoding = "utf-8" ))
54+ for pkg in data .get ("packages" , []):
55+ count += len (pkg .get ("advisories" , []))
56+
57+ return count
5758
58- advisory_directory_page = fetch_response (advisory_directory_link ).content
59- advisory_links .update (
60- fetch_advisory_links (advisory_directory_page , advisory_directory_link , self .log )
59+ def clone (self ):
60+ self .log (f"Cloning `{ self .repo_url } `" )
61+ self .vcs_response = fetch_via_vcs (self .repo_url )
62+
63+ def collect_advisories (self ) -> Iterable [AdvisoryData ]:
64+ base_path = Path (self .vcs_response .dest_dir ) / "secdb"
65+ for file_path in base_path .glob ("**/*.json" ):
66+ advisory_url = get_advisory_url (
67+ file = file_path ,
68+ base_path = base_path ,
69+ url = "https://github.com/aboutcode-org/aboutcode-mirror-alpine-secdb/blob/main/" ,
6170 )
6271
63- for link in advisory_links :
64- record = fetch_response (link ).json ()
65- if not record ["packages" ]:
72+ with open (file_path ) as f :
73+ record = json .load (f )
74+
75+ if not record or not record ["packages" ]:
6676 self .log (
67- f'"packages" not found in { link !r} ' ,
77+ f'"packages" not found in { advisory_url !r} ' ,
6878 level = logging .DEBUG ,
6979 )
7080 continue
71- yield from process_record (record = record , url = link , logger = self .log )
72-
73-
74- def fetch_advisory_directory_links (
75- page_response_content : str ,
76- base_url : str ,
77- logger : callable = None ,
78- ) -> List [str ]:
79- """
80- Return a list of advisory directory links present in `page_response_content` html string
81- """
82- index_page = BeautifulSoup (page_response_content , features = "lxml" )
83- alpine_versions = [
84- link .text
85- for link in index_page .find_all ("a" )
86- if link .text .startswith ("v" ) or link .text .startswith ("edge" )
87- ]
88-
89- if not alpine_versions :
90- if logger :
91- logger (
92- f"No versions found in { base_url !r} " ,
93- level = logging .DEBUG ,
94- )
95- return []
81+ yield from process_record (record = record , url = advisory_url , logger = self .log )
9682
97- advisory_directory_links = [urljoin (base_url , version ) for version in alpine_versions ]
83+ def clean_downloads (self ):
84+ """Cleanup any temporary repository data."""
85+ if self .vcs_response :
86+ self .log (f"Removing cloned repository" )
87+ self .vcs_response .delete ()
9888
99- return advisory_directory_links
100-
101-
102- def fetch_advisory_links (
103- advisory_directory_page : str ,
104- advisory_directory_link : str ,
105- logger : callable = None ,
106- ) -> Iterable [str ]:
107- """
108- Yield json file urls present in `advisory_directory_page`
109- """
110- advisory_directory_page = BeautifulSoup (advisory_directory_page , features = "lxml" )
111- anchor_tags = advisory_directory_page .find_all ("a" )
112- if not anchor_tags :
113- if logger :
114- logger (
115- f"No anchor tags found in { advisory_directory_link !r} " ,
116- level = logging .DEBUG ,
117- )
118- return iter ([])
119- for anchor_tag in anchor_tags :
120- if anchor_tag .text .endswith ("json" ):
121- yield urljoin (advisory_directory_link , anchor_tag .text )
89+ def on_failure (self ):
90+ """Ensure cleanup is always performed on failure."""
91+ self .clean_downloads ()
12292
12393
12494def check_for_attributes (record , logger ) -> bool :
@@ -196,30 +166,14 @@ def load_advisories(
196166 level = logging .DEBUG ,
197167 )
198168 continue
169+
199170 # fixed_vulns is a list of strings and each string is a space-separated
200171 # list of aliases and CVES
201- aliases = set ()
202172 for vuln_ids in fixed_vulns :
203- if not isinstance (vuln_ids , str ):
204- if logger :
205- logger (
206- f"{ vuln_ids !r} is not of `str` instance" ,
207- level = logging .DEBUG ,
208- )
209- continue
210- vuln_ids = vuln_ids .strip ().split ()
211- if not vuln_ids :
212- if logger :
213- logger (
214- f"{ vuln_ids !r} is empty" ,
215- level = logging .DEBUG ,
216- )
217- continue
218- aliases .update (vuln_ids )
173+ aliases = vuln_ids .strip ().split (" " )
174+ vuln_id = aliases [0 ]
219175
220- for vuln_id in aliases :
221176 references = []
222-
223177 if vuln_id .startswith ("XSA" ):
224178 references .append (XsaReferenceV2 .from_id (xsa_id = vuln_id ))
225179
@@ -295,7 +249,7 @@ def load_advisories(
295249 advisory_id = f"{ pkg_infos ['name' ]} /{ qualifiers ['distroversion' ]} /{ version } /{ vuln_id } "
296250 yield AdvisoryData (
297251 advisory_id = advisory_id ,
298- aliases = [ vuln_id ] ,
252+ aliases = aliases ,
299253 references_v2 = references ,
300254 affected_packages = affected_packages ,
301255 url = url ,
0 commit comments