diff --git a/org_fedora_oscap/content_discovery.py b/org_fedora_oscap/content_discovery.py index 23fdafd1..f158a828 100644 --- a/org_fedora_oscap/content_discovery.py +++ b/org_fedora_oscap/content_discovery.py @@ -39,31 +39,20 @@ def path_is_present_among_paths(path, paths): class ContentBringer: CONTENT_DOWNLOAD_LOCATION = pathlib.Path(common.INSTALLATION_CONTENT_DIR) - DEFAULT_SSG_DATA_STREAM_PATH = f"{common.SSG_DIR}/{common.SSG_CONTENT}" - def __init__(self, addon_data): - self.content_uri_scheme = "" - self.content_uri_path = "" - self.fetched_content = "" + def __init__(self, what_if_fail): + self._valid_content_uri = "" + self.dest_file_name = "" self.activity_lock = threading.Lock() self.now_fetching_or_processing = False + self.what_if_fail = what_if_fail self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True) - self._addon_data = addon_data - - def get_content_type(self, url): - if url.endswith(".rpm"): - return "rpm" - elif any(url.endswith(arch_type) for arch_type in common.SUPPORTED_ARCHIVES): - return "archive" - else: - return "file" - @property def content_uri(self): - return self.content_uri_scheme + "://" + self.content_uri_path + return self._valid_content_uri @content_uri.setter def content_uri(self, uri): @@ -73,127 +62,129 @@ def content_uri(self, uri): f"Invalid supplied content URL '{uri}', " "use the 'scheme://path' form.") raise KickstartValueError(msg) - self.content_uri_path = scheme_and_maybe_path[1] - self.content_uri_scheme = scheme_and_maybe_path[0] + path = scheme_and_maybe_path[1] + if "/" not in path: + msg = f"Missing the path component of the '{uri}' URL" + raise KickstartValueError(msg) + basename = path.rsplit("/", 1)[1] + if not basename: + msg = f"Unable to deduce basename from the '{uri}' URL" + raise KickstartValueError(msg) + self._valid_content_uri = uri + self.dest_file_name = self.CONTENT_DOWNLOAD_LOCATION / basename - def fetch_content(self, what_if_fail, ca_certs_path=""): + def fetch_content(self, content_uri, ca_certs_path=""): """ Initiate fetch of the content into an appropriate directory Args: - what_if_fail: Callback accepting exception as an argument that - should handle them in the calling layer. + content_uri: URI location of the content to be fetched ca_certs_path: Path to the HTTPS certificate file """ try: - self.content_uri = self._addon_data.content_url + self.content_uri = content_uri except Exception as exc: - what_if_fail(exc) + self.what_if_fail(exc) shutil.rmtree(self.CONTENT_DOWNLOAD_LOCATION, ignore_errors=True) self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True) - fetching_thread_name = self._fetch_files( - self.content_uri_scheme, self.content_uri_path, - self.CONTENT_DOWNLOAD_LOCATION, ca_certs_path, what_if_fail) + fetching_thread_name = self._fetch_files(ca_certs_path) return fetching_thread_name - def _fetch_files(self, scheme, path, destdir, ca_certs_path, what_if_fail): + def _fetch_files(self, ca_certs_path): with self.activity_lock: if self.now_fetching_or_processing: - msg = "Strange, it seems that we are already fetching something." + msg = "OSCAP Addon: Strange, it seems that we are already " \ + "fetching something." log.warn(msg) return self.now_fetching_or_processing = True fetching_thread_name = None try: - fetching_thread_name = self._start_actual_fetch(scheme, path, destdir, ca_certs_path) + fetching_thread_name = self._start_actual_fetch(ca_certs_path) except Exception as exc: with self.activity_lock: self.now_fetching_or_processing = False - what_if_fail(exc) + self.what_if_fail(exc) # We are not finished yet with the fetch return fetching_thread_name - def _start_actual_fetch(self, scheme, path, destdir, ca_certs_path): + def _start_actual_fetch(self, ca_certs_path): fetching_thread_name = None - url = scheme + "://" + path - - if "/" not in path: - msg = f"Missing the path component of the '{url}' URL" - raise KickstartValueError(msg) - basename = path.rsplit("/", 1)[1] - if not basename: - msg = f"Unable to deduce basename from the '{url}' URL" - raise KickstartValueError(msg) - - dest = destdir / basename + scheme = self.content_uri.split("://")[0] if is_network(scheme): fetching_thread_name = data_fetch.wait_and_fetch_net_data( - url, - dest, + self.content_uri, + self.dest_file_name, ca_certs_path ) else: # invalid schemes are handled down the road fetching_thread_name = data_fetch.fetch_local_data( - url, - dest, + self.content_uri, + self.dest_file_name, ) return fetching_thread_name - def finish_content_fetch(self, fetching_thread_name, fingerprint, report_callback, dest_filename, - what_if_fail): - """ - Finish any ongoing fetch and analyze what has been fetched. - - After the fetch is completed, it analyzes verifies fetched content if applicable, - analyzes it and compiles into an instance of ObtainedContent. - - Args: - fetching_thread_name: Name of the fetching thread - or None if we are only after the analysis - fingerprint: A checksum for downloaded file verification - report_callback: Means for the method to send user-relevant messages outside - dest_filename: The target of the fetch operation. Can be falsy - - in this case there is no content filename defined - what_if_fail: Callback accepting exception as an argument - that should handle them in the calling layer. - - Returns: - Instance of ObtainedContent if everything went well, or None. - """ + def finish_content_fetch(self, fetching_thread_name, fingerprint): try: - content = self._finish_actual_fetch(fetching_thread_name, fingerprint, report_callback, dest_filename) + self._finish_actual_fetch(fetching_thread_name) + if fingerprint: + self._verify_fingerprint(fingerprint) except Exception as exc: - what_if_fail(exc) - content = None + self.what_if_fail(exc) finally: with self.activity_lock: self.now_fetching_or_processing = False - return content + def _finish_actual_fetch(self, wait_for): + if wait_for: + log.info(f"OSCAP Addon: Waiting for thread {wait_for}") + threadMgr.wait(wait_for) + log.info(f"OSCAP Addon: Finished waiting for thread {wait_for}") - def _verify_fingerprint(self, dest_filename, fingerprint=""): + def _verify_fingerprint(self, fingerprint=""): if not fingerprint: return hash_obj = utils.get_hashing_algorithm(fingerprint) - digest = utils.get_file_fingerprint(dest_filename, + digest = utils.get_file_fingerprint(self.dest_file_name, hash_obj) if digest != fingerprint: log.error( - f"File {dest_filename} failed integrity check - assumed a " - f"{hash_obj.name} hash and '{fingerprint}', got '{digest}'" + "OSCAP Addon: " + f"File {self.dest_file_name} failed integrity check - assumed " + f"a {hash_obj.name} hash and '{fingerprint}', got '{digest}'" ) - msg = _(f"Integrity check of the content failed - {hash_obj.name} hash didn't match") + msg = _( + f"OSCAP Addon: Integrity check of the content failed - " + f"{hash_obj.name} hash didn't match") raise content_handling.ContentCheckError(msg) - def allow_one_expected_tailoring_or_no_tailoring(self, labelled_files): - expected_tailoring = self._addon_data.preinst_tailoring_path + +class ContentAnalyzer: + CONTENT_DOWNLOAD_LOCATION = pathlib.Path(common.INSTALLATION_CONTENT_DIR) + DEFAULT_SSG_DATA_STREAM_PATH = f"{common.SSG_DIR}/{common.SSG_CONTENT}" + + @staticmethod + def __get_content_type(url): + if url.endswith(".rpm"): + return "rpm" + elif any( + url.endswith(arch_type) + for arch_type in common.SUPPORTED_ARCHIVES): + return "archive" + else: + return "file" + + @staticmethod + def __allow_one_expected_tailoring_or_no_tailoring( + labelled_files, expected_tailoring): tailoring_label = CONTENT_TYPES["TAILORING"] if expected_tailoring: - labelled_files = self.reduce_files(labelled_files, expected_tailoring, [tailoring_label]) + labelled_files = ContentAnalyzer.reduce_files( + labelled_files, expected_tailoring, [tailoring_label]) else: labelled_files = { path: label for path, label in labelled_files.items() @@ -201,51 +192,77 @@ def allow_one_expected_tailoring_or_no_tailoring(self, labelled_files): } return labelled_files - def filter_discovered_content(self, labelled_files): - expected_path = self._addon_data.preinst_content_path - categories = (CONTENT_TYPES["DATASTREAM"], CONTENT_TYPES["XCCDF_CHECKLIST"]) + @staticmethod + def __filter_discovered_content( + labelled_files, expected_path, expected_tailoring, + expected_cpe_path): + categories = ( + CONTENT_TYPES["DATASTREAM"], + CONTENT_TYPES["XCCDF_CHECKLIST"]) if expected_path: - labelled_files = self.reduce_files(labelled_files, expected_path, categories) + labelled_files = ContentAnalyzer.reduce_files( + labelled_files, expected_path, categories) - labelled_files = self.allow_one_expected_tailoring_or_no_tailoring(labelled_files) + labelled_files = \ + ContentAnalyzer.__allow_one_expected_tailoring_or_no_tailoring( + labelled_files, expected_tailoring) - expected_path = self._addon_data.cpe_path categories = (CONTENT_TYPES["CPE_DICT"], ) - if expected_path: - labelled_files = self.reduce_files(labelled_files, expected_path, categories) + if expected_cpe_path: + labelled_files = ContentAnalyzer.reduce_files( + labelled_files, expected_cpe_path, categories) return labelled_files - def reduce_files(self, labelled_files, expected_path, categories): + @staticmethod + def reduce_files(labelled_files, expected_path, categories): reduced_files = dict() - if not path_is_present_among_paths(expected_path, labelled_files.keys()): + if not path_is_present_among_paths( + expected_path, labelled_files.keys()): msg = ( - f"Expected a file {expected_path} to be part of the supplied content, " - f"but it was not the case, got only {list(labelled_files.keys())}" + f"Expected a file {expected_path} to be part of the supplied " + f"content, but it was not the case, got only " + f"{list(labelled_files.keys())}" ) raise RuntimeError(msg) for path, label in labelled_files.items(): - if label in categories and not paths_are_equivalent(path, expected_path): + if label in categories and not paths_are_equivalent( + path, expected_path): continue reduced_files[path] = label return reduced_files - def _finish_actual_fetch(self, wait_for, fingerprint, report_callback, dest_filename): - threadMgr.wait(wait_for) - actually_fetched_content = wait_for is not None - - if fingerprint and dest_filename: - self._verify_fingerprint(dest_filename, fingerprint) + @staticmethod + def analyze( + fetching_thread_name, fingerprint, dest_filename, what_if_fail, + expected_path, expected_tailoring, expected_cpe_path): + try: + content = ContentAnalyzer.__analyze_fetched_content( + fetching_thread_name, fingerprint, dest_filename, + expected_path, expected_tailoring, expected_cpe_path) + except Exception as exc: + what_if_fail(exc) + content = None + return content - fpaths = self._gather_available_files(actually_fetched_content, dest_filename) + @staticmethod + def __analyze_fetched_content( + wait_for, fingerprint, dest_filename, expected_path, + expected_tailoring, expected_cpe_path): + actually_fetched_content = wait_for is not None + fpaths = ContentAnalyzer.__gather_available_files( + actually_fetched_content, dest_filename) - structured_content = ObtainedContent(self.CONTENT_DOWNLOAD_LOCATION) - content_type = self.get_content_type(str(dest_filename)) + structured_content = ObtainedContent( + ContentAnalyzer.CONTENT_DOWNLOAD_LOCATION) + content_type = ContentAnalyzer.__get_content_type(str(dest_filename)) if content_type in ("archive", "rpm"): structured_content.add_content_archive(dest_filename) labelled_filenames = content_handling.identify_files(fpaths) - labelled_filenames = self.filter_discovered_content(labelled_filenames) + labelled_filenames = ContentAnalyzer.__filter_discovered_content( + labelled_filenames, expected_path, expected_tailoring, + expected_cpe_path) for fname, label in labelled_filenames.items(): structured_content.add_file(str(fname), label) @@ -255,18 +272,21 @@ def _finish_actual_fetch(self, wait_for, fingerprint, report_callback, dest_file return structured_content - def _gather_available_files(self, actually_fetched_content, dest_filename): + @staticmethod + def __gather_available_files(actually_fetched_content, dest_filename): fpaths = [] if not actually_fetched_content: if not dest_filename: # using scap-security-guide - fpaths = [self.DEFAULT_SSG_DATA_STREAM_PATH] + fpaths = [ContentAnalyzer.DEFAULT_SSG_DATA_STREAM_PATH] else: # Using downloaded XCCDF/OVAL/DS/tailoring - fpaths = pathlib.Path(self.CONTENT_DOWNLOAD_LOCATION).rglob("*") + fpaths = pathlib.Path( + ContentAnalyzer.CONTENT_DOWNLOAD_LOCATION).rglob("*") fpaths = [str(p) for p in fpaths if p.is_file()] else: dest_filename = pathlib.Path(dest_filename) # RPM is an archive at this phase - content_type = self.get_content_type(str(dest_filename)) + content_type = ContentAnalyzer.__get_content_type( + str(dest_filename)) if content_type in ("archive", "rpm"): try: fpaths = common.extract_data( @@ -274,8 +294,10 @@ def _gather_available_files(self, actually_fetched_content, dest_filename): str(dest_filename.parent) ) except common.ExtractionError as err: - msg = f"Failed to extract the '{dest_filename}' archive: {str(err)}" - log.error(msg) + msg = ( + f"Failed to extract the '{dest_filename}' " + f"archive: {str(err)}") + log.error("OSCAP Addon: " + msg) raise err elif content_type == "file": @@ -284,45 +306,6 @@ def _gather_available_files(self, actually_fetched_content, dest_filename): raise common.OSCAPaddonError("Unsupported content type") return fpaths - def use_downloaded_content(self, content): - preferred_content = self.get_preferred_content(content) - - # We know that we have ended up with a datastream-like content, - # but if we can't convert an archive to a datastream. - # self._addon_data.content_type = "datastream" - content_type = self._addon_data.content_type - if content_type in ("archive", "rpm"): - self._addon_data.content_path = str(preferred_content.relative_to(content.root)) - else: - self._addon_data.content_path = str(preferred_content) - - preferred_tailoring = self.get_preferred_tailoring(content) - if content.tailoring: - if content_type in ("archive", "rpm"): - self._addon_data.tailoring_path = str(preferred_tailoring.relative_to(content.root)) - else: - self._addon_data.tailoring_path = str(preferred_tailoring) - - def use_system_content(self, content=None): - self._addon_data.clear_all() - self._addon_data.content_type = "scap-security-guide" - self._addon_data.content_path = common.get_ssg_path() - - def get_preferred_content(self, content): - if self._addon_data.content_path: - preferred_content = content.find_expected_usable_content(self._addon_data.content_path) - else: - preferred_content = content.select_main_usable_content() - return preferred_content - - def get_preferred_tailoring(self, content): - tailoring_path = self._addon_data.tailoring_path - if tailoring_path: - if tailoring_path != str(content.tailoring.relative_to(content.root)): - msg = f"Expected a tailoring {tailoring_path}, but it couldn't be found" - raise content_handling.ContentHandlingError(msg) - return content.tailoring - class ObtainedContent: """ @@ -419,3 +402,17 @@ def select_main_usable_content(self): "Couldn't find a valid datastream or a valid XCCDF-OVAL file tuple " "among the available content") raise content_handling.ContentHandlingError(msg) + + def get_preferred_tailoring(self, tailoring_path): + if tailoring_path: + if tailoring_path != str(self.tailoring.relative_to(self.root)): + msg = f"Expected a tailoring {tailoring_path}, but it couldn't be found" + raise content_handling.ContentHandlingError(msg) + return self.tailoring + + def get_preferred_content(self, content_path): + if content_path: + preferred_content = self.find_expected_usable_content(content_path) + else: + preferred_content = self.select_main_usable_content() + return preferred_content diff --git a/org_fedora_oscap/gui/spokes/oscap.py b/org_fedora_oscap/gui/spokes/oscap.py index 54eae1e6..bd01316d 100644 --- a/org_fedora_oscap/gui/spokes/oscap.py +++ b/org_fedora_oscap/gui/spokes/oscap.py @@ -32,7 +32,7 @@ from org_fedora_oscap import scap_content_handler from org_fedora_oscap import utils from org_fedora_oscap.common import dry_run_skip -from org_fedora_oscap.content_discovery import ContentBringer +from org_fedora_oscap.content_discovery import ContentBringer, ContentAnalyzer from pyanaconda.threading import threadMgr, AnacondaThread from pyanaconda.ui.gui.spokes import NormalSpoke from pyanaconda.ui.communication import hubQ @@ -251,7 +251,7 @@ def __init__(self, data, storage, payload): self._anaconda_spokes_initialized = threading.Event() self.initialization_controller.init_done.connect(self._all_anaconda_spokes_initialized) - self.content_bringer = ContentBringer(self._addon_data) + self.content_bringer = ContentBringer(self._handle_error) def _all_anaconda_spokes_initialized(self): log.debug("OSCAP addon: Anaconda init_done signal triggered") @@ -378,7 +378,8 @@ def _fetch_data_and_initialize(self): thread_name = None if self._addon_data.content_url and self._addon_data.content_type != "scap-security-guide": thread_name = self.content_bringer.fetch_content( - self._handle_error, self._addon_data.certificates) + self._addon_data.content_url, + self._addon_data.certificates) # pylint: disable-msg=E1101 hubQ.send_message(self.__class__.__name__, @@ -400,18 +401,22 @@ def _init_after_data_fetch(self, wait_for): :type wait_for: str or None """ - def update_progress_label(msg): - fire_gtk_action(self._progress_label.set_text(msg)) - content_path = None actually_fetched_content = wait_for is not None if actually_fetched_content: content_path = self._addon_data.raw_preinst_content_path + self.content_bringer.finish_content_fetch( + wait_for, self._addon_data.fingerprint) + + expected_path = self._addon_data.preinst_content_path + expected_tailoring = self._addon_data.preinst_tailoring_path + expected_cpe_path = self._addon_data.cpe_path - content = self.content_bringer.finish_content_fetch( - wait_for, self._addon_data.fingerprint, update_progress_label, - content_path, self._handle_error) + content = ContentAnalyzer.analyze( + wait_for, self._addon_data.fingerprint, content_path, + self._handle_error, expected_path, expected_tailoring, + expected_cpe_path) if not content: with self._fetch_flag_lock: self._fetching = False @@ -420,7 +425,7 @@ def update_progress_label(msg): try: if actually_fetched_content: - self.content_bringer.use_downloaded_content(content) + self._use_downloaded_content(content) msg = f"Opening SCAP content at {self._addon_data.preinst_content_path}" if self._addon_data.tailoring_path: @@ -1164,5 +1169,33 @@ def on_change_content_clicked(self, *args): self.refresh() def on_use_ssg_clicked(self, *args): - self.content_bringer.use_system_content() + self._use_system_content() self._fetch_data_and_initialize() + + def _use_system_content(self): + self._addon_data.clear_all() + self._addon_data.content_type = "scap-security-guide" + self._addon_data.content_path = common.get_ssg_path() + + def _use_downloaded_content(self, content): + preferred_content = content.get_preferred_content( + self._addon_data.content_path) + + # We know that we have ended up with a datastream-like content, + # but if we can't convert an archive to a datastream. + # self._addon_data.content_type = "datastream" + content_type = self._addon_data.content_type + if content_type in ("archive", "rpm"): + self._addon_data.content_path = str( + preferred_content.relative_to(content.root)) + else: + self._addon_data.content_path = str(preferred_content) + + preferred_tailoring = content.get_preferred_tailoring( + self._addon_data.tailoring_path) + if content.tailoring: + if content_type in ("archive", "rpm"): + self._addon_data.tailoring_path = str( + preferred_tailoring.relative_to(content.root)) + else: + self._addon_data.tailoring_path = str(preferred_tailoring) diff --git a/org_fedora_oscap/ks/oscap.py b/org_fedora_oscap/ks/oscap.py index 7d4a1319..d4d311a8 100644 --- a/org_fedora_oscap/ks/oscap.py +++ b/org_fedora_oscap/ks/oscap.py @@ -105,7 +105,8 @@ def __init__(self, name, just_clear=False): self.rule_data = rule_handling.RuleData() self.dry_run = False - self.content_bringer = content_discovery.ContentBringer(self) + self.content_bringer = content_discovery.ContentBringer( + self._handle_error) def __str__(self): """ @@ -436,21 +437,31 @@ def setup(self, storage, ksdata, payload): thread_name = None if not os.path.exists(self.preinst_content_path) and not os.path.exists(self.raw_preinst_content_path): # content not available/fetched yet - thread_name = self.content_bringer.fetch_content(self._handle_error, self.certificates) + thread_name = self.content_bringer.fetch_content( + self.content_url, self.certificates) content_dest = None + fingerprint = None if self.content_type != "scap-security-guide": content_dest = self.raw_preinst_content_path - - content = self.content_bringer.finish_content_fetch( - thread_name, self.fingerprint, lambda msg: log.info(msg), content_dest, self._handle_error) + fingerprint = self.fingerprint + + expected_path = self.preinst_content_path + expected_tailoring = self.preinst_tailoring_path + expected_cpe_path = self.cpe_path + if thread_name is not None: + self.content_bringer.finish_content_fetch( + thread_name, fingerprint) + content = content_discovery.ContentAnalyzer.analyze( + thread_name, self.fingerprint, content_dest, self._handle_error, + expected_path, expected_tailoring, expected_cpe_path) if not content: return try: # just check that preferred content exists - self.content_bringer.get_preferred_content(content) + content.get_preferred_content(self.content_path) except Exception as exc: self._terminate(str(exc)) return diff --git a/tests/test_content_discovery.py b/tests/test_content_discovery.py index d6e14d9f..09317ff9 100644 --- a/tests/test_content_discovery.py +++ b/tests/test_content_discovery.py @@ -21,7 +21,7 @@ def labelled_files(): def test_reduce(labelled_files): - bringer = tested_module.ContentBringer(None) + analyzer = tested_module.ContentAnalyzer() d_count = 0 x_count = 0 @@ -31,22 +31,22 @@ def test_reduce(labelled_files): elif l == "X": x_count += 1 - reduced = bringer.reduce_files(labelled_files, "dir/datastream", ["D"]) + reduced = analyzer.reduce_files(labelled_files, "dir/datastream", ["D"]) assert len(reduced) == len(labelled_files) - d_count + 1 assert "dir/datastream" in reduced - reduced = bringer.reduce_files(labelled_files, "dir/datastream", ["D", "X"]) + reduced = analyzer.reduce_files(labelled_files, "dir/datastream", ["D", "X"]) assert len(reduced) == len(labelled_files) - d_count - x_count + 1 assert "dir/datastream" in reduced - reduced = bringer.reduce_files(labelled_files, "dir/XCCDF", ["D", "X"]) + reduced = analyzer.reduce_files(labelled_files, "dir/XCCDF", ["D", "X"]) assert len(reduced) == len(labelled_files) - d_count - x_count + 1 assert "dir/XCCDF" in reduced with pytest.raises(RuntimeError, match="dir/datastream4"): - bringer.reduce_files(labelled_files, "dir/datastream4", ["D"]) + analyzer.reduce_files(labelled_files, "dir/datastream4", ["D"]) - reduced = bringer.reduce_files(labelled_files, "cpe", ["C"]) + reduced = analyzer.reduce_files(labelled_files, "cpe", ["C"]) assert reduced == labelled_files