diff --git a/examples/bls/async_model.py b/examples/bls/async_model.py index d070069d..ef287fdd 100644 --- a/examples/bls/async_model.py +++ b/examples/bls/async_model.py @@ -120,11 +120,11 @@ async def execute(self, requests): raise pb_utils.TritonModelException( infer_response.error().message()) - # Get the OUTPUT0 from the "pytorch" model inference resposne + # Get the OUTPUT0 from the "pytorch" model inference response pytorch_output0_tensor = pb_utils.get_output_tensor_by_name( inference_responses[0], "OUTPUT0") - # Get the OUTPUT1 from the "addsub" model inference resposne + # Get the OUTPUT1 from the "addsub" model inference response addsub_output1_tensor = pb_utils.get_output_tensor_by_name( inference_responses[1], "OUTPUT1") diff --git a/src/infer_response.cc b/src/infer_response.cc index facb2da0..f897307f 100644 --- a/src/infer_response.cc +++ b/src/infer_response.cc @@ -164,40 +164,68 @@ InferResponse::Error() } #ifndef TRITON_PB_STUB -TRITONSERVER_Error* +std::shared_ptr InferResponse::Send( TRITONBACKEND_ResponseFactory* response_factory, void* cuda_stream, - const uint32_t flags) + bool& requires_deferred_callback, const uint32_t flags, + std::unique_ptr& shm_pool, + std::vector, void*>>& output_buffers, + const std::set& requested_output_names, + TRITONBACKEND_Response* response) { - // [FIXME] Use this code to send responses in non-decoupled mode. - TRITONBACKEND_Response* response = nullptr; - TRITONSERVER_Error* response_error = nullptr; - ScopedDefer response_error_handling([&response, &response_error, flags, - response_factory] { - if (response != nullptr) { - LOG_IF_ERROR( - TRITONBACKEND_ResponseSend(response, flags, response_error), - "failed to send the response."); - if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { - std::unique_ptr< - TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter> - response_factory_ptr( - reinterpret_cast(response_factory)); - } - } - }); + std::shared_ptr response_error = + WrapTritonErrorInSharedPtr(nullptr); + std::unique_ptr response_error_handling; + requires_deferred_callback = false; + + // Should only destruct the response factory whenever a response factory is + // being created. + bool destruct_response_factor = (response == nullptr); + + if (response == nullptr) { + SET_ERROR_AND_RETURN( + response_error, + TRITONBACKEND_ResponseNewFromFactory(&response, response_factory)); + } - SET_ERROR_AND_RETURN( - response_error, - TRITONBACKEND_ResponseNewFromFactory(&response, response_factory)); + // This lambda expression will be called when this function exits, if the + // inference response doesn't have any GPU tensors. Otherwise, it will be + // called when the object is destructed or DeferredSendCallback is called. + response_error_handling = std::make_unique( + [response, response_error, flags, response_factory, + destruct_response_factor] { + if (response != nullptr) { + LOG_IF_ERROR( + TRITONBACKEND_ResponseSend(response, flags, *response_error), + "failed to send the response."); + if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL && + destruct_response_factor) { + std::unique_ptr< + TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter> + response_factory_ptr( + reinterpret_cast( + response_factory)); + } + } + }); + + // Moves the response sending callback so that it is not called until the stub + // process fills in the GPU buffers. + ScopedDefer deferred_task( + [this, &requires_deferred_callback, &response_error_handling] { + if (requires_deferred_callback) { + deferred_send_callback_ = std::move(response_error_handling); + } + }); if (HasError()) { - response_error = TRITONSERVER_ErrorNew( + *response_error = TRITONSERVER_ErrorNew( TRITONSERVER_ERROR_INTERNAL, Error()->Message().c_str()); return nullptr; } bool cuda_copy = false; + for (auto& output_tensor : OutputTensors()) { TRITONSERVER_MemoryType src_memory_type = output_tensor->MemoryType(); int64_t src_memory_type_id = output_tensor->MemoryTypeId(); @@ -205,12 +233,8 @@ InferResponse::Send( TRITONSERVER_MemoryType actual_memory_type = src_memory_type; int64_t actual_memory_type_id = src_memory_type_id; - // [FIXME] GPU tensors are not supported in the decoupled API mode. if (actual_memory_type == TRITONSERVER_MEMORY_GPU) { - response_error = TRITONSERVER_ErrorNew( - TRITONSERVER_ERROR_INTERNAL, - "GPU tensors are not supported in decoupled API."); - return response_error; + requires_deferred_callback = true; } TRITONBACKEND_Output* response_output; @@ -222,12 +246,61 @@ InferResponse::Send( output_tensor->Dims().data(), output_tensor->Dims().size())); void* buffer; - bool cuda_used = false; SET_ERROR_AND_RETURN( response_error, TRITONBACKEND_OutputBuffer( response_output, &buffer, output_tensor->ByteSize(), &actual_memory_type, &actual_memory_type_id)); + bool cuda_used = false; + TRITONSERVER_BufferAttributes* output_buffer_attributes; + SET_ERROR_AND_RETURN( + response_error, TRITONBACKEND_OutputBufferAttributes( + response_output, &output_buffer_attributes)); + + std::unique_ptr output_buffer; + if (src_memory_type == TRITONSERVER_MEMORY_GPU && + actual_memory_type == TRITONSERVER_MEMORY_GPU) { +#ifdef TRITON_ENABLE_GPU + cudaIpcMemHandle_t* cuda_ipc_mem_handle_p; + SET_ERROR_AND_RETURN( + response_error, + TRITONSERVER_BufferAttributesCudaIpcHandle( + output_buffer_attributes, + reinterpret_cast(&cuda_ipc_mem_handle_p))); + + if (cuda_ipc_mem_handle_p != nullptr) { + SET_ERROR_AND_RETURN_IF_EXCEPTION( + response_error, + output_buffer = PbMemory::Create( + shm_pool, actual_memory_type, actual_memory_type_id, + output_tensor->ByteSize(), reinterpret_cast(buffer), + false /* copy_gpu */)); + output_buffer->SetCudaIpcHandle(cuda_ipc_mem_handle_p); + } else { + SET_ERROR_AND_RETURN_IF_EXCEPTION( + response_error, + output_buffer = PbMemory::Create( + shm_pool, actual_memory_type, actual_memory_type_id, + output_tensor->ByteSize(), reinterpret_cast(buffer), + true /* copy_gpu */)); + } + output_buffers.push_back({std::move(output_buffer), buffer}); +#endif + } + + // When we requested a GPU buffer but received a CPU buffer. + if (src_memory_type == TRITONSERVER_MEMORY_GPU && + (actual_memory_type == TRITONSERVER_MEMORY_CPU || + actual_memory_type == TRITONSERVER_MEMORY_CPU_PINNED)) { + SET_ERROR_AND_RETURN_IF_EXCEPTION( + response_error, + output_buffer = PbMemory::Create( + shm_pool, actual_memory_type, actual_memory_type_id, + output_tensor->ByteSize(), nullptr /* data ptr */)); + + output_buffers.push_back({std::move(output_buffer), buffer}); + } + if (src_memory_type != TRITONSERVER_MEMORY_GPU) { SET_ERROR_AND_RETURN( response_error, @@ -251,4 +324,12 @@ InferResponse::Send( } #endif +#ifndef TRITON_PB_STUB +void +InferResponse::DeferredSendCallback() +{ + deferred_send_callback_.reset(); +} +#endif + }}} // namespace triton::backend::python diff --git a/src/infer_response.h b/src/infer_response.h index 69a22d04..5e994eaa 100644 --- a/src/infer_response.h +++ b/src/infer_response.h @@ -29,6 +29,7 @@ #include "pb_error.h" #include "pb_tensor.h" #include "pb_utils.h" +#include "scoped_defer.h" namespace triton { namespace backend { namespace python { @@ -44,11 +45,24 @@ struct ResponseShm { do { \ TRITONSERVER_Error* raasnie_err__ = (X); \ if (raasnie_err__ != nullptr) { \ - E = raasnie_err__; \ + *E = raasnie_err__; \ return E; \ } \ } while (false) +#define SET_ERROR_AND_RETURN_IF_EXCEPTION(E, X) \ + do { \ + try { \ + (X); \ + } \ + catch (const PythonBackendException& pb_exception) { \ + TRITONSERVER_Error* rarie_err__ = TRITONSERVER_ErrorNew( \ + TRITONSERVER_ERROR_INTERNAL, pb_exception.what()); \ + *E = rarie_err__; \ + return E; \ + } \ + } while (false) + class InferResponse { public: InferResponse( @@ -66,10 +80,19 @@ class InferResponse { bi::managed_external_buffer::handle_t ShmHandle(); #ifndef TRITON_PB_STUB - /// Send an inference response - TRITONSERVER_Error* Send( + /// Send an inference response. If the response has a GPU tensor, sending the + /// response needs to be done in two step. The boolean + /// 'requires_deferred_callback' indicates whether DeferredSendCallback method + /// should be called or not. + std::shared_ptr Send( TRITONBACKEND_ResponseFactory* response_factory, void* cuda_stream, - const uint32_t flags); + bool& requires_deferred_callback, const uint32_t flags, + std::unique_ptr& shm_pool, + std::vector, void*>>& output_buffers, + const std::set& requested_output_names = {}, + TRITONBACKEND_Response* response = nullptr); + + void DeferredSendCallback(); #endif // Disallow copying the inference response object. @@ -84,5 +107,7 @@ class InferResponse { std::shared_ptr error_; bi::managed_external_buffer::handle_t shm_handle_; AllocatedSharedMemory response_shm_; + std::vector, void*>> gpu_output_buffers_; + std::unique_ptr deferred_send_callback_; }; }}} // namespace triton::backend::python diff --git a/src/ipc_message.h b/src/ipc_message.h index eb89c57f..7cf9ede0 100644 --- a/src/ipc_message.h +++ b/src/ipc_message.h @@ -37,7 +37,7 @@ namespace bi = boost::interprocess; typedef enum PYTHONSTUB_commandtype_enum { PYTHONSTUB_ExecuteRequest, - PYTHONSTUB_ExecuteResposne, + PYTHONSTUB_ExecuteResponse, PYTHONSTUB_InitializeRequest, PYTHONSTUB_InitializeResponse, PYTHONSTUB_FinalizeRequest, diff --git a/src/pb_stub.cc b/src/pb_stub.cc index 37c7f10b..a57503b7 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -25,7 +25,6 @@ // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "pb_stub.h" - #include #include #include @@ -59,65 +58,6 @@ using namespace pybind11::literals; namespace bi = boost::interprocess; namespace triton { namespace backend { namespace python { -#define LOG_IF_EXCEPTION(X) \ - do { \ - try { \ - (X); \ - } \ - catch (const PythonBackendException& pb_exception) { \ - LOG_INFO << pb_exception.what(); \ - } \ - } while (false) - -#define LOG_EXCEPTION(E) \ - do { \ - LOG_INFO << E.what(); \ - } while (false) - -// Macros that use current filename and line number. -#define LOG_INFO LOG_INFO_FL(__FILE__, __LINE__) - -class Logger { - public: - // Log a message. - void Log(const std::string& msg) { std::cerr << msg << std::endl; } - - // Flush the log. - void Flush() { std::cerr << std::flush; } -}; - -Logger gLogger_; -class LogMessage { - public: - LogMessage(const char* file, int line) - { - std::string path(file); - size_t pos = path.rfind('/'); - if (pos != std::string::npos) { - path = path.substr(pos + 1, std::string::npos); - } - - struct timeval tv; - gettimeofday(&tv, NULL); - struct tm tm_time; - gmtime_r(((time_t*)&(tv.tv_sec)), &tm_time); - stream_ << std::setfill('0') << std::setw(2) << (tm_time.tm_mon + 1) - << std::setw(2) << tm_time.tm_mday << " " << std::setw(2) - << tm_time.tm_hour << ':' << std::setw(2) << tm_time.tm_min << ':' - << std::setw(2) << tm_time.tm_sec << "." << std::setw(6) - << tv.tv_usec << ' ' << static_cast(getpid()) << ' ' - << path << ':' << line << "] "; - } - - ~LogMessage() { gLogger_.Log(stream_.str()); } - - std::stringstream& stream() { return stream_; } - - private: - std::stringstream stream_; -}; - -#define LOG_INFO_FL(FN, LN) LogMessage((char*)(FN), LN).stream() std::atomic non_graceful_exit = {false}; @@ -458,52 +398,20 @@ Stub::LoadGPUBuffers(std::unique_ptr& ipc_message) return; } - // We need to hold the cpu_buffers until the main process makes a copy from - // them. - std::vector> cpu_buffers; std::vector> dst_buffers; - bool has_cpu_buffer = false; for (size_t i = 0; i < gpu_tensors_.size(); i++) { std::unique_ptr dst_buffer = PbMemory::LoadFromSharedMemory( shm_pool_, gpu_buffers_handle_shm[i], true /* open_cuda_handle */); - if (dst_buffer->MemoryType() == TRITONSERVER_MEMORY_CPU) { - has_cpu_buffer = true; - } dst_buffers.emplace_back(std::move(dst_buffer)); } - // Pop a dummy message from the stub message queue indicating that the parent - // has finished copying the tensors. - ScopedDefer _([this, has_cpu_buffer] { - if (has_cpu_buffer) { - stub_message_queue_->Pop(); - } - }); - ScopedDefer load_gpu_buffer_response( - [this, has_cpu_buffer] { parent_message_queue_->Push(DUMMY_MESSAGE); }); + [this] { parent_message_queue_->Push(DUMMY_MESSAGE); }); for (size_t i = 0; i < gpu_tensors_.size(); i++) { std::shared_ptr& src_buffer = gpu_tensors_[i]; - - // If the memory type is CPU, the buffer is empty and we need to create - // a buffer. - if (dst_buffers[i]->MemoryType() == TRITONSERVER_MEMORY_CPU) { - dst_buffers[i] = PbMemory::Create( - shm_pool_, dst_buffers[i]->MemoryType(), - dst_buffers[i]->MemoryTypeId(), src_buffer->ByteSize(), - nullptr /* buffer */); - - // Update the handle so that the main process can load it. - gpu_buffers_handle_shm[i] = dst_buffers[i]->ShmHandle(); - } - PbMemory::CopyBuffer(dst_buffers[i], src_buffer->Memory()); - - if (dst_buffers[i]->MemoryType() == TRITONSERVER_MEMORY_CPU) { - cpu_buffers.push_back(std::move(dst_buffers[i])); - } } gpu_tensors_.clear(); @@ -541,7 +449,7 @@ Stub::ProcessRequestsDecoupled(RequestBatch* request_batch_shm_ptr) LoadRequestsFromSharedMemory(request_batch_shm_ptr); std::unique_ptr execute_response = IPCMessage::Create(shm_pool_, false /* Inline response */); - execute_response->Command() = PYTHONSTUB_ExecuteResposne; + execute_response->Command() = PYTHONSTUB_ExecuteResponse; AllocatedSharedMemory response_batch = shm_pool_->Construct(); @@ -615,7 +523,7 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) { std::unique_ptr execute_response = IPCMessage::Create(shm_pool_, false /* Inline response */); - execute_response->Command() = PYTHONSTUB_ExecuteResposne; + execute_response->Command() = PYTHONSTUB_ExecuteResponse; AllocatedSharedMemory response_batch = shm_pool_->Construct( request_batch_shm_ptr->batch_size * @@ -693,7 +601,7 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) size_t response_size = py::len(responses); // If the number of request objects do not match the number of - // resposne objects throw an error. + // response objects throw an error. if (response_size != batch_size) { std::string err = "Number of InferenceResponse objects do not match the number " diff --git a/src/pb_stub.h b/src/pb_stub.h index e503b7fc..312c7ca0 100644 --- a/src/pb_stub.h +++ b/src/pb_stub.h @@ -24,6 +24,8 @@ // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#pragma once + #include #include #include @@ -31,6 +33,9 @@ #include #include #include +#include +#include +#include #include #include #include "infer_request.h" @@ -39,7 +44,6 @@ #include "message_queue.h" #include "pb_utils.h" -#pragma once namespace bi = boost::interprocess; namespace py = pybind11; @@ -47,6 +51,67 @@ using namespace pybind11::literals; namespace triton { namespace backend { namespace python { +#define LOG_IF_EXCEPTION(X) \ + do { \ + try { \ + (X); \ + } \ + catch (const PythonBackendException& pb_exception) { \ + LOG_INFO << pb_exception.what(); \ + } \ + } while (false) + +#define LOG_EXCEPTION(E) \ + do { \ + LOG_INFO << E.what(); \ + } while (false) + +// Macros that use current filename and line number. +#define LOG_INFO LOG_INFO_FL(__FILE__, __LINE__) + +class Logger { + public: + // Log a message. + void Log(const std::string& msg) { std::cerr << msg << std::endl; } + + // Flush the log. + void Flush() { std::cerr << std::flush; } +}; + +static Logger gLogger_; + +class LogMessage { + public: + LogMessage(const char* file, int line) + { + std::string path(file); + size_t pos = path.rfind('/'); + if (pos != std::string::npos) { + path = path.substr(pos + 1, std::string::npos); + } + + struct timeval tv; + gettimeofday(&tv, NULL); + struct tm tm_time; + gmtime_r(((time_t*)&(tv.tv_sec)), &tm_time); + stream_ << std::setfill('0') << std::setw(2) << (tm_time.tm_mon + 1) + << std::setw(2) << tm_time.tm_mday << " " << std::setw(2) + << tm_time.tm_hour << ':' << std::setw(2) << tm_time.tm_min << ':' + << std::setw(2) << tm_time.tm_sec << "." << std::setw(6) + << tv.tv_usec << ' ' << static_cast(getpid()) << ' ' + << path << ':' << line << "] "; + } + + ~LogMessage() { gLogger_.Log(stream_.str()); } + + std::stringstream& stream() { return stream_; } + + private: + std::stringstream stream_; +}; + +#define LOG_INFO_FL(FN, LN) LogMessage((char*)(FN), LN).stream() + class Stub { public: Stub(){}; diff --git a/src/pb_utils.cc b/src/pb_utils.cc index 4de37d33..db6f83a4 100644 --- a/src/pb_utils.cc +++ b/src/pb_utils.cc @@ -211,4 +211,23 @@ CUDAHandler::~CUDAHandler() noexcept(false) } } #endif + +#ifndef TRITON_PB_STUB +std::shared_ptr +WrapTritonErrorInSharedPtr(TRITONSERVER_Error* error) +{ + std::shared_ptr response_error( + new TRITONSERVER_Error*, [](TRITONSERVER_Error** error) { + if (error != nullptr && *error != nullptr) { + TRITONSERVER_ErrorDelete(*error); + } + + if (error != nullptr) { + delete error; + } + }); + *response_error = error; + return response_error; +} +#endif }}} // namespace triton::backend::python diff --git a/src/pb_utils.h b/src/pb_utils.h index 3d5cbb3d..5af7a9dd 100644 --- a/src/pb_utils.h +++ b/src/pb_utils.h @@ -153,6 +153,13 @@ struct ResponseSenderBase { struct ResponseSendMessage : ResponseSenderBase { bi::managed_external_buffer::handle_t response; + + // GPU Buffers handle + bi::managed_external_buffer::handle_t gpu_buffers_handle; + + // GPU buffers count + uint32_t gpu_buffers_count; + uint32_t flags; }; @@ -198,4 +205,9 @@ class CUDAHandler { }; #endif // TRITON_ENABLE_GPU +#ifndef TRITON_PB_STUB +std::shared_ptr WrapTritonErrorInSharedPtr( + TRITONSERVER_Error* error); +#endif + }}} // namespace triton::backend::python diff --git a/src/python.cc b/src/python.cc index 891304d4..93fb4927 100644 --- a/src/python.cc +++ b/src/python.cc @@ -353,7 +353,8 @@ class ModelInstanceState : public BackendModelInstance { // Set error for response send message void SetErrorForResponseSendMessage( - ResponseSendMessage* response_send_message, TRITONSERVER_Error* error, + ResponseSendMessage* response_send_message, + std::shared_ptr error, std::unique_ptr& error_message); TRITONSERVER_Error* SaveRequestsToSharedMemory( @@ -475,14 +476,15 @@ ModelInstanceState::ExistsInClosedRequests(intptr_t closed_request) void ModelInstanceState::SetErrorForResponseSendMessage( - ResponseSendMessage* response_send_message, TRITONSERVER_Error* error, + ResponseSendMessage* response_send_message, + std::shared_ptr error, std::unique_ptr& error_message) { - if (error != nullptr) { + if (error && *error != nullptr) { response_send_message->has_error = true; LOG_IF_EXCEPTION( error_message = - PbString::Create(shm_pool_, TRITONSERVER_ErrorMessage(error))); + PbString::Create(shm_pool_, TRITONSERVER_ErrorMessage(*error))); response_send_message->error = error_message->ShmHandle(); response_send_message->is_error_set = true; } @@ -862,7 +864,7 @@ ModelInstanceState::StartStubProcess() return TRITONSERVER_ErrorNew( TRITONSERVER_ERROR_INTERNAL, (std::string( - "Received unexpected resposne from Python backend stub: ") + + "Received unexpected response from Python backend stub: ") + name_) .c_str()); } @@ -1094,16 +1096,9 @@ ModelInstanceState::GetInputTensor( HostPolicyName().c_str()); } - ModelState* model_state = reinterpret_cast(Model()); bool cpu_only_tensors = model_state->ForceCPUOnlyInputTensors(); - if (!cpu_only_tensors && model_state->IsDecoupled()) { - return TRITONSERVER_ErrorNew( - TRITONSERVER_ERROR_INTERNAL, - "FORCE_CPU_ONLY_INPUT_TENSORS set to OFF is not yet supported in the " - "decoupled API."); - } if (input_dtype == TRITONSERVER_TYPE_BYTES) { cpu_only_tensors = true; } @@ -1147,12 +1142,9 @@ ModelInstanceState::GetInputTensor( input_name, input_buffer, input_byte_size, TRITONSERVER_MEMORY_CPU /* memory_type */, 0 /* memory_type_id */); } else { - bool cuda_used = false; - CopyBuffer( - "Failed to copy the output tensor to buffer.", src_memory_type, - src_memory_type_id, TRITONSERVER_MEMORY_CPU /* memory_type */, - 0 /* memory type id */, input_byte_size, src_ptr, input_buffer, - CudaStream(), &cuda_used); + size_t byte_size = input_byte_size; + RETURN_IF_ERROR(backend::ReadInputTensor( + request, input_name, input_buffer, &byte_size)); } } else { #ifdef TRITON_ENABLE_GPU @@ -1162,54 +1154,82 @@ ModelInstanceState::GetInputTensor( std::vector> alloc_perference; alloc_perference = {{TRITONSERVER_MEMORY_GPU, src_memory_type_id}}; + // collector is used in the non-decoupled mode. if (collector) { RETURN_IF_ERROR(collector->ProcessTensor( input_name, nullptr, 0, alloc_perference, reinterpret_cast(&buffer), &input_byte_size, &src_memory_type, &src_memory_type_id)); - } - - // If the tensor is using the cuda shared memory, we need to extract the - // handle that was used to create the device pointer. This is because of a - // limitation in the legacy CUDA IPC API that doesn't allow getting the - // handle of an exported pointer. If the cuda handle exists, it indicates - // that the cuda shared memory was used and the input is in a single buffer. - // [FIXME] for the case where the input is in cuda shared memory and uses - // multiple input buffers this needs to be changed. - TRITONSERVER_BufferAttributes* buffer_attributes; - // This value is not used. - const void* buffer_p; - RETURN_IF_ERROR(TRITONBACKEND_InputBufferAttributes( - in, 0, &buffer_p, &buffer_attributes)); + // If the tensor is using the cuda shared memory, we need to extract the + // handle that was used to create the device pointer. This is because of a + // limitation in the legacy CUDA IPC API that doesn't allow getting the + // handle of an exported pointer. If the cuda handle exists, it indicates + // that the cuda shared memory was used and the input is in a single + // buffer. + // [FIXME] For the case where the input is in cuda shared memory and uses + // multiple input buffers this needs to be changed. + TRITONSERVER_BufferAttributes* buffer_attributes; + + // This value is not used. + const void* buffer_p; + RETURN_IF_ERROR(TRITONBACKEND_InputBufferAttributes( + in, 0, &buffer_p, &buffer_attributes)); - if (collector) { input_tensor = std::make_shared( std::string(input_name), std::vector(input_shape, input_shape + input_dims_count), input_dtype, src_memory_type, src_memory_type_id, const_cast(buffer), input_byte_size, nullptr /* DLManagedTensor */); + + cudaIpcMemHandle_t* cuda_ipc_handle; + RETURN_IF_ERROR(TRITONSERVER_BufferAttributesCudaIpcHandle( + buffer_attributes, reinterpret_cast(&cuda_ipc_handle))); + if (cuda_ipc_handle != nullptr) { + RETURN_IF_EXCEPTION( + input_tensor->SaveToSharedMemory(shm_pool_, false /* copy_gpu */)); + RETURN_IF_EXCEPTION( + input_tensor->Memory()->SetCudaIpcHandle(cuda_ipc_handle)); + } else { + RETURN_IF_EXCEPTION( + input_tensor->SaveToSharedMemory(shm_pool_, true /* copy_gpu */)); + } } else { + void* dev_ptr; + RETURN_IF_CUDA_ERROR( + cudaMalloc(&dev_ptr, input_byte_size), TRITONSERVER_ERROR_INTERNAL, + std::string("Failed to allocated CUDA memory")); + + size_t byte_size = input_byte_size; + + bool cuda_used = false; + RETURN_IF_ERROR(backend::ReadInputTensor( + request, input_name, reinterpret_cast(dev_ptr), &byte_size, + TRITONSERVER_MEMORY_GPU, src_memory_type_id, CudaStream(), + &cuda_used)); + + if (cuda_used) { +#ifdef TRITON_ENABLE_GPU + cudaStreamSynchronize(stream_); +#endif + } + input_tensor = std::make_shared( std::string(input_name), std::vector(input_shape, input_shape + input_dims_count), input_dtype, src_memory_type, src_memory_type_id, - const_cast(buffer_p), input_byte_size, + const_cast(dev_ptr), input_byte_size, nullptr /* DLManagedTensor */); - } - cudaIpcMemHandle_t* cuda_ipc_handle; - RETURN_IF_ERROR(TRITONSERVER_BufferAttributesCudaIpcHandle( - buffer_attributes, reinterpret_cast(&cuda_ipc_handle))); - if (cuda_ipc_handle != nullptr) { - RETURN_IF_EXCEPTION( - input_tensor->SaveToSharedMemory(shm_pool_, false /* copy_gpu */)); - RETURN_IF_EXCEPTION( - input_tensor->Memory()->SetCudaIpcHandle(cuda_ipc_handle)); - } else { RETURN_IF_EXCEPTION( input_tensor->SaveToSharedMemory(shm_pool_, true /* copy_gpu */)); + + std::unique_ptr gpu_memory_record = + std::make_unique(input_tensor->Memory()->DataPtr()); + uint64_t memory_release_id = + memory_manager_->AddRecord(std::move(gpu_memory_record)); + input_tensor->Memory()->SetMemoryReleaseId(memory_release_id); } #else return TRITONSERVER_ErrorNew( @@ -1410,7 +1430,7 @@ ModelInstanceState::DecoupledMessageQueueMonitor() // Need to notify the model instance thread that the execute response has // been received. - if (message->Command() == PYTHONSTUB_ExecuteResposne) { + if (message->Command() == PYTHONSTUB_ExecuteResponse) { std::lock_guard guard{mu_}; received_message_ = std::move(message); cv_.notify_one(); @@ -1468,13 +1488,76 @@ ModelInstanceState::ResponseSendDecoupled( InferResponse::LoadFromSharedMemory( shm_pool_, send_message_payload->response, false /* open cuda ipc handle */); - TRITONSERVER_Error* error = infer_response->Send( - response_factory, CudaStream(), send_message_payload->flags); + + bool requires_deferred_callback = false; + std::vector, void*>> gpu_output_buffers; + std::shared_ptr error = infer_response->Send( + response_factory, CudaStream(), requires_deferred_callback, + send_message_payload->flags, shm_pool_, gpu_output_buffers); SetErrorForResponseSendMessage(send_message_payload, error, error_message); + + if (requires_deferred_callback) { + AllocatedSharedMemory gpu_buffers_handle = + shm_pool_->Construct( + sizeof(uint64_t) + + gpu_output_buffers.size() * + sizeof(bi::managed_external_buffer::handle_t)); + uint64_t* gpu_buffer_count = + reinterpret_cast(gpu_buffers_handle.data_.get()); + *gpu_buffer_count = gpu_output_buffers.size(); + bi::managed_external_buffer::handle_t* gpu_buffers_handle_shm = + reinterpret_cast( + gpu_buffers_handle.data_.get() + sizeof(uint64_t)); + send_message_payload->gpu_buffers_handle = gpu_buffers_handle.handle_; + + size_t index = 0; + for (auto& output_buffer_pair : gpu_output_buffers) { + std::unique_ptr& pb_memory = output_buffer_pair.first; + gpu_buffers_handle_shm[index] = pb_memory->ShmHandle(); + ++index; + } + + // Additional round trip so that the stub can fill the GPU output buffers. + { + bi::scoped_lock guard{send_message_payload->mu}; + send_message_payload->is_stub_turn = true; + send_message_payload->cv.notify_all(); + + while (send_message_payload->is_stub_turn) { + send_message_payload->cv.wait(guard); + } + } + + index = 0; + bool cuda_copy = false; + for (auto& output_buffer_pair : gpu_output_buffers) { + auto& pb_memory = output_buffer_pair.first; + + if (pb_memory->MemoryType() == TRITONSERVER_MEMORY_CPU) { + bool cuda_used; + void* pointer = output_buffer_pair.second; + + CopyBuffer( + "Failed to copy the output tensor to buffer.", + TRITONSERVER_MEMORY_CPU, 0, TRITONSERVER_MEMORY_CPU, 0, + pb_memory->ByteSize(), pb_memory->DataPtr(), pointer, + CudaStream(), &cuda_used); + cuda_copy |= cuda_used; + } + gpu_buffers_handle_shm[index] = pb_memory->ShmHandle(); + ++index; +#ifdef TRITON_ENABLE_GPU + if (cuda_copy) { + cudaStreamSynchronize(stream_); + } +#endif // TRITON_ENABLE_GPU + } + } } else { TRITONSERVER_Error* error = TRITONBACKEND_ResponseFactorySendFlags( response_factory, send_message_payload->flags); - SetErrorForResponseSendMessage(send_message_payload, error, error_message); + SetErrorForResponseSendMessage( + send_message_payload, WrapTritonErrorInSharedPtr(error), error_message); if (send_message_payload->flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { std::unique_ptr< @@ -1717,9 +1800,11 @@ ModelInstanceState::ProcessRequests( // If the output provided by the model is in GPU, we will pass the list of // buffers provided by Triton to the stub process. bool has_gpu_output = false; + std::vector requires_deferred_callback; - // GPU output buffers - std::vector, std::pair>> + std::vector> shm_responses; + std::unordered_map< + uint32_t, std::vector, void*>>> gpu_output_buffers; for (uint32_t r = 0; r < request_count; ++r) { @@ -1728,7 +1813,8 @@ ModelInstanceState::ProcessRequests( TRITONBACKEND_Request* request = requests[r]; uint32_t requested_output_count = 0; - std::unique_ptr infer_response; + shm_responses.emplace_back(nullptr); + std::unique_ptr& infer_response = shm_responses.back(); try { infer_response = InferResponse::LoadFromSharedMemory( shm_pool_, response_shm_handle[r], false /* open_cuda_handle */); @@ -1764,7 +1850,6 @@ ModelInstanceState::ProcessRequests( responses, r, TRITONBACKEND_RequestOutputCount(request, &requested_output_count)); - bool cuda_copy = false; std::set requested_output_names; for (size_t j = 0; j < requested_output_count; ++j) { const char* output_name; @@ -1774,106 +1859,24 @@ ModelInstanceState::ProcessRequests( requested_output_names.insert(output_name); } - for (auto& output_tensor : infer_response->OutputTensors()) { - if (requested_output_names.find(output_tensor->Name()) == - requested_output_names.end()) { - continue; - } - - TRITONSERVER_MemoryType src_memory_type = output_tensor->MemoryType(); - int64_t src_memory_type_id = output_tensor->MemoryTypeId(); - - TRITONSERVER_MemoryType actual_memory_type = src_memory_type; - int64_t actual_memory_type_id = src_memory_type_id; - - if (actual_memory_type == TRITONSERVER_MEMORY_GPU) - has_gpu_output = true; - - TRITONBACKEND_Output* response_output; - GUARDED_RESPOND_IF_ERROR( - responses, r, - TRITONBACKEND_ResponseOutput( - response, &response_output, output_tensor->Name().c_str(), - static_cast(output_tensor->TritonDtype()), - output_tensor->Dims().data(), output_tensor->Dims().size())); - - void* buffer; - bool cuda_used = false; - GUARDED_RESPOND_IF_ERROR( - responses, r, - TRITONBACKEND_OutputBuffer( - response_output, &buffer, output_tensor->ByteSize(), - &actual_memory_type, &actual_memory_type_id)); - - TRITONSERVER_BufferAttributes* output_buffer_attributes; - GUARDED_RESPOND_IF_ERROR( - responses, r, - TRITONBACKEND_OutputBufferAttributes( - response_output, &output_buffer_attributes)); + bool require_deferred_callback = false; - std::unique_ptr output_buffer; - if (src_memory_type == TRITONSERVER_MEMORY_GPU && - actual_memory_type == TRITONSERVER_MEMORY_GPU) { - if ((*responses)[r] != nullptr) { -#ifdef TRITON_ENABLE_GPU - cudaIpcMemHandle_t* cuda_ipc_mem_handle_p; - GUARDED_RESPOND_IF_ERROR( - responses, r, - TRITONSERVER_BufferAttributesCudaIpcHandle( - output_buffer_attributes, - reinterpret_cast(&cuda_ipc_mem_handle_p))); - - if (cuda_ipc_mem_handle_p != nullptr) { - GUARDED_RESPOND_IF_EXCEPTION( - responses, r, - output_buffer = PbMemory::Create( - shm_pool_, actual_memory_type, actual_memory_type_id, - output_tensor->ByteSize(), reinterpret_cast(buffer), - false /* copy_gpu */)); - output_buffer->SetCudaIpcHandle(cuda_ipc_mem_handle_p); - } else { - GUARDED_RESPOND_IF_EXCEPTION( - responses, r, - output_buffer = PbMemory::Create( - shm_pool_, actual_memory_type, actual_memory_type_id, - output_tensor->ByteSize(), reinterpret_cast(buffer), - true /* copy_gpu */)); - } - gpu_output_buffers.push_back({std::move(output_buffer), {buffer, r}}); -#endif - } - } + gpu_output_buffers[r] = + std::vector, void*>>{}; + std::shared_ptr error = infer_response->Send( + nullptr, CudaStream(), require_deferred_callback, + TRITONSERVER_RESPONSE_COMPLETE_FINAL, shm_pool_, gpu_output_buffers[r], + requested_output_names, response); + GUARDED_RESPOND_IF_ERROR(responses, r, *error); - // When we requested a GPU buffer but received a CPU buffer. - if (src_memory_type == TRITONSERVER_MEMORY_GPU && - (actual_memory_type == TRITONSERVER_MEMORY_CPU || - actual_memory_type == TRITONSERVER_MEMORY_CPU_PINNED)) { - GUARDED_RESPOND_IF_EXCEPTION( - responses, r, - output_buffer = PbMemory::Create( - shm_pool_, actual_memory_type, actual_memory_type_id, - 0 /* byte size */, nullptr /* data ptr */)); - - gpu_output_buffers.push_back({std::move(output_buffer), {buffer, r}}); - } + // Error object will be deleted by the GUARDED_RESPOND macro + *error = nullptr; + error.reset(); - if (src_memory_type != TRITONSERVER_MEMORY_GPU) { - GUARDED_RESPOND_IF_ERROR( - responses, r, - CopyBuffer( - "Failed to copy the output tensor to buffer.", src_memory_type, - src_memory_type_id, actual_memory_type, actual_memory_type_id, - output_tensor->ByteSize(), output_tensor->DataPtr(), buffer, - CudaStream(), &cuda_used)); - } - - cuda_copy |= cuda_used; - } -#ifdef TRITON_ENABLE_GPU - if (cuda_copy) { - cudaStreamSynchronize(stream_); + if (require_deferred_callback) { + has_gpu_output = true; } -#endif // TRITON_ENABLE_GPU + requires_deferred_callback.push_back(require_deferred_callback); } // Finalize the execute. @@ -1882,18 +1885,26 @@ ModelInstanceState::ProcessRequests( // If the output tensor is in GPU, there will be a second round trip // required for filling the GPU buffers provided by the main process. if (has_gpu_output) { + size_t total_gpu_buffers_count = 0; + for (auto& gpu_output_buffer : gpu_output_buffers) { + total_gpu_buffers_count += gpu_output_buffer.second.size(); + } AllocatedSharedMemory gpu_buffers_handle = shm_pool_->Construct( - sizeof(uint64_t) + gpu_output_buffers.size() * + sizeof(uint64_t) + total_gpu_buffers_count * sizeof(bi::managed_external_buffer::handle_t)); uint64_t* gpu_buffer_count = reinterpret_cast(gpu_buffers_handle.data_.get()); - *gpu_buffer_count = gpu_output_buffers.size(); + *gpu_buffer_count = total_gpu_buffers_count; bi::managed_external_buffer::handle_t* gpu_buffers_handle_shm = reinterpret_cast( gpu_buffers_handle.data_.get() + sizeof(uint64_t)); - for (size_t i = 0; i < gpu_output_buffers.size(); i++) { - gpu_buffers_handle_shm[i] = gpu_output_buffers[i].first->ShmHandle(); + size_t index = 0; + for (auto& gpu_output_buffer : gpu_output_buffers) { + for (auto& buffer_memory_pair : gpu_output_buffer.second) { + gpu_buffers_handle_shm[index] = buffer_memory_pair.first->ShmHandle(); + ++index; + } } ipc_message->Command() = PYTHONSTUB_CommandType::PYTHONSTUB_LoadGPUBuffers; @@ -1904,50 +1915,40 @@ ModelInstanceState::ProcessRequests( bool cuda_copy = false; - // CPU tensors require an additional notification to the stub process. - // This is to ask the stub process to release the tensor. - bool has_cpu_tensor = false; - for (size_t i = 0; i < gpu_output_buffers.size(); i++) { - std::unique_ptr& memory = gpu_output_buffers[i].first; - if (memory->MemoryType() == TRITONSERVER_MEMORY_CPU) { - bool cuda_used; - has_cpu_tensor = true; - std::unique_ptr pb_cpu_memory = - PbMemory::LoadFromSharedMemory( - shm_pool_, gpu_buffers_handle_shm[i], - false /* open cuda handle */); - uint32_t response_index = gpu_output_buffers[i].second.second; - void* pointer = gpu_output_buffers[i].second.first; - - GUARDED_RESPOND_IF_ERROR( - responses, response_index, - CopyBuffer( - "Failed to copy the output tensor to buffer.", - TRITONSERVER_MEMORY_CPU, 0, TRITONSERVER_MEMORY_CPU, 0, - pb_cpu_memory->ByteSize(), pb_cpu_memory->DataPtr(), pointer, - CudaStream(), &cuda_used)); - cuda_copy |= cuda_used; - } - } - - if (has_cpu_tensor) { - stub_message_queue_->Push(DUMMY_MESSAGE); - } + index = 0; + for (auto& gpu_output_buffer : gpu_output_buffers) { + for (auto& buffer_memory_pair : gpu_output_buffer.second) { + auto& pb_memory = buffer_memory_pair.first; + if (pb_memory->MemoryType() == TRITONSERVER_MEMORY_CPU) { + bool cuda_used; + uint32_t response_index = gpu_output_buffer.first; + void* pointer = buffer_memory_pair.second; + GUARDED_RESPOND_IF_ERROR( + responses, response_index, + CopyBuffer( + "Failed to copy the output tensor to buffer.", + TRITONSERVER_MEMORY_CPU, 0, TRITONSERVER_MEMORY_CPU, 0, + pb_memory->ByteSize(), pb_memory->DataPtr(), pointer, + CudaStream(), &cuda_used)); + cuda_copy |= cuda_used; + } + gpu_buffers_handle_shm[index] = pb_memory->ShmHandle(); + ++index; + } #ifdef TRITON_ENABLE_GPU - if (cuda_copy) { - cudaStreamSynchronize(stream_); - } + if (cuda_copy) { + cudaStreamSynchronize(stream_); + } #endif // TRITON_ENABLE_GPU + } } bls_defer.Complete(); for (uint32_t r = 0; r < request_count; ++r) { - // If error happens at this stage, we can only log it - GUARDED_RESPOND_IF_ERROR( - responses, r, - TRITONBACKEND_ResponseSend( - (*responses)[r], TRITONSERVER_RESPONSE_COMPLETE_FINAL, nullptr)); + if (requires_deferred_callback[r]) { + shm_responses[r]->DeferredSendCallback(); + } } uint64_t exec_end_ns = 0; @@ -2457,13 +2458,13 @@ TRITONBACKEND_ModelInstanceExecute( TRITONBACKEND_Response* response = nullptr; LOG_IF_ERROR( TRITONBACKEND_ResponseNew(&response, request), - "Failed to create a new resposne."); + "Failed to create a new response."); if (response != nullptr) { LOG_IF_ERROR( TRITONBACKEND_ResponseSend( response, TRITONSERVER_RESPONSE_COMPLETE_FINAL, error), - "Failed to send the error resposne."); + "Failed to send the error response."); } } } diff --git a/src/request_executor.cc b/src/request_executor.cc index c338defc..5118c8bf 100644 --- a/src/request_executor.cc +++ b/src/request_executor.cc @@ -306,7 +306,7 @@ RequestExecutor::Infer( if (response != nullptr) { LOG_IF_ERROR( TRITONSERVER_InferenceResponseDelete(response), - "Failed to delete inference resposne."); + "Failed to delete inference response."); *triton_response = nullptr; } diff --git a/src/response_sender.cc b/src/response_sender.cc index 44b5de7f..e8394df9 100644 --- a/src/response_sender.cc +++ b/src/response_sender.cc @@ -28,6 +28,7 @@ #include #include #include "pb_stub.h" +#include "pb_stub_utils.h" #include "scoped_defer.h" namespace triton { namespace backend { namespace python { @@ -67,17 +68,6 @@ ResponseSender::Send( "set to zero."); } - if (infer_response) { - for (auto& tensor : infer_response->OutputTensors()) { - if (!tensor->IsCPU()) { - throw PythonBackendException( - "Tensor '" + tensor->Name() + - "' is stored in GPU. GPU tensors are not supported yet in the " - "decoupled response sender."); - } - } - } - std::unique_ptr& stub = Stub::GetOrCreateInstance(); AllocatedSharedMemory response_send_message = @@ -112,10 +102,9 @@ ResponseSender::Send( ipc_message->Command() = PYTHONSTUB_ResponseSend; ipc_message->Args() = response_send_message.handle_; - ScopedDefer _([&send_message_payload] { + ScopedDefer _([send_message_payload] { { bi::scoped_lock guard{send_message_payload->mu}; - send_message_payload->is_stub_turn = false; send_message_payload->cv.notify_all(); } @@ -129,6 +118,57 @@ ResponseSender::Send( } } + bool has_gpu_output = false; + std::vector> gpu_tensors; + if (infer_response) { + for (auto& tensor : infer_response->OutputTensors()) { + if (!tensor->IsCPU()) { + has_gpu_output = true; + gpu_tensors.push_back(tensor); + } + } + } + + if (has_gpu_output) { + AllocatedSharedMemory gpu_buffers_handle = + shm_pool_->Load(send_message_payload->gpu_buffers_handle); + + bi::managed_external_buffer::handle_t* gpu_buffers_handle_shm = + reinterpret_cast( + gpu_buffers_handle.data_.get() + sizeof(uint64_t)); + uint64_t* gpu_buffer_count = + reinterpret_cast(gpu_buffers_handle.data_.get()); + if (gpu_tensors.size() != *gpu_buffer_count) { + LOG_INFO + << (std::string( + "GPU buffers size does not match the provided buffers: ") + + std::to_string(gpu_tensors.size()) + + " != " + std::to_string(*gpu_buffer_count)); + return; + } + + std::vector> dst_buffers; + + for (size_t i = 0; i < gpu_tensors.size(); i++) { + std::unique_ptr dst_buffer = PbMemory::LoadFromSharedMemory( + shm_pool_, gpu_buffers_handle_shm[i], true /* open_cuda_handle */); + dst_buffers.emplace_back(std::move(dst_buffer)); + std::shared_ptr& src_buffer = gpu_tensors[i]; + PbMemory::CopyBuffer(dst_buffers[i], src_buffer->Memory()); + } + + { + bi::scoped_lock guard{send_message_payload->mu}; + send_message_payload->is_stub_turn = false; + send_message_payload->cv.notify_one(); + while (!send_message_payload->is_stub_turn) { + // Wait for the stub process to send the response and populate error + // message if any. + send_message_payload->cv.wait(guard); + } + } + } + if (send_message_payload->has_error) { if (send_message_payload->is_error_set) { std::unique_ptr error = PbString::LoadFromSharedMemory( @@ -140,5 +180,4 @@ ResponseSender::Send( } } } - }}} // namespace triton::backend::python