Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix shutdown on Ctrl+C for Python source stages #1839

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
4c34d39
Implement stop method, and use a non-blocking call to queue.get, this…
dagardner-nv Aug 8, 2024
1955523
Move checking of stop_requested attribute to occur at the same time a…
dagardner-nv Aug 8, 2024
561b16b
Stop the http server on shutdown
dagardner-nv Aug 8, 2024
9417048
Move the stop method implemented in the kafka source stage to the bas…
dagardner-nv Aug 9, 2024
d4e7201
Remove, setting _stop_requested as this is now in the parent class
dagardner-nv Aug 9, 2024
ac86f05
wip
dagardner-nv Aug 9, 2024
bbb920d
Use non-blocking call to fetch from queue
dagardner-nv Aug 9, 2024
f27e8b0
Fix
dagardner-nv Aug 9, 2024
38fc546
Add a should_stop_fn callback to Watcher class
dagardner-nv Aug 9, 2024
7a12d43
Since we cannot pass a callback function to a module, as module confi…
dagardner-nv Aug 9, 2024
0ca515a
Since the interval time is often high (default is 10 minutes), rather…
dagardner-nv Aug 9, 2024
8fa5602
Remove unused import
dagardner-nv Aug 9, 2024
096aefd
Remove stop impl, this has been moved to the base class
dagardner-nv Aug 9, 2024
9e10924
Check is_stop_requested
dagardner-nv Aug 9, 2024
5412076
Use self.is_stop_requested() method, set type hint
dagardner-nv Aug 9, 2024
0a3fb0a
Update developer docs to reflect code changes
dagardner-nv Aug 9, 2024
8586673
Merge branch 'branch-24.10' into david-polling-source-stage-ctrl-c-18337
dagardner-nv Aug 13, 2024
9805dc8
Formatting fix
dagardner-nv Aug 14, 2024
e8be2a7
Merge branch 'branch-24.10' into david-polling-source-stage-ctrl-c-18337
dagardner-nv Aug 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
wip
  • Loading branch information
dagardner-nv committed Aug 9, 2024
commit ac86f052d19ce2a6ea9ea66bd9bcdb866c0065c2
19 changes: 18 additions & 1 deletion morpheus/pipeline/single_output_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,23 @@ def stop(self):
"""

# Indicate we need to stop
self._stop_requested = True
self.request_stop()

return super().stop()

def request_stop(self):
"""
Request the source to stop processing data.
"""
self._stop_requested = True

def is_stop_requested(self) -> bool:
"""
Returns `True` if a stop has been requested.

Returns
-------
bool:
True if a stop has been requested, False otherwise.
"""
return self._stop_requested
5 changes: 2 additions & 3 deletions morpheus/stages/input/arxiv_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ def __init__(self,
self._total_pdfs = 0
self._total_pages = 0
self._total_chunks = 0
self._stop_requested = False
self._cache_dir = cache_dir

@property
Expand Down Expand Up @@ -142,7 +141,7 @@ def _generate_frames(self):
)

for x in search_results.results():
if self._stop_requested:
if self.is_stop_requested():
break

full_path = os.path.join(self._cache_dir, x._get_default_filename())
Expand Down Expand Up @@ -175,7 +174,7 @@ def _process_pages(self, pdf_path: str):

logger.debug("Processing %s/%s: %s", len(documents), self._total_pages, pdf_path)
if self._total_pages > self._max_pages:
self._stop_requested = True
self.request_stop()

return documents
except PdfStreamError:
Expand Down
6 changes: 3 additions & 3 deletions morpheus/stages/input/control_message_kafka_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def _process_msg(self, consumer, msg):
consumer.commit(message=msg, asynchronous=self._async_commits)

if self._stop_after > 0 and self._records_emitted >= self._stop_after:
self._stop_requested = True
self.request_stop()

return control_messages

Expand All @@ -159,7 +159,7 @@ def _source_generator(self):

do_sleep = False

while not self._stop_requested:
while not self.is_stop_requested():

msg = consumer.poll(timeout=1.0)
if msg is None:
Expand All @@ -177,7 +177,7 @@ def _source_generator(self):
else:
raise ck.KafkaException(msg_error)

if do_sleep and not self._stop_requested:
if do_sleep and not self.is_stop_requested():
time.sleep(self._poll_interval)

finally:
Expand Down
2 changes: 1 addition & 1 deletion morpheus/stages/input/http_client_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def _generate_frames(self) -> typing.Iterator[MessageMeta]:

request_args.update(self._requst_kwargs)

while (not self._stop_requested and (self._stop_after == 0 or num_records_emitted < self._stop_after)):
while (not self.is_stop_requested() and (self._stop_after == 0 or num_records_emitted < self._stop_after)):
if self._query_params_fn is not None:
request_args['params'] = self._query_params_fn()

Expand Down
3 changes: 1 addition & 2 deletions morpheus/stages/input/http_server_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ def __init__(self,
self._request_timeout_secs = request_timeout_secs
self._lines = lines
self._stop_after = stop_after
self._stop_requested = False
self._payload_to_df_fn = payload_to_df_fn
self._http_server = None

Expand Down Expand Up @@ -253,7 +252,7 @@ def _generate_frames(self) -> typing.Iterator[MessageMeta]:
df = self._queue.get()
self._queue_size -= 1
except queue.Empty:
if (not self._http_server.is_running() or self._stop_requested):
if (not self._http_server.is_running() or self.is_stop_requested()):
self._processing = False
else:
logger.debug("Queue empty, sleeping ...")
Expand Down
9 changes: 3 additions & 6 deletions morpheus/stages/input/kafka_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,6 @@ def __init__(self,
self._async_commits = async_commits
self._client = None

# Flag to indicate whether or not we should stop
self._stop_requested = False

self._poll_interval = pd.Timedelta(poll_interval).total_seconds()
self._started = False

Expand Down Expand Up @@ -173,7 +170,7 @@ def _process_batch(self, consumer, batch):
self._num_messages += 1

if self._stop_after > 0 and self._records_emitted >= self._stop_after:
self._stop_requested = True
self.request_stop()

batch.clear()

Expand All @@ -187,7 +184,7 @@ def _source_generator(self):

batch = []

while not self._stop_requested:
while self.is_stop_requested():
do_process_batch = False
do_sleep = False

Expand All @@ -214,7 +211,7 @@ def _source_generator(self):
if message_meta is not None:
yield message_meta

if do_sleep and not self._stop_requested:
if do_sleep and not self.is_stop_requested():
time.sleep(self._poll_interval)

message_meta = self._process_batch(consumer, batch)
Expand Down
2 changes: 0 additions & 2 deletions morpheus/stages/input/rss_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ def __init__(self,
request_timeout: float = 2.0,
strip_markup: bool = False):
super().__init__(c)
self._stop_requested = False

if (batch_size is None):
batch_size = c.pipeline_batch_size

Expand Down