Skip to content

Commit

Permalink
Strip HTML & XML tags from RSS feed input (#1670)
Browse files Browse the repository at this point in the history
* Optionally strip HTML & XML tags embedded in RSS feeds

Requires PR #1665 to be merged first
Closes #1666

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - David Gardner (https://github.com/dagardner-nv)
  - Michael Demoret (https://github.com/mdemoret-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1670
  • Loading branch information
dagardner-nv committed May 1, 2024
1 parent 57d11a2 commit 9d3de8a
Show file tree
Hide file tree
Showing 13 changed files with 142 additions and 31 deletions.
1 change: 1 addition & 0 deletions conda/environments/all_cuda-121_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies:
- appdirs
- arxiv=1.4
- automake
- beautifulsoup4
- benchmark=1.8.3
- boost-cpp=1.84
- boto3
Expand Down
1 change: 1 addition & 0 deletions conda/environments/dev_cuda-121_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ channels:
dependencies:
- appdirs
- automake
- beautifulsoup4
- benchmark=1.8.3
- boost-cpp=1.84
- breathe=4.35.0
Expand Down
1 change: 1 addition & 0 deletions conda/environments/examples_cuda-121_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dependencies:
- anyio>=3.7
- appdirs
- arxiv=1.4
- beautifulsoup4
- boto3
- click >=8
- cuml=24.02.*
Expand Down
1 change: 1 addition & 0 deletions conda/environments/runtime_cuda-121_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ channels:
- pytorch
dependencies:
- appdirs
- beautifulsoup4
- click >=8
- datacompy=0.10
- dill=0.3.7
Expand Down
1 change: 1 addition & 0 deletions dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ dependencies:
- &dill dill=0.3.7
- &scikit-learn scikit-learn=1.3.2
- appdirs
- beautifulsoup4
- datacompy=0.10
- elasticsearch==8.9.0
- feedparser=6.0.10
Expand Down
3 changes: 3 additions & 0 deletions examples/llm/vdb_upload/module/rss_source_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class RSSSourcePipeSchema(BaseModel):
request_timeout_sec: float = 2.0
run_indefinitely: bool = True
stop_after_rec: int = 0
strip_markup: bool = True
vdb_resource_name: str
web_scraper_config: Optional[Dict[Any, Any]] = None

Expand Down Expand Up @@ -98,6 +99,7 @@ def _rss_source_pipe(builder: mrc.Builder):
- **request_timeout_sec**: Timeout in seconds for RSS feed requests.
- **run_indefinitely**: Boolean to indicate continuous running.
- **stop_after**: Number of records to process before stopping (0 for indefinite).
- **strip_markup**: When True, strip HTML & XML markup from feed content.
- **web_scraper_config**: Configuration for the web scraper module.
- **chunk_overlap**: Overlap size for chunks in web scraping.
- **chunk_size**: Size of content chunks for processing.
Expand Down Expand Up @@ -131,6 +133,7 @@ def _rss_source_pipe(builder: mrc.Builder):
"request_timeout_sec": validated_config.request_timeout_sec,
"interval_sec": validated_config.interval_sec,
"stop_after_rec": validated_config.stop_after_rec,
"strip_markup": validated_config.strip_markup,
}
rss_source_loader = RSSSourceLoaderFactory.get_instance("rss_source", {"rss_source": rss_source_config})

Expand Down
1 change: 1 addition & 0 deletions examples/llm/vdb_upload/vdb_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ vdb_pipeline:
request_timeout_sec: 2.0
run_indefinitely: true
stop_after_rec: 0
strip_markup: true
web_scraper_config:
chunk_overlap: 51
chunk_size: 512
Expand Down
1 change: 1 addition & 0 deletions examples/llm/vdb_upload/vdb_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def _build_default_rss_source(enable_cache,
"interval_sec": interval_secs,
"request_timeout_sec": rss_request_timeout_sec,
"run_indefinitely": run_indefinitely,
"strip_markup": True,
"vdb_resource_name": vector_db_resource_name,
"web_scraper_config": {
"chunk_size": content_chunking_size,
Expand Down
54 changes: 53 additions & 1 deletion morpheus/controllers/rss_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,26 @@ class RSSController:
Cooldown interval in seconds if there is a failure in fetching or parsing the feed.
request_timeout : float, optional, default = 2.0
Request timeout in secs to fetch the feed.
strip_markup : bool, optional, default = False
When true, strip HTML & XML markup from the from the content, summary and title fields.
"""

# Fields which may contain HTML or XML content
MARKUP_FIELDS = (
"content",
"summary",
"title",
)

def __init__(self,
feed_input: str | list[str],
batch_size: int = 128,
run_indefinitely: bool = None,
enable_cache: bool = False,
cache_dir: str = "./.cache/http",
cooldown_interval: int = 600,
request_timeout: float = 2.0):
request_timeout: float = 2.0,
strip_markup: bool = False):
if IMPORT_EXCEPTION is not None:
raise ImportError(IMPORT_ERROR_MESSAGE) from IMPORT_EXCEPTION

Expand All @@ -92,6 +102,7 @@ def __init__(self,
self._previous_entries = set() # Stores the IDs of previous entries to prevent the processing of duplicates.
self._cooldown_interval = cooldown_interval
self._request_timeout = request_timeout
self._strip_markup = strip_markup

# Validate feed_input
for f in self._feed_input:
Expand Down Expand Up @@ -236,6 +247,44 @@ def _try_parse_feed(self, url: str) -> "feedparser.FeedParserDict":

return feed

@staticmethod
def _strip_markup_from_field(field: str, mime_type: str) -> str:
if mime_type.endswith("xml"):
parser = "xml"
else:
parser = "html.parser"

try:
soup = BeautifulSoup(field, features=parser)
return soup.get_text()
except Exception as ex:
logger.error("Failed to strip tags from field: %s: %s", field, ex)
return field

def _strip_markup_from_fields(self, entry: "feedparser.FeedParserDict"):
"""
Strip HTML & XML tags from the content, summary and title fields.
Per note in feedparser documentation even if a field is advertized as plain text, it may still contain HTML
https://feedparser.readthedocs.io/en/latest/html-sanitization.html
"""
for field in self.MARKUP_FIELDS:
field_value = entry.get(field)
if field_value is not None:
if isinstance(field_value, list):
for field_item in field_value:
mime_type = field_item.get("type", "text/plain")
field_item["value"] = self._strip_markup_from_field(field_item["value"], mime_type)
field_item["type"] = "text/plain"
else:
detail_field_name = f"{field}_detail"
detail_field: dict = entry.get(detail_field_name, {})
mime_type = detail_field.get("type", "text/plain")

entry[field] = self._strip_markup_from_field(field_value, mime_type)
detail_field["type"] = "text/plain"
entry[detail_field_name] = detail_field

def parse_feeds(self):
"""
Parse the RSS feed using the feedparser library.
Expand Down Expand Up @@ -291,6 +340,9 @@ def fetch_dataframes(self):
entry_id = entry.get('id')
current_entries.add(entry_id)
if entry_id not in self._previous_entries:
if self._strip_markup:
self._strip_markup_from_fields(entry)

entry_accumulator.append(entry)

if self._batch_size > 0 and len(entry_accumulator) >= self._batch_size:
Expand Down
33 changes: 15 additions & 18 deletions morpheus/modules/input/rss_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,30 +32,26 @@
@register_module("rss_source", "morpheus")
def _rss_source(builder: mrc.Builder):
"""
A module for applying simple DataFrame schema transform policies.
This module reads the configuration to determine how to set data types for columns, select, or rename them in the
dataframe.
A module for loading RSS feed items into a DataFrame.
Parameters
----------
builder : mrc.Builder
The Morpheus pipeline builder object.
Notes
-------------
The configuration should be passed to the module through the `module_config` attribute of the builder. It should
contain a dictionary where each key is a column name, and the value is another dictionary with keys 'dtype' for
data type, 'op_type' for operation type ('select' or 'rename'), and optionally 'from' for the original column
name (if the column is to be renamed).
Example Configuration
---------------------
{
"summary": {"dtype": "str", "op_type": "select"},
"title": {"dtype": "str", "op_type": "select"},
"content": {"from": "page_content", "dtype": "str", "op_type": "rename"},
"source": {"from": "link", "dtype": "str", "op_type": "rename"}
"batch_size": 32,
"cache_dir": "./.cache/http",
"cooldown_interval_sec": 600,
"enable_cache": True,
"feed_input": ["https://nvidianews.nvidia.com/releases.xml"],
"interval_sec": 600,
"request_timeout_sec": 2.0,
run_indefinitely: True,
"stop_after_rec": 0,
"strip_markup": True,
}
"""

Expand All @@ -77,7 +73,8 @@ def _rss_source(builder: mrc.Builder):
enable_cache=validated_config.enable_cache,
cache_dir=validated_config.cache_dir,
cooldown_interval=validated_config.cooldown_interval_sec,
request_timeout=validated_config.request_timeout_sec)
request_timeout=validated_config.request_timeout_sec,
strip_markup=validated_config.strip_markup)

stop_requested = False

Expand Down Expand Up @@ -108,9 +105,9 @@ def fetch_feeds() -> MessageMeta:

except Exception as exc:
if not controller.run_indefinitely:
logger.error("Failed either in the process of fetching or processing entries: %d.", exc)
logger.error("Failed either in the process of fetching or processing entries: %s.", exc)
raise
logger.error("Failed either in the process of fetching or processing entries: %d.", exc)
logger.error("Failed either in the process of fetching or processing entries: %s.", exc)

if not controller.run_indefinitely:
stop_requested = True
Expand Down
1 change: 1 addition & 0 deletions morpheus/modules/schemas/rss_source_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class RSSSourceSchema(BaseModel):
request_timeout_sec: float = 2.0
interval_sec: int = 600
stop_after_rec: int = 0
strip_markup: bool = True

class Config:
extra = "forbid"
8 changes: 6 additions & 2 deletions morpheus/stages/input/rss_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class RSSSourceStage(PreallocatorMixin, SingleOutputSource):
Cooldown interval in seconds if there is a failure in fetching or parsing the feed.
request_timeout : float, optional, default = 2.0
Request timeout in secs to fetch the feed.
strip_markup : bool, optional, default = False
When true, strip HTML & XML markup from the from the content, summary and title fields.
"""

def __init__(self,
Expand All @@ -64,7 +66,8 @@ def __init__(self,
enable_cache: bool = False,
cache_dir: str = "./.cache/http",
cooldown_interval: int = 600,
request_timeout: float = 2.0):
request_timeout: float = 2.0,
strip_markup: bool = False):
super().__init__(c)
self._stop_requested = False

Expand All @@ -87,7 +90,8 @@ def __init__(self,
"enable_cache": enable_cache,
"cache_dir": cache_dir,
"cooldown_interval_sec": cooldown_interval,
"request_timeout_sec": request_timeout
"request_timeout_sec": request_timeout,
"strip_markup": strip_markup
}
}

Expand Down
Loading

0 comments on commit 9d3de8a

Please sign in to comment.