diff --git a/conda/environments/all_cuda-121_arch-x86_64.yaml b/conda/environments/all_cuda-121_arch-x86_64.yaml index b440991aa3..3b310995fb 100644 --- a/conda/environments/all_cuda-121_arch-x86_64.yaml +++ b/conda/environments/all_cuda-121_arch-x86_64.yaml @@ -13,6 +13,7 @@ dependencies: - appdirs - arxiv=1.4 - automake +- beautifulsoup4 - benchmark=1.8.3 - boost-cpp=1.84 - boto3 diff --git a/conda/environments/dev_cuda-121_arch-x86_64.yaml b/conda/environments/dev_cuda-121_arch-x86_64.yaml index 2ee99333a0..23ff2c707e 100644 --- a/conda/environments/dev_cuda-121_arch-x86_64.yaml +++ b/conda/environments/dev_cuda-121_arch-x86_64.yaml @@ -11,6 +11,7 @@ channels: dependencies: - appdirs - automake +- beautifulsoup4 - benchmark=1.8.3 - boost-cpp=1.84 - breathe=4.35.0 diff --git a/conda/environments/examples_cuda-121_arch-x86_64.yaml b/conda/environments/examples_cuda-121_arch-x86_64.yaml index 857b73aa85..11d5e535ce 100644 --- a/conda/environments/examples_cuda-121_arch-x86_64.yaml +++ b/conda/environments/examples_cuda-121_arch-x86_64.yaml @@ -12,6 +12,7 @@ dependencies: - anyio>=3.7 - appdirs - arxiv=1.4 +- beautifulsoup4 - boto3 - click >=8 - cuml=24.02.* diff --git a/conda/environments/runtime_cuda-121_arch-x86_64.yaml b/conda/environments/runtime_cuda-121_arch-x86_64.yaml index 3f9543d426..80f6f995d2 100644 --- a/conda/environments/runtime_cuda-121_arch-x86_64.yaml +++ b/conda/environments/runtime_cuda-121_arch-x86_64.yaml @@ -10,6 +10,7 @@ channels: - pytorch dependencies: - appdirs +- beautifulsoup4 - click >=8 - datacompy=0.10 - dill=0.3.7 diff --git a/dependencies.yaml b/dependencies.yaml index 616c1db3de..7f1f9145ef 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -249,6 +249,7 @@ dependencies: - &dill dill=0.3.7 - &scikit-learn scikit-learn=1.3.2 - appdirs + - beautifulsoup4 - datacompy=0.10 - elasticsearch==8.9.0 - feedparser=6.0.10 diff --git a/examples/llm/vdb_upload/module/rss_source_pipe.py b/examples/llm/vdb_upload/module/rss_source_pipe.py index ff61940b8c..55b309e032 100644 --- a/examples/llm/vdb_upload/module/rss_source_pipe.py +++ b/examples/llm/vdb_upload/module/rss_source_pipe.py @@ -49,6 +49,7 @@ class RSSSourcePipeSchema(BaseModel): request_timeout_sec: float = 2.0 run_indefinitely: bool = True stop_after_rec: int = 0 + strip_markup: bool = True vdb_resource_name: str web_scraper_config: Optional[Dict[Any, Any]] = None @@ -98,6 +99,7 @@ def _rss_source_pipe(builder: mrc.Builder): - **request_timeout_sec**: Timeout in seconds for RSS feed requests. - **run_indefinitely**: Boolean to indicate continuous running. - **stop_after**: Number of records to process before stopping (0 for indefinite). + - **strip_markup**: When True, strip HTML & XML markup from feed content. - **web_scraper_config**: Configuration for the web scraper module. - **chunk_overlap**: Overlap size for chunks in web scraping. - **chunk_size**: Size of content chunks for processing. @@ -131,6 +133,7 @@ def _rss_source_pipe(builder: mrc.Builder): "request_timeout_sec": validated_config.request_timeout_sec, "interval_sec": validated_config.interval_sec, "stop_after_rec": validated_config.stop_after_rec, + "strip_markup": validated_config.strip_markup, } rss_source_loader = RSSSourceLoaderFactory.get_instance("rss_source", {"rss_source": rss_source_config}) diff --git a/examples/llm/vdb_upload/vdb_config.yaml b/examples/llm/vdb_upload/vdb_config.yaml index ac93a47615..5698cc2e83 100644 --- a/examples/llm/vdb_upload/vdb_config.yaml +++ b/examples/llm/vdb_upload/vdb_config.yaml @@ -76,6 +76,7 @@ vdb_pipeline: request_timeout_sec: 2.0 run_indefinitely: true stop_after_rec: 0 + strip_markup: true web_scraper_config: chunk_overlap: 51 chunk_size: 512 diff --git a/examples/llm/vdb_upload/vdb_utils.py b/examples/llm/vdb_upload/vdb_utils.py index 7740acbc7c..d9e39b2553 100644 --- a/examples/llm/vdb_upload/vdb_utils.py +++ b/examples/llm/vdb_upload/vdb_utils.py @@ -142,6 +142,7 @@ def _build_default_rss_source(enable_cache, "interval_sec": interval_secs, "request_timeout_sec": rss_request_timeout_sec, "run_indefinitely": run_indefinitely, + "strip_markup": True, "vdb_resource_name": vector_db_resource_name, "web_scraper_config": { "chunk_size": content_chunking_size, diff --git a/morpheus/controllers/rss_controller.py b/morpheus/controllers/rss_controller.py index 5b9c36f369..a1972c406f 100644 --- a/morpheus/controllers/rss_controller.py +++ b/morpheus/controllers/rss_controller.py @@ -70,8 +70,17 @@ class RSSController: Cooldown interval in seconds if there is a failure in fetching or parsing the feed. request_timeout : float, optional, default = 2.0 Request timeout in secs to fetch the feed. + strip_markup : bool, optional, default = False + When true, strip HTML & XML markup from the from the content, summary and title fields. """ + # Fields which may contain HTML or XML content + MARKUP_FIELDS = ( + "content", + "summary", + "title", + ) + def __init__(self, feed_input: str | list[str], batch_size: int = 128, @@ -79,7 +88,8 @@ def __init__(self, enable_cache: bool = False, cache_dir: str = "./.cache/http", cooldown_interval: int = 600, - request_timeout: float = 2.0): + request_timeout: float = 2.0, + strip_markup: bool = False): if IMPORT_EXCEPTION is not None: raise ImportError(IMPORT_ERROR_MESSAGE) from IMPORT_EXCEPTION @@ -92,6 +102,7 @@ def __init__(self, self._previous_entries = set() # Stores the IDs of previous entries to prevent the processing of duplicates. self._cooldown_interval = cooldown_interval self._request_timeout = request_timeout + self._strip_markup = strip_markup # Validate feed_input for f in self._feed_input: @@ -236,6 +247,44 @@ def _try_parse_feed(self, url: str) -> "feedparser.FeedParserDict": return feed + @staticmethod + def _strip_markup_from_field(field: str, mime_type: str) -> str: + if mime_type.endswith("xml"): + parser = "xml" + else: + parser = "html.parser" + + try: + soup = BeautifulSoup(field, features=parser) + return soup.get_text() + except Exception as ex: + logger.error("Failed to strip tags from field: %s: %s", field, ex) + return field + + def _strip_markup_from_fields(self, entry: "feedparser.FeedParserDict"): + """ + Strip HTML & XML tags from the content, summary and title fields. + + Per note in feedparser documentation even if a field is advertized as plain text, it may still contain HTML + https://feedparser.readthedocs.io/en/latest/html-sanitization.html + """ + for field in self.MARKUP_FIELDS: + field_value = entry.get(field) + if field_value is not None: + if isinstance(field_value, list): + for field_item in field_value: + mime_type = field_item.get("type", "text/plain") + field_item["value"] = self._strip_markup_from_field(field_item["value"], mime_type) + field_item["type"] = "text/plain" + else: + detail_field_name = f"{field}_detail" + detail_field: dict = entry.get(detail_field_name, {}) + mime_type = detail_field.get("type", "text/plain") + + entry[field] = self._strip_markup_from_field(field_value, mime_type) + detail_field["type"] = "text/plain" + entry[detail_field_name] = detail_field + def parse_feeds(self): """ Parse the RSS feed using the feedparser library. @@ -291,6 +340,9 @@ def fetch_dataframes(self): entry_id = entry.get('id') current_entries.add(entry_id) if entry_id not in self._previous_entries: + if self._strip_markup: + self._strip_markup_from_fields(entry) + entry_accumulator.append(entry) if self._batch_size > 0 and len(entry_accumulator) >= self._batch_size: diff --git a/morpheus/modules/input/rss_source.py b/morpheus/modules/input/rss_source.py index 9f5dd6c316..1454a67b05 100644 --- a/morpheus/modules/input/rss_source.py +++ b/morpheus/modules/input/rss_source.py @@ -32,30 +32,26 @@ @register_module("rss_source", "morpheus") def _rss_source(builder: mrc.Builder): """ - A module for applying simple DataFrame schema transform policies. - - This module reads the configuration to determine how to set data types for columns, select, or rename them in the - dataframe. + A module for loading RSS feed items into a DataFrame. Parameters ---------- builder : mrc.Builder The Morpheus pipeline builder object. - Notes - ------------- - The configuration should be passed to the module through the `module_config` attribute of the builder. It should - contain a dictionary where each key is a column name, and the value is another dictionary with keys 'dtype' for - data type, 'op_type' for operation type ('select' or 'rename'), and optionally 'from' for the original column - name (if the column is to be renamed). - Example Configuration --------------------- { - "summary": {"dtype": "str", "op_type": "select"}, - "title": {"dtype": "str", "op_type": "select"}, - "content": {"from": "page_content", "dtype": "str", "op_type": "rename"}, - "source": {"from": "link", "dtype": "str", "op_type": "rename"} + "batch_size": 32, + "cache_dir": "./.cache/http", + "cooldown_interval_sec": 600, + "enable_cache": True, + "feed_input": ["https://nvidianews.nvidia.com/releases.xml"], + "interval_sec": 600, + "request_timeout_sec": 2.0, + run_indefinitely: True, + "stop_after_rec": 0, + "strip_markup": True, } """ @@ -77,7 +73,8 @@ def _rss_source(builder: mrc.Builder): enable_cache=validated_config.enable_cache, cache_dir=validated_config.cache_dir, cooldown_interval=validated_config.cooldown_interval_sec, - request_timeout=validated_config.request_timeout_sec) + request_timeout=validated_config.request_timeout_sec, + strip_markup=validated_config.strip_markup) stop_requested = False @@ -108,9 +105,9 @@ def fetch_feeds() -> MessageMeta: except Exception as exc: if not controller.run_indefinitely: - logger.error("Failed either in the process of fetching or processing entries: %d.", exc) + logger.error("Failed either in the process of fetching or processing entries: %s.", exc) raise - logger.error("Failed either in the process of fetching or processing entries: %d.", exc) + logger.error("Failed either in the process of fetching or processing entries: %s.", exc) if not controller.run_indefinitely: stop_requested = True diff --git a/morpheus/modules/schemas/rss_source_schema.py b/morpheus/modules/schemas/rss_source_schema.py index 53c0928391..38facfed0e 100644 --- a/morpheus/modules/schemas/rss_source_schema.py +++ b/morpheus/modules/schemas/rss_source_schema.py @@ -31,6 +31,7 @@ class RSSSourceSchema(BaseModel): request_timeout_sec: float = 2.0 interval_sec: int = 600 stop_after_rec: int = 0 + strip_markup: bool = True class Config: extra = "forbid" diff --git a/morpheus/stages/input/rss_source_stage.py b/morpheus/stages/input/rss_source_stage.py index d56a443542..a67d7997cb 100644 --- a/morpheus/stages/input/rss_source_stage.py +++ b/morpheus/stages/input/rss_source_stage.py @@ -52,6 +52,8 @@ class RSSSourceStage(PreallocatorMixin, SingleOutputSource): Cooldown interval in seconds if there is a failure in fetching or parsing the feed. request_timeout : float, optional, default = 2.0 Request timeout in secs to fetch the feed. + strip_markup : bool, optional, default = False + When true, strip HTML & XML markup from the from the content, summary and title fields. """ def __init__(self, @@ -64,7 +66,8 @@ def __init__(self, enable_cache: bool = False, cache_dir: str = "./.cache/http", cooldown_interval: int = 600, - request_timeout: float = 2.0): + request_timeout: float = 2.0, + strip_markup: bool = False): super().__init__(c) self._stop_requested = False @@ -87,7 +90,8 @@ def __init__(self, "enable_cache": enable_cache, "cache_dir": cache_dir, "cooldown_interval_sec": cooldown_interval, - "request_timeout_sec": request_timeout + "request_timeout_sec": request_timeout, + "strip_markup": strip_markup } } diff --git a/tests/controllers/test_rss_controller.py b/tests/controllers/test_rss_controller.py index dad981ad07..9cb42ca815 100644 --- a/tests/controllers/test_rss_controller.py +++ b/tests/controllers/test_rss_controller.py @@ -17,15 +17,18 @@ from os import path from unittest.mock import Mock from unittest.mock import patch +from xml.etree import ElementTree import feedparser import pytest +from bs4 import BeautifulSoup import cudf from _utils import TEST_DIRS from morpheus.controllers.rss_controller import FeedStats from morpheus.controllers.rss_controller import RSSController +from morpheus.utils.type_aliases import SeriesType test_urls = ["https://fake.nvidia.com/rss/HomePage.xml"] @@ -66,6 +69,11 @@ def mock_get_response_fixture() -> Mock: return mock_response +@pytest.fixture(scope="module", name="cisa_rss_feed") +def cisa_rss_feed_fixture() -> str: + return [path.join(TEST_DIRS.tests_data_dir, 'service/cisa_rss_feed.xml')] + + @pytest.mark.parametrize("feed_input, expected_output", [(url, True) for url in test_urls]) def test_run_indefinitely_true(feed_input: str, expected_output: bool): controller = RSSController(feed_input=feed_input) @@ -95,9 +103,11 @@ def test_parse_feed_invalid_input(feed_input: list[str]): RSSController(feed_input=feed_input) +@pytest.mark.parametrize("strip_markup", [False, True]) @pytest.mark.parametrize("feed_input, expected_count", [(test_file_paths[0], 30)]) -def test_skip_duplicates_feed_inputs(feed_input: str, expected_count: int): - controller = RSSController(feed_input=[feed_input, feed_input]) # Pass duplicate feed inputs +def test_skip_duplicates_feed_inputs(feed_input: str, expected_count: int, strip_markup: bool): + controller = RSSController(feed_input=[feed_input, feed_input], + strip_markup=strip_markup) # Pass duplicate feed inputs dataframes_generator = controller.fetch_dataframes() dataframe = next(dataframes_generator, None) assert isinstance(dataframe, cudf.DataFrame) @@ -130,9 +140,10 @@ def test_fetch_dataframes_url(feed_input: str | list[str], assert len(dataframe) > 0 +@pytest.mark.parametrize("strip_markup", [False, True]) @pytest.mark.parametrize("feed_input", [test_file_paths, test_file_paths[0]]) -def test_fetch_dataframes_filepath(feed_input: str | list[str]): - controller = RSSController(feed_input=feed_input) +def test_fetch_dataframes_filepath(feed_input: str | list[str], strip_markup: bool): + controller = RSSController(feed_input=feed_input, strip_markup=strip_markup) dataframes_generator = controller.fetch_dataframes() dataframe = next(dataframes_generator, None) assert isinstance(dataframe, cudf.DataFrame) @@ -140,18 +151,23 @@ def test_fetch_dataframes_filepath(feed_input: str | list[str]): assert len(dataframe) > 0 +@pytest.mark.parametrize("strip_markup", [False, True]) @pytest.mark.parametrize("feed_input, batch_size", [(test_file_paths, 5)]) -def test_batch_size(feed_input: list[str], batch_size: int): - controller = RSSController(feed_input=feed_input, batch_size=batch_size) +def test_batch_size(feed_input: list[str], batch_size: int, strip_markup: bool): + controller = RSSController(feed_input=feed_input, batch_size=batch_size, strip_markup=strip_markup) for df in controller.fetch_dataframes(): assert isinstance(df, cudf.DataFrame) assert len(df) <= batch_size +@pytest.mark.parametrize("strip_markup", [False, True]) @pytest.mark.parametrize("feed_input, enable_cache", [(test_file_paths[0], False), (test_urls[0], True), (test_urls[0], False)]) -def test_try_parse_feed_with_beautiful_soup(feed_input: str, enable_cache: bool, mock_get_response: Mock): - controller = RSSController(feed_input=feed_input, enable_cache=enable_cache) +def test_try_parse_feed_with_beautiful_soup(feed_input: str, + enable_cache: bool, + mock_get_response: Mock, + strip_markup: bool): + controller = RSSController(feed_input=feed_input, enable_cache=enable_cache, strip_markup=strip_markup) # When enable_cache is set to 'True', the feed content is provided as input. feed_data = controller._try_parse_feed_with_beautiful_soup(mock_get_response.text) @@ -226,13 +242,44 @@ def test_parse_feeds(mock_feed: feedparser.FeedParserDict): controller.get_feed_stats("http://testfeed.com") +@pytest.mark.parametrize("strip_markup", [False, True]) @pytest.mark.parametrize("feed_input", [test_urls[0]]) -def test_redundant_fetch(feed_input: str, mock_feed: feedparser.FeedParserDict, mock_get_response: Mock): +def test_redundant_fetch(feed_input: str, + mock_feed: feedparser.FeedParserDict, + mock_get_response: Mock, + strip_markup: bool): - controller = RSSController(feed_input=feed_input) + controller = RSSController(feed_input=feed_input, strip_markup=strip_markup) mock_feedparser_parse = patch("morpheus.controllers.rss_controller.feedparser.parse") with mock_feedparser_parse, patch("requests.Session.get", return_value=mock_get_response) as mocked_session_get: mock_feedparser_parse.return_value = mock_feed dataframes_generator = controller.fetch_dataframes() next(dataframes_generator, None) assert mocked_session_get.call_count == 1 + + +@pytest.mark.parametrize("strip_markup", [False, True]) +def test_strip_markup(cisa_rss_feed: list[str], strip_markup: bool): + # Construct expected data + tree = ElementTree.parse(cisa_rss_feed[0]) + + # feedparser will map the description field to the summary field + description_tags = tree.findall('./channel/item/description') + expected_summary_col = [(tag.text or "").strip() for tag in description_tags] + + if strip_markup: + expected_summary_col = [ + BeautifulSoup(summary, features="html.parser").get_text() for summary in expected_summary_col + ] + + controller = RSSController(feed_input=cisa_rss_feed, strip_markup=strip_markup) + dataframes = list(controller.fetch_dataframes()) + + # The length number of dataframes and rows should be the same regardless if strip_markup is True or False + assert len(dataframes) == 1 + dataframe = dataframes[0] + assert isinstance(dataframe, cudf.DataFrame) + assert len(dataframe) == 10 + + series: SeriesType = dataframe["summary"] + assert (series.to_pandas().values == expected_summary_col).all()