Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding HTTP sources & sinks #977

Merged
merged 168 commits into from
Aug 24, 2023
Merged
Changes from 1 commit
Commits
Show all changes
168 commits
Select commit Hold shift + click to select a range
12bc307
POC: using the fmt lib for text formatting, doesn't belong in TestTyp…
dagardner-nv May 23, 2023
f58e549
POC: Rough drafts
dagardner-nv May 23, 2023
313b211
wip
dagardner-nv May 23, 2023
0e4fc40
Add from-rest to cli
dagardner-nv May 24, 2023
d266015
wip
dagardner-nv May 24, 2023
03fe9a7
Add todo comments
dagardner-nv May 24, 2023
0abc244
wip
dagardner-nv May 24, 2023
cc370c9
WIP: Proof of concept C++ impl
dagardner-nv May 25, 2023
0b2609c
WIP: Proof of concept C++ impl
dagardner-nv May 25, 2023
de57f49
Support propper json arrays of objects as a config param
dagardner-nv May 25, 2023
ec3fe8e
wip
dagardner-nv May 25, 2023
dfe1213
Replace queue with boost fiber buffered_channel
dagardner-nv May 26, 2023
47e4067
Pass a parsing function into rest server, keeping queueing logic and …
dagardner-nv May 26, 2023
36262b0
Switch to using a a fiber buffered channel for queueing, pass a paylo…
dagardner-nv May 26, 2023
cdaf7aa
Add Python bindings for RestServer
dagardner-nv May 26, 2023
6087ce9
Remove python webserver
dagardner-nv May 30, 2023
af031f7
Put should raise full not empty when fulll, and will never receive an…
dagardner-nv May 30, 2023
ca38eac
Use the C++ rest server impl when in Python mode
dagardner-nv May 30, 2023
4964920
wip
dagardner-nv May 30, 2023
d62d956
Set a timeout for pushing to the queue
dagardner-nv May 30, 2023
e6f106b
WIP: async refactor
dagardner-nv Jun 1, 2023
d603048
WIP: async refactor
dagardner-nv Jun 1, 2023
7392822
Pass num_threads, cleanup includes
dagardner-nv Jun 1, 2023
9301315
Expose max payload and request timeouts to the stage
dagardner-nv Jun 1, 2023
722565f
Fix docstring
dagardner-nv Jun 2, 2023
e42646d
First pass at a polling rest client source, WIP: untested
dagardner-nv Jun 2, 2023
14c5bce
misc cleanups
dagardner-nv Jun 2, 2023
1eb222b
wip
dagardner-nv Jun 2, 2023
4099f26
Use urllib3's Retry class instead of home-grown logic, use a request …
dagardner-nv Jun 2, 2023
17df1dd
wip
dagardner-nv Jun 2, 2023
8261ac2
Add RestClientSourceStage to CLI, and fix typing for click
dagardner-nv Jun 2, 2023
7a6ddcf
Revert "Use urllib3's Retry class instead of home-grown logic, use a …
dagardner-nv Jun 2, 2023
86bab76
Parse the url, and check for a protocol scheme
dagardner-nv Jun 5, 2023
3afd417
Move retry logic to it's own method, cleaning up the source stage, an…
dagardner-nv Jun 5, 2023
1ade955
Logging and type hint cleanups
dagardner-nv Jun 6, 2023
54507b9
Add stop_after functionality borrowed from Kafka source
dagardner-nv Jun 6, 2023
899c6a5
Quick mock rest server to test the RestClientSourceStage against
dagardner-nv Jun 6, 2023
56f1537
Rename requests_wrapper to http_utils
dagardner-nv Jun 6, 2023
5900628
wip
dagardner-nv Jun 6, 2023
67977c9
Move url verify logic, cleanups
dagardner-nv Jun 6, 2023
55a0532
Expose lines arg
dagardner-nv Jun 6, 2023
8faa729
First pass at rest sink
dagardner-nv Jun 6, 2023
e90cb5e
WIP: chunking and function overrides
dagardner-nv Jun 7, 2023
3600d21
Fix bugs introduced by refactor
dagardner-nv Jun 7, 2023
b39b0df
Add missing docstrings for parameters
dagardner-nv Jun 7, 2023
f966c38
Fix old copy/paste docstring
dagardner-nv Jun 7, 2023
48a19f0
Make the is_running flag an instance variable instead of a global, al…
dagardner-nv Jun 7, 2023
034c11a
wip
dagardner-nv Jun 7, 2023
85b728e
WIP
dagardner-nv Jun 7, 2023
2305223
wip
dagardner-nv Jun 7, 2023
1dfb72e
add content type
dagardner-nv Jun 7, 2023
3fca9fc
Enforce supported methods in the stages not the server
dagardner-nv Jun 7, 2023
7efa82e
Remove temp test
dagardner-nv Jun 8, 2023
24e011d
Fix handling of queue exceptions, block shutdown until all queued mes…
dagardner-nv Jun 8, 2023
2151f07
Don't log the response body which could be large
dagardner-nv Jun 8, 2023
7c518a4
Use a python queue rather than a FiberQueue allowing us to check the …
dagardner-nv Jun 8, 2023
98c0625
Merge branch 'branch-23.07' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Jun 8, 2023
5e75757
Fix clang-tidy errors
dagardner-nv Jun 8, 2023
651512f
Cleanups
dagardner-nv Jun 8, 2023
b7a88b3
formatting
dagardner-nv Jun 8, 2023
01f438f
Revert "formatting"
dagardner-nv Jun 8, 2023
f2a364f
Revert formatting changes
dagardner-nv Jun 8, 2023
61608b1
Fix formatting:
dagardner-nv Jun 8, 2023
85cab37
Remove old POC
dagardner-nv Jun 8, 2023
6a83898
Remove old POC
dagardner-nv Jun 8, 2023
7c289d8
WIP
dagardner-nv Jun 12, 2023
507b38b
wip
dagardner-nv Jun 12, 2023
e9e1823
remove debug logging
dagardner-nv Jun 13, 2023
be2d398
Wrap the parsing function, use some namespace aliases to clean things up
dagardner-nv Jun 13, 2023
8f29a24
Cleanup includes
dagardner-nv Jun 13, 2023
13871d8
Remove TODO comment and sleep/hack
dagardner-nv Jun 13, 2023
b67fe76
Merge pull request #5 from dagardner-nv/david-rest-source-sink-callbacks
dagardner-nv Jun 13, 2023
2e37d06
Merge branch 'branch-23.07' into david-rest-source-sink
dagardner-nv Jun 20, 2023
dfd3259
Use getvalue instead of read, removing the need to call seek first [n…
dagardner-nv Jun 20, 2023
ae6a8c0
Rename helper methods to more accurate names per feedback [no ci]
dagardner-nv Jun 20, 2023
f1f65a2
Switch to using an enumto define supported HTTP methods [no ci]
dagardner-nv Jun 20, 2023
384091e
Use the HTTPStatus enum from the std-lib rather than hard-coded ints …
dagardner-nv Jun 20, 2023
fb22b09
Move server startup to on_start per feedback [no ci]
dagardner-nv Jun 20, 2023
6a96a26
Add a context manager to the python bindings for the RestServer [no ci]
dagardner-nv Jun 20, 2023
6bb3029
Add a context manager to FiberQueue's python bindings [no ci]
dagardner-nv Jun 20, 2023
3ed2a9e
formatting [no ci]
dagardner-nv Jun 20, 2023
fea32d4
Cleanup comments [no ci]
dagardner-nv Jun 21, 2023
5b9ee44
Merge branch 'branch-23.07' into david-rest-source-sink
dagardner-nv Jul 6, 2023
ca39c21
Remove out of date comment
dagardner-nv Jul 6, 2023
5169179
Restore case statement for channel_op_status::empty in put method to …
dagardner-nv Jul 6, 2023
37bc816
Misc IWYU fixes
dagardner-nv Jul 6, 2023
bc6f20b
Add mappings for boost asio & beast
dagardner-nv Jul 6, 2023
4363b0d
wip [no ci]
dagardner-nv Jul 6, 2023
ca69552
IWYU fixes [no ci]
dagardner-nv Jul 6, 2023
8df95b8
Add mapping for bits/this_thread_sleep.h
dagardner-nv Jul 6, 2023
175deb5
iwyu fixes [no ci]
dagardner-nv Jul 6, 2023
007ab95
Fix pylint warnings [no ci]
dagardner-nv Jul 6, 2023
087989a
flake8 fixes [no ci]
dagardner-nv Jul 6, 2023
7aa47d8
Adopt updated yapf, the old version was failing to parse code that us…
dagardner-nv Jul 6, 2023
7f91421
Document the on-complete callback function [no ci]
dagardner-nv Jul 6, 2023
674357d
Ensure the server is shutdown properly. Check that endpoint starts wi…
dagardner-nv Jul 7, 2023
8073207
First pass at a test for the rest server
dagardner-nv Jul 7, 2023
4a8a275
Fix how the io context was being copied
dagardner-nv Jul 7, 2023
8d44c7f
Move Listener to header allowing RestServer to own a pointer to it an…
dagardner-nv Jul 7, 2023
1b439d8
Don't check to see if the callback is called until we have stopped, s…
dagardner-nv Jul 7, 2023
2d6db79
Return to using a simpler bool for running status [no ci]
dagardner-nv Jul 7, 2023
74a21f4
Remove namespace aliases from header
dagardner-nv Jul 7, 2023
0f2bd14
IWYU fixes
dagardner-nv Jul 7, 2023
e7ab11a
Add test for constructor errors
dagardner-nv Jul 7, 2023
336a316
Merge branch 'branch-23.07' into david-rest-source-sink
dagardner-nv Jul 8, 2023
a0ccf04
Return parsed url
dagardner-nv Jul 10, 2023
08a9c2b
First pass at tests for http_utils
dagardner-nv Jul 10, 2023
5c8e810
new test
dagardner-nv Jul 10, 2023
b1acb64
Merge branch 'branch-23.11' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Jul 21, 2023
32c8966
Bump version of requests
dagardner-nv Jul 21, 2023
4855b30
Test on 8 threads
dagardner-nv Jul 21, 2023
79e2471
Track the acceptor in a unique pointer, and explicitly delete it, thi…
dagardner-nv Jul 24, 2023
6173685
Don't use more threads than cores
dagardner-nv Jul 24, 2023
0d3af58
Move test, and add more tests [no ci]
dagardner-nv Jul 25, 2023
22e8a7c
Add todo note [no ci]
dagardner-nv Jul 25, 2023
5f0eee8
tests wip [no ci]
dagardner-nv Jul 25, 2023
003a018
Merge branch 'branch-23.11' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Jul 25, 2023
d0bc614
Test for serializer methods
dagardner-nv Jul 25, 2023
f5908e6
to_parquet needs a binary file object
dagardner-nv Jul 25, 2023
5fcb8ef
Merge branch 'branch-23.11' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Jul 25, 2023
769a584
Expose is_closed method to python
dagardner-nv Jul 25, 2023
1c2d34c
Add tests for FiberQueue
dagardner-nv Jul 25, 2023
6842443
wip
dagardner-nv Jul 25, 2023
d5cc038
Merge branch 'branch-23.11' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Jul 26, 2023
0d827fe
Make the success status code configurable
dagardner-nv Jul 26, 2023
1603ed9
Unittests for source stage
dagardner-nv Jul 26, 2023
5486503
Don't need to paramaterize on the df type for this test [no ci]
dagardner-nv Jul 26, 2023
c7bcc59
Add timeout arg [no ci]
dagardner-nv Jul 26, 2023
e43a45e
Fix comment
dagardner-nv Jul 27, 2023
62ec889
Use a size_t for stop_after and records_emitted variables
dagardner-nv Jul 27, 2023
3b6aadf
Add stop_after argument, first pass at tests [no ci]
dagardner-nv Jul 27, 2023
59c0ea8
Call close on shutdown
dagardner-nv Jul 27, 2023
1b4a656
Extra sleep to avoid a first fail, or at least make it less likely [n…
dagardner-nv Jul 27, 2023
8ad1abc
Add fixtures for a mock camouflage rest server in addition to the exi…
dagardner-nv Jul 27, 2023
54b4f16
Add missing docstring
dagardner-nv Jul 27, 2023
2133a06
cleanup [no ci]
dagardner-nv Jul 27, 2023
c37bfbc
Remove unused import
dagardner-nv Jul 27, 2023
e0af9e6
WIP: tests for rest client source stage: TODO consolidate camougflage…
dagardner-nv Jul 27, 2023
9031f6b
Change the monitoring port to not conflict with the existing triton m…
dagardner-nv Jul 27, 2023
46b647a
Tests for WriteToRestStage [no ci]
dagardner-nv Jul 27, 2023
16202fc
wip [no ci]
dagardner-nv Jul 27, 2023
e2b9505
Tests for non-static endpoints [no ci]
dagardner-nv Jul 27, 2023
f718e6d
Add tests for RestServerSinkStage
dagardner-nv Jul 28, 2023
b2da558
Remove unused import
dagardner-nv Jul 28, 2023
3207381
Explicitly set server to none on complete, this prevents a file descr…
dagardner-nv Jul 28, 2023
c4c9253
Scale back params for rest stages that are already being tested in te…
dagardner-nv Jul 28, 2023
8452a72
Merge branch 'branch-23.11' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Jul 28, 2023
422610f
IWYU fixes [no ci]
dagardner-nv Jul 28, 2023
a51816b
Remove unused imports
dagardner-nv Jul 28, 2023
714137c
Revert unneeded change [no ci]
dagardner-nv Jul 28, 2023
05c3e16
Pylint fix
dagardner-nv Jul 28, 2023
c7231af
Sphinx fixes
dagardner-nv Jul 28, 2023
2931839
Install yapf via conda-forge
dagardner-nv Aug 1, 2023
1be38bd
Explicitly delete the server, prevents file descriptor leak
dagardner-nv Aug 1, 2023
7de3daa
Don't hold a reference to the pipeline outside of the build method, e…
dagardner-nv Aug 1, 2023
02a60f4
Revert "Don't hold a reference to the pipeline outside of the build m…
dagardner-nv Aug 1, 2023
723600e
Merge branch 'branch-23.11' into david-rest-source-sink
dagardner-nv Aug 4, 2023
480e6bb
pyi
dagardner-nv Aug 4, 2023
d976b4c
Merge branch 'branch-23.11' into david-rest-source-sink
dagardner-nv Aug 22, 2023
a30ad21
Merge branch 'branch-23.11' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Aug 23, 2023
b0b9ec4
Fix merge errors [no ci]
dagardner-nv Aug 23, 2023
c7a7e26
Rename Rest->Http per PR feedback
dagardner-nv Aug 23, 2023
c622539
Rename Rest->Http per PR feedback
dagardner-nv Aug 23, 2023
5ebe7bb
Fix tests [no ci]
dagardner-nv Aug 24, 2023
1edd4a8
Define a type alias and named tuple for http parse responses
dagardner-nv Aug 24, 2023
22c3645
isort fixes
dagardner-nv Aug 24, 2023
66eea1e
Add docstring
dagardner-nv Aug 24, 2023
731a152
Add docstring
dagardner-nv Aug 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP
  • Loading branch information
dagardner-nv committed Jun 7, 2023
commit 85b728ee77ab3308b17775306374530008b8e817
77 changes: 61 additions & 16 deletions morpheus/stages/output/rest_server_sink_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,23 @@

import logging
import os
import queue
import typing
from io import StringIO

import mrc
import pandas as pd
from mrc.core import operators as ops

import cudf

from morpheus.cli.register_stage import register_stage
from morpheus.config import Config
from morpheus.io import serializers
from morpheus.messages import MessageMeta
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair
from morpheus.utils import http_utils
from morpheus.utils.producer_consumer_queue import Closed
from morpheus.utils.type_aliases import DataFrameType

logger = logging.getLogger(__name__)
Expand All @@ -49,13 +53,23 @@ class RestServerSinkStage(SinglePortStage):
The endpoint to listen for requests on.
method : str, default "GET"
HTTP method to listen for.
queue_timeout : int, default 5
Maximum amount of time in seconds to block on a queue put or get request. Must be smaller than
`request_timeout_secs`.
max_queue_size : int, default None
Maximum number of requests to queue before rejecting requests. If `None` then `config.edge_buffer_size` will be
used. Once the queue is full, the incoming edge buffer will begin to fill up.
num_server_threads : int, default None
Number of threads to use for the REST server. If `None` then `os.cpu_count()` will be used.
max_rows_per_response : int, optional
Maximum number of rows to include in a single response, by default 10000.
overflow_pct: float, optional
The stage stores incoming dataframes in a queue. If the received dataframes are smaller than
`max_rows_per_response * overflow_pct`, then additional dataframes are popped off of the queue.
Setting a higher number (0.9 or 1) can potentially improve performance by allowing as many dataframes to be
concatinated as possible into a single response, but with the possibility of returning a response containing
more than `max_rows_per_response` rows. Setting a lower number (0.5 or 0.75) decreases the chance, and a value
of `0` prevents this possibility entirely.
request_timeout_secs : int, default 30
The maximum amount of time in seconds for any given request.
lines : bool, default False
Expand All @@ -66,20 +80,25 @@ class RestServerSinkStage(SinglePortStage):
Optional custom dataframe serializer function.
"""

def __init__(self,
config: Config,
bind_address: str = "127.0.0.1",
port: int = 8080,
endpoint: str = "/message",
method: str = "GET",
max_queue_size: int = None,
num_server_threads: int = None,
max_rows_per_response: int = 10000,
request_timeout_secs: int = 30,
lines: bool = False,
df_serializer_fn: typing.Callable[[DataFrameType], str] = None):
def __init__(
self,
config: Config,
bind_address: str = "127.0.0.1",
port: int = 8080,
endpoint: str = "/message",
method: str = "GET",
queue_timeout: int = 5,
max_queue_size: int = None,
num_server_threads: int = None,
max_rows_per_response: int = 10000,
overflow_pct: float = 0.75, # TODO: find a better name for this
request_timeout_secs: int = 30,
lines: bool = False,
df_serializer_fn: typing.Callable[[DataFrameType], str] = None):
super().__init__(config)
self._queue_timeout = queue_timeout
self._max_rows_per_response = max_rows_per_response
self._overflow_pct = overflow_pct
self._request_timeout_secs = request_timeout_secs
self._lines = lines
self._df_serializer_fn = df_serializer_fn or self._default_df_serializer
Expand All @@ -88,7 +107,7 @@ def __init__(self,
from morpheus.common import RestServer

self._queue = FiberQueue(max_queue_size or config.edge_buffer_size)
self._server = RestServer(parse_fn=self._parse_payload,
self._server = RestServer(parse_fn=self._request_handler,
bind_address=bind_address,
port=port,
endpoint=endpoint,
Expand Down Expand Up @@ -124,6 +143,33 @@ def _default_df_serializer(self, df: DataFrameType) -> str:
str_buf.seek(0)
return str_buf.read()
dagardner-nv marked this conversation as resolved.
Show resolved Hide resolved

def _request_handler(self, _: str) -> typing.Tuple[int, str]:
dagardner-nv marked this conversation as resolved.
Show resolved Hide resolved
try:
num_rows = 0
data_frames = []
while (num_rows == 0 or num_rows < (self._max_rows_per_response * self._overflow_pct)):
df = self._queue.get(timeout=self._queue_timeout)
num_rows += len(df)
data_frames.append(df)

df = data_frames[0]
if len(data_frames) > 1:
cat_fn = pd.concat if isinstance(df, pd.DataFrame) else cudf.concat
df = cat_fn(data_frames)

return (200, self._df_serializer_fn(df))

except queue.Empty:
return (204, "No messages available")
except Closed:
err_msg = "DF queue is closed"
logger.error(err_msg)
return (503, err_msg)
except Exception as e:
err_msg = "Unknown error processing request"
logger.error(f"{err_msg}: %s", e)
return (500, err_msg)

def _partition_df(self, df: DataFrameType) -> typing.Iterable[DataFrameType]:
"""
Partition a dataframe into slices no larger than `self._max_rows_per_response`.
Expand All @@ -143,11 +189,10 @@ def _partition_df(self, df: DataFrameType) -> typing.Iterable[DataFrameType]:
slice_start = slice_end

def _process_message(self, msg: MessageMeta) -> MessageMeta:

# In order to conform to the `self._max_rows_per_response` argument we need to slice up the dataframe here
# because our queue isn't a deque.
for df_slice in self._partition_df(msg.df):
self._queue.put(df_slice)
self._queue.put(df_slice, timeout=self._queue_timeout)

return msg

Expand Down