Skip to content

Commit

Permalink
Modify L0_backend_python bls test for BLS decoupled support (#5455)
Browse files Browse the repository at this point in the history
* Lower the tensor size to avoid intermittent issue

* Disable multiprocessing for decoupled case

* Fix indent

* Lower the tensor size to avoid intermittent issue

* Add unittest for response iterator

* Address comment

* Add checks for the last empty response

* Fix up for decoupled test
  • Loading branch information
krishung5 committed Mar 8, 2023
1 parent 1b0fe1f commit 6629deb
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,34 +80,35 @@ def response_thread(self, response_sender, in_value):

response_count = 0
for infer_response in infer_responses:
output0 = pb_utils.get_output_tensor_by_name(
infer_response, "OUT")
if infer_response.has_error():
response = pb_utils.InferenceResponse(
error=infer_response.error().message())
response_sender.send(
response,
flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL)
elif np.any(in_value != output0.as_numpy()):
error_message = (
"BLS Request input and BLS response output do not match."
f" {in_value} != {output0.as_numpy()}")
response = pb_utils.InferenceResponse(error=error_message)
response_sender.send(
response,
flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL)
else:
output_tensors = [pb_utils.Tensor('OUT', output0.as_numpy())]
response = pb_utils.InferenceResponse(
output_tensors=output_tensors)
response_sender.send(response)
if len(infer_response.output_tensors()) > 0:
output0 = pb_utils.get_output_tensor_by_name(
infer_response, "OUT")
if infer_response.has_error():
response = pb_utils.InferenceResponse(
error=infer_response.error().message())
response_sender.send(
response,
flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL)
elif np.any(in_value != output0.as_numpy()):
error_message = (
"BLS Request input and BLS response output do not match."
f" {in_value} != {output0.as_numpy()}")
response = pb_utils.InferenceResponse(error=error_message)
response_sender.send(
response,
flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL)
else:
output_tensors = [pb_utils.Tensor('OUT', output0.as_numpy())]
response = pb_utils.InferenceResponse(
output_tensors=output_tensors)
response_sender.send(response)

response_count += 1

if response_count != in_value:
if in_value != response_count-1:
error_message = (
"Expected {} responses, got {}".format(
in_value, len(infer_responses)))
in_value, len(infer_responses)-1))
response = pb_utils.InferenceResponse(
error=error_message)
response_sender.send(
Expand Down
120 changes: 108 additions & 12 deletions qa/python_models/bls/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,18 +84,19 @@ def bls_square(_=None):
if infer_response.has_error():
return False

output0 = pb_utils.get_output_tensor_by_name(infer_response, 'OUT')
if output0 is None:
return False
if len(infer_response.output_tensors()) > 0:
output0 = pb_utils.get_output_tensor_by_name(infer_response, 'OUT')
if output0 is None:
return False

expected_output = input0.as_numpy()
expected_output = input0.as_numpy()

if not np.all(expected_output == output0.as_numpy()):
return False
if not np.all(expected_output == output0.as_numpy()):
return False

response_count += 1

if not np.all(response_count == input0.as_numpy()):
if not np.all(input0.as_numpy() == response_count-1):
return False

return True
Expand Down Expand Up @@ -459,14 +460,16 @@ def test_gpu_bls(self):
def test_multiprocess(self):
# Test multiprocess Pool with sync BLS
if self._is_decoupled:
func_name = bls_square
# Fixme: DLIS-4630
# func_name = bls_square
pass
else:
func_name = bls_add_sub

pool = Pool(10)
pool.map(func_name, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
pool.close()
pool.join()
pool = Pool(10)
pool.map(func_name, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
pool.close()
pool.join()

def test_bls_sync(self):
infer_request = pb_utils.InferenceRequest(
Expand Down Expand Up @@ -553,6 +556,99 @@ def test_timeout(self):
infer_response.error().message())
self.assertTrue(len(infer_response.output_tensors()) == 0)

def _test_response_iterator_square(self,
expected_output_cnt,
expected_output_value,
response_iterator):
response_count = 0
expected_output_cnt = np.array([expected_output_cnt], dtype=np.int32)

for infer_response in response_iterator:
self.assertFalse(infer_response.has_error())
if len(infer_response.output_tensors()) > 0:
output0 = pb_utils.get_output_tensor_by_name(infer_response, 'OUT')
self.assertIsNotNone(output0)
self.assertEqual(expected_output_value, output0.as_numpy())

response_count += 1

self.assertEqual(response_count, expected_output_cnt)

# Make sure the iterator is exhausted.
with self.assertRaises(StopIteration):
next(response_iterator)

return response_iterator

def test_response_iterator(self):
if self._is_decoupled:
# Test the response iterator for decoupled responses. The request
# has 4 decoupled responses followed by an empty response.
response_value = 4
input0_np = np.array([response_value], dtype=np.int32)
input0 = pb_utils.Tensor('IN', input0_np)
infer_request = pb_utils.InferenceRequest(
model_name='square_int32',
inputs=[input0],
requested_output_names=['OUT'])
infer_responses = infer_request.exec(decoupled=True)

# case 1. Use Next() to get the next response first, then use
# for-loop to get the remaining responses.
infer_response = next(infer_responses)
self.assertFalse(infer_response.has_error())
output0 = pb_utils.get_output_tensor_by_name(infer_response, 'OUT')
self.assertIsNotNone(output0)
self.assertEqual(response_value, output0.as_numpy())
# The iterator now should only have 4 remaining responses.
infer_responses = self._test_response_iterator_square(
4, response_value, infer_responses)

# case 2. Call for-loop to get all the responses multiple times.
infer_responses = self._test_response_iterator_square(
5, response_value, infer_responses)
infer_responses = self._test_response_iterator_square(
5, response_value, infer_responses)
infer_responses = self._test_response_iterator_square(
5, response_value, infer_responses)

# case 3. Break from the iteration, then use Next() and for-loop to
# get the remaining responses.
response_count = 0
for infer_response in infer_responses:
self.assertFalse(infer_response.has_error())
output0 = pb_utils.get_output_tensor_by_name(infer_response,
'OUT')
self.assertIsNotNone(output0)
self.assertEqual(response_value, output0.as_numpy())

response_count += 1
if response_count == 2:
break

infer_response = next(infer_responses)
self.assertFalse(infer_response.has_error())
output0 = pb_utils.get_output_tensor_by_name(infer_response, 'OUT')
self.assertIsNotNone(output0)
self.assertEqual(response_value, output0.as_numpy())

# The iterator now should only have 2 remaining responses.
infer_responses = self._test_response_iterator_square(
2, response_value, infer_responses)

# case 4. Delete the iterator before all the responses have been
# retrieved.
infer_responses = infer_request.exec(decoupled=True)

infer_response = next(infer_responses)
self.assertFalse(infer_response.has_error())
output0 = pb_utils.get_output_tensor_by_name(infer_response, 'OUT')
self.assertIsNotNone(output0)
self.assertEqual(response_value, output0.as_numpy())

del infer_responses


class TritonPythonModel:

def execute(self, requests):
Expand Down
30 changes: 16 additions & 14 deletions qa/python_models/bls_async/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,27 +97,29 @@ def verify_square_results(input0, infer_responses):
flush=True)
return False

output0 = pb_utils.get_output_tensor_by_name(infer_response, 'OUT')

if len(infer_response.output_tensors()) > 0:
output0 = pb_utils.get_output_tensor_by_name(infer_response, 'OUT')

if (output0 is None):
return False
if (output0 is None):
return False

if not output0.is_cpu():
output0 = from_dlpack(
output0.to_dlpack()).to('cpu').cpu().detach().numpy()
else:
output0 = output0.as_numpy()
if not output0.is_cpu():
output0 = from_dlpack(
output0.to_dlpack()).to('cpu').cpu().detach().numpy()
else:
output0 = output0.as_numpy()

expected_output = input0
expected_output = input0

if not np.all(expected_output == input0):
print(f'For OUT expected {expected_output} found {output0}')
return False
if not np.all(expected_output == input0):
print(f'For OUT expected {expected_output} found {output0}')
return False

response_count += 1

if not np.all(response_count == input0):
print('Expected {} responses, got {}'.format(input0, response_count))
if not np.all(input0 == response_count-1):
print('Expected {} responses, got {}'.format(input0, response_count-1))
return False

return True
Expand Down
2 changes: 1 addition & 1 deletion qa/python_models/bls_memory/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def _send_identity_tensor(self, size, is_decoupled):
return input0_np, infer_response

def test_bls_out_of_memory(self):
tensor_size = 1024 * 1024 * 1024
tensor_size = 256 * 1024 * 1024
input0_np, infer_response = self._send_identity_tensor(
tensor_size, self._is_decoupled)
out_of_memory_message = "Failed to increase the shared memory pool size for key"
Expand Down
2 changes: 1 addition & 1 deletion qa/python_models/bls_memory_async/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async def _send_identity_tensor(size, is_decoupled):
async def test_bls_out_of_memory():
is_decoupled = True if os.environ['BLS_KIND'] == "decoupled" else False

tensor_size = 1024 * 1024 * 1024
tensor_size = 256 * 1024 * 1024
input0_np, infer_response = await _send_identity_tensor(
tensor_size, is_decoupled)

Expand Down

0 comments on commit 6629deb

Please sign in to comment.