Skip to content

Commit

Permalink
HTTP live connections on server shutdown (triton-inference-server#6986)
Browse files Browse the repository at this point in the history
* Wait for HTTP connection when shutting down

* Add test for shutdown with live HTTP connection

* Use TRITONSERVER_ServerSetExitTimeout() API

* Variable name update

* Stop HTTP service immediately after all connections close

* Remove unused include

* Remove accept new connection check

* Adjust test for 'Remove accept new connection check' and add testing for countdown restart

* Fix non exit timeout supported endpoints

* Improve existing shutdown test reliability on extra http shutdown delay

* Fix gap between decided to close socket and actually close socket

* Start rejecting new connections once shutdown polling start

* Group checking logic
  • Loading branch information
kthui authored Apr 9, 2024
1 parent 726b764 commit 157ec72
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 8 deletions.
45 changes: 45 additions & 0 deletions qa/L0_lifecycle/lifecycle_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2662,6 +2662,7 @@ def callback(user_data, result, error):

# Send signal to shutdown the server
os.kill(int(os.environ["SERVER_PID"]), signal.SIGINT)
time.sleep(0.5)

# Send more requests and should be rejected
try:
Expand Down Expand Up @@ -2721,6 +2722,7 @@ def callback(user_data, result, error):

# Send signal to shutdown the server
os.kill(int(os.environ["SERVER_PID"]), signal.SIGINT)
time.sleep(0.5)

# Send requests with different characteristic
# 1: New sequence with new sequence ID
Expand Down Expand Up @@ -2808,6 +2810,7 @@ def callback(user_data, result, error):

# Send signal to shutdown the server
os.kill(int(os.environ["SERVER_PID"]), signal.SIGINT)
time.sleep(0.5)

# Send more requests and should be rejected
try:
Expand Down Expand Up @@ -3360,6 +3363,48 @@ def test_shutdown_while_loading(self):
# The server will shutdown after this sub-test exits. The server must shutdown
# without any hang or runtime error.

def test_shutdown_with_live_connection(self):
model_name = "add_sub"
model_shape = (16,)
from geventhttpclient.response import HTTPConnectionClosed

input_data = np.ones(shape=model_shape, dtype=np.float32)
inputs = [
httpclient.InferInput("INPUT0", model_shape, "FP32"),
httpclient.InferInput("INPUT1", model_shape, "FP32"),
]
inputs[0].set_data_from_numpy(input_data)
inputs[1].set_data_from_numpy(input_data)

# start connection
conn = httpclient.InferenceServerClient("localhost:8000", verbose=True)
conn.infer(model_name, inputs)

# shutdown the server
os.kill(int(os.environ["SERVER_PID"]), signal.SIGINT)
time.sleep(2)

# connection should still work
conn.infer(model_name, inputs)

# close connection
conn.close()
time.sleep(2)

# check exit timeout countdown did not restart
with open(os.environ["SERVER_LOG"]) as f:
server_log = f.read()
self.assertIn(
"Waiting for in-flight requests to complete.",
server_log,
"precondition not met - core shutdown did not begin",
)
self.assertEqual(
server_log.count("Timeout 30: "),
1,
"exit timeout countdown restart detected",
)


if __name__ == "__main__":
unittest.main()
29 changes: 29 additions & 0 deletions qa/L0_lifecycle/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2102,6 +2102,35 @@ if [ "$ACTUAL_LOAD_UNLOAD_ORDER" != "$EXPECTED_LOAD_UNLOAD_ORDER" ]; then
RET=1
fi

LOG_IDX=$((LOG_IDX+1))

# LifeCycleTest.test_shutdown_with_live_connection
rm -rf models
mkdir models
cp -r ../python_models/add_sub models/ && (cd models/add_sub && \
mkdir 1 && mv model.py 1)

SERVER_ARGS="--model-repository=`pwd`/models"
SERVER_LOG="./inference_server_$LOG_IDX.log"
run_server
if [ "$SERVER_PID" == "0" ]; then
echo -e "\n***\n*** Failed to start $SERVER\n***"
cat $SERVER_LOG
exit 1
fi

set +e
SERVER_PID=$SERVER_PID SERVER_LOG=$SERVER_LOG python $LC_TEST LifeCycleTest.test_shutdown_with_live_connection >>$CLIENT_LOG 2>&1
if [ $? -ne 0 ]; then
cat $CLIENT_LOG
echo -e "\n***\n*** Test Failed\n***"
RET=1
fi
set -e

kill $SERVER_PID
wait $SERVER_PID

if [ $RET -eq 0 ]; then
echo -e "\n***\n*** Test Passed\n***"
else
Expand Down
46 changes: 44 additions & 2 deletions src/http_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ HTTPServer::Start()
evhtp_enable_flag(htp_, EVHTP_FLAG_ENABLE_REUSEPORT);
}
evhtp_set_gencb(htp_, HTTPServer::Dispatch, this);
evhtp_set_pre_accept_cb(htp_, HTTPServer::NewConnection, this);
evhtp_use_threads_wexit(htp_, NULL, NULL, thread_cnt_, NULL);
if (evhtp_bind_socket(htp_, address_.c_str(), port_, 1024) != 0) {
return TRITONSERVER_ErrorNew(
Expand All @@ -235,8 +236,22 @@ HTTPServer::Start()
}

TRITONSERVER_Error*
HTTPServer::Stop()
HTTPServer::Stop(uint32_t* exit_timeout_secs, const std::string& service_name)
{
{
std::lock_guard<std::mutex> lock(conn_mu_);
accepting_new_conn_ = false;
}
if (exit_timeout_secs != nullptr) {
// Note: conn_cnt_ can only decrease
while (*exit_timeout_secs > 0 && conn_cnt_ > 0) {
LOG_INFO << "Timeout " << *exit_timeout_secs << ": Found " << conn_cnt_
<< " " << service_name << " service connections";
std::this_thread::sleep_for(std::chrono::seconds(1));
(*exit_timeout_secs)--;
}
}

if (worker_.joinable()) {
// Notify event loop to break via fd write
send(fds_[1], (const char*)&evbase_, sizeof(event_base*), 0);
Expand All @@ -249,7 +264,6 @@ HTTPServer::Stop()
event_base_free(evbase_);
return nullptr;
}

return TRITONSERVER_ErrorNew(
TRITONSERVER_ERROR_UNAVAILABLE, "HTTP server is not running.");
}
Expand All @@ -267,6 +281,34 @@ HTTPServer::Dispatch(evhtp_request_t* req, void* arg)
(static_cast<HTTPServer*>(arg))->Handle(req);
}

evhtp_res
HTTPServer::NewConnection(evhtp_connection_t* conn, void* arg)
{
HTTPServer* server = static_cast<HTTPServer*>(arg);
{
std::lock_guard<std::mutex> lock(server->conn_mu_);
if (!server->accepting_new_conn_) {
return EVHTP_RES_SERVUNAVAIL; // reset connection
}
server->conn_cnt_++;
}
evhtp_connection_set_hook(
conn, evhtp_hook_on_connection_fini,
(evhtp_hook)(void*)HTTPServer::EndConnection, arg);
return EVHTP_RES_OK;
}

evhtp_res
HTTPServer::EndConnection(evhtp_connection_t* conn, void* arg)
{
HTTPServer* server = static_cast<HTTPServer*>(arg);
{
std::lock_guard<std::mutex> lock(server->conn_mu_);
server->conn_cnt_--;
}
return EVHTP_RES_OK;
}

#ifdef TRITON_ENABLE_METRICS

void
Expand Down
15 changes: 13 additions & 2 deletions src/http_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
Expand Down Expand Up @@ -80,15 +81,18 @@ class HTTPServer {
virtual ~HTTPServer() { IGNORE_ERR(Stop()); }

TRITONSERVER_Error* Start();
TRITONSERVER_Error* Stop();
TRITONSERVER_Error* Stop(
uint32_t* exit_timeout_secs = nullptr,
const std::string& service_name = "HTTP");

protected:
explicit HTTPServer(
const int32_t port, const bool reuse_port, const std::string& address,
const std::string& header_forward_pattern, const int thread_cnt)
: port_(port), reuse_port_(reuse_port), address_(address),
header_forward_pattern_(header_forward_pattern),
thread_cnt_(thread_cnt), header_forward_regex_(header_forward_pattern_)
thread_cnt_(thread_cnt), header_forward_regex_(header_forward_pattern_),
conn_cnt_(0), accepting_new_conn_(true)
{
}

Expand All @@ -100,6 +104,9 @@ class HTTPServer {

static void StopCallback(evutil_socket_t sock, short events, void* arg);

static evhtp_res NewConnection(evhtp_connection_t* conn, void* arg);
static evhtp_res EndConnection(evhtp_connection_t* conn, void* arg);

int32_t port_;
bool reuse_port_;
std::string address_;
Expand All @@ -112,6 +119,10 @@ class HTTPServer {
std::thread worker_;
evutil_socket_t fds_[2];
event* break_ev_;

std::mutex conn_mu_;
uint32_t conn_cnt_;
bool accepting_new_conn_;
};

#ifdef TRITON_ENABLE_METRICS
Expand Down
26 changes: 22 additions & 4 deletions src/main.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// Copyright 2018-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
Expand Down Expand Up @@ -300,13 +300,13 @@ StartEndpoints(
}

bool
StopEndpoints()
StopEndpoints(uint32_t* exit_timeout_secs)
{
bool ret = true;

#ifdef TRITON_ENABLE_HTTP
if (g_http_service) {
TRITONSERVER_Error* err = g_http_service->Stop();
TRITONSERVER_Error* err = g_http_service->Stop(exit_timeout_secs);
if (err != nullptr) {
LOG_TRITONSERVER_ERROR(err, "failed to stop HTTP service");
ret = false;
Expand All @@ -316,6 +316,17 @@ StopEndpoints()
}
#endif // TRITON_ENABLE_HTTP

return ret;
}

bool
StopEndpoints()
{
bool ret = true;

// TODO: Add support for 'exit_timeout_secs' to the endpoints below and move
// them to the 'StopEndpoints(uint32_t* exit_timeout_secs)' function above.

#ifdef TRITON_ENABLE_GRPC
if (g_grpc_service) {
TRITONSERVER_Error* err = g_grpc_service->Stop();
Expand Down Expand Up @@ -509,6 +520,11 @@ main(int argc, char** argv)
triton::server::signal_exit_cv_.wait_for(lock, wait_timeout);
}

// Stop the HTTP[, gRPC, and metrics] endpoints, and update exit timeout.
uint32_t exit_timeout_secs = g_triton_params.exit_timeout_secs_;
StopEndpoints(&exit_timeout_secs);
TRITONSERVER_ServerSetExitTimeout(server_ptr, exit_timeout_secs);

TRITONSERVER_Error* stop_err = TRITONSERVER_ServerStop(server_ptr);

// If unable to gracefully stop the server then Triton threads and
Expand All @@ -519,8 +535,10 @@ main(int argc, char** argv)
exit(1);
}

// Stop tracing and the HTTP, GRPC, and metrics endpoints.
// Stop gRPC and metrics endpoints that do not yet support exit timeout.
StopEndpoints();

// Stop tracing.
StopTracing(&trace_manager);

#ifdef TRITON_ENABLE_ASAN
Expand Down

0 comments on commit 157ec72

Please sign in to comment.