Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce multi-endpoint servers and health check endpoints to HttpServerSourceStage #1734

Merged
Prev Previous commit
Next Next commit
Add break statement, remove forward declarations, and empty tests
  • Loading branch information
jadu-nv committed Jun 26, 2024
commit 47f71bfbd56226e23be2c8cdf905742920e0549e
94 changes: 45 additions & 49 deletions morpheus/_lib/include/morpheus/utilities/http_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ namespace morpheus {
* @file
*/

class MORPHEUS_EXPORT Listener;

struct MORPHEUS_EXPORT HttpEndpoint;

using on_complete_cb_fn_t = std::function<void(const boost::system::error_code& /* error message */)>;

/**
Expand All @@ -75,51 +71,6 @@ using payload_parse_fn_t = std::function<parse_status_t(const std::string& /* po

constexpr std::size_t DefaultMaxPayloadSize{1024 * 1024 * 10}; // 10MB

/**
* @brief A simple HTTP server that listens for POST or PUT requests on a given endpoint.
*
* @details The server is started on a separate thread(s) and will call the provided payload_parse_fn_t
* function when an incoming request is received. The payload_parse_fn_t function is expected to
* return a tuple conforming to `parse_status_t` (ex: `std::make_tuple(200, "text/plain"s, "OK"s, nullptr)`).
*
* @param payload_parse_fn The function that will be called when a POST request is received.
* @param bind_address The address to bind the server to.
* @param port The port to bind the server to.
* @param endpoint The endpoint to listen for POST requests on.
* @param method The HTTP method to listen for.
* @param num_threads The number of threads to use for the server.
* @param max_payload_size The maximum size in bytes of the payload that the server will accept in a single request.
* @param request_timeout The timeout for a request.
*/
class MORPHEUS_EXPORT HttpServer
{
public:
HttpServer(std::vector<HttpEndpoint> endpoints,
std::string bind_address = "127.0.0.1",
unsigned short port = 8080,
unsigned short num_threads = 1,
std::size_t max_payload_size = DefaultMaxPayloadSize,
std::chrono::seconds request_timeout = std::chrono::seconds(30));
~HttpServer();
void start();
void stop();
bool is_running() const;

private:
void start_listener(std::binary_semaphore& listener_semaphore, std::binary_semaphore& started_semaphore);

std::string m_bind_address;
unsigned short m_port;
std::vector<HttpEndpoint> m_endpoints;
unsigned short m_num_threads;
std::chrono::seconds m_request_timeout;
std::size_t m_max_payload_size;
std::vector<std::thread> m_listener_threads;
boost::asio::io_context m_io_context;
std::shared_ptr<Listener> m_listener;
std::atomic<bool> m_is_running;
};

/**
* @brief A struct that encapsulates the http endpoint attributes
*
Expand Down Expand Up @@ -169,6 +120,51 @@ class MORPHEUS_EXPORT Listener : public std::enable_shared_from_this<Listener>
std::atomic<bool> m_is_running;
};

/**
* @brief A simple HTTP server that listens for POST or PUT requests on a given endpoint.
*
* @details The server is started on a separate thread(s) and will call the provided payload_parse_fn_t
* function when an incoming request is received. The payload_parse_fn_t function is expected to
* return a tuple conforming to `parse_status_t` (ex: `std::make_tuple(200, "text/plain"s, "OK"s, nullptr)`).
*
* @param payload_parse_fn The function that will be called when a POST request is received.
* @param bind_address The address to bind the server to.
* @param port The port to bind the server to.
* @param endpoint The endpoint to listen for POST requests on.
* @param method The HTTP method to listen for.
* @param num_threads The number of threads to use for the server.
* @param max_payload_size The maximum size in bytes of the payload that the server will accept in a single request.
* @param request_timeout The timeout for a request.
*/
class MORPHEUS_EXPORT HttpServer
{
public:
HttpServer(std::vector<HttpEndpoint> endpoints,
std::string bind_address = "127.0.0.1",
unsigned short port = 8080,
unsigned short num_threads = 1,
std::size_t max_payload_size = DefaultMaxPayloadSize,
std::chrono::seconds request_timeout = std::chrono::seconds(30));
~HttpServer();
void start();
void stop();
bool is_running() const;

private:
void start_listener(std::binary_semaphore& listener_semaphore, std::binary_semaphore& started_semaphore);

std::string m_bind_address;
unsigned short m_port;
std::vector<HttpEndpoint> m_endpoints;
unsigned short m_num_threads;
std::chrono::seconds m_request_timeout;
std::size_t m_max_payload_size;
std::vector<std::thread> m_listener_threads;
boost::asio::io_context m_io_context;
std::shared_ptr<Listener> m_listener;
std::atomic<bool> m_is_running;
};

/****** HttpEndpointInterfaceProxy ************************/
/**
* @brief Interface proxy, used to insulate python bindings.
Expand Down
1 change: 1 addition & 0 deletions morpheus/_lib/src/utilities/http_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ class Session : public std::enable_shared_from_this<Session>
m_response->set(http::field::content_type, std::get<1>(parse_status));
m_response->body() = std::get<2>(parse_status);
m_on_complete_cb = std::get<3>(parse_status);
jadu-nv marked this conversation as resolved.
Show resolved Hide resolved
break;
}
}

Expand Down
4 changes: 0 additions & 4 deletions tests/common/test_http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,6 @@ def check_server(url) -> None:
assert not server.is_running()


def test_simple_multi_endpoint():
pass


@pytest.mark.parametrize("endpoint", ["/test"])
def test_constructor_errors(endpoint: str):
with pytest.raises(RuntimeError):
Expand Down