Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RSS Source Stage for Reading RSS Feeds #1149

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions morpheus/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,7 @@ def post_pipeline(ctx: click.Context, *args, **kwargs):
add_command("from-duo", "morpheus.stages.input.duo_source_stage.DuoSourceStage", modes=AE_ONLY)
add_command("from-file", "morpheus.stages.input.file_source_stage.FileSourceStage", modes=NOT_AE)
add_command("from-kafka", "morpheus.stages.input.kafka_source_stage.KafkaSourceStage", modes=NOT_AE)
add_command("from-rss", "morpheus.stages.input.rss_source_stage.RSSSourceStage", modes=ALL)
add_command("gen-viz", "morpheus.stages.postprocess.generate_viz_frames_stage.GenerateVizFramesStage", modes=NLP_ONLY)
add_command("inf-identity", "morpheus.stages.inference.identity_inference_stage.IdentityInferenceStage", modes=NOT_AE)
add_command("inf-pytorch",
Expand Down
119 changes: 119 additions & 0 deletions morpheus/stages/input/rss_source_stage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import feedparser
import cudf
from morpheus.utils.controllers.rss_controller import RSSController
import mrc
from morpheus.cli import register_stage
import time
import feedparser
import pandas as pd
from datetime import datetime, timedelta
from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.messages import MessageMeta
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin
from morpheus.pipeline.single_output_source import SingleOutputSource
from morpheus.pipeline.stream_pair import StreamPair


logger = logging.getLogger(__name__)

@register_stage("from-rss", modes=[PipelineModes.AE, PipelineModes.FIL, PipelineModes.NLP, PipelineModes.OTHER])
class RSSSourceStage(PreallocatorMixin, SingleOutputSource):
"""
Load RSS feed items into a pandas DataFrame.

Parameters
----------
c : morpheus.config.Config
Pipeline configuration instance.
feed_input : str
The URL of the RSS feed.
interval_secs : float, optional, default = 600
Interval in seconds between fetching new feed items.
stop_after: int, default = 0
Stops ingesting after emitting `stop_after` records (rows in the dataframe). Useful for testing. Disabled if `0`
max_retries : int, optional, default = 3
Maximum number of retries for fetching entries on exception.
"""
def __init__(self, c: Config, feed_input: str, interval_secs: float = 600, stop_after: int = 0, max_retries: int = 5):
super().__init__(c)
self._stop_requested = False
self._records_emitted = 0
self._stop_after = stop_after
self._interval_secs = interval_secs
self._max_retries = max_retries
self._controller = RSSController(feed_input=feed_input, batch_size=c.pipeline_batch_size)

@property
def name(self) -> str:
return "from-rss"

def stop(self):
"""
Stop the RSS source stage.
"""
self._stop_requested = True
return super().stop()

def supports_cpp_node(self):
return False

def _fetch_feeds(self):
"""
Fetch RSS feed entries and yield as MessageMeta messages.
"""
retries = 0

while (not self._stop_requested) and (retries < self._max_retries):
try:
for entry_accumulator in self._controller.fetch_entries():
if entry_accumulator is not None and entry_accumulator:
logger.debug("Processing %d new entries...", len(entry_accumulator))
df = self._controller.create_dataframe(entry_accumulator)
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved
self._records_emitted += len(df)
logger.debug("Emitted %d records so far.", self._records_emitted)
yield MessageMeta(df=df)
continue
logger.debug("New entries not found.")

if not self._controller.run_indefinitely:
self._stop_requested = True
continue

if (self._stop_after > 0 and self._records_emitted >= self._stop_after):
self._stop_requested = True
logger.debug("Stop limit reached...preparing to halt the source.")
continue

logger.debug("Waiting for %d seconds before fetching again...", self._interval_secs)
time.sleep(self._interval_secs)

except Exception as exc:
retries += 1
logger.warning("Error fetching feed entries. Retrying (%d/%d)...", retries, self._max_retries)
logger.debug("Waiting for 5 secs before retrying...")
time.sleep(5) # Wait before retrying

if retries == self._max_retries: # Check if retries exceeded the limit
logger.error("Max retries reached. Unable to fetch feed entries.")
raise Exception("Failed to fetch feed entries after max retries: %s", exc)


def _build_source(self, builder: mrc.Builder) -> StreamPair:
source = builder.make_source(self.unique_name, self._fetch_feeds)
return source, MessageMeta
13 changes: 13 additions & 0 deletions morpheus/utils/controllers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
144 changes: 144 additions & 0 deletions morpheus/utils/controllers/rss_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import feedparser
import cudf
import typing
from urllib.parse import urlparse

logger = logging.getLogger(__name__)

class RSSController:
def __init__(self, feed_input: str, batch_size: int = 128):
"""
RSSController handles fetching and processing of RSS feed entries.

Parameters
----------
feed_input : str
The URL or file path of the RSS feed.
batch_size : int, optional, default = 128
Number of feed items to accumulate before creating a DataFrame.
"""
self._feed_input = feed_input
self._batch_size = batch_size
self._previous_entires = set() # Stores the IDs of previous entries to prevent the processing of duplicates.
# If feed_input is URL. Runs indefinitely
self._run_indefinitely = True if RSSController.is_url(feed_input) else False

@property
def run_indefinitely(self):
return self._run_indefinitely

def parse_feed(self):
"""
Parse the RSS feed using the feedparser library.

Returns
-------
feedparser.FeedParserDict
The parsed feed content.

Raises
------
Exception
If the feed input is invalid or does not contain any entries.
"""
feed = feedparser.parse(self._feed_input)

if feed.entries:
return feed
else:
raise Exception(f"Invalid feed input: {self._feed_input}. No entries found.")


def fetch_entries(self) -> typing.Union[typing.List[typing.Tuple], typing.List]:
"""
Fetch and process RSS feed entries.

Returns
-------
typing.Union[typing.List[typing.Tuple], typing.List]
List of feed entries or None if no new entries are available.

Raises
------
Exception
If there is error fetching or processing feed entries.
"""
entry_accumulator = []
current_entries = set()

try:

feed = self.parse_feed()

for entry in feed.entries:
entry_id = entry.get('id')
current_entries.add(entry_id)

if entry_id not in self._previous_entires:
entry_accumulator.append(entry)

if len(entry_accumulator) >= self._batch_size:
yield entry_accumulator
entry_accumulator.clear()

self._previous_entires = current_entries

yield entry_accumulator

except Exception as exc:
raise Exception("Error fetching or processing feed entries: %s", exc)

def create_dataframe(self, entries: typing.List[typing.Tuple]) -> cudf.DataFrame:
"""
Create a DataFrame from accumulated entry data.

Parameters
----------
entries : typing.List[typing.Tuple]
List of accumulated feed entries.

Returns
-------
cudf.DataFrame
A DataFrame containing feed entry data.
"""
try:
return cudf.DataFrame(entries)
except Exception as exc:
logger.error("Error creating DataFrame: %s", exc)

@classmethod
def is_url(cls, feed_input: str) -> bool:
"""
Check if the provided input is a valid URL.

Parameters
----------
feed_input : str
The input string to be checked.

Returns
-------
bool
True if the input is a valid URL, False otherwise.
"""
try:
parsed_url = urlparse(feed_input)
return parsed_url.scheme != '' and parsed_url.netloc != ''
except Exception:
return False
3 changes: 3 additions & 0 deletions tests/tests_data/rss_feed_atom.xml
Git LFS file not shown
84 changes: 84 additions & 0 deletions tests/utils/controllers/test_rss_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
import os
from utils import TEST_DIRS
from morpheus.utils.controllers.rss_controller import RSSController

test_urls = [
"https://realpython.com/atom.xml",
"https://rss.nytimes.com/services/xml/rss/nyt/HomePage.xml"
]

test_invalid_urls = [
"invalid_url",
"example.com/rss-feed-url",
"ftp://",
]

test_file_paths = [
os.path.join(TEST_DIRS.tests_data_dir, "rss_feed_atom.xml")
]

test_invalid_file_paths = [
"/path/to/nonexistent_file.xml",
"/path/to/directory/",
]

@pytest.mark.parametrize("feed_input, expected_output", [(url, True) for url in test_urls])
def test_run_indefinitely_true(feed_input, expected_output):
controller = RSSController(feed_input=feed_input)
assert controller.run_indefinitely == expected_output

@pytest.mark.parametrize("feed_input", test_invalid_urls + test_invalid_file_paths + test_file_paths)
def test_run_indefinitely_false(feed_input):
controller = RSSController(feed_input=feed_input)
assert controller.run_indefinitely == False

@pytest.mark.parametrize("feed_input", test_urls)
def test_parse_feed_valid_url(feed_input):
controller = RSSController(feed_input=feed_input)
feed = controller.parse_feed()
assert feed.entries

@pytest.mark.parametrize("feed_input", test_invalid_urls + test_invalid_file_paths)
def test_parse_feed_invalid_input(feed_input):
controller = RSSController(feed_input=feed_input)
with pytest.raises(Exception):
controller.parse_feed()

@pytest.mark.parametrize("feed_input", test_urls + test_file_paths)
def test_fetch_entries(feed_input):
controller = RSSController(feed_input=feed_input)
entries_generator = controller.fetch_entries()
entries = next(entries_generator, None)
assert isinstance(entries, list)
assert len(entries) > 0

@pytest.mark.parametrize("feed_input", test_file_paths)
def test_create_dataframe(feed_input):
controller = RSSController(feed_input=feed_input)
entries = [{'id': '1', 'title': 'Entry 1'}, {'id': '2', 'title': 'Entry 2'}]
df = controller.create_dataframe(entries)
assert len(df) == len(entries)

@pytest.mark.parametrize("feed_input", test_urls)
def test_is_url_true(feed_input):
assert RSSController.is_url(feed_input)

@pytest.mark.parametrize("feed_input", test_invalid_urls + test_invalid_file_paths + test_file_paths)
def test_is_url_false(feed_input):
assert not RSSController.is_url(feed_input)
Loading