diff --git a/Dockerfile.QA b/Dockerfile.QA index 4b14de77c5..7183952111 100644 --- a/Dockerfile.QA +++ b/Dockerfile.QA @@ -154,8 +154,19 @@ RUN cd ${TRITONTMP_DIR}/tritonbuild/identity && \ -DTRITON_BACKEND_REPO_TAG:STRING=${TRITON_BACKEND_REPO_TAG} .. && \ make -j16 install +# L0_backend_python test require triton_shm_monitor +RUN cd ${TRITONTMP_DIR}/tritonbuild/python && \ + rm -rf install build && mkdir build && cd build && \ + cmake -DCMAKE_INSTALL_PREFIX:PATH=${TRITONTMP_DIR}/tritonbuild/python/install \ + -DTRITON_COMMON_REPO_TAG:STRING=${TRITON_COMMON_REPO_TAG} \ + -DTRITON_CORE_REPO_TAG:STRING=${TRITON_CORE_REPO_TAG} \ + -DTRITON_BACKEND_REPO_TAG:STRING=${TRITON_BACKEND_REPO_TAG} .. && \ + make -j18 triton-shm-monitor install + RUN cp ${TRITONTMP_DIR}/tritonbuild/identity/install/backends/identity/libtriton_identity.so \ qa/L0_lifecycle/. && \ + cp ${TRITONTMP_DIR}/tritonbuild/python/install/backends/python/triton_shm_monitor*.so \ + qa/common/. && \ mkdir -p qa/L0_perf_nomodel/custom_models/custom_zero_1_float32/1 && \ mkdir -p qa/L0_perf_pyclients/custom_models/custom_zero_1_int32/1 && \ mkdir -p qa/L0_infer_shm && \ diff --git a/qa/L0_backend_python/ensemble/ensemble_test.py b/qa/L0_backend_python/ensemble/ensemble_test.py index f9dc0e29ae..831f1fa5a3 100644 --- a/qa/L0_backend_python/ensemble/ensemble_test.py +++ b/qa/L0_backend_python/ensemble/ensemble_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021, NVIDIA CORPORATION. All rights reserved. +# Copyright 2021-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions @@ -28,6 +28,7 @@ sys.path.append("../../common") import test_util as tu +import shm_util import tritonclient.http as httpclient from tritonclient.utils import * import numpy as np @@ -36,49 +37,58 @@ class EnsembleTest(tu.TestResultCollector): + def setUp(self): + self._shm_leak_detector = shm_util.ShmLeakDetector() + def test_ensemble(self): model_name = "ensemble" shape = [16] - with httpclient.InferenceServerClient("localhost:8000") as client: - input_data_0 = np.random.random(shape).astype(np.float32) - input_data_1 = np.random.random(shape).astype(np.float32) - inputs = [ - httpclient.InferInput("INPUT0", input_data_0.shape, - np_to_triton_dtype(input_data_0.dtype)), - httpclient.InferInput("INPUT1", input_data_1.shape, - np_to_triton_dtype(input_data_1.dtype)) - ] - inputs[0].set_data_from_numpy(input_data_0) - inputs[1].set_data_from_numpy(input_data_1) - result = client.infer(model_name, inputs) - output0 = result.as_numpy('OUTPUT0') - output1 = result.as_numpy('OUTPUT1') - self.assertIsNotNone(output0) - self.assertIsNotNone(output1) + with self._shm_leak_detector.Probe() as shm_probe: + with httpclient.InferenceServerClient("localhost:8000") as client: + input_data_0 = np.random.random(shape).astype(np.float32) + input_data_1 = np.random.random(shape).astype(np.float32) + inputs = [ + httpclient.InferInput( + "INPUT0", input_data_0.shape, + np_to_triton_dtype(input_data_0.dtype)), + httpclient.InferInput( + "INPUT1", input_data_1.shape, + np_to_triton_dtype(input_data_1.dtype)) + ] + inputs[0].set_data_from_numpy(input_data_0) + inputs[1].set_data_from_numpy(input_data_1) + result = client.infer(model_name, inputs) + output0 = result.as_numpy('OUTPUT0') + output1 = result.as_numpy('OUTPUT1') + self.assertIsNotNone(output0) + self.assertIsNotNone(output1) - self.assertTrue(np.allclose(output0, 2 * input_data_0)) - self.assertTrue(np.allclose(output1, 2 * input_data_1)) + self.assertTrue(np.allclose(output0, 2 * input_data_0)) + self.assertTrue(np.allclose(output1, 2 * input_data_1)) model_name = "ensemble_gpu" - with httpclient.InferenceServerClient("localhost:8000") as client: - input_data_0 = np.random.random(shape).astype(np.float32) - input_data_1 = np.random.random(shape).astype(np.float32) - inputs = [ - httpclient.InferInput("INPUT0", input_data_0.shape, - np_to_triton_dtype(input_data_0.dtype)), - httpclient.InferInput("INPUT1", input_data_1.shape, - np_to_triton_dtype(input_data_1.dtype)) - ] - inputs[0].set_data_from_numpy(input_data_0) - inputs[1].set_data_from_numpy(input_data_1) - result = client.infer(model_name, inputs) - output0 = result.as_numpy('OUTPUT0') - output1 = result.as_numpy('OUTPUT1') - self.assertIsNotNone(output0) - self.assertIsNotNone(output1) + with self._shm_leak_detector.Probe() as shm_probe: + with httpclient.InferenceServerClient("localhost:8000") as client: + input_data_0 = np.random.random(shape).astype(np.float32) + input_data_1 = np.random.random(shape).astype(np.float32) + inputs = [ + httpclient.InferInput( + "INPUT0", input_data_0.shape, + np_to_triton_dtype(input_data_0.dtype)), + httpclient.InferInput( + "INPUT1", input_data_1.shape, + np_to_triton_dtype(input_data_1.dtype)) + ] + inputs[0].set_data_from_numpy(input_data_0) + inputs[1].set_data_from_numpy(input_data_1) + result = client.infer(model_name, inputs) + output0 = result.as_numpy('OUTPUT0') + output1 = result.as_numpy('OUTPUT1') + self.assertIsNotNone(output0) + self.assertIsNotNone(output1) - self.assertTrue(np.allclose(output0, 2 * input_data_0)) - self.assertTrue(np.allclose(output1, 2 * input_data_1)) + self.assertTrue(np.allclose(output0, 2 * input_data_0)) + self.assertTrue(np.allclose(output1, 2 * input_data_1)) if __name__ == '__main__': diff --git a/qa/L0_backend_python/io/io_test.py b/qa/L0_backend_python/io/io_test.py index 2bf9a406e6..fd891b4248 100644 --- a/qa/L0_backend_python/io/io_test.py +++ b/qa/L0_backend_python/io/io_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021, NVIDIA CORPORATION. All rights reserved. +# Copyright 2021-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions @@ -29,6 +29,7 @@ sys.path.append("../../common") import test_util as tu +import shm_util import tritonclient.http as httpclient from tritonclient.utils import * import numpy as np @@ -37,30 +38,35 @@ class IOTest(tu.TestResultCollector): + def setUp(self): + self._shm_leak_detector = shm_util.ShmLeakDetector() + def test_ensemble_io(self): model_name = "ensemble_io" - with httpclient.InferenceServerClient("localhost:8000") as client: - input0 = np.random.random([1000]).astype(np.float32) - for model_1_in_gpu in [True, False]: - for model_2_in_gpu in [True, False]: - for model_3_in_gpu in [True, False]: - gpu_output = np.asarray( - [model_1_in_gpu, model_2_in_gpu, model_3_in_gpu], - dtype=bool) - inputs = [ - httpclient.InferInput( - "INPUT0", input0.shape, - np_to_triton_dtype(input0.dtype)), - httpclient.InferInput( - "GPU_OUTPUT", gpu_output.shape, - np_to_triton_dtype(gpu_output.dtype)) - ] - inputs[0].set_data_from_numpy(input0) - inputs[1].set_data_from_numpy(gpu_output) - result = client.infer(model_name, inputs) - output0 = result.as_numpy('OUTPUT0') - self.assertIsNotNone(output0) - self.assertTrue(np.all(output0 == input0)) + with self._shm_leak_detector.Probe() as shm_probe: + with httpclient.InferenceServerClient("localhost:8000") as client: + input0 = np.random.random([1000]).astype(np.float32) + for model_1_in_gpu in [True, False]: + for model_2_in_gpu in [True, False]: + for model_3_in_gpu in [True, False]: + gpu_output = np.asarray([ + model_1_in_gpu, model_2_in_gpu, model_3_in_gpu + ], + dtype=bool) + inputs = [ + httpclient.InferInput( + "INPUT0", input0.shape, + np_to_triton_dtype(input0.dtype)), + httpclient.InferInput( + "GPU_OUTPUT", gpu_output.shape, + np_to_triton_dtype(gpu_output.dtype)) + ] + inputs[0].set_data_from_numpy(input0) + inputs[1].set_data_from_numpy(gpu_output) + result = client.infer(model_name, inputs) + output0 = result.as_numpy('OUTPUT0') + self.assertIsNotNone(output0) + self.assertTrue(np.all(output0 == input0)) if __name__ == '__main__': diff --git a/qa/L0_backend_python/lifecycle/lifecycle_test.py b/qa/L0_backend_python/lifecycle/lifecycle_test.py index f2832de9e4..f9805d7984 100644 --- a/qa/L0_backend_python/lifecycle/lifecycle_test.py +++ b/qa/L0_backend_python/lifecycle/lifecycle_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2021, NVIDIA CORPORATION. All rights reserved. +# Copyright 2019-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions @@ -28,6 +28,7 @@ sys.path.append("../../common") import test_util as tu +import shm_util from functools import partial import tritonclient.http as httpclient import tritonclient.grpc as grpcclient @@ -52,6 +53,9 @@ def callback(user_data, result, error): class LifecycleTest(tu.TestResultCollector): + def setUp(self): + self._shm_leak_detector = shm_util.ShmLeakDetector() + def test_batch_error(self): # The execute_error model returns an error for the first request and # sucessfully processes the second request. This is making sure that @@ -63,87 +67,91 @@ def test_batch_error(self): triton_client = grpcclient.InferenceServerClient("localhost:8001") triton_client.start_stream(callback=partial(callback, user_data)) - input_datas = [] - for i in range(number_of_requests): - input_data = np.random.randn(*shape).astype(np.float32) - input_datas.append(input_data) - inputs = [ - grpcclient.InferInput("IN", input_data.shape, - np_to_triton_dtype(input_data.dtype)) - ] - inputs[0].set_data_from_numpy(input_data) - triton_client.async_stream_infer(model_name=model_name, - inputs=inputs) - - for i in range(number_of_requests): - result = user_data._completed_requests.get() - if i == 0: - self.assertIs(type(result), InferenceServerException) - continue - - print(result) - output_data = result.as_numpy("OUT") - self.assertIsNotNone(output_data, "error: expected 'OUT'") - self.assertTrue( - np.array_equal(output_data, input_datas[i]), - "error: expected output {} to match input {}".format( - output_data, input_datas[i])) + with self._shm_leak_detector.Probe() as shm_probe: + input_datas = [] + for i in range(number_of_requests): + input_data = np.random.randn(*shape).astype(np.float32) + input_datas.append(input_data) + inputs = [ + grpcclient.InferInput("IN", input_data.shape, + np_to_triton_dtype(input_data.dtype)) + ] + inputs[0].set_data_from_numpy(input_data) + triton_client.async_stream_infer(model_name=model_name, + inputs=inputs) + + for i in range(number_of_requests): + result = user_data._completed_requests.get() + if i == 0: + self.assertIs(type(result), InferenceServerException) + continue + + print(result) + output_data = result.as_numpy("OUT") + self.assertIsNotNone(output_data, "error: expected 'OUT'") + self.assertTrue( + np.array_equal(output_data, input_datas[i]), + "error: expected output {} to match input {}".format( + output_data, input_datas[i])) def test_infer_pymodel_error(self): model_name = "wrong_model" shape = [2, 2] - with httpclient.InferenceServerClient("localhost:8000") as client: - input_data = (16384 * np.random.randn(*shape)).astype(np.uint32) - inputs = [ - httpclient.InferInput("IN", input_data.shape, - np_to_triton_dtype(input_data.dtype)) - ] - inputs[0].set_data_from_numpy(input_data) - try: - client.infer(model_name, inputs) - except InferenceServerException as e: - print(e.message()) - self.assertTrue( - e.message().startswith( - "Failed to process the request(s) for model instance"), - "Exception message is not correct") - else: - self.assertTrue( - False, - "Wrong exception raised or did not raise an exception") + + with self._shm_leak_detector.Probe() as shm_probe: + with httpclient.InferenceServerClient("localhost:8000") as client: + input_data = (16384 * np.random.randn(*shape)).astype(np.uint32) + inputs = [ + httpclient.InferInput("IN", input_data.shape, + np_to_triton_dtype(input_data.dtype)) + ] + inputs[0].set_data_from_numpy(input_data) + try: + client.infer(model_name, inputs) + except InferenceServerException as e: + print(e.message()) + self.assertTrue( + e.message().startswith( + "Failed to process the request(s) for model instance" + ), "Exception message is not correct") + else: + self.assertTrue( + False, + "Wrong exception raised or did not raise an exception") def test_incorrect_execute_return(self): model_name = 'execute_return_error' shape = [1, 1] - with httpclient.InferenceServerClient("localhost:8000") as client: - input_data = (5 * np.random.randn(*shape)).astype(np.float32) - inputs = [ - httpclient.InferInput("INPUT", input_data.shape, - np_to_triton_dtype(input_data.dtype)) - ] - inputs[0].set_data_from_numpy(input_data) - - # The first request to this model will return None. - with self.assertRaises(InferenceServerException) as e: - client.infer(model_name, inputs) - - self.assertTrue( - str(e.exception).startswith( - "Failed to process the request(s) for model instance " - "'execute_return_error_0', message: Expected a list in the " - "execute return"), "Exception message is not correct.") - - # The second inference request will return a list of None object - # instead of Python InferenceResponse objects. - with self.assertRaises(InferenceServerException) as e: - client.infer(model_name, inputs) - - self.assertTrue( - str(e.exception).startswith( - "Failed to process the request(s) for model instance " - "'execute_return_error_0', message: Expected an " - "'InferenceResponse' object in the execute function return" - " list"), "Exception message is not correct.") + with self._shm_leak_detector.Probe() as shm_probe: + with httpclient.InferenceServerClient("localhost:8000") as client: + input_data = (5 * np.random.randn(*shape)).astype(np.float32) + inputs = [ + httpclient.InferInput("INPUT", input_data.shape, + np_to_triton_dtype(input_data.dtype)) + ] + inputs[0].set_data_from_numpy(input_data) + + # The first request to this model will return None. + with self.assertRaises(InferenceServerException) as e: + client.infer(model_name, inputs) + + self.assertTrue( + str(e.exception).startswith( + "Failed to process the request(s) for model instance " + "'execute_return_error_0', message: Expected a list in the " + "execute return"), "Exception message is not correct.") + + # The second inference request will return a list of None object + # instead of Python InferenceResponse objects. + with self.assertRaises(InferenceServerException) as e: + client.infer(model_name, inputs) + + self.assertTrue( + str(e.exception).startswith( + "Failed to process the request(s) for model instance " + "'execute_return_error_0', message: Expected an " + "'InferenceResponse' object in the execute function return" + " list"), "Exception message is not correct.") if __name__ == '__main__': diff --git a/qa/L0_backend_python/model_control/model_control_test.py b/qa/L0_backend_python/model_control/model_control_test.py index 8dc7dd2aec..feceda01e4 100644 --- a/qa/L0_backend_python/model_control/model_control_test.py +++ b/qa/L0_backend_python/model_control/model_control_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021, NVIDIA CORPORATION. All rights reserved. +# Copyright 2021-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions @@ -33,20 +33,26 @@ from tritonclient.utils import * import numpy as np import unittest +import shm_util class ExplicitModelTest(tu.TestResultCollector): + def setUp(self): + self._shm_leak_detector = shm_util.ShmLeakDetector() + def send_identity_request(self, client, model_name): inputs = [] inputs.append(httpclient.InferInput('INPUT0', [1, 16], "FP32")) input0_data = np.arange(start=0, stop=16, dtype=np.float32) input0_data = np.expand_dims(input0_data, axis=0) inputs[0].set_data_from_numpy(input0_data) - result = client.infer( - model_name=model_name, - inputs=inputs, - outputs=[httpclient.InferRequestedOutput('OUTPUT0')]) + + with self._shm_leak_detector.Probe() as shm_probe: + result = client.infer( + model_name=model_name, + inputs=inputs, + outputs=[httpclient.InferRequestedOutput('OUTPUT0')]) output_numpy = result.as_numpy('OUTPUT0') self.assertTrue(np.all(input0_data == output_numpy)) diff --git a/qa/L0_backend_python/python_test.py b/qa/L0_backend_python/python_test.py index 3c34423865..9d7bac54ba 100644 --- a/qa/L0_backend_python/python_test.py +++ b/qa/L0_backend_python/python_test.py @@ -1,6 +1,6 @@ #!/usr/bin/python -# Copyright 2019-2021, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright 2019-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions @@ -33,6 +33,7 @@ import unittest import numpy as np import test_util as tu +import shm_util import requests as httpreq import os @@ -44,6 +45,9 @@ class PythonTest(tu.TestResultCollector): + def setUp(self): + self._shm_leak_detector = shm_util.ShmLeakDetector() + def _infer_help(self, model_name, shape, data_type): with httpclient.InferenceServerClient("localhost:8000") as client: input_data_0 = np.array(np.random.randn(*shape), dtype=data_type) @@ -52,12 +56,14 @@ def _infer_help(self, model_name, shape, data_type): np_to_triton_dtype(input_data_0.dtype)) ] inputs[0].set_data_from_numpy(input_data_0) + result = client.infer(model_name, inputs) output0 = result.as_numpy('OUTPUT0') self.assertTrue(np.all(input_data_0 == output0)) - # We do not use a docker on Jetson so it does not impose a shared memory allocation limit of 1GB. - # This means test will pass without the expected error on jetson and is hence unnecessary. + # We do not use a docker on Jetson so it does not impose a shared memory + # allocation limit of 1GB. This means test will pass without the expected + # error on jetson and is hence unnecessary. if not TEST_JETSON: def test_growth_error(self): @@ -66,7 +72,8 @@ def test_growth_error(self): shape = [total_byte_size] model_name = 'identity_uint8_nobatch' dtype = np.uint8 - self._infer_help(model_name, shape, dtype) + with self._shm_leak_detector.Probe() as shm_probe: + self._infer_help(model_name, shape, dtype) # 1 GiB payload leads to error in the main Python backned process. # Total shared memory available is 1GiB. @@ -89,181 +96,197 @@ def test_growth_error(self): # Send a small paylaod to make sure it is still working properly total_byte_size = 2 * 1024 * 1024 shape = [total_byte_size] - self._infer_help(model_name, shape, dtype) + with self._shm_leak_detector.Probe() as shm_probe: + self._infer_help(model_name, shape, dtype) def test_async_infer(self): model_name = "identity_uint8" request_parallelism = 4 shape = [2, 2] - with httpclient.InferenceServerClient( - "localhost:8000", concurrency=request_parallelism) as client: - input_datas = [] - requests = [] - for i in range(request_parallelism): - input_data = (16384 * np.random.randn(*shape)).astype(np.uint8) - input_datas.append(input_data) + + with self._shm_leak_detector.Probe() as shm_probe: + with httpclient.InferenceServerClient( + "localhost:8000", + concurrency=request_parallelism) as client: + input_datas = [] + requests = [] + for i in range(request_parallelism): + input_data = (16384 * np.random.randn(*shape)).astype( + np.uint8) + input_datas.append(input_data) + inputs = [ + httpclient.InferInput( + "INPUT0", input_data.shape, + np_to_triton_dtype(input_data.dtype)) + ] + inputs[0].set_data_from_numpy(input_data) + requests.append(client.async_infer(model_name, inputs)) + + for i in range(request_parallelism): + # Get the result from the initiated asynchronous inference request. + # Note the call will block till the server responds. + results = requests[i].get_result() + + output_data = results.as_numpy("OUTPUT0") + self.assertIsNotNone(output_data, + "error: expected 'OUTPUT0'") + self.assertTrue( + np.array_equal(output_data, input_datas[i]), + "error: expected output {} to match input {}".format( + output_data, input_datas[i])) + + # Make sure the requests ran in parallel. + stats = client.get_inference_statistics(model_name) + test_cond = (len(stats['model_stats']) != 1) or ( + stats['model_stats'][0]['name'] != model_name) + self.assertFalse( + test_cond, + "error: expected statistics for {}".format(model_name)) + + stat = stats['model_stats'][0] + self.assertFalse((stat['inference_count'] != 8) or ( + stat['execution_count'] != 1 + ), "error: expected execution_count == 1 and inference_count == 8, got {} and {}" + .format(stat['execution_count'], + stat['inference_count'])) + batch_stat = stat['batch_stats'][0] + self.assertFalse( + batch_stat['batch_size'] != 8, + f"error: expected batch_size == 8, got {batch_stat['batch_size']}" + ) + # Check metrics to make sure they are reported correctly + metrics = httpreq.get('http://localhost:8002/metrics') + print(metrics.text) + + success_str = 'nv_inference_request_success{model="identity_uint8",version="1"}' + infer_count_str = 'nv_inference_count{model="identity_uint8",version="1"}' + infer_exec_str = 'nv_inference_exec_count{model="identity_uint8",version="1"}' + + success_val = None + infer_count_val = None + infer_exec_val = None + for line in metrics.text.splitlines(): + if line.startswith(success_str): + success_val = float(line[len(success_str):]) + if line.startswith(infer_count_str): + infer_count_val = float(line[len(infer_count_str):]) + if line.startswith(infer_exec_str): + infer_exec_val = float(line[len(infer_exec_str):]) + + self.assertFalse( + success_val != 4, + "error: expected metric {} == 4, got {}".format( + success_str, success_val)) + self.assertFalse( + infer_count_val != 8, + "error: expected metric {} == 8, got {}".format( + infer_count_str, infer_count_val)) + self.assertFalse( + infer_exec_val != 1, + "error: expected metric {} == 1, got {}".format( + infer_exec_str, infer_exec_val)) + + def test_bool(self): + model_name = 'identity_bool' + with self._shm_leak_detector.Probe() as shm_probe: + with httpclient.InferenceServerClient("localhost:8000") as client: + input_data = np.array([[True, False, True]], dtype=bool) inputs = [ httpclient.InferInput("INPUT0", input_data.shape, np_to_triton_dtype(input_data.dtype)) ] inputs[0].set_data_from_numpy(input_data) - requests.append(client.async_infer(model_name, inputs)) - - for i in range(request_parallelism): - # Get the result from the initiated asynchronous inference request. - # Note the call will block till the server responds. - results = requests[i].get_result() - print(results) - - output_data = results.as_numpy("OUTPUT0") - self.assertIsNotNone(output_data, "error: expected 'OUTPUT0'") - self.assertTrue( - np.array_equal(output_data, input_datas[i]), - "error: expected output {} to match input {}".format( - output_data, input_datas[i])) - - # Make sure the requests ran in parallel. - stats = client.get_inference_statistics(model_name) - test_cond = (len(stats['model_stats']) != - 1) or (stats['model_stats'][0]['name'] != model_name) - self.assertFalse( - test_cond, - "error: expected statistics for {}".format(model_name)) - - stat = stats['model_stats'][0] - self.assertFalse((stat['inference_count'] != 8) or ( - stat['execution_count'] != 1 - ), "error: expected execution_count == 1 and inference_count == 8, got {} and {}" - .format(stat['execution_count'], - stat['inference_count'])) - batch_stat = stat['batch_stats'][0] - self.assertFalse( - batch_stat['batch_size'] != 8, - f"error: expected batch_size == 8, got {batch_stat['batch_size']}" - ) - # Check metrics to make sure they are reported correctly - metrics = httpreq.get('http://localhost:8002/metrics') - print(metrics.text) - - success_str = 'nv_inference_request_success{model="identity_uint8",version="1"}' - infer_count_str = 'nv_inference_count{model="identity_uint8",version="1"}' - infer_exec_str = 'nv_inference_exec_count{model="identity_uint8",version="1"}' - - success_val = None - infer_count_val = None - infer_exec_val = None - for line in metrics.text.splitlines(): - if line.startswith(success_str): - success_val = float(line[len(success_str):]) - if line.startswith(infer_count_str): - infer_count_val = float(line[len(infer_count_str):]) - if line.startswith(infer_exec_str): - infer_exec_val = float(line[len(infer_exec_str):]) - - self.assertFalse( - success_val != 4, - "error: expected metric {} == 4, got {}".format( - success_str, success_val)) - self.assertFalse( - infer_count_val != 8, - "error: expected metric {} == 8, got {}".format( - infer_count_str, infer_count_val)) - self.assertFalse( - infer_exec_val != 1, - "error: expected metric {} == 1, got {}".format( - infer_exec_str, infer_exec_val)) - - def test_bool(self): - model_name = 'identity_bool' - with httpclient.InferenceServerClient("localhost:8000") as client: - input_data = np.array([[True, False, True]], dtype=bool) - inputs = [ - httpclient.InferInput("INPUT0", input_data.shape, - np_to_triton_dtype(input_data.dtype)) - ] - inputs[0].set_data_from_numpy(input_data) - result = client.infer(model_name, inputs) - output0 = result.as_numpy('OUTPUT0') - self.assertIsNotNone(output0) - self.assertTrue(np.all(output0 == input_data)) + result = client.infer(model_name, inputs) + output0 = result.as_numpy('OUTPUT0') + self.assertIsNotNone(output0) + self.assertTrue(np.all(output0 == input_data)) def test_infer_pytorch(self): model_name = "pytorch_fp32_fp32" shape = [1, 1, 28, 28] - with httpclient.InferenceServerClient("localhost:8000") as client: - input_data = np.zeros(shape, dtype=np.float32) - inputs = [ - httpclient.InferInput("IN", input_data.shape, - np_to_triton_dtype(input_data.dtype)) - ] - inputs[0].set_data_from_numpy(input_data) - result = client.infer(model_name, inputs) - output_data = result.as_numpy('OUT') - self.assertIsNotNone(output_data, "error: expected 'OUT'") + with self._shm_leak_detector.Probe() as shm_probe: + with httpclient.InferenceServerClient("localhost:8000") as client: + input_data = np.zeros(shape, dtype=np.float32) + inputs = [ + httpclient.InferInput("IN", input_data.shape, + np_to_triton_dtype(input_data.dtype)) + ] + inputs[0].set_data_from_numpy(input_data) + result = client.infer(model_name, inputs) + output_data = result.as_numpy('OUT') + self.assertIsNotNone(output_data, "error: expected 'OUT'") - # expected inference resposne from a zero tensor - expected_result = [ - -2.2377274, -2.3976364, -2.2464046, -2.2790744, -2.3828976, - -2.2940576, -2.2928185, -2.340665, -2.275219, -2.292135 - ] - self.assertTrue(np.allclose(output_data[0], expected_result), - 'Inference result is not correct') + # expected inference resposne from a zero tensor + expected_result = [ + -2.2377274, -2.3976364, -2.2464046, -2.2790744, -2.3828976, + -2.2940576, -2.2928185, -2.340665, -2.275219, -2.292135 + ] + self.assertTrue(np.allclose(output_data[0], expected_result), + 'Inference result is not correct') def test_init_args(self): model_name = "init_args" shape = [2, 2] - with httpclient.InferenceServerClient("localhost:8000") as client: - input_data = np.zeros(shape, dtype=np.float32) - inputs = [ - httpclient.InferInput("IN", input_data.shape, - np_to_triton_dtype(input_data.dtype)) - ] - inputs[0].set_data_from_numpy(input_data) - result = client.infer(model_name, inputs) - # output respone in this model is the number of keys in the args - self.assertTrue( - result.as_numpy("OUT") == 7, - "Number of keys in the init args is not correct") - - def test_unicode(self): - model_name = "string" - shape = [1] - - for i in range(3): + with self._shm_leak_detector.Probe() as shm_probe: with httpclient.InferenceServerClient("localhost:8000") as client: - utf8 = '😀' - input_data = np.array([bytes(utf8, encoding='utf-8')], - dtype=np.bytes_) + input_data = np.zeros(shape, dtype=np.float32) inputs = [ - httpclient.InferInput("INPUT0", shape, + httpclient.InferInput("IN", input_data.shape, np_to_triton_dtype(input_data.dtype)) ] inputs[0].set_data_from_numpy(input_data) result = client.infer(model_name, inputs) - output0 = result.as_numpy('OUTPUT0') - self.assertIsNotNone(output0) - self.assertEqual(output0[0], input_data) + # output respone in this model is the number of keys in the args + self.assertTrue( + result.as_numpy("OUT") == 7, + "Number of keys in the init args is not correct") + + def test_unicode(self): + model_name = "string" + shape = [1] + + for i in range(3): + with self._shm_leak_detector.Probe() as shm_probe: + with httpclient.InferenceServerClient( + "localhost:8000") as client: + utf8 = '😀' + input_data = np.array([bytes(utf8, encoding='utf-8')], + dtype=np.bytes_) + inputs = [ + httpclient.InferInput( + "INPUT0", shape, + np_to_triton_dtype(input_data.dtype)) + ] + inputs[0].set_data_from_numpy(input_data) + result = client.infer(model_name, inputs) + output0 = result.as_numpy('OUTPUT0') + self.assertIsNotNone(output0) + self.assertEqual(output0[0], input_data) def test_string(self): model_name = "string_fixed" shape = [1] for i in range(6): - with httpclient.InferenceServerClient("localhost:8000") as client: - input_data = np.array(['123456'], dtype=np.object_) - inputs = [ - httpclient.InferInput("INPUT0", shape, - np_to_triton_dtype(input_data.dtype)) - ] - inputs[0].set_data_from_numpy(input_data) - result = client.infer(model_name, inputs) - output0 = result.as_numpy('OUTPUT0') - self.assertIsNotNone(output0) - - if i % 2 == 0: - self.assertEqual(output0[0], input_data.astype(np.bytes_)) - else: - self.assertEqual(output0.size, 0) + with self._shm_leak_detector.Probe() as shm_probe: + with httpclient.InferenceServerClient( + "localhost:8000") as client: + input_data = np.array(['123456'], dtype=np.object_) + inputs = [ + httpclient.InferInput( + "INPUT0", shape, + np_to_triton_dtype(input_data.dtype)) + ] + inputs[0].set_data_from_numpy(input_data) + result = client.infer(model_name, inputs) + output0 = result.as_numpy('OUTPUT0') + self.assertIsNotNone(output0) + + if i % 2 == 0: + self.assertEqual(output0[0], + input_data.astype(np.bytes_)) + else: + self.assertEqual(output0.size, 0) def test_non_contiguous(self): model_name = 'non_contiguous' diff --git a/qa/L0_backend_python/python_unittest.py b/qa/L0_backend_python/python_unittest.py index 2ecc19eb84..557671729e 100644 --- a/qa/L0_backend_python/python_unittest.py +++ b/qa/L0_backend_python/python_unittest.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright 2021-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions @@ -29,6 +29,7 @@ sys.path.append("../../common") import test_util as tu +import shm_util import unittest import tritonclient.grpc as grpcclient from tritonclient.utils import * @@ -37,8 +38,10 @@ class PythonUnittest(tu.TestResultCollector): - def test_python_unittest(self): - model_name = os.environ['MODEL_NAME'] + def setUp(self): + self._shm_leak_detector = shm_util.ShmLeakDetector() + + def _run_unittest(self, model_name): with grpcclient.InferenceServerClient("localhost:8001") as client: # No input is required result = client.infer(model_name, [], client_timeout=120) @@ -48,6 +51,20 @@ def test_python_unittest(self): # Otherwise, it will return 0. self.assertEqual(output0, [1]) + def test_python_unittest(self): + model_name = os.environ['MODEL_NAME'] + + if model_name == 'bls_memory' or model_name == 'bls_memory_async': + # For these tests, the memory region size will be grown. Because of + # this we need to use the shared memory probe only on the second + # call so that the probe can detect the leak correctly. + self._run_unittest(model_name) + with self._shm_leak_detector.Probe() as shm_probe: + self._run_unittest(model_name) + else: + with self._shm_leak_detector.Probe() as shm_probe: + self._run_unittest(model_name) + if __name__ == '__main__': unittest.main() diff --git a/qa/L0_backend_python/restart/models/restart/1/model.py b/qa/L0_backend_python/restart/models/restart/1/model.py index 882b893945..72bce2933a 100644 --- a/qa/L0_backend_python/restart/models/restart/1/model.py +++ b/qa/L0_backend_python/restart/models/restart/1/model.py @@ -42,11 +42,9 @@ def execute(self, requests): if path.exists(file_name): with open(file_name, 'r') as f: expected_free_memory = f.read() - assert ( - expected_free_memory == current_free_memory, - f'Free shared memory before and after restart are not equal. ' - '{expected_free_memory} (before) != {current_free_memory} (after).' - ) + assert expected_free_memory == current_free_memory, \ + (f'Free shared memory before and after restart are not equal. ' + '{expected_free_memory} (before) != {current_free_memory} (after).') else: with open(file_name, 'w') as f: f.write(current_free_memory) diff --git a/qa/common/shm_util.py b/qa/common/shm_util.py index 51b32f8c37..a351ac09ad 100644 --- a/qa/common/shm_util.py +++ b/qa/common/shm_util.py @@ -25,11 +25,14 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import os +from os import listdir import numpy as np +import time from ctypes import * import tritonclient.http as httpclient from tritonclient.utils import * +import triton_shm_monitor # By default, find tritonserver on "localhost", but can be overridden # with TRITONSERVER_IPADDR envvar @@ -229,7 +232,8 @@ def unregister_cleanup_shm_regions(shm_regions, shm_handles, if not (use_system_shared_memory or use_cuda_shared_memory): return None - triton_client = httpclient.InferenceServerClient(f"{_tritonserver_ipaddr}:8000") + triton_client = httpclient.InferenceServerClient( + f"{_tritonserver_ipaddr}:8000") if use_cuda_shared_memory: triton_client.unregister_cuda_shared_memory(shm_regions[0] + '_data') @@ -344,3 +348,45 @@ def register_add_either_shm_regions(inputs, outputs, shm_region_prefix, input_byte_size) outputs[io_num].set_shared_memory(output_shm_name + '_data', output_byte_size) + + +class ShmLeakDetector: + """Detect shared memory leaks when testing Python backend.""" + + class ShmLeakProbe: + + def __init__(self, shm_monitors): + self._shm_monitors = shm_monitors + + def __enter__(self): + self._shm_region_free_sizes = [] + for shm_monitor in self._shm_monitors: + self._shm_region_free_sizes.append(shm_monitor.free_memory()) + + return self + + def __exit__(self, type, value, traceback): + current_shm_sizes = [] + for shm_monitor in self._shm_monitors: + current_shm_sizes.append(shm_monitor.free_memory()) + + shm_leak_detected = False + for current_shm_size, prev_shm_size in zip( + current_shm_sizes, self._shm_region_free_sizes): + if current_shm_size != prev_shm_size: + shm_leak_detected = True + print( + f'Shared memory leak detected: {current_shm_size} (current) != {prev_shm_size} (prev).' + ) + assert not shm_leak_detected, "Shared memory leak detected." + + def __init__(self, prefix='triton_python_backend_shm_region'): + self._shm_monitors = [] + shm_regions = listdir('/dev/shm') + for shm_region in shm_regions: + if shm_region.startswith(prefix): + self._shm_monitors.append( + triton_shm_monitor.SharedMemoryManager(shm_region)) + + def Probe(self): + return self.ShmLeakProbe(self._shm_monitors) diff --git a/qa/python_models/bls/model.py b/qa/python_models/bls/model.py index 83850eaf94..0efd6804d5 100644 --- a/qa/python_models/bls/model.py +++ b/qa/python_models/bls/model.py @@ -286,6 +286,8 @@ def test_multiprocess(self): # Test multiprocess Pool with sync BLS pool = Pool(10) pool.map(bls_add_sub, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) + pool.close() + pool.join() def test_bls_sync(self): infer_request = pb_utils.InferenceRequest(