Skip to content

Commit

Permalink
changed location of delay
Browse files Browse the repository at this point in the history
  • Loading branch information
jbkyang-nvi committed Nov 14, 2023
1 parent 587885d commit 3f05051
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 48 deletions.
4 changes: 3 additions & 1 deletion qa/L0_client_timeout/client_timeout_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,9 @@ def test_http_async_infer(self):
# response. Expect an exception for small timeout values.
with self.assertRaises(socket.timeout) as cm:
triton_client = httpclient.InferenceServerClient(
url="localhost:8000", verbose=True, network_timeout=2.0
url="localhost:8000",
verbose=True,
network_timeout=self.INFER_SMALL_INTERVAL,
)
async_request = triton_client.async_infer(
model_name=self.model_name_, inputs=self.inputs_, outputs=self.outputs_
Expand Down
16 changes: 7 additions & 9 deletions qa/L0_client_timeout/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ fi

export CUDA_VISIBLE_DEVICES=0
TIMEOUT_VALUE=100000000
SHORT_TIMEOUT_VALUE=1000
SHORT_TIMEOUT_VALUE=1
RET=0

CLIENT_TIMEOUT_TEST=client_timeout_test.py
Expand All @@ -60,7 +60,7 @@ source ../common/util.sh
mkdir -p $DATADIR/custom_identity_int32/1

# Test all APIs apart from Infer.
export TRITONSERVER_SERVER_DELAY_GRPC_RESPONSE_SEC=1
export TRITONSERVER_SERVER_DELAY_GRPC_RESPONSE_SEC=2
run_server
if [ "$SERVER_PID" == "0" ]; then
echo -e "\n***\n*** Failed to start $SERVER\n***"
Expand All @@ -70,14 +70,14 @@ fi

set +e
# Expect timeout for everything
sed -i 's#value: { string_value: "0" }#value: { string_value: "1" }#' $DATADIR/custom_identity_int32/config.pbtxt
$CLIENT_TIMEOUT_TEST_CPP -t $SHORT_TIMEOUT_VALUE -v -i grpc -p >> ${CLIENT_LOG}.c++.grpc_non_infer_apis 2>&1
if [ $? -eq 0 ]; then
echo -e "\n***\n*** Test expected to fail with short timeout but did not\n***"
RET=1
fi
if [ `grep -c "Deadline Exceeded" ${CLIENT_LOG}.c++.grpc_non_infer_apis` != "1" ]; then
if [ `grep -c "Deadline Exceeded" ${CLIENT_LOG}.c++.grpc_non_infer_apis` != "18" ]; then
cat ${CLIENT_LOG}.c++.grpc_non_infer_apis
echo -e "\n***\n*** Test Failed\n***"
echo -e "\n***\n*** Test Failed. Expected 18 failed\n***"
RET=1
fi
# Test all APIs with long timeout
Expand All @@ -92,9 +92,8 @@ kill $SERVER_PID
wait $SERVER_PID

# Test infer APIs
export TRITONSERVER_SERVER_DELAY_GRPC_RESPONSE_SEC=
unset TRITONSERVER_SERVER_DELAY_GRPC_RESPONSE_SEC
SERVER_ARGS="--model-repository=$DATADIR"
sed -i 's#value: { string_value: "1" }#value: { string_value: "0" }#' $DATADIR/custom_identity_int32/config.pbtxt
run_server
if [ "$SERVER_PID" == "0" ]; then
echo -e "\n***\n*** Failed to start $SERVER\n***"
Expand Down Expand Up @@ -240,9 +239,8 @@ kill $SERVER_PID
wait $SERVER_PID

# Test all APIs other than infer
export TRITONSERVER_SERVER_DELAY_GRPC_RESPONSE_SEC=1
export TRITONSERVER_SERVER_DELAY_GRPC_RESPONSE_SEC=2
SERVER_ARGS="${SERVER_ARGS} --model-control-mode=explicit --load-model=custom_identity_int32 --log-verbose 2"
sed -i 's#value: { string_value: "0" }#value: { string_value: "1" }#' $DATADIR/custom_identity_int32/config.pbtxt
run_server
if [ "$SERVER_PID" == "0" ]; then
echo -e "\n***\n*** Failed to start $SERVER\n***"
Expand Down
77 changes: 51 additions & 26 deletions src/grpc/grpc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,11 @@ class CommonCallData : public ICallData {
const StandardRegisterFunc OnRegister,
const StandardCallbackFunc OnExecute, const bool async,
::grpc::ServerCompletionQueue* cq,
const std::pair<std::string, std::string>& restricted_kv)
const std::pair<std::string, std::string>& restricted_kv,
const uint64_t& response_delay = 0)
: name_(name), id_(id), OnRegister_(OnRegister), OnExecute_(OnExecute),
async_(async), cq_(cq), responder_(&ctx_), step_(Steps::START),
restricted_kv_(restricted_kv)
restricted_kv_(restricted_kv), response_delay_(response_delay)
{
OnRegister_(&ctx_, &request_, &responder_, this);
LOG_VERBOSE(1) << "Ready for RPC '" << name_ << "', " << id_;
Expand Down Expand Up @@ -140,6 +141,8 @@ class CommonCallData : public ICallData {
Steps step_;

std::pair<std::string, std::string> restricted_kv_{"", ""};

const uint64_t response_delay_;
};

template <typename ResponderType, typename RequestType, typename ResponseType>
Expand All @@ -165,7 +168,8 @@ CommonCallData<ResponderType, RequestType, ResponseType>::Process(bool rpc_ok)
// Start a new request to replace this one...
if (!shutdown) {
new CommonCallData<ResponderType, RequestType, ResponseType>(
name_, id_ + 1, OnRegister_, OnExecute_, async_, cq_, restricted_kv_);
name_, id_ + 1, OnRegister_, OnExecute_, async_, cq_, restricted_kv_,
response_delay_);
}

if (!async_) {
Expand Down Expand Up @@ -234,6 +238,14 @@ template <typename ResponderType, typename RequestType, typename ResponseType>
void
CommonCallData<ResponderType, RequestType, ResponseType>::WriteResponse()
{
if (response_delay_ != 0) {
// Will delay the write of the response by the specified time.
// This can be used to test the flow where there are other
// responses available to be written.
LOG_VERBOSE(1) << "Delaying the write of the response by "
<< response_delay_ << " seconds";
std::this_thread::sleep_for(std::chrono::seconds(response_delay_));
}
step_ = Steps::COMPLETE;
responder_.Finish(response_, status_, this);
}
Expand All @@ -253,7 +265,7 @@ class CommonHandler : public HandlerBase {
inference::GRPCInferenceService::AsyncService* service,
::grpc::health::v1::Health::AsyncService* health_service,
::grpc::ServerCompletionQueue* cq,
const RestrictedFeatures& restricted_keys);
const RestrictedFeatures& restricted_keys, const uint64_t response_delay);

// Descriptive name of of the handler.
const std::string& Name() const { return name_; }
Expand Down Expand Up @@ -299,6 +311,7 @@ class CommonHandler : public HandlerBase {
::grpc::ServerCompletionQueue* cq_;
std::unique_ptr<std::thread> thread_;
const RestrictedFeatures& restricted_keys_;
const uint64_t response_delay_ = 0;
};

CommonHandler::CommonHandler(
Expand All @@ -309,11 +322,12 @@ CommonHandler::CommonHandler(
inference::GRPCInferenceService::AsyncService* service,
::grpc::health::v1::Health::AsyncService* health_service,
::grpc::ServerCompletionQueue* cq,
const RestrictedFeatures& restricted_keys)
const RestrictedFeatures& restricted_keys,
const uint64_t response_delay = 0)
: name_(name), tritonserver_(tritonserver), shm_manager_(shm_manager),
trace_manager_(trace_manager), service_(service),
health_service_(health_service), cq_(cq),
restricted_keys_(restricted_keys)
restricted_keys_(restricted_keys), response_delay_(response_delay)
{
}

Expand Down Expand Up @@ -440,7 +454,7 @@ CommonHandler::RegisterServerLive()
::grpc::ServerAsyncResponseWriter<inference::ServerLiveResponse>,
inference::ServerLiveRequest, inference::ServerLiveResponse>(
"ServerLive", 0, OnRegisterServerLive, OnExecuteServerLive,
false /* async */, cq_, restricted_kv);
false /* async */, cq_, restricted_kv, response_delay_);
}

void
Expand Down Expand Up @@ -476,7 +490,7 @@ CommonHandler::RegisterServerReady()
::grpc::ServerAsyncResponseWriter<inference::ServerReadyResponse>,
inference::ServerReadyRequest, inference::ServerReadyResponse>(
"ServerReady", 0, OnRegisterServerReady, OnExecuteServerReady,
false /* async */, cq_, restricted_kv);
false /* async */, cq_, restricted_kv, response_delay_);
}

void
Expand Down Expand Up @@ -525,7 +539,7 @@ CommonHandler::RegisterHealthCheck()
::grpc::health::v1::HealthCheckRequest,
::grpc::health::v1::HealthCheckResponse>(
"Check", 0, OnRegisterHealthCheck, OnExecuteHealthCheck,
false /* async */, cq_, restricted_kv);
false /* async */, cq_, restricted_kv, response_delay_);
}

void
Expand Down Expand Up @@ -567,7 +581,7 @@ CommonHandler::RegisterModelReady()
::grpc::ServerAsyncResponseWriter<inference::ModelReadyResponse>,
inference::ModelReadyRequest, inference::ModelReadyResponse>(
"ModelReady", 0, OnRegisterModelReady, OnExecuteModelReady,
false /* async */, cq_, restricted_kv);
false /* async */, cq_, restricted_kv, response_delay_);
}

void
Expand Down Expand Up @@ -645,7 +659,7 @@ CommonHandler::RegisterServerMetadata()
::grpc::ServerAsyncResponseWriter<inference::ServerMetadataResponse>,
inference::ServerMetadataRequest, inference::ServerMetadataResponse>(
"ServerMetadata", 0, OnRegisterServerMetadata, OnExecuteServerMetadata,
false /* async */, cq_, restricted_kv);
false /* async */, cq_, restricted_kv, response_delay_);
}

void
Expand Down Expand Up @@ -813,7 +827,7 @@ CommonHandler::RegisterModelMetadata()
::grpc::ServerAsyncResponseWriter<inference::ModelMetadataResponse>,
inference::ModelMetadataRequest, inference::ModelMetadataResponse>(
"ModelMetadata", 0, OnRegisterModelMetadata, OnExecuteModelMetadata,
false /* async */, cq_, restricted_kv);
false /* async */, cq_, restricted_kv, response_delay_);
}

void
Expand Down Expand Up @@ -866,7 +880,7 @@ CommonHandler::RegisterModelConfig()
::grpc::ServerAsyncResponseWriter<inference::ModelConfigResponse>,
inference::ModelConfigRequest, inference::ModelConfigResponse>(
"ModelConfig", 0, OnRegisterModelConfig, OnExecuteModelConfig,
false /* async */, cq_, restricted_kv);
false /* async */, cq_, restricted_kv, response_delay_);
}

void
Expand Down Expand Up @@ -1196,7 +1210,7 @@ CommonHandler::RegisterModelStatistics()
::grpc::ServerAsyncResponseWriter<inference::ModelStatisticsResponse>,
inference::ModelStatisticsRequest, inference::ModelStatisticsResponse>(
"ModelStatistics", 0, OnRegisterModelStatistics, OnExecuteModelStatistics,
false /* async */, cq_, restricted_kv);
false /* async */, cq_, restricted_kv, response_delay_);
}

void
Expand Down Expand Up @@ -1471,7 +1485,7 @@ CommonHandler::RegisterTrace()
::grpc::ServerAsyncResponseWriter<inference::TraceSettingResponse>,
inference::TraceSettingRequest, inference::TraceSettingResponse>(
"Trace", 0, OnRegisterTrace, OnExecuteTrace, false /* async */, cq_,
restricted_kv);
restricted_kv, response_delay_);
}

void
Expand Down Expand Up @@ -1680,7 +1694,7 @@ CommonHandler::RegisterLogging()
::grpc::ServerAsyncResponseWriter<inference::LogSettingsResponse>,
inference::LogSettingsRequest, inference::LogSettingsResponse>(
"Logging", 0, OnRegisterLogging, OnExecuteLogging, false /* async */, cq_,
restricted_kv);
restricted_kv, response_delay_);
}

void
Expand Down Expand Up @@ -1754,7 +1768,8 @@ CommonHandler::RegisterSystemSharedMemoryStatus()
inference::SystemSharedMemoryStatusRequest,
inference::SystemSharedMemoryStatusResponse>(
"SystemSharedMemoryStatus", 0, OnRegisterSystemSharedMemoryStatus,
OnExecuteSystemSharedMemoryStatus, false /* async */, cq_, restricted_kv);
OnExecuteSystemSharedMemoryStatus, false /* async */, cq_, restricted_kv,
response_delay_);
}

void
Expand Down Expand Up @@ -1793,7 +1808,7 @@ CommonHandler::RegisterSystemSharedMemoryRegister()
inference::SystemSharedMemoryRegisterResponse>(
"SystemSharedMemoryRegister", 0, OnRegisterSystemSharedMemoryRegister,
OnExecuteSystemSharedMemoryRegister, false /* async */, cq_,
restricted_kv);
restricted_kv, response_delay_);
}

void
Expand Down Expand Up @@ -1836,7 +1851,7 @@ CommonHandler::RegisterSystemSharedMemoryUnregister()
inference::SystemSharedMemoryUnregisterResponse>(
"SystemSharedMemoryUnregister", 0, OnRegisterSystemSharedMemoryUnregister,
OnExecuteSystemSharedMemoryUnregister, false /* async */, cq_,
restricted_kv);
restricted_kv, response_delay_);
}

void
Expand Down Expand Up @@ -1902,7 +1917,8 @@ CommonHandler::RegisterCudaSharedMemoryStatus()
inference::CudaSharedMemoryStatusRequest,
inference::CudaSharedMemoryStatusResponse>(
"CudaSharedMemoryStatus", 0, OnRegisterCudaSharedMemoryStatus,
OnExecuteCudaSharedMemoryStatus, false /* async */, cq_, restricted_kv);
OnExecuteCudaSharedMemoryStatus, false /* async */, cq_, restricted_kv,
response_delay_);
}

void
Expand Down Expand Up @@ -1952,7 +1968,8 @@ CommonHandler::RegisterCudaSharedMemoryRegister()
inference::CudaSharedMemoryRegisterRequest,
inference::CudaSharedMemoryRegisterResponse>(
"CudaSharedMemoryRegister", 0, OnRegisterCudaSharedMemoryRegister,
OnExecuteCudaSharedMemoryRegister, false /* async */, cq_, restricted_kv);
OnExecuteCudaSharedMemoryRegister, false /* async */, cq_, restricted_kv,
response_delay_);
}

void
Expand Down Expand Up @@ -1995,7 +2012,7 @@ CommonHandler::RegisterCudaSharedMemoryUnregister()
inference::CudaSharedMemoryUnregisterResponse>(
"CudaSharedMemoryUnregister", 0, OnRegisterCudaSharedMemoryUnregister,
OnExecuteCudaSharedMemoryUnregister, false /* async */, cq_,
restricted_kv);
restricted_kv, response_delay_);
}

void
Expand Down Expand Up @@ -2097,7 +2114,7 @@ CommonHandler::RegisterRepositoryIndex()
::grpc::ServerAsyncResponseWriter<inference::RepositoryIndexResponse>,
inference::RepositoryIndexRequest, inference::RepositoryIndexResponse>(
"RepositoryIndex", 0, OnRegisterRepositoryIndex, OnExecuteRepositoryIndex,
false /* async */, cq_, restricted_kv);
false /* async */, cq_, restricted_kv, response_delay_);
}

void
Expand Down Expand Up @@ -2209,7 +2226,8 @@ CommonHandler::RegisterRepositoryModelLoad()
inference::RepositoryModelLoadRequest,
inference::RepositoryModelLoadResponse>(
"RepositoryModelLoad", 0, OnRegisterRepositoryModelLoad,
OnExecuteRepositoryModelLoad, true /* async */, cq_, restricted_kv);
OnExecuteRepositoryModelLoad, true /* async */, cq_, restricted_kv,
response_delay_);
}

void
Expand Down Expand Up @@ -2278,7 +2296,8 @@ CommonHandler::RegisterRepositoryModelUnload()
inference::RepositoryModelUnloadRequest,
inference::RepositoryModelUnloadResponse>(
"RepositoryModelUnload", 0, OnRegisterRepositoryModelUnload,
OnExecuteRepositoryModelUnload, true /* async */, cq_, restricted_kv);
OnExecuteRepositoryModelUnload, true /* async */, cq_, restricted_kv,
response_delay_);
}

} // namespace
Expand Down Expand Up @@ -2388,9 +2407,15 @@ Server::Server(
model_stream_infer_cq_ = builder_.AddCompletionQueue();

// A common Handler for other non-inference requests
const char* dstr = getenv("TRITONSERVER_SERVER_DELAY_GRPC_RESPONSE_SEC");
uint64_t response_delay = 0;
if (dstr != nullptr) {
response_delay = atoi(dstr);
}
common_handler_.reset(new CommonHandler(
"CommonHandler", tritonserver_, shm_manager_, trace_manager_, &service_,
&health_service_, common_cq_.get(), options.restricted_protocols_));
&health_service_, common_cq_.get(), options.restricted_protocols_,
response_delay));

// [FIXME] "register" logic is different for infer
// Handler for model inference requests.
Expand Down
12 changes: 0 additions & 12 deletions src/grpc/grpc_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,6 @@ operator<<(std::ostream& out, const Steps& step)
void
GrpcStatusUtil::Create(::grpc::Status* status, TRITONSERVER_Error* err)
{
const char* dstr = getenv("TRITONSERVER_SERVER_DELAY_GRPC_RESPONSE_SEC");
uint64_t delay_response = 0;
if (dstr != nullptr) {
delay_response = atoi(dstr);
// Will delay the write of the response by the specified time.
// This can be used to test the flow where there are other
// responses available to be written.
LOG_VERBOSE(1) << "Delaying the write of the response by " << delay_response
<< " seconds";
std::this_thread::sleep_for(std::chrono::seconds(delay_response));
}

if (err == nullptr) {
*status = ::grpc::Status::OK;
} else {
Expand Down

0 comments on commit 3f05051

Please sign in to comment.