Skip to content

Commit

Permalink
Improve ensemble to use cudaMemcpyAsync (triton-inference-server#582)
Browse files Browse the repository at this point in the history
* Adding cuda stream in ensemble

* Add hook on collecting preferred input memory type

* Fix GetNextInputContent() to set 'memory_type' for override content
  • Loading branch information
GuanLuo authored and deadeyegoodwin committed Aug 28, 2019
1 parent b1a0577 commit 5dd7870
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 24 deletions.
2 changes: 1 addition & 1 deletion src/backends/custom/custom_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ CustomBackend::Context::GetNextInput(

// If the memory type is on GPU, implicitly copying it to CPU memory
// to ensure backward capability
TRTSERVER_Memory_Type src_memory_type;
auto src_memory_type = TRTSERVER_MEMORY_CPU;
Status status = payload->request_provider_->GetNextInputContent(
name, content, content_byte_size, &src_memory_type, false);
#ifdef TRTIS_ENABLE_GPU
Expand Down
2 changes: 1 addition & 1 deletion src/backends/tensorflow/base_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ SetStringInputTensor(

// For string data type, we always need to copy the data to CPU so that
// we can read string length and construct the string properly.
TRTSERVER_Memory_Type src_memory_type;
auto src_memory_type = TRTSERVER_MEMORY_CPU;
const void* vcontent;
size_t content_byte_size = expected_element_cnt * sizeof(uint32_t);
payload.status_ = payload.request_provider_->GetNextInputContent(
Expand Down
2 changes: 1 addition & 1 deletion src/core/backend_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ BackendContext::SetInputBuffer(

size_t copied_byte_size = 0;
while (payload.status_.IsOk()) {
TRTSERVER_Memory_Type src_memory_type;
auto src_memory_type = dst_memory_type;
const void* content;
size_t content_byte_size = expected_byte_size - copied_byte_size;
payload.status_ = payload.request_provider_->GetNextInputContent(
Expand Down
79 changes: 62 additions & 17 deletions src/core/ensemble_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@
#include "src/core/server.h"
#include "src/core/server_status.h"

#ifdef TRTIS_ENABLE_GPU
#include <cuda_runtime_api.h>
#endif // TRTIS_ENABLE_GPU

namespace nvidia { namespace inferenceserver {

namespace {
Expand Down Expand Up @@ -73,7 +69,7 @@ class EnsembleContext {
const std::shared_ptr<ModelInferStats>& stats,
const std::shared_ptr<InferRequestProvider>& request_provider,
const std::shared_ptr<InferResponseProvider>& response_provider,
std::function<void(const Status&)> OnComplete);
std::function<void(const Status&)> OnComplete, cudaStream_t stream);

// Perform transition on 'context' state given the information of
// 'completed_step'
Expand Down Expand Up @@ -144,6 +140,10 @@ class EnsembleContext {

EnsembleInfo* info_;

// All EnsembleContext will use the same CUDA stream managed by
// the ensemble scheduler
cudaStream_t stream_;

// Mutex to avoid concurrent call on 'PrepareSteps' where ensemble state
// are being modified
std::mutex mutex_;
Expand Down Expand Up @@ -183,9 +183,9 @@ EnsembleContext::EnsembleContext(
const std::shared_ptr<ModelInferStats>& stats,
const std::shared_ptr<InferRequestProvider>& request_provider,
const std::shared_ptr<InferResponseProvider>& response_provider,
std::function<void(const Status&)> OnComplete)
: is_(is), info_(info), inflight_step_counter_(0), stats_(stats),
request_provider_(request_provider),
std::function<void(const Status&)> OnComplete, cudaStream_t stream)
: is_(is), info_(info), stream_(stream), inflight_step_counter_(0),
stats_(stats), request_provider_(request_provider),
response_provider_(response_provider), OnComplete_(OnComplete),
allocator_(nullptr, TRTSERVER_ResponseAllocatorDelete)
{
Expand Down Expand Up @@ -567,6 +567,7 @@ EnsembleContext::FinishEnsemble()
Status
EnsembleContext::CheckAndSetEnsembleOutput()
{
bool cuda_async_copy = false;
for (const auto& output_pair : info_->ensemble_output_shape_) {
if (!response_provider_->RequiresOutput(output_pair.first)) {
continue;
Expand Down Expand Up @@ -646,9 +647,20 @@ EnsembleContext::CheckAndSetEnsembleOutput()
} else {
copy_type = cudaMemcpyDeviceToHost;
}
// [TODO] create stream for EnsembleContext for using async call
cudaError_t err = cudaMemcpy(
((char*)buffer) + content_offset, content, content_size, copy_type);

cudaError_t err;
// use default stream if no CUDA stream is created for the ensemble
if (stream_ == nullptr) {
err = cudaMemcpy(
((char*)buffer) + content_offset, content, content_size,
copy_type);
} else {
err = cudaMemcpyAsync(
((char*)buffer) + content_offset, content, content_size,
copy_type, stream_);
cuda_async_copy |= (err == cudaSuccess);
}

if (err != cudaSuccess) {
return Status(
RequestStatusCode::INTERNAL,
Expand All @@ -657,9 +669,9 @@ EnsembleContext::CheckAndSetEnsembleOutput()
}
#else
return Status(
RequestStatusCode::INTERNAL,
"try to use CUDA copy for output '" + output_pair.first +
"' while GPU is not supported"));
RequestStatusCode::INTERNAL, "try to use CUDA copy for output '" +
output_pair.first +
"' while GPU is not supported");
#endif // TRTIS_ENABLE_GPU
}

Expand All @@ -669,6 +681,17 @@ EnsembleContext::CheckAndSetEnsembleOutput()
memory_block->BufferAt(content_idx, &content_size, &src_memory_type);
}
}

if (cuda_async_copy) {
#ifdef TRTIS_ENABLE_GPU
cudaStreamSynchronize(stream_);
#else
return Status(
RequestStatusCode::INTERNAL,
"unexpected CUDA copy flag set while GPU is not supported");
#endif // TRTIS_ENABLE_GPU
}

return Status::Success;
}

Expand Down Expand Up @@ -729,15 +752,25 @@ EnsembleScheduler::Enqueue(
std::function<void(const Status&)> OnComplete)
{
std::shared_ptr<EnsembleContext> context(new EnsembleContext(
is_, info_.get(), stats, request_provider, response_provider,
OnComplete));
is_, info_.get(), stats, request_provider, response_provider, OnComplete,
stream_));
EnsembleContext::Proceed(context);
}

EnsembleScheduler::EnsembleScheduler(
InferenceServer* const server, const ModelConfig& config)
: is_(server)
: is_(server), stream_(nullptr)
{
#ifdef TRTIS_ENABLE_GPU
// create CUDA stream
auto cuerr = cudaStreamCreate(&stream_);
if (cuerr != cudaSuccess) {
stream_ = nullptr;
LOG_ERROR << "unable to create stream for " << config.name() << ": "
<< cudaGetErrorString(cuerr);
}
#endif // TRTIS_ENABLE_GPU

// Set 'info_' based on 'config'
info_.reset(new EnsembleInfo());

Expand Down Expand Up @@ -783,4 +816,16 @@ EnsembleScheduler::EnsembleScheduler(
}
}

EnsembleScheduler::~EnsembleScheduler()
{
#ifdef TRTIS_ENABLE_GPU
if (stream_ != nullptr) {
cudaError_t err = cudaStreamDestroy(stream_);
if (err != cudaSuccess) {
LOG_ERROR << "Failed to destroy cuda stream: " << cudaGetErrorString(err);
}
}
#endif // TRTIS_ENABLE_GPU
}

}} // namespace nvidia::inferenceserver
13 changes: 13 additions & 0 deletions src/core/ensemble_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,16 @@
#include "src/core/scheduler.h"
#include "src/core/status.h"

#ifdef TRTIS_ENABLE_GPU
#include <cuda_runtime_api.h>
#endif // TRTIS_ENABLE_GPU

namespace nvidia { namespace inferenceserver {

#ifndef TRTIS_ENABLE_GPU
using cudaStream_t = void*;
#endif // TRTIS_ENABLE_GPU

class InferenceServer;

struct EnsembleInfo {
Expand Down Expand Up @@ -71,6 +79,8 @@ class EnsembleScheduler : public Scheduler {
InferenceServer* const server, const ModelConfig& config,
std::unique_ptr<Scheduler>* scheduler);

~EnsembleScheduler();

// \see Scheduler::Enqueue()
void Enqueue(
const std::shared_ptr<ModelInferStats>& stats,
Expand All @@ -85,6 +95,9 @@ class EnsembleScheduler : public Scheduler {

// Ensemble information that is built from model config
std::unique_ptr<EnsembleInfo> info_;

// The stream used for data transfer.
cudaStream_t stream_;
};

}} // namespace nvidia::inferenceserver
4 changes: 3 additions & 1 deletion src/core/provider.cc
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,9 @@ InferRequestProvider::GetNextInputContent(
return Status::Success;
}

if (!GetInputOverrideContent(name, content, content_byte_size)) {
if (GetInputOverrideContent(name, content, content_byte_size)) {
*memory_type = TRTSERVER_MEMORY_CPU;
} else {
const auto& pr = input_buffer_.find(name);
if (pr == input_buffer_.end()) {
return Status(
Expand Down
12 changes: 9 additions & 3 deletions src/core/provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,19 @@ class InferRequestProvider {

// Get the next contiguous chunk of bytes for the 'name'd
// input. Return a pointer to the chunk in 'content'.
// If there are no more bytes for the input return 'content' == nullptr.
// 'content_byte_size' acts as both input and output. On input
// 'content_byte_size' is a hint of the maximum chunk size that
// should be returned in 'content' and must be non-zero unless no
// additional input is expected. On return 'content_byte_size' gives
// the actual size of the chunk pointed to by 'content'. If there
// are no more bytes for the input return 'content' == nullptr. If
// 'force_contiguous' is true then the entire (remaining) input will
// the actual size of the chunk pointed to by 'content'.
// 'memory_type' acts as both input and output. On input 'memory_type'
// is the buffer memory type preferred by the function caller, it will
// not affect the function behavior, but it will be propagated to the
// buffer and the buffer owner may collect such information for other use.
// On return 'memory_type' gives the actual memory type of the chunk
// pointed to by 'content'.
// If 'force_contiguous' is true then the entire (remaining) input will
// be returned as a single chunk. In some cases this will require
// copying the data.
virtual Status GetNextInputContent(
Expand Down

0 comments on commit 5dd7870

Please sign in to comment.