diff --git a/src/backends/custom/custom_backend.cc b/src/backends/custom/custom_backend.cc index 18e6171a65..2b7df8d53b 100644 --- a/src/backends/custom/custom_backend.cc +++ b/src/backends/custom/custom_backend.cc @@ -465,7 +465,7 @@ CustomBackend::Context::GetNextInput( // If the memory type is on GPU, implicitly copying it to CPU memory // to ensure backward capability - TRTSERVER_Memory_Type src_memory_type; + auto src_memory_type = TRTSERVER_MEMORY_CPU; Status status = payload->request_provider_->GetNextInputContent( name, content, content_byte_size, &src_memory_type, false); #ifdef TRTIS_ENABLE_GPU diff --git a/src/backends/tensorflow/base_backend.cc b/src/backends/tensorflow/base_backend.cc index 8b0252b738..35db8a7a22 100644 --- a/src/backends/tensorflow/base_backend.cc +++ b/src/backends/tensorflow/base_backend.cc @@ -299,7 +299,7 @@ SetStringInputTensor( // For string data type, we always need to copy the data to CPU so that // we can read string length and construct the string properly. - TRTSERVER_Memory_Type src_memory_type; + auto src_memory_type = TRTSERVER_MEMORY_CPU; const void* vcontent; size_t content_byte_size = expected_element_cnt * sizeof(uint32_t); payload.status_ = payload.request_provider_->GetNextInputContent( diff --git a/src/core/backend_context.cc b/src/core/backend_context.cc index a5bd011e6b..855c1500d1 100644 --- a/src/core/backend_context.cc +++ b/src/core/backend_context.cc @@ -92,7 +92,7 @@ BackendContext::SetInputBuffer( size_t copied_byte_size = 0; while (payload.status_.IsOk()) { - TRTSERVER_Memory_Type src_memory_type; + auto src_memory_type = dst_memory_type; const void* content; size_t content_byte_size = expected_byte_size - copied_byte_size; payload.status_ = payload.request_provider_->GetNextInputContent( diff --git a/src/core/ensemble_scheduler.cc b/src/core/ensemble_scheduler.cc index afca46e7e9..6fbb751b38 100644 --- a/src/core/ensemble_scheduler.cc +++ b/src/core/ensemble_scheduler.cc @@ -34,10 +34,6 @@ #include "src/core/server.h" #include "src/core/server_status.h" -#ifdef TRTIS_ENABLE_GPU -#include -#endif // TRTIS_ENABLE_GPU - namespace nvidia { namespace inferenceserver { namespace { @@ -73,7 +69,7 @@ class EnsembleContext { const std::shared_ptr& stats, const std::shared_ptr& request_provider, const std::shared_ptr& response_provider, - std::function OnComplete); + std::function OnComplete, cudaStream_t stream); // Perform transition on 'context' state given the information of // 'completed_step' @@ -144,6 +140,10 @@ class EnsembleContext { EnsembleInfo* info_; + // All EnsembleContext will use the same CUDA stream managed by + // the ensemble scheduler + cudaStream_t stream_; + // Mutex to avoid concurrent call on 'PrepareSteps' where ensemble state // are being modified std::mutex mutex_; @@ -183,9 +183,9 @@ EnsembleContext::EnsembleContext( const std::shared_ptr& stats, const std::shared_ptr& request_provider, const std::shared_ptr& response_provider, - std::function OnComplete) - : is_(is), info_(info), inflight_step_counter_(0), stats_(stats), - request_provider_(request_provider), + std::function OnComplete, cudaStream_t stream) + : is_(is), info_(info), stream_(stream), inflight_step_counter_(0), + stats_(stats), request_provider_(request_provider), response_provider_(response_provider), OnComplete_(OnComplete), allocator_(nullptr, TRTSERVER_ResponseAllocatorDelete) { @@ -567,6 +567,7 @@ EnsembleContext::FinishEnsemble() Status EnsembleContext::CheckAndSetEnsembleOutput() { + bool cuda_async_copy = false; for (const auto& output_pair : info_->ensemble_output_shape_) { if (!response_provider_->RequiresOutput(output_pair.first)) { continue; @@ -646,9 +647,20 @@ EnsembleContext::CheckAndSetEnsembleOutput() } else { copy_type = cudaMemcpyDeviceToHost; } - // [TODO] create stream for EnsembleContext for using async call - cudaError_t err = cudaMemcpy( - ((char*)buffer) + content_offset, content, content_size, copy_type); + + cudaError_t err; + // use default stream if no CUDA stream is created for the ensemble + if (stream_ == nullptr) { + err = cudaMemcpy( + ((char*)buffer) + content_offset, content, content_size, + copy_type); + } else { + err = cudaMemcpyAsync( + ((char*)buffer) + content_offset, content, content_size, + copy_type, stream_); + cuda_async_copy |= (err == cudaSuccess); + } + if (err != cudaSuccess) { return Status( RequestStatusCode::INTERNAL, @@ -657,9 +669,9 @@ EnsembleContext::CheckAndSetEnsembleOutput() } #else return Status( - RequestStatusCode::INTERNAL, - "try to use CUDA copy for output '" + output_pair.first + - "' while GPU is not supported")); + RequestStatusCode::INTERNAL, "try to use CUDA copy for output '" + + output_pair.first + + "' while GPU is not supported"); #endif // TRTIS_ENABLE_GPU } @@ -669,6 +681,17 @@ EnsembleContext::CheckAndSetEnsembleOutput() memory_block->BufferAt(content_idx, &content_size, &src_memory_type); } } + + if (cuda_async_copy) { +#ifdef TRTIS_ENABLE_GPU + cudaStreamSynchronize(stream_); +#else + return Status( + RequestStatusCode::INTERNAL, + "unexpected CUDA copy flag set while GPU is not supported"); +#endif // TRTIS_ENABLE_GPU + } + return Status::Success; } @@ -729,15 +752,25 @@ EnsembleScheduler::Enqueue( std::function OnComplete) { std::shared_ptr context(new EnsembleContext( - is_, info_.get(), stats, request_provider, response_provider, - OnComplete)); + is_, info_.get(), stats, request_provider, response_provider, OnComplete, + stream_)); EnsembleContext::Proceed(context); } EnsembleScheduler::EnsembleScheduler( InferenceServer* const server, const ModelConfig& config) - : is_(server) + : is_(server), stream_(nullptr) { +#ifdef TRTIS_ENABLE_GPU + // create CUDA stream + auto cuerr = cudaStreamCreate(&stream_); + if (cuerr != cudaSuccess) { + stream_ = nullptr; + LOG_ERROR << "unable to create stream for " << config.name() << ": " + << cudaGetErrorString(cuerr); + } +#endif // TRTIS_ENABLE_GPU + // Set 'info_' based on 'config' info_.reset(new EnsembleInfo()); @@ -783,4 +816,16 @@ EnsembleScheduler::EnsembleScheduler( } } +EnsembleScheduler::~EnsembleScheduler() +{ +#ifdef TRTIS_ENABLE_GPU + if (stream_ != nullptr) { + cudaError_t err = cudaStreamDestroy(stream_); + if (err != cudaSuccess) { + LOG_ERROR << "Failed to destroy cuda stream: " << cudaGetErrorString(err); + } + } +#endif // TRTIS_ENABLE_GPU +} + }} // namespace nvidia::inferenceserver diff --git a/src/core/ensemble_scheduler.h b/src/core/ensemble_scheduler.h index 3d5d9f3931..832f64e81f 100644 --- a/src/core/ensemble_scheduler.h +++ b/src/core/ensemble_scheduler.h @@ -32,8 +32,16 @@ #include "src/core/scheduler.h" #include "src/core/status.h" +#ifdef TRTIS_ENABLE_GPU +#include +#endif // TRTIS_ENABLE_GPU + namespace nvidia { namespace inferenceserver { +#ifndef TRTIS_ENABLE_GPU +using cudaStream_t = void*; +#endif // TRTIS_ENABLE_GPU + class InferenceServer; struct EnsembleInfo { @@ -71,6 +79,8 @@ class EnsembleScheduler : public Scheduler { InferenceServer* const server, const ModelConfig& config, std::unique_ptr* scheduler); + ~EnsembleScheduler(); + // \see Scheduler::Enqueue() void Enqueue( const std::shared_ptr& stats, @@ -85,6 +95,9 @@ class EnsembleScheduler : public Scheduler { // Ensemble information that is built from model config std::unique_ptr info_; + + // The stream used for data transfer. + cudaStream_t stream_; }; }} // namespace nvidia::inferenceserver diff --git a/src/core/provider.cc b/src/core/provider.cc index 055b9acf8b..a32485cbf6 100644 --- a/src/core/provider.cc +++ b/src/core/provider.cc @@ -218,7 +218,9 @@ InferRequestProvider::GetNextInputContent( return Status::Success; } - if (!GetInputOverrideContent(name, content, content_byte_size)) { + if (GetInputOverrideContent(name, content, content_byte_size)) { + *memory_type = TRTSERVER_MEMORY_CPU; + } else { const auto& pr = input_buffer_.find(name); if (pr == input_buffer_.end()) { return Status( diff --git a/src/core/provider.h b/src/core/provider.h index 6bc8fdbe6a..08d38a10b8 100644 --- a/src/core/provider.h +++ b/src/core/provider.h @@ -131,13 +131,19 @@ class InferRequestProvider { // Get the next contiguous chunk of bytes for the 'name'd // input. Return a pointer to the chunk in 'content'. + // If there are no more bytes for the input return 'content' == nullptr. // 'content_byte_size' acts as both input and output. On input // 'content_byte_size' is a hint of the maximum chunk size that // should be returned in 'content' and must be non-zero unless no // additional input is expected. On return 'content_byte_size' gives - // the actual size of the chunk pointed to by 'content'. If there - // are no more bytes for the input return 'content' == nullptr. If - // 'force_contiguous' is true then the entire (remaining) input will + // the actual size of the chunk pointed to by 'content'. + // 'memory_type' acts as both input and output. On input 'memory_type' + // is the buffer memory type preferred by the function caller, it will + // not affect the function behavior, but it will be propagated to the + // buffer and the buffer owner may collect such information for other use. + // On return 'memory_type' gives the actual memory type of the chunk + // pointed to by 'content'. + // If 'force_contiguous' is true then the entire (remaining) input will // be returned as a single chunk. In some cases this will require // copying the data. virtual Status GetNextInputContent(