Skip to content

Commit

Permalink
Introduce multi-endpoint servers and health check endpoints to HttpSe…
Browse files Browse the repository at this point in the history
…rverSourceStage (#1734)

Addresses: #1732

Currently, the `HttpServer` implementation does not support the initialization of a single server with multiple endpoints. This PR seeks to address this issue by refactoring the `HttpServer` by taking a list of `HttpEndpoint` objects (encapsulates the attributes of an http endpoint, ie. url, method, endpoint handler).

Additionally, this PR also introduces 2 endpoints to `HttpServerSourceStage` that allows users to conduct health checks on their pipelines.
- GET /live -> Immediately returns a 200 response if the server is alive
- GET /ready -> Immediately returns a 200 response if the server is ready to accept new messages

These endpoints are configurable by the user.

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - Jason Du (https://github.com/jadu-nv)
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1734
  • Loading branch information
jadu-nv authored Jun 26, 2024
1 parent a09261c commit 18eca56
Show file tree
Hide file tree
Showing 13 changed files with 308 additions and 162 deletions.
6 changes: 5 additions & 1 deletion morpheus/_lib/common/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ __all__ = [
"FiberQueue",
"FileTypes",
"FilterSource",
"HttpEndpoint",
"HttpServer",
"Tensor",
"TypeId",
Expand Down Expand Up @@ -108,10 +109,13 @@ class FilterSource():
TENSOR: morpheus._lib.common.FilterSource # value = <FilterSource.TENSOR: 1>
__members__: dict # value = {'Auto': <FilterSource.Auto: 0>, 'TENSOR': <FilterSource.TENSOR: 1>, 'DATAFRAME': <FilterSource.DATAFRAME: 2>}
pass
class HttpEndpoint():
def __init__(self, py_parse_fn: function, url: str, method: str) -> None: ...
pass
class HttpServer():
def __enter__(self) -> HttpServer: ...
def __exit__(self, arg0: object, arg1: object, arg2: object) -> None: ...
def __init__(self, parse_fn: function, bind_address: str = '127.0.0.1', port: int = 8080, endpoint: str = '/message', method: str = 'POST', num_threads: int = 1, max_payload_size: int = 10485760, request_timeout: int = 30) -> None: ...
def __init__(self, endpoints: typing.List[HttpEndpoint], bind_address: str = '127.0.0.1', port: int = 8080, num_threads: int = 1, max_payload_size: int = 10485760, request_timeout: int = 30) -> None: ...
def is_running(self) -> bool: ...
def start(self) -> None: ...
def stop(self) -> None: ...
Expand Down
8 changes: 5 additions & 3 deletions morpheus/_lib/common/module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <pybind11/pybind11.h>
#include <pybind11/pytypes.h> // for return_value_policy::reference
// for pathlib.Path -> std::filesystem::path conversions
#include <pybind11/stl.h> // IWYU pragma: keep
#include <pybind11/stl/filesystem.h> // IWYU pragma: keep

#include <filesystem> // for std::filesystem::path
Expand Down Expand Up @@ -151,13 +152,14 @@ PYBIND11_MODULE(common, _module)
.value("TENSOR", FilterSource::TENSOR)
.value("DATAFRAME", FilterSource::DATAFRAME);

py::class_<HttpEndpoint, std::shared_ptr<HttpEndpoint>>(_module, "HttpEndpoint")
.def(py::init<>(&HttpEndpointInterfaceProxy::init), py::arg("py_parse_fn"), py::arg("url"), py::arg("method"));

py::class_<HttpServer, std::shared_ptr<HttpServer>>(_module, "HttpServer")
.def(py::init<>(&HttpServerInterfaceProxy::init),
py::arg("parse_fn"),
py::arg("endpoints"),
py::arg("bind_address") = "127.0.0.1",
py::arg("port") = 8080,
py::arg("endpoint") = "/message",
py::arg("method") = "POST",
py::arg("num_threads") = 1,
py::arg("max_payload_size") = DefaultMaxPayloadSize,
py::arg("request_timeout") = 30)
Expand Down
14 changes: 12 additions & 2 deletions morpheus/_lib/include/morpheus/stages/http_server_source_stage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@
#include <pymrc/node.hpp> // for PythonSource
#include <rxcpp/rx.hpp> // for subscriber

#include <atomic> // for atomic
#include <chrono> // for duration
#include <cstddef> // for size_t
#include <cstdint> // for int64_t
#include <memory> // for shared_ptr & unique_ptr
#include <ratio> // for std::milli
#include <string> // for string & to_string
// IWYU thinks we're using thread::operator<<
// IWYU pragma: no_include <thread>
Expand Down Expand Up @@ -63,7 +63,11 @@ class MORPHEUS_EXPORT HttpServerSourceStage : public mrc::pymrc::PythonSource<st
HttpServerSourceStage(std::string bind_address = "127.0.0.1",
unsigned short port = 8080,
std::string endpoint = "/message",
std::string live_endpoint = "/live",
std::string ready_endpoint = "/ready",
std::string method = "POST",
std::string live_method = "GET",
std::string ready_method = "GET",
unsigned accept_status = 201,
float sleep_time = 0.1f,
long queue_timeout = 5,
Expand All @@ -81,10 +85,12 @@ class MORPHEUS_EXPORT HttpServerSourceStage : public mrc::pymrc::PythonSource<st
subscriber_fn_t build();
void source_generator(rxcpp::subscriber<source_type_t> subscriber);

std::chrono::duration<float, std::milli> m_sleep_time;
std::atomic<int> m_queue_cnt = 0;
std::chrono::steady_clock::duration m_sleep_time;
std::chrono::duration<long> m_queue_timeout;
std::unique_ptr<HttpServer> m_server;
request_queue_t m_queue;
std::size_t m_max_queue_size;
std::size_t m_stop_after;
std::size_t m_records_emitted;
};
Expand All @@ -100,7 +106,11 @@ struct MORPHEUS_EXPORT HttpServerSourceStageInterfaceProxy
std::string bind_address,
unsigned short port,
std::string endpoint,
std::string live_endpoint,
std::string ready_endpoint,
std::string method,
std::string live_method,
std::string ready_method,
unsigned accept_status,
float sleep_time,
long queue_timeout,
Expand Down
101 changes: 56 additions & 45 deletions morpheus/_lib/include/morpheus/utilities/http_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ namespace morpheus {
* @file
*/

class MORPHEUS_EXPORT Listener;

using on_complete_cb_fn_t = std::function<void(const boost::system::error_code& /* error message */)>;

/**
Expand All @@ -73,6 +71,55 @@ using payload_parse_fn_t = std::function<parse_status_t(const std::string& /* po

constexpr std::size_t DefaultMaxPayloadSize{1024 * 1024 * 10}; // 10MB

/**
* @brief A struct that encapsulates the http endpoint attributes
*
* @details Constructed to be used in the HttpServer class as http endpoint configurations
*/
struct MORPHEUS_EXPORT HttpEndpoint
{
HttpEndpoint(payload_parse_fn_t payload_parse_fn, std::string url, std::string method);

std::shared_ptr<payload_parse_fn_t> m_parser;
std::string m_url;
boost::beast::http::verb m_method;
};

/**
* @brief A class that listens for incoming HTTP requests.
*
* @details Constructed by the HttpServer class and should not be used directly.
*/
class MORPHEUS_EXPORT Listener : public std::enable_shared_from_this<Listener>
{
public:
Listener(boost::asio::io_context& io_context,
const std::string& bind_address,
unsigned short port,
std::vector<HttpEndpoint> endpoints,
std::size_t max_payload_size,
std::chrono::seconds request_timeout);

~Listener() = default;

void run();
void stop();
bool is_running() const;

private:
void do_accept();
void on_accept(boost::beast::error_code ec, boost::asio::ip::tcp::socket socket);

boost::asio::io_context& m_io_context;
boost::asio::ip::tcp::endpoint m_tcp_endpoint;
std::unique_ptr<boost::asio::ip::tcp::acceptor> m_acceptor;

std::vector<HttpEndpoint> m_endpoints;
std::size_t m_max_payload_size;
std::chrono::seconds m_request_timeout;
std::atomic<bool> m_is_running;
};

/**
* @brief A simple HTTP server that listens for POST or PUT requests on a given endpoint.
*
Expand All @@ -92,11 +139,9 @@ constexpr std::size_t DefaultMaxPayloadSize{1024 * 1024 * 10}; // 10MB
class MORPHEUS_EXPORT HttpServer
{
public:
HttpServer(payload_parse_fn_t payload_parse_fn,
HttpServer(std::vector<HttpEndpoint> endpoints,
std::string bind_address = "127.0.0.1",
unsigned short port = 8080,
std::string endpoint = "/message",
std::string method = "POST",
unsigned short num_threads = 1,
std::size_t max_payload_size = DefaultMaxPayloadSize,
std::chrono::seconds request_timeout = std::chrono::seconds(30));
Expand All @@ -110,55 +155,23 @@ class MORPHEUS_EXPORT HttpServer

std::string m_bind_address;
unsigned short m_port;
std::string m_endpoint;
boost::beast::http::verb m_method;
std::vector<HttpEndpoint> m_endpoints;
unsigned short m_num_threads;
std::chrono::seconds m_request_timeout;
std::size_t m_max_payload_size;
std::vector<std::thread> m_listener_threads;
boost::asio::io_context m_io_context;
std::shared_ptr<Listener> m_listener;
std::shared_ptr<payload_parse_fn_t> m_payload_parse_fn;
std::atomic<bool> m_is_running;
};

/****** HttpEndpointInterfaceProxy ************************/
/**
* @brief A class that listens for incoming HTTP requests.
*
* @details Constructed by the HttpServer class and should not be used directly.
* @brief Interface proxy, used to insulate python bindings.
*/
class MORPHEUS_EXPORT Listener : public std::enable_shared_from_this<Listener>
struct MORPHEUS_EXPORT HttpEndpointInterfaceProxy
{
public:
Listener(boost::asio::io_context& io_context,
std::shared_ptr<morpheus::payload_parse_fn_t> payload_parse_fn,
const std::string& bind_address,
unsigned short port,
const std::string& endpoint,
boost::beast::http::verb method,
std::size_t max_payload_size,
std::chrono::seconds request_timeout);

~Listener() = default;

void run();
void stop();
bool is_running() const;

private:
void do_accept();
void on_accept(boost::beast::error_code ec, boost::asio::ip::tcp::socket socket);

boost::asio::io_context& m_io_context;
boost::asio::ip::tcp::endpoint m_tcp_endpoint;
std::unique_ptr<boost::asio::ip::tcp::acceptor> m_acceptor;

std::shared_ptr<morpheus::payload_parse_fn_t> m_payload_parse_fn;
const std::string& m_url_endpoint;
boost::beast::http::verb m_method;
std::size_t m_max_payload_size;
std::chrono::seconds m_request_timeout;
std::atomic<bool> m_is_running;
static std::shared_ptr<HttpEndpoint> init(pybind11::function py_parse_fn, std::string m_url, std::string m_method);
};

/****** HttpServerInterfaceProxy *************************/
Expand All @@ -167,11 +180,9 @@ class MORPHEUS_EXPORT Listener : public std::enable_shared_from_this<Listener>
*/
struct MORPHEUS_EXPORT HttpServerInterfaceProxy
{
static std::shared_ptr<HttpServer> init(pybind11::function py_parse_fn,
static std::shared_ptr<HttpServer> init(std::vector<HttpEndpoint> endpoints,
std::string bind_address,
unsigned short port,
std::string endpoint,
std::string method,
unsigned short num_threads,
std::size_t max_payload_size,
int64_t request_timeout);
Expand Down
63 changes: 53 additions & 10 deletions morpheus/_lib/src/stages/http_server_source_stage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <boost/beast/http/status.hpp> // for int_to_status, status
#include <boost/fiber/channel_op_status.hpp> // for channel_op_status
#include <boost/fiber/operations.hpp> // for sleep_for
#include <cudf/io/json.hpp> // for json_reader_options & read_json
#include <glog/logging.h> // for CHECK & LOG

Expand All @@ -28,6 +29,7 @@
#include <thread> // for std::this_thread::sleep_for
#include <tuple> // for make_tuple
#include <utility> // for std::move
#include <vector> // for vector
// IWYU thinks we need more boost headers than we need as int_to_status is defined in status.hpp
// IWYU pragma: no_include <boost/beast/http.hpp>

Expand All @@ -41,7 +43,11 @@ class SourceStageStopAfter : public std::exception
HttpServerSourceStage::HttpServerSourceStage(std::string bind_address,
unsigned short port,
std::string endpoint,
std::string live_endpoint,
std::string ready_endpoint,
std::string method,
std::string live_method,
std::string ready_method,
unsigned accept_status,
float sleep_time,
long queue_timeout,
Expand All @@ -52,7 +58,8 @@ HttpServerSourceStage::HttpServerSourceStage(std::string bind_address,
bool lines,
std::size_t stop_after) :
PythonSource(build()),
m_sleep_time{sleep_time},
m_max_queue_size{max_queue_size},
m_sleep_time{std::chrono::milliseconds(static_cast<long int>(sleep_time))},
m_queue_timeout{queue_timeout},
m_queue{max_queue_size},
m_stop_after{stop_after},
Expand Down Expand Up @@ -83,6 +90,7 @@ HttpServerSourceStage::HttpServerSourceStage(std::string bind_address,

if (queue_status == boost::fibers::channel_op_status::success)
{
m_queue_cnt++;
return std::make_tuple(accept_status, "text/plain", std::string(), nullptr);
}

Expand Down Expand Up @@ -113,14 +121,40 @@ HttpServerSourceStage::HttpServerSourceStage(std::string bind_address,
return std::make_tuple(500u, "text/plain", error_msg, nullptr);
}
};
m_server = std::make_unique<HttpServer>(std::move(parser),
std::move(bind_address),
port,
std::move(endpoint),
std::move(method),
num_server_threads,
max_payload_size,
request_timeout);

payload_parse_fn_t live_parser = [this, accept_status, lines](const std::string& payload) {
if (!m_server->is_running())
{
std::string error_msg = "Source server is not running";
return std::make_tuple(500u, "text/plain", error_msg, nullptr);
}

return std::make_tuple(accept_status, "text/plain", std::string(), nullptr);
};

payload_parse_fn_t ready_parser = [this, accept_status, lines](const std::string& payload) {
if (!m_server->is_running())
{
std::string error_msg = "Source server is not running";
return std::make_tuple(500u, "text/plain", error_msg, nullptr);
}

if (m_queue_cnt < m_max_queue_size)
{
return std::make_tuple(accept_status, "text/plain", std::string(), nullptr);
}

std::string error_msg = "HTTP payload queue is full or unavailable to accept new values";
return std::make_tuple(503u, "text/plain", std::move(error_msg), nullptr);
};

std::vector<HttpEndpoint> endpoints;
endpoints.emplace_back(parser, endpoint, method);
endpoints.emplace_back(live_parser, live_endpoint, live_method);
endpoints.emplace_back(ready_parser, ready_endpoint, ready_method);

m_server = std::make_unique<HttpServer>(
std::move(endpoints), std::move(bind_address), port, num_server_threads, max_payload_size, request_timeout);
}

HttpServerSourceStage::subscriber_fn_t HttpServerSourceStage::build()
Expand Down Expand Up @@ -157,6 +191,7 @@ void HttpServerSourceStage::source_generator(rxcpp::subscriber<HttpServerSourceS
if (queue_status == boost::fibers::channel_op_status::success)
{
// NOLINTNEXTLINE(clang-diagnostic-unused-value)
m_queue_cnt--;
DCHECK_NOTNULL(table_ptr);
try
{
Expand All @@ -182,7 +217,7 @@ void HttpServerSourceStage::source_generator(rxcpp::subscriber<HttpServerSourceS
if (server_running)
{
// Sleep when there are no messages
std::this_thread::sleep_for(m_sleep_time);
boost::this_fiber::sleep_for(m_sleep_time);
}
}
else if (queue_status == boost::fibers::channel_op_status::closed)
Expand Down Expand Up @@ -220,7 +255,11 @@ std::shared_ptr<mrc::segment::Object<HttpServerSourceStage>> HttpServerSourceSta
std::string bind_address,
unsigned short port,
std::string endpoint,
std::string live_endpoint,
std::string ready_endpoint,
std::string method,
std::string live_method,
std::string ready_method,
unsigned accept_status,
float sleep_time,
long queue_timeout,
Expand All @@ -237,7 +276,11 @@ std::shared_ptr<mrc::segment::Object<HttpServerSourceStage>> HttpServerSourceSta
std::move(bind_address),
port,
std::move(endpoint),
std::move(live_endpoint),
std::move(ready_endpoint),
std::move(method),
std::move(live_method),
std::move(ready_method),
accept_status,
sleep_time,
queue_timeout,
Expand Down
Loading

0 comments on commit 18eca56

Please sign in to comment.