Skip to content

Commit

Permalink
Add GPU tensor support to decoupled API (triton-inference-server#154)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tabrizian authored May 21, 2022
1 parent 2b98842 commit 92245a7
Show file tree
Hide file tree
Showing 11 changed files with 489 additions and 339 deletions.
4 changes: 2 additions & 2 deletions examples/bls/async_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,11 @@ async def execute(self, requests):
raise pb_utils.TritonModelException(
infer_response.error().message())

# Get the OUTPUT0 from the "pytorch" model inference resposne
# Get the OUTPUT0 from the "pytorch" model inference response
pytorch_output0_tensor = pb_utils.get_output_tensor_by_name(
inference_responses[0], "OUTPUT0")

# Get the OUTPUT1 from the "addsub" model inference resposne
# Get the OUTPUT1 from the "addsub" model inference response
addsub_output1_tensor = pb_utils.get_output_tensor_by_name(
inference_responses[1], "OUTPUT1")

Expand Down
139 changes: 110 additions & 29 deletions src/infer_response.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,53 +164,77 @@ InferResponse::Error()
}

#ifndef TRITON_PB_STUB
TRITONSERVER_Error*
std::shared_ptr<TRITONSERVER_Error*>
InferResponse::Send(
TRITONBACKEND_ResponseFactory* response_factory, void* cuda_stream,
const uint32_t flags)
bool& requires_deferred_callback, const uint32_t flags,
std::unique_ptr<SharedMemoryManager>& shm_pool,
std::vector<std::pair<std::unique_ptr<PbMemory>, void*>>& output_buffers,
const std::set<std::string>& requested_output_names,
TRITONBACKEND_Response* response)
{
// [FIXME] Use this code to send responses in non-decoupled mode.
TRITONBACKEND_Response* response = nullptr;
TRITONSERVER_Error* response_error = nullptr;
ScopedDefer response_error_handling([&response, &response_error, flags,
response_factory] {
if (response != nullptr) {
LOG_IF_ERROR(
TRITONBACKEND_ResponseSend(response, flags, response_error),
"failed to send the response.");
if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) {
std::unique_ptr<
TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter>
response_factory_ptr(
reinterpret_cast<TRITONBACKEND_ResponseFactory*>(response_factory));
}
}
});
std::shared_ptr<TRITONSERVER_Error*> response_error =
WrapTritonErrorInSharedPtr(nullptr);
std::unique_ptr<ScopedDefer> response_error_handling;
requires_deferred_callback = false;

// Should only destruct the response factory whenever a response factory is
// being created.
bool destruct_response_factor = (response == nullptr);

if (response == nullptr) {
SET_ERROR_AND_RETURN(
response_error,
TRITONBACKEND_ResponseNewFromFactory(&response, response_factory));
}

SET_ERROR_AND_RETURN(
response_error,
TRITONBACKEND_ResponseNewFromFactory(&response, response_factory));
// This lambda expression will be called when this function exits, if the
// inference response doesn't have any GPU tensors. Otherwise, it will be
// called when the object is destructed or DeferredSendCallback is called.
response_error_handling = std::make_unique<ScopedDefer>(
[response, response_error, flags, response_factory,
destruct_response_factor] {
if (response != nullptr) {
LOG_IF_ERROR(
TRITONBACKEND_ResponseSend(response, flags, *response_error),
"failed to send the response.");
if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL &&
destruct_response_factor) {
std::unique_ptr<
TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter>
response_factory_ptr(
reinterpret_cast<TRITONBACKEND_ResponseFactory*>(
response_factory));
}
}
});

// Moves the response sending callback so that it is not called until the stub
// process fills in the GPU buffers.
ScopedDefer deferred_task(
[this, &requires_deferred_callback, &response_error_handling] {
if (requires_deferred_callback) {
deferred_send_callback_ = std::move(response_error_handling);
}
});

if (HasError()) {
response_error = TRITONSERVER_ErrorNew(
*response_error = TRITONSERVER_ErrorNew(
TRITONSERVER_ERROR_INTERNAL, Error()->Message().c_str());
return nullptr;
}

bool cuda_copy = false;

for (auto& output_tensor : OutputTensors()) {
TRITONSERVER_MemoryType src_memory_type = output_tensor->MemoryType();
int64_t src_memory_type_id = output_tensor->MemoryTypeId();

TRITONSERVER_MemoryType actual_memory_type = src_memory_type;
int64_t actual_memory_type_id = src_memory_type_id;

// [FIXME] GPU tensors are not supported in the decoupled API mode.
if (actual_memory_type == TRITONSERVER_MEMORY_GPU) {
response_error = TRITONSERVER_ErrorNew(
TRITONSERVER_ERROR_INTERNAL,
"GPU tensors are not supported in decoupled API.");
return response_error;
requires_deferred_callback = true;
}

TRITONBACKEND_Output* response_output;
Expand All @@ -222,12 +246,61 @@ InferResponse::Send(
output_tensor->Dims().data(), output_tensor->Dims().size()));

void* buffer;
bool cuda_used = false;
SET_ERROR_AND_RETURN(
response_error, TRITONBACKEND_OutputBuffer(
response_output, &buffer, output_tensor->ByteSize(),
&actual_memory_type, &actual_memory_type_id));

bool cuda_used = false;
TRITONSERVER_BufferAttributes* output_buffer_attributes;
SET_ERROR_AND_RETURN(
response_error, TRITONBACKEND_OutputBufferAttributes(
response_output, &output_buffer_attributes));

std::unique_ptr<PbMemory> output_buffer;
if (src_memory_type == TRITONSERVER_MEMORY_GPU &&
actual_memory_type == TRITONSERVER_MEMORY_GPU) {
#ifdef TRITON_ENABLE_GPU
cudaIpcMemHandle_t* cuda_ipc_mem_handle_p;
SET_ERROR_AND_RETURN(
response_error,
TRITONSERVER_BufferAttributesCudaIpcHandle(
output_buffer_attributes,
reinterpret_cast<void**>(&cuda_ipc_mem_handle_p)));

if (cuda_ipc_mem_handle_p != nullptr) {
SET_ERROR_AND_RETURN_IF_EXCEPTION(
response_error,
output_buffer = PbMemory::Create(
shm_pool, actual_memory_type, actual_memory_type_id,
output_tensor->ByteSize(), reinterpret_cast<char*>(buffer),
false /* copy_gpu */));
output_buffer->SetCudaIpcHandle(cuda_ipc_mem_handle_p);
} else {
SET_ERROR_AND_RETURN_IF_EXCEPTION(
response_error,
output_buffer = PbMemory::Create(
shm_pool, actual_memory_type, actual_memory_type_id,
output_tensor->ByteSize(), reinterpret_cast<char*>(buffer),
true /* copy_gpu */));
}
output_buffers.push_back({std::move(output_buffer), buffer});
#endif
}

// When we requested a GPU buffer but received a CPU buffer.
if (src_memory_type == TRITONSERVER_MEMORY_GPU &&
(actual_memory_type == TRITONSERVER_MEMORY_CPU ||
actual_memory_type == TRITONSERVER_MEMORY_CPU_PINNED)) {
SET_ERROR_AND_RETURN_IF_EXCEPTION(
response_error,
output_buffer = PbMemory::Create(
shm_pool, actual_memory_type, actual_memory_type_id,
output_tensor->ByteSize(), nullptr /* data ptr */));

output_buffers.push_back({std::move(output_buffer), buffer});
}

if (src_memory_type != TRITONSERVER_MEMORY_GPU) {
SET_ERROR_AND_RETURN(
response_error,
Expand All @@ -251,4 +324,12 @@ InferResponse::Send(
}
#endif

#ifndef TRITON_PB_STUB
void
InferResponse::DeferredSendCallback()
{
deferred_send_callback_.reset();
}
#endif

}}} // namespace triton::backend::python
33 changes: 29 additions & 4 deletions src/infer_response.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "pb_error.h"
#include "pb_tensor.h"
#include "pb_utils.h"
#include "scoped_defer.h"

namespace triton { namespace backend { namespace python {

Expand All @@ -44,11 +45,24 @@ struct ResponseShm {
do { \
TRITONSERVER_Error* raasnie_err__ = (X); \
if (raasnie_err__ != nullptr) { \
E = raasnie_err__; \
*E = raasnie_err__; \
return E; \
} \
} while (false)

#define SET_ERROR_AND_RETURN_IF_EXCEPTION(E, X) \
do { \
try { \
(X); \
} \
catch (const PythonBackendException& pb_exception) { \
TRITONSERVER_Error* rarie_err__ = TRITONSERVER_ErrorNew( \
TRITONSERVER_ERROR_INTERNAL, pb_exception.what()); \
*E = rarie_err__; \
return E; \
} \
} while (false)

class InferResponse {
public:
InferResponse(
Expand All @@ -66,10 +80,19 @@ class InferResponse {
bi::managed_external_buffer::handle_t ShmHandle();

#ifndef TRITON_PB_STUB
/// Send an inference response
TRITONSERVER_Error* Send(
/// Send an inference response. If the response has a GPU tensor, sending the
/// response needs to be done in two step. The boolean
/// 'requires_deferred_callback' indicates whether DeferredSendCallback method
/// should be called or not.
std::shared_ptr<TRITONSERVER_Error*> Send(
TRITONBACKEND_ResponseFactory* response_factory, void* cuda_stream,
const uint32_t flags);
bool& requires_deferred_callback, const uint32_t flags,
std::unique_ptr<SharedMemoryManager>& shm_pool,
std::vector<std::pair<std::unique_ptr<PbMemory>, void*>>& output_buffers,
const std::set<std::string>& requested_output_names = {},
TRITONBACKEND_Response* response = nullptr);

void DeferredSendCallback();
#endif

// Disallow copying the inference response object.
Expand All @@ -84,5 +107,7 @@ class InferResponse {
std::shared_ptr<PbError> error_;
bi::managed_external_buffer::handle_t shm_handle_;
AllocatedSharedMemory<char> response_shm_;
std::vector<std::pair<std::unique_ptr<PbMemory>, void*>> gpu_output_buffers_;
std::unique_ptr<ScopedDefer> deferred_send_callback_;
};
}}} // namespace triton::backend::python
2 changes: 1 addition & 1 deletion src/ipc_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace bi = boost::interprocess;

typedef enum PYTHONSTUB_commandtype_enum {
PYTHONSTUB_ExecuteRequest,
PYTHONSTUB_ExecuteResposne,
PYTHONSTUB_ExecuteResponse,
PYTHONSTUB_InitializeRequest,
PYTHONSTUB_InitializeResponse,
PYTHONSTUB_FinalizeRequest,
Expand Down
Loading

0 comments on commit 92245a7

Please sign in to comment.