Skip to content

Commit 48cf6b7

Browse files
authored
Add cancellation into response statistics (#6904)
* Add cancellation into response statistics * Add test for response statistics cancel * Remove debugging print * Use is None comparison * Fix docs * Use default args None * Refactor RegisterModelStatistics()
1 parent 5732163 commit 48cf6b7

File tree

4 files changed

+211
-282
lines changed

4 files changed

+211
-282
lines changed

docs/protocol/extension_statistics.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,8 @@ $response_stats =
195195
"compute_output" : $duration_stat,
196196
"success" : $duration_stat,
197197
"fail" : $duration_stat,
198-
"empty_response" : $duration_stat
198+
"empty_response" : $duration_stat,
199+
"cancel" : $duration_stat
199200
}
200201
```
201202

@@ -208,6 +209,8 @@ $response_stats =
208209
is the sum of infer and output durations.
209210
- "empty_response" : The count and cumulative duration of an inference with an
210211
empty / no response. The duration is infer durations.
212+
- "cancel" : The count and cumulative duration of a inference cancellation. The
213+
duration is for cleaning up resources held by cancelled inference requests.
211214

212215

213216
```

qa/L0_response_statistics/response_statistics_test.py

Lines changed: 82 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,12 @@
3636

3737
class TestResponseStatistics(unittest.TestCase):
3838
def setUp(self):
39-
self._model_name = "square_int32"
40-
self._min_infer_delay_ns = 400000000
41-
self._min_output_delay_ns = 200000000
42-
self._number_of_fail_responses = 2
43-
self._number_of_empty_responses = 1
39+
self._model_name = "set_by_test_case"
40+
self._min_infer_delay_ns = 0
41+
self._min_output_delay_ns = 0
42+
self._min_cancel_delay_ns = 0
43+
self._number_of_fail_responses = 0
44+
self._number_of_empty_responses = 0
4445
self._statistics_counts = []
4546
self._grpc_client = grpcclient.InferenceServerClient(
4647
"localhost:8001", verbose=True
@@ -59,8 +60,10 @@ def callback(result, error):
5960

6061
# Send an infer request and return its responses. 'number_of_responses' is the sum
6162
# of success, fail and empty responses the model should return for this request.
62-
# This function waits until all success and fail responses are received.
63-
def _stream_infer(self, number_of_responses):
63+
# 'cancel_at_response_size' will cancel the stream when the number of responses
64+
# received equals the size, set to None if cancellation is not required. This
65+
# function waits until all success and fail responses are received, or cancelled.
66+
def _stream_infer(self, number_of_responses, cancel_at_response_size=None):
6467
callback, responses = self._generate_streaming_callback_and_response_pair()
6568
self._grpc_client.start_stream(callback)
6669
input_data = np.array([number_of_responses], dtype=np.int32)
@@ -70,15 +73,27 @@ def _stream_infer(self, number_of_responses):
7073
self._grpc_client.async_stream_infer(
7174
model_name=self._model_name, inputs=inputs, outputs=outputs
7275
)
73-
while len(responses) < (number_of_responses - self._number_of_empty_responses):
74-
time.sleep(0.1) # poll until all expected responses are received
75-
self._grpc_client.stop_stream()
76+
if cancel_at_response_size is None:
77+
# poll until all expected responses are received
78+
while len(responses) < (
79+
number_of_responses - self._number_of_empty_responses
80+
):
81+
time.sleep(0.1)
82+
self._grpc_client.stop_stream(cancel_requests=False)
83+
else:
84+
# poll until cancellation response size is reached
85+
while len(responses) < cancel_at_response_size:
86+
time.sleep(0.1)
87+
self._grpc_client.stop_stream(cancel_requests=True)
7688
return responses
7789

7890
# Update expected statistics counts for the response at 'current_index'.
7991
# 'number_of_responses' is the sum of success, fail and empty responses expected
80-
# from this inference request.
81-
def _update_statistics_counts(self, current_index, number_of_responses):
92+
# from this inference request. 'cancel_at_index' is the index at which the request
93+
# should be cancelled.
94+
def _update_statistics_counts(
95+
self, current_index, number_of_responses, cancel_at_index
96+
):
8297
if current_index >= len(self._statistics_counts):
8398
self._statistics_counts.append(
8499
{
@@ -87,9 +102,13 @@ def _update_statistics_counts(self, current_index, number_of_responses):
87102
"success": 0,
88103
"fail": 0,
89104
"empty_response": 0,
105+
"cancel": 0,
90106
}
91107
)
92-
if (
108+
if current_index == cancel_at_index:
109+
# cancel
110+
self._statistics_counts[current_index]["cancel"] += 1
111+
elif (
93112
current_index
94113
+ self._number_of_fail_responses
95114
+ self._number_of_empty_responses
@@ -118,10 +137,16 @@ def _check_statistics_count_and_duration(
118137
delay_ns = self._min_infer_delay_ns
119138
elif stats_name == "compute_output":
120139
delay_ns = self._min_output_delay_ns
140+
elif stats_name == "cancel":
141+
delay_ns = self._min_cancel_delay_ns
121142
else: # success or fail
122143
delay_ns = self._min_infer_delay_ns + self._min_output_delay_ns
123-
upper_bound_ns = 1.1 * delay_ns * expected_count
124-
lower_bound_ns = 0.9 * delay_ns * expected_count
144+
if delay_ns == 0:
145+
upper_bound_ns = 10000000 * expected_count
146+
lower_bound_ns = 0
147+
else:
148+
upper_bound_ns = 1.1 * delay_ns * expected_count
149+
lower_bound_ns = 0.9 * delay_ns * expected_count
125150
stats = response_stats[str(current_index)][stats_name]
126151
self.assertEqual(stats["count"], expected_count)
127152
self.assertLessEqual(stats["ns"], upper_bound_ns)
@@ -162,12 +187,14 @@ def _get_response_statistics(self):
162187
return response_stats_http
163188

164189
# Check the response statistics is valid for a given infer request, providing its
165-
# 'responses' and 'number_of_responses'.
166-
def _check_response_stats(self, responses, number_of_responses):
190+
# 'responses', expected 'number_of_responses' and 'cancel_at_index'.
191+
def _check_response_stats(
192+
self, responses, number_of_responses, cancel_at_index=None
193+
):
167194
response_stats = self._get_response_statistics()
168195
self.assertGreaterEqual(len(response_stats), number_of_responses)
169196
for i in range(number_of_responses):
170-
self._update_statistics_counts(i, number_of_responses)
197+
self._update_statistics_counts(i, number_of_responses, cancel_at_index)
171198
self._check_statistics_count_and_duration(
172199
response_stats, i, "compute_infer"
173200
)
@@ -179,24 +206,57 @@ def _check_response_stats(self, responses, number_of_responses):
179206
self._check_statistics_count_and_duration(
180207
response_stats, i, "empty_response"
181208
)
209+
self._check_statistics_count_and_duration(response_stats, i, "cancel")
182210

183211
# Test response statistics. The statistics must be valid over two or more infers.
184212
def test_response_statistics(self):
213+
self._model_name = "square_int32"
214+
self._min_infer_delay_ns = 400000000
215+
self._min_output_delay_ns = 200000000
216+
self._number_of_fail_responses = 2
217+
self._number_of_empty_responses = 1
185218
# Send a request that generates 4 responses.
186219
number_of_responses = 4
187220
responses = self._stream_infer(number_of_responses)
188221
self._check_response_stats(responses, number_of_responses)
189-
# Send a request that generates 6 responses, and make sure the
190-
# statistics are aggregated with the previous request.
222+
# Send a request that generates 6 responses, and make sure the statistics are
223+
# aggregated with the previous request.
191224
number_of_responses = 6
192225
responses = self._stream_infer(number_of_responses)
193226
self._check_response_stats(responses, number_of_responses)
194-
# Send a request that generates 3 responses, and make sure the
195-
# statistics are aggregated with the previous requests.
227+
# Send a request that generates 3 responses, and make sure the statistics are
228+
# aggregated with the previous requests.
196229
number_of_responses = 3
197230
responses = self._stream_infer(number_of_responses)
198231
self._check_response_stats(responses, number_of_responses)
199232

233+
# Test response statistics with cancellation.
234+
def test_response_statistics_cancel(self):
235+
self._model_name = "square_int32_slow"
236+
self._min_infer_delay_ns = 1200000000
237+
self._min_output_delay_ns = 800000000
238+
self._min_cancel_delay_ns = 400000000
239+
240+
# Send a request that generates 4 responses.
241+
number_of_responses = 4
242+
responses = self._stream_infer(number_of_responses)
243+
self._check_response_stats(responses, number_of_responses)
244+
245+
# Send a request that generates 4 responses, and cancel on the 3rd response.
246+
# Make sure the statistics are aggregated with the previous request.
247+
responses = self._stream_infer(number_of_responses=4, cancel_at_response_size=1)
248+
# There is an infer and output delay on the 1st and 2nd response, and a cancel
249+
# delay on the 3rd response.
250+
min_total_delay_ns = (
251+
self._min_infer_delay_ns + self._min_output_delay_ns
252+
) * 2 + self._min_cancel_delay_ns
253+
# Make sure the inference and cancellation is completed before checking.
254+
time.sleep(min_total_delay_ns * 1.5 / 1000000000)
255+
# The request is cancelled when the 2nd response is computing, so the
256+
# cancellation should be received at the 3rd response (index 2), making a total
257+
# of 3 responses on the statistics.
258+
self._check_response_stats(responses, number_of_responses=3, cancel_at_index=2)
259+
200260

201261
if __name__ == "__main__":
202262
unittest.main()

qa/L0_response_statistics/test.sh

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,15 @@ mkdir -p models/square_int32/1 && (cd models/square_int32 && \
5656
echo -e 'parameters [{ key: "CUSTOM_OUTPUT_DELAY_NS" \n value: { string_value: "200000000" } }]' >> config.pbtxt && \
5757
echo -e 'parameters [{ key: "CUSTOM_FAIL_COUNT" \n value: { string_value: "2" } }]' >> config.pbtxt && \
5858
echo -e 'parameters [{ key: "CUSTOM_EMPTY_COUNT" \n value: { string_value: "1" } }]' >> config.pbtxt)
59+
mkdir -p models/square_int32_slow/1 && (cd models/square_int32_slow && \
60+
echo 'backend: "square"' >> config.pbtxt && \
61+
echo 'max_batch_size: 0' >> config.pbtxt && \
62+
echo 'model_transaction_policy { decoupled: True }' >> config.pbtxt && \
63+
echo -e 'input [{ name: "IN" \n data_type: TYPE_INT32 \n dims: [ 1 ] }]' >> config.pbtxt && \
64+
echo -e 'output [{ name: "OUT" \n data_type: TYPE_INT32 \n dims: [ 1 ] }]' >> config.pbtxt && \
65+
echo -e 'parameters [{ key: "CUSTOM_INFER_DELAY_NS" \n value: { string_value: "1200000000" } }]' >> config.pbtxt && \
66+
echo -e 'parameters [{ key: "CUSTOM_OUTPUT_DELAY_NS" \n value: { string_value: "800000000" } }]' >> config.pbtxt && \
67+
echo -e 'parameters [{ key: "CUSTOM_CANCEL_DELAY_NS" \n value: { string_value: "400000000" } }]' >> config.pbtxt)
5968

6069
TEST_LOG="response_statistics_test.log"
6170
SERVER_LOG="./response_statistics_test.server.log"

0 commit comments

Comments
 (0)