Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify L0_backend_python bls test for BLS decoupled support #5455

Merged
merged 8 commits into from
Mar 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,34 +80,35 @@ def response_thread(self, response_sender, in_value):

response_count = 0
for infer_response in infer_responses:
output0 = pb_utils.get_output_tensor_by_name(
infer_response, "OUT")
if infer_response.has_error():
response = pb_utils.InferenceResponse(
error=infer_response.error().message())
response_sender.send(
response,
flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL)
elif np.any(in_value != output0.as_numpy()):
error_message = (
"BLS Request input and BLS response output do not match."
f" {in_value} != {output0.as_numpy()}")
response = pb_utils.InferenceResponse(error=error_message)
response_sender.send(
response,
flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL)
else:
output_tensors = [pb_utils.Tensor('OUT', output0.as_numpy())]
response = pb_utils.InferenceResponse(
output_tensors=output_tensors)
response_sender.send(response)
if len(infer_response.output_tensors()) > 0:
output0 = pb_utils.get_output_tensor_by_name(
infer_response, "OUT")
if infer_response.has_error():
response = pb_utils.InferenceResponse(
error=infer_response.error().message())
response_sender.send(
response,
flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL)
elif np.any(in_value != output0.as_numpy()):
error_message = (
"BLS Request input and BLS response output do not match."
f" {in_value} != {output0.as_numpy()}")
response = pb_utils.InferenceResponse(error=error_message)
response_sender.send(
response,
flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL)
else:
output_tensors = [pb_utils.Tensor('OUT', output0.as_numpy())]
response = pb_utils.InferenceResponse(
output_tensors=output_tensors)
response_sender.send(response)

response_count += 1

if response_count != in_value:
if in_value != response_count-1:
error_message = (
"Expected {} responses, got {}".format(
in_value, len(infer_responses)))
in_value, len(infer_responses)-1))
response = pb_utils.InferenceResponse(
error=error_message)
response_sender.send(
Expand Down
120 changes: 108 additions & 12 deletions qa/python_models/bls/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,18 +84,19 @@ def bls_square(_=None):
if infer_response.has_error():
return False

output0 = pb_utils.get_output_tensor_by_name(infer_response, 'OUT')
if output0 is None:
return False
if len(infer_response.output_tensors()) > 0:
output0 = pb_utils.get_output_tensor_by_name(infer_response, 'OUT')
if output0 is None:
return False

expected_output = input0.as_numpy()
expected_output = input0.as_numpy()

if not np.all(expected_output == output0.as_numpy()):
return False
if not np.all(expected_output == output0.as_numpy()):
return False

response_count += 1

if not np.all(response_count == input0.as_numpy()):
if not np.all(input0.as_numpy() == response_count-1):
return False

return True
Expand Down Expand Up @@ -459,14 +460,16 @@ def test_gpu_bls(self):
def test_multiprocess(self):
# Test multiprocess Pool with sync BLS
if self._is_decoupled:
func_name = bls_square
# Fixme: DLIS-4630
# func_name = bls_square
pass
else:
func_name = bls_add_sub

pool = Pool(10)
pool.map(func_name, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
pool.close()
pool.join()
pool = Pool(10)
pool.map(func_name, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
pool.close()
pool.join()

def test_bls_sync(self):
infer_request = pb_utils.InferenceRequest(
Expand Down Expand Up @@ -553,6 +556,99 @@ def test_timeout(self):
infer_response.error().message())
self.assertTrue(len(infer_response.output_tensors()) == 0)

def _test_response_iterator_square(self,
expected_output_cnt,
expected_output_value,
response_iterator):
response_count = 0
expected_output_cnt = np.array([expected_output_cnt], dtype=np.int32)

for infer_response in response_iterator:
self.assertFalse(infer_response.has_error())
if len(infer_response.output_tensors()) > 0:
output0 = pb_utils.get_output_tensor_by_name(infer_response, 'OUT')
self.assertIsNotNone(output0)
self.assertEqual(expected_output_value, output0.as_numpy())

response_count += 1

self.assertEqual(response_count, expected_output_cnt)

# Make sure the iterator is exhausted.
with self.assertRaises(StopIteration):
next(response_iterator)

return response_iterator

def test_response_iterator(self):
if self._is_decoupled:
# Test the response iterator for decoupled responses. The request
# has 4 decoupled responses followed by an empty response.
response_value = 4
input0_np = np.array([response_value], dtype=np.int32)
input0 = pb_utils.Tensor('IN', input0_np)
infer_request = pb_utils.InferenceRequest(
model_name='square_int32',
inputs=[input0],
requested_output_names=['OUT'])
infer_responses = infer_request.exec(decoupled=True)

# case 1. Use Next() to get the next response first, then use
# for-loop to get the remaining responses.
infer_response = next(infer_responses)
self.assertFalse(infer_response.has_error())
output0 = pb_utils.get_output_tensor_by_name(infer_response, 'OUT')
self.assertIsNotNone(output0)
self.assertEqual(response_value, output0.as_numpy())
# The iterator now should only have 4 remaining responses.
infer_responses = self._test_response_iterator_square(
4, response_value, infer_responses)

# case 2. Call for-loop to get all the responses multiple times.
infer_responses = self._test_response_iterator_square(
5, response_value, infer_responses)
infer_responses = self._test_response_iterator_square(
5, response_value, infer_responses)
infer_responses = self._test_response_iterator_square(
5, response_value, infer_responses)

# case 3. Break from the iteration, then use Next() and for-loop to
# get the remaining responses.
response_count = 0
for infer_response in infer_responses:
self.assertFalse(infer_response.has_error())
output0 = pb_utils.get_output_tensor_by_name(infer_response,
'OUT')
self.assertIsNotNone(output0)
self.assertEqual(response_value, output0.as_numpy())

response_count += 1
if response_count == 2:
break

infer_response = next(infer_responses)
self.assertFalse(infer_response.has_error())
output0 = pb_utils.get_output_tensor_by_name(infer_response, 'OUT')
self.assertIsNotNone(output0)
self.assertEqual(response_value, output0.as_numpy())

# The iterator now should only have 2 remaining responses.
infer_responses = self._test_response_iterator_square(
2, response_value, infer_responses)

# case 4. Delete the iterator before all the responses have been
# retrieved.
infer_responses = infer_request.exec(decoupled=True)

infer_response = next(infer_responses)
self.assertFalse(infer_response.has_error())
output0 = pb_utils.get_output_tensor_by_name(infer_response, 'OUT')
self.assertIsNotNone(output0)
self.assertEqual(response_value, output0.as_numpy())

del infer_responses


class TritonPythonModel:

def execute(self, requests):
Expand Down
30 changes: 16 additions & 14 deletions qa/python_models/bls_async/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,27 +97,29 @@ def verify_square_results(input0, infer_responses):
flush=True)
return False

output0 = pb_utils.get_output_tensor_by_name(infer_response, 'OUT')

if len(infer_response.output_tensors()) > 0:
output0 = pb_utils.get_output_tensor_by_name(infer_response, 'OUT')

if (output0 is None):
return False
if (output0 is None):
return False

if not output0.is_cpu():
output0 = from_dlpack(
output0.to_dlpack()).to('cpu').cpu().detach().numpy()
else:
output0 = output0.as_numpy()
if not output0.is_cpu():
output0 = from_dlpack(
output0.to_dlpack()).to('cpu').cpu().detach().numpy()
else:
output0 = output0.as_numpy()

expected_output = input0
expected_output = input0

if not np.all(expected_output == input0):
print(f'For OUT expected {expected_output} found {output0}')
return False
if not np.all(expected_output == input0):
print(f'For OUT expected {expected_output} found {output0}')
return False

response_count += 1

if not np.all(response_count == input0):
print('Expected {} responses, got {}'.format(input0, response_count))
if not np.all(input0 == response_count-1):
print('Expected {} responses, got {}'.format(input0, response_count-1))
return False

return True
Expand Down
2 changes: 1 addition & 1 deletion qa/python_models/bls_memory/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def _send_identity_tensor(self, size, is_decoupled):
return input0_np, infer_response

def test_bls_out_of_memory(self):
tensor_size = 1024 * 1024 * 1024
Tabrizian marked this conversation as resolved.
Show resolved Hide resolved
tensor_size = 256 * 1024 * 1024
input0_np, infer_response = self._send_identity_tensor(
tensor_size, self._is_decoupled)
out_of_memory_message = "Failed to increase the shared memory pool size for key"
Expand Down
2 changes: 1 addition & 1 deletion qa/python_models/bls_memory_async/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async def _send_identity_tensor(size, is_decoupled):
async def test_bls_out_of_memory():
is_decoupled = True if os.environ['BLS_KIND'] == "decoupled" else False

tensor_size = 1024 * 1024 * 1024
tensor_size = 256 * 1024 * 1024
input0_np, infer_response = await _send_identity_tensor(
tensor_size, is_decoupled)

Expand Down