From a6fff975a214ff00221790dd0a5521fb05ce3ac9 Mon Sep 17 00:00:00 2001 From: KrishnanPrash <140860868+KrishnanPrash@users.noreply.github.com> Date: Tue, 3 Sep 2024 17:53:56 -0500 Subject: [PATCH 1/6] fix: Adding copyright info (#7591) --- docs/customization_guide/tritonfrontend.md | 27 ++++++++++++++++++++++ qa/L0_python_api/test_kserve.py | 26 +++++++++++++++++++++ src/python/tritonfrontend/__init__.pyi | 26 +++++++++++++++++++++ 3 files changed, 79 insertions(+) diff --git a/docs/customization_guide/tritonfrontend.md b/docs/customization_guide/tritonfrontend.md index caaac9308d..0ec4b32749 100644 --- a/docs/customization_guide/tritonfrontend.md +++ b/docs/customization_guide/tritonfrontend.md @@ -1,3 +1,30 @@ + ### Triton Server (tritonfrontend) Bindings The `tritonfrontend` python package is a set of bindings to Triton's existing frontends implemented in C++. Currently, `tritonfrontend` supports starting up `KServeHttp` and `KServeGrpc` frontends. These bindings used in-combination with Triton's Python In-Process API ([`tritonserver`](https://github.com/triton-inference-server/core/tree/main/python/tritonserver)) and [`tritonclient`](https://github.com/triton-inference-server/client/tree/main/src/python/library) extend the ability to use Triton's full feature set with a couple of lines of Python. diff --git a/qa/L0_python_api/test_kserve.py b/qa/L0_python_api/test_kserve.py index ab77783d0c..703d86ca43 100644 --- a/qa/L0_python_api/test_kserve.py +++ b/qa/L0_python_api/test_kserve.py @@ -1,3 +1,29 @@ +# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + import time from functools import partial diff --git a/src/python/tritonfrontend/__init__.pyi b/src/python/tritonfrontend/__init__.pyi index 17847e4038..0afb0cb886 100644 --- a/src/python/tritonfrontend/__init__.pyi +++ b/src/python/tritonfrontend/__init__.pyi @@ -1 +1,27 @@ +# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + # Need to automate stubgen process as a part of build: https://github.com/triton-inference-server/server/pull/7501#discussion_r1720135228 From ca8ae28bbf1e1de0d7b1d1cb71f930fccdac5a84 Mon Sep 17 00:00:00 2001 From: Yingge He <157551214+yinggeh@users.noreply.github.com> Date: Wed, 4 Sep 2024 13:54:43 -0700 Subject: [PATCH 2/6] test: Refactor core input size checks (#7592) --- qa/L0_input_validation/input_validation_test.py | 4 ++-- qa/L0_input_validation/test.sh | 17 +++++++++++------ 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/qa/L0_input_validation/input_validation_test.py b/qa/L0_input_validation/input_validation_test.py index 33360b7a08..8e7f58bb0c 100755 --- a/qa/L0_input_validation/input_validation_test.py +++ b/qa/L0_input_validation/input_validation_test.py @@ -195,7 +195,7 @@ def get_input_array(input_size, np_dtype): triton_client.infer(model_name=model_name, inputs=inputs) err_str = str(e.exception) self.assertIn( - f"expected {input_size} string elements for inference input 'INPUT1', got {input_size-2}", + f"expected {input_size} string elements for inference input 'INPUT1' for model '{model_name}', got {input_size-2}", err_str, ) @@ -208,7 +208,7 @@ def get_input_array(input_size, np_dtype): triton_client.infer(model_name=model_name, inputs=inputs) err_str = str(e.exception) self.assertIn( - f"expected {input_size} string elements for inference input 'INPUT1', got {input_size+2}", + f"unexpected number of string elements {input_size+1} for inference input 'INPUT1' for model '{model_name}', expecting {input_size}", err_str, ) diff --git a/qa/L0_input_validation/test.sh b/qa/L0_input_validation/test.sh index fc70abd969..22e0560959 100755 --- a/qa/L0_input_validation/test.sh +++ b/qa/L0_input_validation/test.sh @@ -68,7 +68,9 @@ set +e python3 -m pytest --junitxml="input_validation.report.xml" $TEST_PY::InputValTest >> $CLIENT_LOG 2>&1 if [ $? -ne 0 ]; then - echo -e "\n***\n*** input_validation_test.py FAILED. \n***" + cat $CLIENT_LOG + cat $SERVER_LOG + echo -e "\n***\n*** input_validation_test.py::InputValTest FAILED. \n***" RET=1 fi set -e @@ -138,7 +140,9 @@ set +e python3 -m pytest --junitxml="input_shape_validation.report.xml" $TEST_PY::InputShapeTest >> $CLIENT_LOG 2>&1 if [ $? -ne 0 ]; then - echo -e "\n***\n*** input_validation_test.py FAILED. \n***" + cat $CLIENT_LOG + cat $SERVER_LOG + echo -e "\n***\n*** input_validation_test.py::InputShapeTest FAILED. \n***" RET=1 fi set -e @@ -147,10 +151,13 @@ kill $SERVER_PID wait $SERVER_PID # input_byte_size_test +cp -r /data/inferenceserver/${REPO_VERSION}/qa_identity_model_repository/{savedmodel_zero_1_float32,savedmodel_zero_1_object} ./models + set +e -LD_LIBRARY_PATH=/opt/tritonserver/lib:$LD_LIBRARY_PATH $TEST_EXEC >>$TEST_LOG 2>&1 +LD_LIBRARY_PATH=/opt/tritonserver/lib:$LD_LIBRARY_PATH $TEST_EXEC >> $TEST_LOG 2>&1 if [ $? -ne 0 ]; then - echo -e "\n***\n*** Query Unit Test Failed\n***" + cat $TEST_LOG + echo -e "\n***\n*** input_byte_size_test FAILED\n***" RET=1 fi set -e @@ -158,8 +165,6 @@ set -e if [ $RET -eq 0 ]; then echo -e "\n***\n*** Input Validation Test Passed\n***" else - cat $CLIENT_LOG - cat $SERVER_LOG echo -e "\n***\n*** Input Validation Test FAILED\n***" fi From be557b6ffc8d180b86ddbd0e1ddad615dd913df2 Mon Sep 17 00:00:00 2001 From: Francesco Petrini Date: Fri, 6 Sep 2024 17:13:23 -0700 Subject: [PATCH 3/6] Don't Build `tritonfrontend` for Windows. (#7599) Don't Build `tritonfrontend` for Windows. --- src/CMakeLists.txt | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 2e0380470a..9488fc6233 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -782,8 +782,11 @@ if (NOT WIN32) endif() # TRITON_ENABLE_GPU endif() # NOT WIN32 -# tritonfrontend python package -add_subdirectory(python) +# DLIS-7292: Extend tritonfrontend to build for Windows +if (NOT WIN32) + # tritonfrontend python package + add_subdirectory(python) +endif (NOT WIN32) # Currently unit tests do not build for windows... if ( NOT WIN32) From edd0ac1b02f415a658758410903900fc5017e4f8 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Wed, 11 Sep 2024 09:08:29 +0530 Subject: [PATCH 4/6] fix: Add reference count tracking for shared memory regions (#7567) Co-authored-by: GuanLuo <41310872+GuanLuo@users.noreply.github.com> --- .../cuda_shared_memory_test.py | 312 ++++++++++++++---- qa/L0_cuda_shared_memory/test.sh | 41 +++ qa/L0_shared_memory/shared_memory_test.py | 290 ++++++++++++---- qa/L0_shared_memory/test.sh | 40 +++ qa/L0_trt_shape_tensors/test.sh | 2 +- .../execute_delayed_model/config.pbtxt | 55 +++ .../execute_delayed_model/model.py | 72 ++++ src/grpc/infer_handler.cc | 54 +-- src/grpc/infer_handler.h | 32 +- src/grpc/stream_infer_handler.cc | 36 +- src/http_server.cc | 13 +- src/http_server.h | 15 + src/shared_memory_manager.cc | 47 ++- src/shared_memory_manager.h | 99 +++--- 14 files changed, 886 insertions(+), 222 deletions(-) create mode 100644 qa/python_models/execute_delayed_model/config.pbtxt create mode 100644 qa/python_models/execute_delayed_model/model.py diff --git a/qa/L0_cuda_shared_memory/cuda_shared_memory_test.py b/qa/L0_cuda_shared_memory/cuda_shared_memory_test.py index 07f9c05a88..51137e8934 100755 --- a/qa/L0_cuda_shared_memory/cuda_shared_memory_test.py +++ b/qa/L0_cuda_shared_memory/cuda_shared_memory_test.py @@ -31,18 +31,20 @@ sys.path.append("../common") import os +import time import unittest +from functools import partial import infer_util as iu import numpy as np import test_util as tu import tritonclient.grpc as grpcclient import tritonclient.http as httpclient -import tritonshmutils.cuda_shared_memory as cshm +import tritonclient.utils.cuda_shared_memory as cshm from tritonclient.utils import * -class CudaSharedMemoryTest(tu.TestResultCollector): +class CudaSharedMemoryTestBase(tu.TestResultCollector): DEFAULT_SHM_BYTE_SIZE = 64 def setUp(self): @@ -61,76 +63,6 @@ def _setup_client(self): self.url, verbose=True ) - def test_invalid_create_shm(self): - # Raises error since tried to create invalid cuda shared memory region - try: - shm_op0_handle = cshm.create_shared_memory_region("dummy_data", -1, 0) - cshm.destroy_shared_memory_region(shm_op0_handle) - except Exception as ex: - self.assertEqual(str(ex), "unable to create cuda shared memory handle") - - def test_valid_create_set_register(self): - # Create a valid cuda shared memory region, fill data in it and register - shm_op0_handle = cshm.create_shared_memory_region("dummy_data", 8, 0) - cshm.set_shared_memory_region( - shm_op0_handle, [np.array([1, 2], dtype=np.float32)] - ) - self.triton_client.register_cuda_shared_memory( - "dummy_data", cshm.get_raw_handle(shm_op0_handle), 0, 8 - ) - shm_status = self.triton_client.get_cuda_shared_memory_status() - if self.protocol == "http": - self.assertEqual(len(shm_status), 1) - else: - self.assertEqual(len(shm_status.regions), 1) - cshm.destroy_shared_memory_region(shm_op0_handle) - - def test_unregister_before_register(self): - # Create a valid cuda shared memory region and unregister before register - shm_op0_handle = cshm.create_shared_memory_region("dummy_data", 8, 0) - self.triton_client.unregister_cuda_shared_memory("dummy_data") - shm_status = self.triton_client.get_cuda_shared_memory_status() - if self.protocol == "http": - self.assertEqual(len(shm_status), 0) - else: - self.assertEqual(len(shm_status.regions), 0) - cshm.destroy_shared_memory_region(shm_op0_handle) - - def test_unregister_after_register(self): - # Create a valid cuda shared memory region and unregister after register - shm_op0_handle = cshm.create_shared_memory_region("dummy_data", 8, 0) - self.triton_client.register_cuda_shared_memory( - "dummy_data", cshm.get_raw_handle(shm_op0_handle), 0, 8 - ) - self.triton_client.unregister_cuda_shared_memory("dummy_data") - shm_status = self.triton_client.get_cuda_shared_memory_status() - if self.protocol == "http": - self.assertEqual(len(shm_status), 0) - else: - self.assertEqual(len(shm_status.regions), 0) - cshm.destroy_shared_memory_region(shm_op0_handle) - - def test_reregister_after_register(self): - # Create a valid cuda shared memory region and unregister after register - shm_op0_handle = cshm.create_shared_memory_region("dummy_data", 8, 0) - self.triton_client.register_cuda_shared_memory( - "dummy_data", cshm.get_raw_handle(shm_op0_handle), 0, 8 - ) - try: - self.triton_client.register_cuda_shared_memory( - "dummy_data", cshm.get_raw_handle(shm_op0_handle), 0, 8 - ) - except Exception as ex: - self.assertIn( - "shared memory region 'dummy_data' already in manager", str(ex) - ) - shm_status = self.triton_client.get_cuda_shared_memory_status() - if self.protocol == "http": - self.assertEqual(len(shm_status), 1) - else: - self.assertEqual(len(shm_status.regions), 1) - cshm.destroy_shared_memory_region(shm_op0_handle) - def _configure_server( self, create_byte_size=DEFAULT_SHM_BYTE_SIZE, @@ -205,6 +137,78 @@ def _cleanup_server(self, shm_handles): for shm_handle in shm_handles: cshm.destroy_shared_memory_region(shm_handle) + +class CudaSharedMemoryTest(CudaSharedMemoryTestBase): + def test_invalid_create_shm(self): + # Raises error since tried to create invalid cuda shared memory region + try: + shm_op0_handle = cshm.create_shared_memory_region("dummy_data", -1, 0) + cshm.destroy_shared_memory_region(shm_op0_handle) + except Exception as ex: + self.assertEqual(str(ex), "unable to create cuda shared memory handle") + + def test_valid_create_set_register(self): + # Create a valid cuda shared memory region, fill data in it and register + shm_op0_handle = cshm.create_shared_memory_region("dummy_data", 8, 0) + cshm.set_shared_memory_region( + shm_op0_handle, [np.array([1, 2], dtype=np.float32)] + ) + self.triton_client.register_cuda_shared_memory( + "dummy_data", cshm.get_raw_handle(shm_op0_handle), 0, 8 + ) + shm_status = self.triton_client.get_cuda_shared_memory_status() + if self.protocol == "http": + self.assertEqual(len(shm_status), 1) + else: + self.assertEqual(len(shm_status.regions), 1) + cshm.destroy_shared_memory_region(shm_op0_handle) + + def test_unregister_before_register(self): + # Create a valid cuda shared memory region and unregister before register + shm_op0_handle = cshm.create_shared_memory_region("dummy_data", 8, 0) + self.triton_client.unregister_cuda_shared_memory("dummy_data") + shm_status = self.triton_client.get_cuda_shared_memory_status() + if self.protocol == "http": + self.assertEqual(len(shm_status), 0) + else: + self.assertEqual(len(shm_status.regions), 0) + cshm.destroy_shared_memory_region(shm_op0_handle) + + def test_unregister_after_register(self): + # Create a valid cuda shared memory region and unregister after register + shm_op0_handle = cshm.create_shared_memory_region("dummy_data", 8, 0) + self.triton_client.register_cuda_shared_memory( + "dummy_data", cshm.get_raw_handle(shm_op0_handle), 0, 8 + ) + self.triton_client.unregister_cuda_shared_memory("dummy_data") + shm_status = self.triton_client.get_cuda_shared_memory_status() + if self.protocol == "http": + self.assertEqual(len(shm_status), 0) + else: + self.assertEqual(len(shm_status.regions), 0) + cshm.destroy_shared_memory_region(shm_op0_handle) + + def test_reregister_after_register(self): + # Create a valid cuda shared memory region and unregister after register + shm_op0_handle = cshm.create_shared_memory_region("dummy_data", 8, 0) + self.triton_client.register_cuda_shared_memory( + "dummy_data", cshm.get_raw_handle(shm_op0_handle), 0, 8 + ) + try: + self.triton_client.register_cuda_shared_memory( + "dummy_data", cshm.get_raw_handle(shm_op0_handle), 0, 8 + ) + except Exception as ex: + self.assertIn( + "shared memory region 'dummy_data' already in manager", str(ex) + ) + shm_status = self.triton_client.get_cuda_shared_memory_status() + if self.protocol == "http": + self.assertEqual(len(shm_status), 1) + else: + self.assertEqual(len(shm_status.regions), 1) + cshm.destroy_shared_memory_region(shm_op0_handle) + def test_unregister_after_inference(self): # Unregister after inference error_msg = [] @@ -396,5 +400,169 @@ def test_infer_byte_size_out_of_bound(self): self._cleanup_server(shm_handles) +class TestCudaSharedMemoryUnregister(CudaSharedMemoryTestBase): + def _test_unregister_shm_fail(self): + second_client = httpclient.InferenceServerClient("localhost:8000", verbose=True) + + with self.assertRaises(InferenceServerException) as ex: + second_client.unregister_cuda_shared_memory() + self.assertIn( + "Failed to unregister the following cuda shared memory regions: input0_data ,input1_data ,output0_data ,output1_data", + str(ex.exception), + ) + + with self.assertRaises(InferenceServerException) as ex: + second_client.unregister_cuda_shared_memory("input0_data") + self.assertIn( + "Cannot unregister shared memory region 'input0_data', it is currently in use.", + str(ex.exception), + ) + + with self.assertRaises(InferenceServerException) as ex: + second_client.unregister_cuda_shared_memory("input1_data") + self.assertIn( + "Cannot unregister shared memory region 'input1_data', it is currently in use.", + str(ex.exception), + ) + + with self.assertRaises(InferenceServerException) as ex: + second_client.unregister_cuda_shared_memory("output0_data") + self.assertIn( + "Cannot unregister shared memory region 'output0_data', it is currently in use.", + str(ex.exception), + ) + + with self.assertRaises(InferenceServerException) as ex: + second_client.unregister_cuda_shared_memory("output1_data") + self.assertIn( + "Cannot unregister shared memory region 'output1_data', it is currently in use.", + str(ex.exception), + ) + + def _test_shm_not_found(self): + second_client = httpclient.InferenceServerClient("localhost:8000", verbose=True) + + with self.assertRaises(InferenceServerException) as ex: + second_client.get_cuda_shared_memory_status("input0_data") + self.assertIn( + "Unable to find cuda shared memory region: 'input0_data'", + str(ex.exception), + ) + + with self.assertRaises(InferenceServerException) as ex: + second_client.get_cuda_shared_memory_status("input1_data") + self.assertIn( + "Unable to find cuda shared memory region: 'input1_data'", + str(ex.exception), + ) + + with self.assertRaises(InferenceServerException) as ex: + second_client.get_cuda_shared_memory_status("output0_data") + self.assertIn( + "Unable to find cuda shared memory region: 'output0_data'", + str(ex.exception), + ) + + with self.assertRaises(InferenceServerException) as ex: + second_client.get_cuda_shared_memory_status("output1_data") + self.assertIn( + "Unable to find cuda shared memory region: 'output1_data'", + str(ex.exception), + ) + + def test_unregister_shm_during_inference_http(self): + try: + self.triton_client.unregister_cuda_shared_memory() + shm_handles = self._configure_server() + + inputs = [ + httpclient.InferInput("INPUT0", [1, 16], "INT32"), + httpclient.InferInput("INPUT1", [1, 16], "INT32"), + ] + outputs = [ + httpclient.InferRequestedOutput("OUTPUT0", binary_data=True), + httpclient.InferRequestedOutput("OUTPUT1", binary_data=False), + ] + + inputs[0].set_shared_memory("input0_data", self.DEFAULT_SHM_BYTE_SIZE) + inputs[1].set_shared_memory("input1_data", self.DEFAULT_SHM_BYTE_SIZE) + outputs[0].set_shared_memory("output0_data", self.DEFAULT_SHM_BYTE_SIZE) + outputs[1].set_shared_memory("output1_data", self.DEFAULT_SHM_BYTE_SIZE) + + async_request = self.triton_client.async_infer( + model_name="simple", inputs=inputs, outputs=outputs + ) + + # Ensure inference started + time.sleep(2) + + # Try unregister shm regions during inference + self._test_unregister_shm_fail() + + # Blocking call + async_request.get_result() + + # Try unregister shm regions after inference + self.triton_client.unregister_cuda_shared_memory() + self._test_shm_not_found() + + finally: + self._cleanup_server(shm_handles) + + def test_unregister_shm_during_inference_grpc(self): + try: + self.triton_client.unregister_cuda_shared_memory() + shm_handles = self._configure_server() + + inputs = [ + grpcclient.InferInput("INPUT0", [1, 16], "INT32"), + grpcclient.InferInput("INPUT1", [1, 16], "INT32"), + ] + outputs = [ + grpcclient.InferRequestedOutput("OUTPUT0"), + grpcclient.InferRequestedOutput("OUTPUT1"), + ] + + inputs[0].set_shared_memory("input0_data", self.DEFAULT_SHM_BYTE_SIZE) + inputs[1].set_shared_memory("input1_data", self.DEFAULT_SHM_BYTE_SIZE) + outputs[0].set_shared_memory("output0_data", self.DEFAULT_SHM_BYTE_SIZE) + outputs[1].set_shared_memory("output1_data", self.DEFAULT_SHM_BYTE_SIZE) + + def callback(user_data, result, error): + if error: + user_data.append(error) + else: + user_data.append(result) + + user_data = [] + + self.triton_client.async_infer( + model_name="simple", + inputs=inputs, + outputs=outputs, + callback=partial(callback, user_data), + ) + + # Ensure inference started + time.sleep(2) + + # Try unregister shm regions during inference + self._test_unregister_shm_fail() + + # Wait until the results are available in user_data + time_out = 20 + while (len(user_data) == 0) and time_out > 0: + time_out = time_out - 1 + time.sleep(1) + time.sleep(2) + + # Try unregister shm regions after inference + self.triton_client.unregister_cuda_shared_memory() + self._test_shm_not_found() + + finally: + self._cleanup_server(shm_handles) + + if __name__ == "__main__": unittest.main() diff --git a/qa/L0_cuda_shared_memory/test.sh b/qa/L0_cuda_shared_memory/test.sh index 02857b2153..b7126a9295 100755 --- a/qa/L0_cuda_shared_memory/test.sh +++ b/qa/L0_cuda_shared_memory/test.sh @@ -84,6 +84,47 @@ for i in \ done done +mkdir -p python_models/simple/1/ +cp ../python_models/execute_delayed_model/model.py ./python_models/simple/1/ +cp ../python_models/execute_delayed_model/config.pbtxt ./python_models/simple/ +sed -i 's/KIND_CPU/KIND_GPU/g' ./python_models/simple/config.pbtxt + +for client_type in http grpc; do + SERVER_ARGS="--model-repository=`pwd`/python_models --log-verbose=1 ${SERVER_ARGS_EXTRA}" + SERVER_LOG="./unregister_shm.$client_type.server.log" + run_server + if [ "$SERVER_PID" == "0" ]; then + echo -e "\n***\n*** Failed to start $SERVER\n***" + cat $SERVER_LOG + exit 1 + fi + + export CLIENT_TYPE=$client_type + CLIENT_LOG="./unregister_shm.$client_type.client.log" + set +e + python3 $SHM_TEST TestCudaSharedMemoryUnregister.test_unregister_shm_during_inference_$client_type >>$CLIENT_LOG 2>&1 + if [ $? -ne 0 ]; then + cat $CLIENT_LOG + echo -e "\n***\n*** Test Failed\n***" + RET=1 + else + check_test_results $TEST_RESULT_FILE 1 + if [ $? -ne 0 ]; then + cat $TEST_RESULT_FILE + echo -e "\n***\n*** Test Result Verification Failed\n***" + RET=1 + fi + fi + + kill $SERVER_PID + wait $SERVER_PID + if [ $? -ne 0 ]; then + echo -e "\n***\n*** Test Server shut down non-gracefully\n***" + RET=1 + fi + set -e + done + if [ $RET -eq 0 ]; then echo -e "\n***\n*** Test Passed\n***" else diff --git a/qa/L0_shared_memory/shared_memory_test.py b/qa/L0_shared_memory/shared_memory_test.py index c38ecb4814..871fca9b2a 100755 --- a/qa/L0_shared_memory/shared_memory_test.py +++ b/qa/L0_shared_memory/shared_memory_test.py @@ -31,7 +31,9 @@ sys.path.append("../common") import os +import time import unittest +from functools import partial import infer_util as iu import numpy as np @@ -43,7 +45,7 @@ from tritonclient import utils -class SharedMemoryTest(tu.TestResultCollector): +class SystemSharedMemoryTestBase(tu.TestResultCollector): DEFAULT_SHM_BYTE_SIZE = 64 def setUp(self): @@ -62,6 +64,68 @@ def _setup_client(self): self.url, verbose=True ) + def _configure_server( + self, + create_byte_size=DEFAULT_SHM_BYTE_SIZE, + register_byte_size=DEFAULT_SHM_BYTE_SIZE, + register_offset=0, + ): + """Creates and registers shared memory regions for testing. + + Parameters + ---------- + create_byte_size: int + Size of each system shared memory region to create. + NOTE: This should be sufficiently large to hold the inputs/outputs + stored in shared memory. + + register_byte_size: int + Size of each system shared memory region to register with server. + NOTE: The (offset + register_byte_size) should be less than or equal + to the create_byte_size. Otherwise an exception will be raised for + an invalid set of registration args. + + register_offset: int + Offset into the shared memory object to start the registered region. + + """ + shm_ip0_handle = shm.create_shared_memory_region( + "input0_data", "/input0_data", create_byte_size + ) + shm_ip1_handle = shm.create_shared_memory_region( + "input1_data", "/input1_data", create_byte_size + ) + shm_op0_handle = shm.create_shared_memory_region( + "output0_data", "/output0_data", create_byte_size + ) + shm_op1_handle = shm.create_shared_memory_region( + "output1_data", "/output1_data", create_byte_size + ) + # Implicit assumption that input and output byte_sizes are 64 bytes for now + input0_data = np.arange(start=0, stop=16, dtype=np.int32) + input1_data = np.ones(shape=16, dtype=np.int32) + shm.set_shared_memory_region(shm_ip0_handle, [input0_data]) + shm.set_shared_memory_region(shm_ip1_handle, [input1_data]) + self.triton_client.register_system_shared_memory( + "input0_data", "/input0_data", register_byte_size, offset=register_offset + ) + self.triton_client.register_system_shared_memory( + "input1_data", "/input1_data", register_byte_size, offset=register_offset + ) + self.triton_client.register_system_shared_memory( + "output0_data", "/output0_data", register_byte_size, offset=register_offset + ) + self.triton_client.register_system_shared_memory( + "output1_data", "/output1_data", register_byte_size, offset=register_offset + ) + return [shm_ip0_handle, shm_ip1_handle, shm_op0_handle, shm_op1_handle] + + def _cleanup_server(self, shm_handles): + for shm_handle in shm_handles: + shm.destroy_shared_memory_region(shm_handle) + + +class SharedMemoryTest(SystemSharedMemoryTestBase): def test_invalid_create_shm(self): # Raises error since tried to create invalid system shared memory region try: @@ -128,66 +192,6 @@ def test_reregister_after_register(self): self.assertTrue(len(shm_status.regions) == 1) shm.destroy_shared_memory_region(shm_op0_handle) - def _configure_server( - self, - create_byte_size=DEFAULT_SHM_BYTE_SIZE, - register_byte_size=DEFAULT_SHM_BYTE_SIZE, - register_offset=0, - ): - """Creates and registers shared memory regions for testing. - - Parameters - ---------- - create_byte_size: int - Size of each system shared memory region to create. - NOTE: This should be sufficiently large to hold the inputs/outputs - stored in shared memory. - - register_byte_size: int - Size of each system shared memory region to register with server. - NOTE: The (offset + register_byte_size) should be less than or equal - to the create_byte_size. Otherwise an exception will be raised for - an invalid set of registration args. - - register_offset: int - Offset into the shared memory object to start the registered region. - - """ - shm_ip0_handle = shm.create_shared_memory_region( - "input0_data", "/input0_data", create_byte_size - ) - shm_ip1_handle = shm.create_shared_memory_region( - "input1_data", "/input1_data", create_byte_size - ) - shm_op0_handle = shm.create_shared_memory_region( - "output0_data", "/output0_data", create_byte_size - ) - shm_op1_handle = shm.create_shared_memory_region( - "output1_data", "/output1_data", create_byte_size - ) - # Implicit assumption that input and output byte_sizes are 64 bytes for now - input0_data = np.arange(start=0, stop=16, dtype=np.int32) - input1_data = np.ones(shape=16, dtype=np.int32) - shm.set_shared_memory_region(shm_ip0_handle, [input0_data]) - shm.set_shared_memory_region(shm_ip1_handle, [input1_data]) - self.triton_client.register_system_shared_memory( - "input0_data", "/input0_data", register_byte_size, offset=register_offset - ) - self.triton_client.register_system_shared_memory( - "input1_data", "/input1_data", register_byte_size, offset=register_offset - ) - self.triton_client.register_system_shared_memory( - "output0_data", "/output0_data", register_byte_size, offset=register_offset - ) - self.triton_client.register_system_shared_memory( - "output1_data", "/output1_data", register_byte_size, offset=register_offset - ) - return [shm_ip0_handle, shm_ip1_handle, shm_op0_handle, shm_op1_handle] - - def _cleanup_server(self, shm_handles): - for shm_handle in shm_handles: - shm.destroy_shared_memory_region(shm_handle) - def test_unregister_after_inference(self): # Unregister after inference error_msg = [] @@ -443,5 +447,169 @@ def test_python_client_leak(self): ) +class TestSharedMemoryUnregister(SystemSharedMemoryTestBase): + def _test_unregister_shm_fail(self): + second_client = httpclient.InferenceServerClient("localhost:8000", verbose=True) + + with self.assertRaises(utils.InferenceServerException) as ex: + second_client.unregister_system_shared_memory() + self.assertIn( + "Failed to unregister the following system shared memory regions: input0_data ,input1_data ,output0_data ,output1_data", + str(ex.exception), + ) + + with self.assertRaises(utils.InferenceServerException) as ex: + second_client.unregister_system_shared_memory("input0_data") + self.assertIn( + "Cannot unregister shared memory region 'input0_data', it is currently in use.", + str(ex.exception), + ) + + with self.assertRaises(utils.InferenceServerException) as ex: + second_client.unregister_system_shared_memory("input1_data") + self.assertIn( + "Cannot unregister shared memory region 'input1_data', it is currently in use.", + str(ex.exception), + ) + + with self.assertRaises(utils.InferenceServerException) as ex: + second_client.unregister_system_shared_memory("output0_data") + self.assertIn( + "Cannot unregister shared memory region 'output0_data', it is currently in use.", + str(ex.exception), + ) + + with self.assertRaises(utils.InferenceServerException) as ex: + second_client.unregister_system_shared_memory("output1_data") + self.assertIn( + "Cannot unregister shared memory region 'output1_data', it is currently in use.", + str(ex.exception), + ) + + def _test_shm_not_found(self): + second_client = httpclient.InferenceServerClient("localhost:8000", verbose=True) + + with self.assertRaises(utils.InferenceServerException) as ex: + second_client.get_system_shared_memory_status("input0_data") + self.assertIn( + "Unable to find system shared memory region: 'input0_data'", + str(ex.exception), + ) + + with self.assertRaises(utils.InferenceServerException) as ex: + second_client.get_system_shared_memory_status("input1_data") + self.assertIn( + "Unable to find system shared memory region: 'input1_data'", + str(ex.exception), + ) + + with self.assertRaises(utils.InferenceServerException) as ex: + second_client.get_system_shared_memory_status("output0_data") + self.assertIn( + "Unable to find system shared memory region: 'output0_data'", + str(ex.exception), + ) + + with self.assertRaises(utils.InferenceServerException) as ex: + second_client.get_system_shared_memory_status("output1_data") + self.assertIn( + "Unable to find system shared memory region: 'output1_data'", + str(ex.exception), + ) + + def test_unregister_shm_during_inference_http(self): + try: + self.triton_client.unregister_system_shared_memory() + shm_handles = self._configure_server() + + inputs = [ + httpclient.InferInput("INPUT0", [1, 16], "INT32"), + httpclient.InferInput("INPUT1", [1, 16], "INT32"), + ] + outputs = [ + httpclient.InferRequestedOutput("OUTPUT0", binary_data=True), + httpclient.InferRequestedOutput("OUTPUT1", binary_data=False), + ] + + inputs[0].set_shared_memory("input0_data", self.DEFAULT_SHM_BYTE_SIZE) + inputs[1].set_shared_memory("input1_data", self.DEFAULT_SHM_BYTE_SIZE) + outputs[0].set_shared_memory("output0_data", self.DEFAULT_SHM_BYTE_SIZE) + outputs[1].set_shared_memory("output1_data", self.DEFAULT_SHM_BYTE_SIZE) + + async_request = self.triton_client.async_infer( + model_name="simple", inputs=inputs, outputs=outputs + ) + + # Ensure inference started + time.sleep(2) + + # Try unregister shm regions during inference + self._test_unregister_shm_fail() + + # Blocking call + async_request.get_result() + + # Try unregister shm regions after inference + self.triton_client.unregister_system_shared_memory() + self._test_shm_not_found() + + finally: + self._cleanup_server(shm_handles) + + def test_unregister_shm_during_inference_grpc(self): + try: + self.triton_client.unregister_system_shared_memory() + shm_handles = self._configure_server() + + inputs = [ + grpcclient.InferInput("INPUT0", [1, 16], "INT32"), + grpcclient.InferInput("INPUT1", [1, 16], "INT32"), + ] + outputs = [ + grpcclient.InferRequestedOutput("OUTPUT0"), + grpcclient.InferRequestedOutput("OUTPUT1"), + ] + + inputs[0].set_shared_memory("input0_data", self.DEFAULT_SHM_BYTE_SIZE) + inputs[1].set_shared_memory("input1_data", self.DEFAULT_SHM_BYTE_SIZE) + outputs[0].set_shared_memory("output0_data", self.DEFAULT_SHM_BYTE_SIZE) + outputs[1].set_shared_memory("output1_data", self.DEFAULT_SHM_BYTE_SIZE) + + def callback(user_data, result, error): + if error: + user_data.append(error) + else: + user_data.append(result) + + user_data = [] + + self.triton_client.async_infer( + model_name="simple", + inputs=inputs, + outputs=outputs, + callback=partial(callback, user_data), + ) + + # Ensure inference started + time.sleep(2) + + # Try unregister shm regions during inference + self._test_unregister_shm_fail() + + # Wait until the results are available in user_data + time_out = 20 + while (len(user_data) == 0) and time_out > 0: + time_out = time_out - 1 + time.sleep(1) + time.sleep(2) + + # Try unregister shm regions after inference + self.triton_client.unregister_system_shared_memory() + self._test_shm_not_found() + + finally: + self._cleanup_server(shm_handles) + + if __name__ == "__main__": unittest.main() diff --git a/qa/L0_shared_memory/test.sh b/qa/L0_shared_memory/test.sh index ba6a2fa8f2..e711de9cff 100755 --- a/qa/L0_shared_memory/test.sh +++ b/qa/L0_shared_memory/test.sh @@ -95,6 +95,46 @@ for i in \ done done +mkdir -p python_models/simple/1/ +cp ../python_models/execute_delayed_model/model.py ./python_models/simple/1/ +cp ../python_models/execute_delayed_model/config.pbtxt ./python_models/simple/ + +for client_type in http grpc; do + SERVER_ARGS="--model-repository=`pwd`/python_models --log-verbose=1 ${SERVER_ARGS_EXTRA}" + SERVER_LOG="./unregister_shm.$client_type.server.log" + run_server + if [ "$SERVER_PID" == "0" ]; then + echo -e "\n***\n*** Failed to start $SERVER\n***" + cat $SERVER_LOG + exit 1 + fi + + export CLIENT_TYPE=$client_type + CLIENT_LOG="./unregister_shm.$client_type.client.log" + set +e + python3 $SHM_TEST TestSharedMemoryUnregister.test_unregister_shm_during_inference_$client_type >>$CLIENT_LOG 2>&1 + if [ $? -ne 0 ]; then + cat $CLIENT_LOG + echo -e "\n***\n*** Test Failed\n***" + RET=1 + else + check_test_results $TEST_RESULT_FILE 1 + if [ $? -ne 0 ]; then + cat $TEST_RESULT_FILE + echo -e "\n***\n*** Test Result Verification Failed\n***" + RET=1 + fi + fi + + kill $SERVER_PID + wait $SERVER_PID + if [ $? -ne 0 ]; then + echo -e "\n***\n*** Test Server shut down non-gracefully\n***" + RET=1 + fi + set -e + done + if [ $RET -eq 0 ]; then echo -e "\n***\n*** Test Passed\n***" else diff --git a/qa/L0_trt_shape_tensors/test.sh b/qa/L0_trt_shape_tensors/test.sh index f08ed339b0..548ebb55af 100755 --- a/qa/L0_trt_shape_tensors/test.sh +++ b/qa/L0_trt_shape_tensors/test.sh @@ -45,7 +45,7 @@ CLIENT_LOG="./client.log" SHAPE_TENSOR_TEST=trt_shape_tensor_test.py SERVER=/opt/tritonserver/bin/tritonserver -SERVER_ARGS="--model-repository=`pwd`/models" +SERVER_ARGS="--model-repository=`pwd`/models --log-verbose=1" SERVER_LOG="./inference_server.log" source ../common/util.sh diff --git a/qa/python_models/execute_delayed_model/config.pbtxt b/qa/python_models/execute_delayed_model/config.pbtxt new file mode 100644 index 0000000000..0a4ee59d3e --- /dev/null +++ b/qa/python_models/execute_delayed_model/config.pbtxt @@ -0,0 +1,55 @@ +# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +name: "simple" +backend: "python" +max_batch_size: 8 +input [ + { + name: "INPUT0" + data_type: TYPE_INT32 + dims: [ 16 ] + }, + { + name: "INPUT1" + data_type: TYPE_INT32 + dims: [ 16 ] + } +] +output [ + { + name: "OUTPUT0" + data_type: TYPE_INT32 + dims: [ 16 ] + }, + { + name: "OUTPUT1" + data_type: TYPE_INT32 + dims: [ 16 ] + } +] + +instance_group [ { kind: KIND_CPU }] diff --git a/qa/python_models/execute_delayed_model/model.py b/qa/python_models/execute_delayed_model/model.py new file mode 100644 index 0000000000..055b321a93 --- /dev/null +++ b/qa/python_models/execute_delayed_model/model.py @@ -0,0 +1,72 @@ +# Copyright 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import json +import time + +import triton_python_backend_utils as pb_utils + + +class TritonPythonModel: + def initialize(self, args): + self.model_config = model_config = json.loads(args["model_config"]) + output0_config = pb_utils.get_output_config_by_name(model_config, "OUTPUT0") + output1_config = pb_utils.get_output_config_by_name(model_config, "OUTPUT1") + self.output0_dtype = pb_utils.triton_string_to_numpy( + output0_config["data_type"] + ) + self.output1_dtype = pb_utils.triton_string_to_numpy( + output1_config["data_type"] + ) + + def execute(self, requests): + output0_dtype = self.output0_dtype + output1_dtype = self.output1_dtype + responses = [] + + time.sleep(15) + + for request in requests: + in_0 = pb_utils.get_input_tensor_by_name(request, "INPUT0") + in_1 = pb_utils.get_input_tensor_by_name(request, "INPUT1") + + out_0, out_1 = ( + in_0.as_numpy() + in_1.as_numpy(), + in_0.as_numpy() - in_1.as_numpy(), + ) + + out_tensor_0 = pb_utils.Tensor("OUTPUT0", out_0.astype(output0_dtype)) + out_tensor_1 = pb_utils.Tensor("OUTPUT1", out_1.astype(output1_dtype)) + + inference_response = pb_utils.InferenceResponse( + output_tensors=[out_tensor_0, out_tensor_1] + ) + responses.append(inference_response) + + return responses + + def finalize(self): + print("Cleaning up...") diff --git a/src/grpc/infer_handler.cc b/src/grpc/infer_handler.cc index 916230381b..c4ba9338cb 100644 --- a/src/grpc/infer_handler.cc +++ b/src/grpc/infer_handler.cc @@ -158,18 +158,6 @@ InferResponseFree( return nullptr; // Success } -TRITONSERVER_Error* InferGRPCToInputHelper( - const std::string& input_name, const std::string& model_name, - const TRITONSERVER_DataType tensor_dt, const TRITONSERVER_DataType input_dt, - const size_t binary_data_byte_size); - -TRITONSERVER_Error* InferGRPCToInput( - const std::shared_ptr& tritonserver, - const std::shared_ptr& shm_manager, - const inference::ModelInferRequest& request, - std::list* serialized_data, - TRITONSERVER_InferenceRequest* inference_request); - TRITONSERVER_Error* InferGRPCToInputHelper( const std::string& input_name, const std::string& model_name, @@ -391,7 +379,9 @@ InferGRPCToInput( const std::shared_ptr& shm_manager, const inference::ModelInferRequest& request, std::list* serialized_data, - TRITONSERVER_InferenceRequest* inference_request) + TRITONSERVER_InferenceRequest* inference_request, + std::vector>* + shm_regions_info) { // Verify that the batch-byte-size of each input matches the size of // the provided tensor data (provided raw or from shared memory) @@ -432,9 +422,14 @@ InferGRPCToInput( .c_str()); } void* tmp; + std::shared_ptr shm_info = + nullptr; RETURN_IF_ERR(shm_manager->GetMemoryInfo( - region_name, offset, byte_size, &tmp, &memory_type, &memory_type_id)); + region_name, offset, byte_size, &tmp, &memory_type, &memory_type_id, + &shm_info)); base = tmp; + shm_regions_info->emplace_back(shm_info); + if (memory_type == TRITONSERVER_MEMORY_GPU) { #ifdef TRITON_ENABLE_GPU RETURN_IF_ERR(shm_manager->GetCUDAHandle( @@ -911,18 +906,32 @@ ModelInferHandler::Execute(InferHandler::State* state) // tensors are present in the request. std::list serialized_data; + // Maintain shared pointers(read-only reference) to the shared memory block's + // information for the shared memory regions used by the request. These + // pointers will automatically increase the usage count, preventing + // unregistration of the shared memory. This vector must be cleared in the + // `InferResponseComplete` callback (after inference) to decrease the count + // and permit unregistration. The vector will be included in + // `response_release_payload` for the callback. + std::vector> + shm_regions_info; + if (err == nullptr) { err = InferGRPCToInput( - tritonserver_, shm_manager_, request, &serialized_data, irequest); + tritonserver_, shm_manager_, request, &serialized_data, irequest, + &shm_regions_info); } if (err == nullptr) { err = InferAllocatorPayload( tritonserver_, shm_manager_, request, std::move(serialized_data), - response_queue, &state->alloc_payload_); + response_queue, &state->alloc_payload_, &shm_regions_info); } auto request_release_payload = std::make_unique(state->inference_request_); + auto response_release_payload = std::make_unique( + state, std::move(shm_regions_info)); + if (err == nullptr) { err = TRITONSERVER_InferenceRequestSetReleaseCallback( irequest, InferRequestComplete, @@ -932,7 +941,8 @@ ModelInferHandler::Execute(InferHandler::State* state) err = TRITONSERVER_InferenceRequestSetResponseCallback( irequest, allocator_, &state->alloc_payload_ /* response_allocator_userp */, - InferResponseComplete, reinterpret_cast(state)); + InferResponseComplete, + response_release_payload.get() /* response_userp */); } // Get request ID for logging in case of error. const char* request_id = ""; @@ -970,8 +980,9 @@ ModelInferHandler::Execute(InferHandler::State* state) // to handle gRPC stream cancellation. if (err == nullptr) { state->context_->InsertInflightState(state); - // The payload will be cleaned in request release callback. + // The payload will be cleaned in release callback. request_release_payload.release(); + response_release_payload.release(); } else { // If error go immediately to COMPLETE. LOG_VERBOSE(1) << "[request id: " << request_id << "] " @@ -1000,7 +1011,9 @@ ModelInferHandler::InferResponseComplete( TRITONSERVER_InferenceResponse* iresponse, const uint32_t flags, void* userp) { - State* state = reinterpret_cast(userp); + ResponseReleasePayload* response_release_payload( + static_cast(userp)); + auto state = response_release_payload->state_; // There are multiple handlers registered in the gRPC service // Hence, we would need to properly synchronize this thread @@ -1042,6 +1055,7 @@ ModelInferHandler::InferResponseComplete( // in the next cycle. state->context_->PutTaskBackToQueue(state); + delete response_release_payload; return; } @@ -1104,6 +1118,8 @@ ModelInferHandler::InferResponseComplete( if (response_created) { delete response; } + + delete response_release_payload; } }}} // namespace triton::server::grpc diff --git a/src/grpc/infer_handler.h b/src/grpc/infer_handler.h index 51307d4ae0..87536dd173 100644 --- a/src/grpc/infer_handler.h +++ b/src/grpc/infer_handler.h @@ -299,7 +299,9 @@ InferAllocatorPayload( const inference::ModelInferRequest& request, std::list&& serialized_data, std::shared_ptr> response_queue, - AllocPayload* alloc_payload) + AllocPayload* alloc_payload, + std::vector>* + shm_regions_info) { alloc_payload->response_queue_ = response_queue; alloc_payload->shm_map_.clear(); @@ -335,9 +337,12 @@ InferAllocatorPayload( void* base; TRITONSERVER_MemoryType memory_type; int64_t memory_type_id; + std::shared_ptr shm_info = + nullptr; RETURN_IF_ERR(shm_manager->GetMemoryInfo( - region_name, offset, byte_size, &base, &memory_type, - &memory_type_id)); + region_name, offset, byte_size, &base, &memory_type, &memory_type_id, + &shm_info)); + shm_regions_info->emplace_back(shm_info); if (memory_type == TRITONSERVER_MEMORY_GPU) { #ifdef TRITON_ENABLE_GPU @@ -373,7 +378,9 @@ TRITONSERVER_Error* InferGRPCToInput( const std::shared_ptr& shm_manager, const inference::ModelInferRequest& request, std::list* serialized_data, - TRITONSERVER_InferenceRequest* inference_request); + TRITONSERVER_InferenceRequest* inference_request, + std::vector>* + shm_regions_info); TRITONSERVER_Error* ResponseAllocatorHelper( TRITONSERVER_ResponseAllocator* allocator, const char* tensor_name, @@ -1263,6 +1270,23 @@ class InferHandler : public HandlerBase { delete state; } + // Simple structure that carries the payload needed for + // response release callback. + struct ResponseReleasePayload final { + State* state_; + std::vector> + shm_regions_info_; + + ResponseReleasePayload( + State* state, + std::vector< + std::shared_ptr>&& + shm_regions_info) + : state_(state), shm_regions_info_(std::move(shm_regions_info)) + { + } + }; + virtual void StartNewRequest() = 0; virtual bool Process(State* state, bool rpc_ok) = 0; bool ExecutePrecondition(InferHandler::State* state); diff --git a/src/grpc/stream_infer_handler.cc b/src/grpc/stream_infer_handler.cc index 6651eca813..1f554db83c 100644 --- a/src/grpc/stream_infer_handler.cc +++ b/src/grpc/stream_infer_handler.cc @@ -282,18 +282,32 @@ ModelStreamInferHandler::Process(InferHandler::State* state, bool rpc_ok) // tensors are present in the request. std::list serialized_data; + // Maintain shared pointers(read-only reference) to the shared memory + // block's information for the shared memory regions used by the request. + // These pointers will automatically increase the usage count, preventing + // unregistration of the shared memory. This vector must be cleared in the + // `StreamInferResponseComplete` callback (after inference) to decrease the + // count and permit unregistration. The vector will be included in + // `response_release_payload` for the callback. + std::vector> + shm_regions_info; + if (err == nullptr) { err = InferGRPCToInput( - tritonserver_, shm_manager_, request, &serialized_data, irequest); + tritonserver_, shm_manager_, request, &serialized_data, irequest, + &shm_regions_info); } if (err == nullptr) { err = InferAllocatorPayload( tritonserver_, shm_manager_, request, std::move(serialized_data), - response_queue_, &state->alloc_payload_); + response_queue_, &state->alloc_payload_, &shm_regions_info); } auto request_release_payload = std::make_unique(state->inference_request_); + auto response_release_payload = std::make_unique( + state, std::move(shm_regions_info)); + if (err == nullptr) { err = TRITONSERVER_InferenceRequestSetReleaseCallback( irequest, InferRequestComplete, @@ -303,7 +317,8 @@ ModelStreamInferHandler::Process(InferHandler::State* state, bool rpc_ok) err = TRITONSERVER_InferenceRequestSetResponseCallback( irequest, allocator_, &state->alloc_payload_ /* response_allocator_userp */, - StreamInferResponseComplete, reinterpret_cast(state)); + StreamInferResponseComplete, + response_release_payload.get() /* response_userp */); } if (err == nullptr) { @@ -330,8 +345,9 @@ ModelStreamInferHandler::Process(InferHandler::State* state, bool rpc_ok) // irequest to handle gRPC stream cancellation. if (err == nullptr) { state->context_->InsertInflightState(state); - // The payload will be cleaned in request release callback. + // The payload will be cleaned in release callback. request_release_payload.release(); + response_release_payload.release(); } else { // If there was an error then enqueue the error response and show // it to be ready for writing. @@ -594,7 +610,10 @@ ModelStreamInferHandler::StreamInferResponseComplete( TRITONSERVER_InferenceResponse* iresponse, const uint32_t flags, void* userp) { - State* state = reinterpret_cast(userp); + ResponseReleasePayload* response_release_payload( + static_cast(userp)); + auto state = response_release_payload->state_; + // Ignore Response from CORE in case GRPC Strict as we dont care about if (state->context_->gRPCErrorTracker_->triton_grpc_error_) { std::lock_guard lock(state->context_->mu_); @@ -648,6 +667,7 @@ ModelStreamInferHandler::StreamInferResponseComplete( if (is_complete) { state->step_ = Steps::CANCELLED; state->context_->PutTaskBackToQueue(state); + delete response_release_payload; } state->complete_ = is_complete; @@ -695,6 +715,7 @@ ModelStreamInferHandler::StreamInferResponseComplete( LOG_TRITONSERVER_ERROR( TRITONSERVER_InferenceResponseDelete(iresponse), "deleting GRPC inference response"); + delete response_release_payload; return; } } @@ -774,6 +795,7 @@ ModelStreamInferHandler::StreamInferResponseComplete( if (is_complete) { state->step_ = Steps::CANCELLED; state->context_->PutTaskBackToQueue(state); + delete response_release_payload; } state->complete_ = is_complete; @@ -818,6 +840,10 @@ ModelStreamInferHandler::StreamInferResponseComplete( } state->complete_ = is_complete; } + + if (is_complete) { + delete response_release_payload; + } } // Changes the state of grpc_stream_error_state_ to ERROR_HANDLING_COMPLETE, diff --git a/src/http_server.cc b/src/http_server.cc index cfd1da88ae..2fa395fc98 100644 --- a/src/http_server.cc +++ b/src/http_server.cc @@ -2681,9 +2681,13 @@ HTTPAPIServer::ParseJsonTritonIO( void* base; TRITONSERVER_MemoryType memory_type; int64_t memory_type_id; + std::shared_ptr shm_info = + nullptr; RETURN_IF_ERR(shm_manager_->GetMemoryInfo( shm_region, shm_offset, byte_size, &base, &memory_type, - &memory_type_id)); + &memory_type_id, &shm_info)); + infer_req->AddShmRegionInfo(shm_info); + if (memory_type == TRITONSERVER_MEMORY_GPU) { #ifdef TRITON_ENABLE_GPU cudaIpcMemHandle_t* cuda_handle; @@ -2796,9 +2800,12 @@ HTTPAPIServer::ParseJsonTritonIO( void* base; TRITONSERVER_MemoryType memory_type; int64_t memory_type_id; + std::shared_ptr shm_info = + nullptr; RETURN_IF_ERR(shm_manager_->GetMemoryInfo( - shm_region, offset, byte_size, &base, &memory_type, - &memory_type_id)); + shm_region, offset, byte_size, &base, &memory_type, &memory_type_id, + &shm_info)); + infer_req->AddShmRegionInfo(shm_info); if (memory_type == TRITONSERVER_MEMORY_GPU) { #ifdef TRITON_ENABLE_GPU diff --git a/src/http_server.h b/src/http_server.h index 3ad3d60cc4..3949f97e27 100644 --- a/src/http_server.h +++ b/src/http_server.h @@ -311,6 +311,13 @@ class HTTPAPIServer : public HTTPServer { static void ReplyCallback(evthr_t* thr, void* arg, void* shared); + void AddShmRegionInfo( + const std::shared_ptr& + shm_info) + { + shm_regions_info_.push_back(shm_info); + } + protected: TRITONSERVER_Server* server_{nullptr}; evhtp_request_t* req_{nullptr}; @@ -330,6 +337,14 @@ class HTTPAPIServer : public HTTPServer { // TRITONSERVER_ServerInferAsync (except for cancellation). std::shared_ptr triton_request_{nullptr}; + // Maintain shared pointers(read-only reference) to the shared memory + // block's information for the shared memory regions used by the request. + // These pointers will automatically increase the usage count, preventing + // unregistration of the shared memory. This vector must be cleared when no + // longer needed to decrease the count and permit unregistration. + std::vector> + shm_regions_info_; + evhtp_res response_code_{EVHTP_RES_OK}; }; diff --git a/src/shared_memory_manager.cc b/src/shared_memory_manager.cc index 1f4a77e887..7b845709a1 100644 --- a/src/shared_memory_manager.cc +++ b/src/shared_memory_manager.cc @@ -69,7 +69,8 @@ TRITONSERVER_Error* SharedMemoryManager::GetMemoryInfo( const std::string& name, size_t offset, size_t byte_size, void** shm_mapped_addr, TRITONSERVER_MemoryType* memory_type, - int64_t* device_id) + int64_t* device_id, + std::shared_ptr* shm_info) { return TRITONSERVER_ErrorNew( TRITONSERVER_ERROR_UNSUPPORTED, @@ -408,9 +409,9 @@ SharedMemoryManager::RegisterSystemSharedMemory( } shared_memory_map_.insert(std::make_pair( - name, std::unique_ptr(new SharedMemoryInfo( + name, std::make_shared( name, shm_key, offset, byte_size, shm_fd, mapped_addr, - TRITONSERVER_MEMORY_CPU, 0)))); + TRITONSERVER_MEMORY_CPU, 0))); return nullptr; // success } @@ -444,9 +445,9 @@ SharedMemoryManager::RegisterCUDASharedMemory( name, reinterpret_cast(mapped_addr), byte_size)); shared_memory_map_.insert(std::make_pair( - name, std::unique_ptr(new CUDASharedMemoryInfo( + name, std::make_shared( name, "", 0, byte_size, 0, mapped_addr, TRITONSERVER_MEMORY_GPU, - device_id, cuda_shm_handle)))); + device_id, cuda_shm_handle))); return nullptr; // success } @@ -456,7 +457,8 @@ TRITONSERVER_Error* SharedMemoryManager::GetMemoryInfo( const std::string& name, size_t offset, size_t byte_size, void** shm_mapped_addr, TRITONSERVER_MemoryType* memory_type, - int64_t* device_id) + int64_t* device_id, + std::shared_ptr* shm_info) { // protect shared_memory_map_ from concurrent access std::lock_guard lock(mu_); @@ -494,6 +496,10 @@ SharedMemoryManager::GetMemoryInfo( .c_str()); } + if (shm_info != nullptr) { + *shm_info = std::static_pointer_cast(it->second); + } + if (it->second->kind_ == TRITONSERVER_MEMORY_CPU) { *shm_mapped_addr = (void*)((uint8_t*)it->second->mapped_addr_ + it->second->offset_ + offset); @@ -561,11 +567,19 @@ SharedMemoryManager::GetStatus( } else { auto it = shared_memory_map_.find(name); if (it == shared_memory_map_.end()) { - return TRITONSERVER_ErrorNew( - TRITONSERVER_ERROR_NOT_FOUND, - std::string( - "Unable to find system shared memory region: '" + name + "'") - .c_str()); + if (memory_type == TRITONSERVER_MEMORY_GPU) { + return TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_NOT_FOUND, + std::string( + "Unable to find cuda shared memory region: '" + name + "'") + .c_str()); + } else { + return TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_NOT_FOUND, + std::string( + "Unable to find system shared memory region: '" + name + "'") + .c_str()); + } } if (it->second->kind_ != memory_type) { @@ -632,6 +646,7 @@ SharedMemoryManager::UnregisterAll(TRITONSERVER_MemoryType memory_type) TRITONSERVER_Error* err = UnregisterHelper(it->first, memory_type); if (err != nullptr) { unregister_fails.push_back(it->first); + LOG_VERBOSE(1) << TRITONSERVER_ErrorMessage(err); } } } @@ -645,6 +660,7 @@ SharedMemoryManager::UnregisterAll(TRITONSERVER_MemoryType memory_type) ; if (err != nullptr) { unregister_fails.push_back(it->first); + LOG_VERBOSE(1) << TRITONSERVER_ErrorMessage(err); } } } @@ -669,6 +685,15 @@ SharedMemoryManager::UnregisterHelper( // Must hold the lock on register_mu_ while calling this function. auto it = shared_memory_map_.find(name); if (it != shared_memory_map_.end() && it->second->kind_ == memory_type) { + if (it->second.use_count() > 1) { + return TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INTERNAL, + std::string( + "Cannot unregister shared memory region '" + name + + "', it is currently in use.") + .c_str()); + } + if (it->second->kind_ == TRITONSERVER_MEMORY_CPU) { RETURN_IF_ERR( UnmapSharedMemory(it->second->mapped_addr_, it->second->byte_size_)); diff --git a/src/shared_memory_manager.h b/src/shared_memory_manager.h index 51eb0f0786..393fd29128 100644 --- a/src/shared_memory_manager.h +++ b/src/shared_memory_manager.h @@ -1,4 +1,4 @@ -// Copyright 2019-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2019-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -50,6 +50,48 @@ class SharedMemoryManager { SharedMemoryManager() = default; ~SharedMemoryManager(); + /// A struct that records the shared memory regions registered by the shared + /// memory manager. + struct SharedMemoryInfo { + SharedMemoryInfo( + const std::string& name, const std::string& shm_key, + const size_t offset, const size_t byte_size, int shm_fd, + void* mapped_addr, const TRITONSERVER_MemoryType kind, + const int64_t device_id) + : name_(name), shm_key_(shm_key), offset_(offset), + byte_size_(byte_size), shm_fd_(shm_fd), mapped_addr_(mapped_addr), + kind_(kind), device_id_(device_id) + { + } + + std::string name_; + std::string shm_key_; + size_t offset_; + size_t byte_size_; + int shm_fd_; + void* mapped_addr_; + TRITONSERVER_MemoryType kind_; + int64_t device_id_; + }; + +#ifdef TRITON_ENABLE_GPU + struct CUDASharedMemoryInfo : SharedMemoryInfo { + CUDASharedMemoryInfo( + const std::string& name, const std::string& shm_key, + const size_t offset, const size_t byte_size, int shm_fd, + void* mapped_addr, const TRITONSERVER_MemoryType kind, + const int64_t device_id, const cudaIpcMemHandle_t* cuda_ipc_handle) + : SharedMemoryInfo( + name, shm_key, offset, byte_size, shm_fd, mapped_addr, kind, + device_id), + cuda_ipc_handle_(*cuda_ipc_handle) + { + } + + cudaIpcMemHandle_t cuda_ipc_handle_; + }; +#endif + /// Add a shared memory block representing shared memory in system /// (CPU) memory to the manager. Return TRITONSERVER_ERROR_ALREADY_EXISTS /// if a shared memory block of the same name already exists in the manager. @@ -90,11 +132,18 @@ class SharedMemoryManager { /// \param memory_type Returns the type of the memory /// \param device_id Returns the device id associated with the /// memory block - /// \return a TRITONSERVER_Error indicating success or failure. + /// \param shm_info Returns a shared pointer reference(read-only) to the + /// shared memory block's information. + /// This pointer will automatically increase the usage count, preventing + /// unregistration while the reference is held. The reference must be cleared + /// or set to nullptr when no longer needed, to decrease the count and allow + /// unregistration. + /// \return a TRITONSERVER_Error indicating success or + /// failure. TRITONSERVER_Error* GetMemoryInfo( const std::string& name, size_t offset, size_t byte_size, void** shm_mapped_addr, TRITONSERVER_MemoryType* memory_type, - int64_t* device_id); + int64_t* device_id, std::shared_ptr* shm_info); #ifdef TRITON_ENABLE_GPU /// Get the CUDA memory handle associated with the block name. @@ -139,50 +188,8 @@ class SharedMemoryManager { TRITONSERVER_Error* UnregisterHelper( const std::string& name, TRITONSERVER_MemoryType memory_type); - /// A struct that records the shared memory regions registered by the shared - /// memory manager. - struct SharedMemoryInfo { - SharedMemoryInfo( - const std::string& name, const std::string& shm_key, - const size_t offset, const size_t byte_size, int shm_fd, - void* mapped_addr, const TRITONSERVER_MemoryType kind, - const int64_t device_id) - : name_(name), shm_key_(shm_key), offset_(offset), - byte_size_(byte_size), shm_fd_(shm_fd), mapped_addr_(mapped_addr), - kind_(kind), device_id_(device_id) - { - } - - std::string name_; - std::string shm_key_; - size_t offset_; - size_t byte_size_; - int shm_fd_; - void* mapped_addr_; - TRITONSERVER_MemoryType kind_; - int64_t device_id_; - }; - -#ifdef TRITON_ENABLE_GPU - struct CUDASharedMemoryInfo : SharedMemoryInfo { - CUDASharedMemoryInfo( - const std::string& name, const std::string& shm_key, - const size_t offset, const size_t byte_size, int shm_fd, - void* mapped_addr, const TRITONSERVER_MemoryType kind, - const int64_t device_id, const cudaIpcMemHandle_t* cuda_ipc_handle) - : SharedMemoryInfo( - name, shm_key, offset, byte_size, shm_fd, mapped_addr, kind, - device_id), - cuda_ipc_handle_(*cuda_ipc_handle) - { - } - - cudaIpcMemHandle_t cuda_ipc_handle_; - }; -#endif - using SharedMemoryStateMap = - std::map>; + std::map>; // A map between the name and the details of the associated // shared memory block SharedMemoryStateMap shared_memory_map_; From 363bcdcd03cddcd00979c7fd3315557328221c6d Mon Sep 17 00:00:00 2001 From: Francesco Petrini Date: Wed, 11 Sep 2024 16:27:28 -0700 Subject: [PATCH 5/6] build/test: RHEL8 EA3 (#7595) --- build.py | 11 +++++++---- qa/L0_sequence_batcher/test.sh | 21 ++++++++++++++++++--- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/build.py b/build.py index 3195c50cbb..4d4d911468 100755 --- a/build.py +++ b/build.py @@ -1374,12 +1374,15 @@ def dockerfile_prepare_container_linux(argmap, backends, enable_gpu, target_mach if enable_gpu: df += install_dcgm_libraries(argmap["DCGM_VERSION"], target_machine) - df += """ + # This segment will break the RHEL SBSA build. Need to determine whether + # this is necessary to incorporate. + if target_platform() != "rhel": + df += """ # Extra defensive wiring for CUDA Compat lib RUN ln -sf ${_CUDA_COMPAT_PATH}/lib.real ${_CUDA_COMPAT_PATH}/lib \\ - && echo ${_CUDA_COMPAT_PATH}/lib > /etc/ld.so.conf.d/00-cuda-compat.conf \\ - && ldconfig \\ - && rm -f ${_CUDA_COMPAT_PATH}/lib + && echo ${_CUDA_COMPAT_PATH}/lib > /etc/ld.so.conf.d/00-cuda-compat.conf \\ + && ldconfig \\ + && rm -f ${_CUDA_COMPAT_PATH}/lib """ else: df += add_cpu_libs_to_linux_dockerfile(backends, target_machine) diff --git a/qa/L0_sequence_batcher/test.sh b/qa/L0_sequence_batcher/test.sh index 23ee387b55..ac34458b4e 100755 --- a/qa/L0_sequence_batcher/test.sh +++ b/qa/L0_sequence_batcher/test.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright 2018-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright 2018-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions @@ -183,6 +183,16 @@ export USE_SINGLE_BUFFER # models4 - four instances with batch-size 1 rm -fr *.log models{0,1,2,4} queue_delay_models && mkdir models{0,1,2,4} queue_delay_models +# Search BACKENDS to determine if a backend should be tested +function should_test_backend() { + local target_backend=$1 + if [[ $(echo "${BACKENDS[@]}" | grep -c "${target_backend}") -ne 0 ]]; then + echo "true" + return + fi + echo "false" +} + # Get the datatype to use based on the backend function get_datatype () { local dtype="int32 bool" @@ -827,8 +837,13 @@ fi ### Start Preserve Ordering Tests ### -# Test only supported on windows currently due to use of python backend models -if [ ${WINDOWS} -ne 1 ]; then +# FIXME: Test only supported on windows currently due to use of python backend models. +# Now that Windows supports the PYBE, we should check that this tests works once Windows +# CI is stable. + +# These subtests use python models. They should not be executed if 'python' is not one +# of the backends under test. +if [[ $(should_test_backend "python") == "true" && !( -v WSL_DISTRO_NAME || -v MSYSTEM )]]; then # Test preserve ordering true/false and decoupled/non-decoupled TEST_CASE=SequenceBatcherPreserveOrderingTest MODEL_PATH=preserve_ordering_models From 68d4c01e4491e6bb033a4063b67eb41b55cb4ea4 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Tue, 17 Sep 2024 23:37:30 +0530 Subject: [PATCH 6/6] Fix: Add mutex lock for state completion check in gRPC streaming to prevent race condition (#7617) --- src/grpc/stream_infer_handler.cc | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/grpc/stream_infer_handler.cc b/src/grpc/stream_infer_handler.cc index 1f554db83c..cf788b1e09 100644 --- a/src/grpc/stream_infer_handler.cc +++ b/src/grpc/stream_infer_handler.cc @@ -537,15 +537,18 @@ ModelStreamInferHandler::Process(InferHandler::State* state, bool rpc_ok) } else if (state->step_ == Steps::WRITEREADY) { // Finish the state if all the transactions associated with // the state have completed. - if (state->IsComplete()) { - state->context_->DecrementRequestCounter(); - finished = Finish(state); - } else { - LOG_ERROR << "Should not print this! Decoupled should NOT write via " - "WRITEREADY!"; - // Remove the state from the completion queue - std::lock_guard lock(state->step_mtx_); - state->step_ = Steps::ISSUED; + std::lock_guard lk1(state->context_->mu_); + { + if (state->IsComplete()) { + state->context_->DecrementRequestCounter(); + finished = Finish(state); + } else { + LOG_ERROR << "Should not print this! Decoupled should NOT write via " + "WRITEREADY!"; + // Remove the state from the completion queue + std::lock_guard lock(state->step_mtx_); + state->step_ = Steps::ISSUED; + } } } }