diff --git a/docs/user_guide/model_management.md b/docs/user_guide/model_management.md index dc323a087c..4ce698feee 100644 --- a/docs/user_guide/model_management.md +++ b/docs/user_guide/model_management.md @@ -212,9 +212,8 @@ repository, copy in the new shared libraries, and then reload the model. * If only the model instance configuration on the 'config.pbtxt' is modified -(i.e. increasing/decreasing the instance count) for non-sequence models, -then Triton will update the model rather then reloading it, when either a load -request is received under +(i.e. increasing/decreasing the instance count), then Triton will update the +model rather then reloading it, when either a load request is received under [Model Control Mode EXPLICIT](#model-control-mode-explicit) or change to the 'config.pbtxt' is detected under [Model Control Mode POLL](#model-control-mode-poll). @@ -225,11 +224,17 @@ request is received under configuration, so its presence in the model directory may be detected as a new file and cause the model to fully reload when only an update is expected. -* If a sequence model is updated with in-flight sequence(s), Triton does not -guarantee any remaining request(s) from the in-flight sequence(s) will be routed -to the same model instance for processing. It is currently the responsibility of -the user to ensure any in-flight sequence(s) is complete before updating a -sequence model. +* If a sequence model is *updated* (i.e. decreasing the instance count), Triton +will wait until the in-flight sequence is completed (or timed-out) before the +instance behind the sequence is removed. + * If the instance count is decreased, arbitrary instance(s) are selected among +idle instances and instances with in-flight sequence(s) for removal. + +* If a sequence model is *reloaded* with in-flight sequence(s) (i.e. changes to +the model file), Triton does not guarantee any remaining request(s) from the +in-flight sequence(s) will be routed to the same model instance for processing. +It is currently the responsibility of the user to ensure any in-flight +sequence(s) are completed before reloading a sequence model. ## Concurrently Loading Models diff --git a/qa/L0_model_update/instance_update_test.py b/qa/L0_model_update/instance_update_test.py index 27a09486d9..a3c9ce3201 100755 --- a/qa/L0_model_update/instance_update_test.py +++ b/qa/L0_model_update/instance_update_test.py @@ -43,18 +43,42 @@ set_delay, update_instance_group, update_model_file, + update_sequence_batching, ) from tritonclient.utils import InferenceServerException class TestInstanceUpdate(unittest.TestCase): - __model_name = "model_init_del" + _model_name = "model_init_del" def setUp(self): + # Reset counters + reset_count("initialize") + reset_count("finalize") + # Reset batching + disable_batching() + # Reset delays + set_delay("initialize", 0) + set_delay("infer", 0) + # Reset sequence batching + update_sequence_batching("") # Initialize client - self.__triton = grpcclient.InferenceServerClient("localhost:8001") - - def __get_inputs(self, batching=False): + self._triton = grpcclient.InferenceServerClient("localhost:8001") + + def tearDown(self): + # Check if the test passed for this test case that is tearing down + r = self.defaultTestResult() + self._feedErrorsToResult(r, self._outcome.errors) + # Use `r = self._outcome.result` for the above, if Python >= 3.11 + passed = all(self != test_case for test_case, _ in r.errors + r.failures) + if passed: + # Do nothing if passed + return + # Best effort to reset the model state for the next test case + self._triton.unload_model(self._model_name) + time.sleep(30) # time for instances to finish unloading + + def _get_inputs(self, batching=False): self.assertIsInstance(batching, bool) if batching: shape = [random.randint(1, 2), random.randint(1, 16)] @@ -64,16 +88,16 @@ def __get_inputs(self, batching=False): inputs[0].set_data_from_numpy(np.ones(shape, dtype=np.float32)) return inputs - def __infer(self, batching=False): - self.__triton.infer(self.__model_name, self.__get_inputs(batching)) + def _infer(self, batching=False): + self._triton.infer(self._model_name, self._get_inputs(batching)) - def __concurrent_infer(self, concurrency=4, batching=False): + def _concurrent_infer(self, concurrency=4, batching=False): pool = concurrent.futures.ThreadPoolExecutor() stop = [False] def repeat_infer(): while not stop[0]: - self.__infer(batching) + self._infer(batching) infer_threads = [pool.submit(repeat_infer) for i in range(concurrency)] @@ -84,7 +108,7 @@ def stop_infer(): return stop_infer - def __check_count(self, kind, expected_count, poll=False): + def _check_count(self, kind, expected_count, poll=False): self.assertIsInstance(poll, bool) if poll: timeout = 30 # seconds @@ -96,21 +120,15 @@ def __check_count(self, kind, expected_count, poll=False): num_retry += 1 self.assertEqual(get_count(kind), expected_count) - def __load_model(self, instance_count, instance_config="", batching=False): - # Reset counters - reset_count("initialize") - reset_count("finalize") + def _load_model(self, instance_count, instance_config="", batching=False): # Set batching enable_batching() if batching else disable_batching() - # Reset delays - set_delay("initialize", 0) - set_delay("infer", 0) # Load model - self.__update_instance_count( + self._update_instance_count( instance_count, 0, instance_config, batching=batching ) - def __update_instance_count( + def _update_instance_count( self, add_count, del_count, @@ -132,175 +150,194 @@ def __update_instance_count( new_count = prev_count + add_count - del_count instance_config = "{\ncount: " + str(new_count) + "\nkind: KIND_CPU\n}" update_instance_group(instance_config) - self.__triton.load_model(self.__model_name) - self.__check_count("initialize", new_initialize_count) - self.__check_count("finalize", new_finalize_count, wait_for_finalize) - self.__infer(batching) + self._triton.load_model(self._model_name) + self._check_count("initialize", new_initialize_count) + self._check_count("finalize", new_finalize_count, wait_for_finalize) + self._infer(batching) - def __unload_model(self, batching=False): + def _unload_model(self, batching=False): prev_initialize_count = get_count("initialize") - self.__triton.unload_model(self.__model_name) - self.__check_count("initialize", prev_initialize_count) - self.__check_count("finalize", prev_initialize_count, True) + self._triton.unload_model(self._model_name) + self._check_count("initialize", prev_initialize_count) + self._check_count("finalize", prev_initialize_count, True) with self.assertRaises(InferenceServerException): - self.__infer(batching) - - # Test add -> remove -> add an instance - def test_add_rm_add_instance(self): - for batching in [False, True]: - self.__load_model(3, batching=batching) - stop = self.__concurrent_infer(batching=batching) - self.__update_instance_count(1, 0, batching=batching) # add - self.__update_instance_count(0, 1, batching=batching) # remove - self.__update_instance_count(1, 0, batching=batching) # add - stop() - self.__unload_model(batching=batching) - - # Test remove -> add -> remove an instance - def test_rm_add_rm_instance(self): - for batching in [False, True]: - self.__load_model(2, batching=batching) - stop = self.__concurrent_infer(batching=batching) - self.__update_instance_count(0, 1, batching=batching) # remove - self.__update_instance_count(1, 0, batching=batching) # add - self.__update_instance_count(0, 1, batching=batching) # remove - stop() - self.__unload_model(batching=batching) + self._infer(batching) + + # Test add -> remove -> add an instance without batching + def test_add_rm_add_instance_no_batching(self): + self._load_model(3, batching=False) + stop = self._concurrent_infer(batching=False) + self._update_instance_count(1, 0, batching=False) # add + self._update_instance_count(0, 1, batching=False) # remove + self._update_instance_count(1, 0, batching=False) # add + stop() + self._unload_model(batching=False) + + # Test add -> remove -> add an instance with batching + def test_add_rm_add_instance_with_batching(self): + self._load_model(4, batching=True) + stop = self._concurrent_infer(batching=True) + self._update_instance_count(1, 0, batching=True) # add + self._update_instance_count(0, 1, batching=True) # remove + self._update_instance_count(1, 0, batching=True) # add + stop() + self._unload_model(batching=True) + + # Test remove -> add -> remove an instance without batching + def test_rm_add_rm_instance_no_batching(self): + self._load_model(2, batching=False) + stop = self._concurrent_infer(batching=False) + self._update_instance_count(0, 1, batching=False) # remove + self._update_instance_count(1, 0, batching=False) # add + self._update_instance_count(0, 1, batching=False) # remove + stop() + self._unload_model(batching=False) + + # Test remove -> add -> remove an instance with batching + def test_rm_add_rm_instance_with_batching(self): + self._load_model(3, batching=True) + stop = self._concurrent_infer(batching=True) + self._update_instance_count(0, 1, batching=True) # remove + self._update_instance_count(1, 0, batching=True) # add + self._update_instance_count(0, 1, batching=True) # remove + stop() + self._unload_model(batching=True) # Test reduce instance count to zero def test_rm_instance_to_zero(self): - self.__load_model(1) + self._load_model(1) # Setting instance group count to 0 will be overwritten to 1, so no # instances should be created or removed. - self.__update_instance_count(0, 0, "{\ncount: 0\nkind: KIND_CPU\n}") - self.__unload_model() + self._update_instance_count(0, 0, "{\ncount: 0\nkind: KIND_CPU\n}") + self._unload_model() # Test add/remove multiple CPU instances at a time def test_cpu_instance_update(self): - self.__load_model(8) - self.__update_instance_count(0, 4) # remove 4 instances - self.__update_instance_count(0, 3) # remove 3 instances - self.__update_instance_count(0, 0) # no change + self._load_model(8) + self._update_instance_count(0, 4) # remove 4 instances + self._update_instance_count(0, 3) # remove 3 instances + self._update_instance_count(0, 0) # no change time.sleep(0.1) # larger the gap for config.pbtxt timestamp to update - self.__update_instance_count(2, 0) # add 2 instances - self.__update_instance_count(5, 0) # add 5 instances - self.__unload_model() + self._update_instance_count(2, 0) # add 2 instances + self._update_instance_count(5, 0) # add 5 instances + self._unload_model() # Test add/remove multiple GPU instances at a time def test_gpu_instance_update(self): - self.__load_model(6, "{\ncount: 6\nkind: KIND_GPU\n}") - self.__update_instance_count(0, 2, "{\ncount: 4\nkind: KIND_GPU\n}") - self.__update_instance_count(3, 0, "{\ncount: 7\nkind: KIND_GPU\n}") - self.__unload_model() + self._load_model(6, "{\ncount: 6\nkind: KIND_GPU\n}") + self._update_instance_count(0, 2, "{\ncount: 4\nkind: KIND_GPU\n}") + self._update_instance_count(3, 0, "{\ncount: 7\nkind: KIND_GPU\n}") + self._unload_model() # Test add/remove multiple CPU/GPU instances at a time def test_gpu_cpu_instance_update(self): # Load model with 1 GPU instance and 2 CPU instance - self.__load_model( + self._load_model( 3, "{\ncount: 2\nkind: KIND_CPU\n},\n{\ncount: 1\nkind: KIND_GPU\n}" ) # Add 2 GPU instance and remove 1 CPU instance - self.__update_instance_count( + self._update_instance_count( 2, 1, "{\ncount: 1\nkind: KIND_CPU\n},\n{\ncount: 3\nkind: KIND_GPU\n}" ) # Shuffle the instances - self.__update_instance_count( + self._update_instance_count( 0, 0, "{\ncount: 3\nkind: KIND_GPU\n},\n{\ncount: 1\nkind: KIND_CPU\n}" ) + time.sleep(0.1) # larger the gap for config.pbtxt timestamp to update # Remove 1 GPU instance and add 1 CPU instance - self.__update_instance_count( + self._update_instance_count( 1, 1, "{\ncount: 2\nkind: KIND_GPU\n},\n{\ncount: 2\nkind: KIND_CPU\n}" ) # Unload model - self.__unload_model() + self._unload_model() # Test model instance name update def test_instance_name_update(self): # Load 3 instances with 2 different names - self.__load_model( + self._load_model( 3, '{\nname: "old_1"\ncount: 1\nkind: KIND_CPU\n},\n{\nname: "old_2"\ncount: 2\nkind: KIND_GPU\n}', ) # Change the instance names - self.__update_instance_count( + self._update_instance_count( 0, 0, '{\nname: "new_1"\ncount: 1\nkind: KIND_CPU\n},\n{\nname: "new_2"\ncount: 2\nkind: KIND_GPU\n}', ) # Unload model - self.__unload_model() + self._unload_model() # Test instance signature grouping def test_instance_signature(self): # Load 2 GPU instances and 3 CPU instances - self.__load_model( + self._load_model( 5, '{\nname: "GPU_group"\ncount: 2\nkind: KIND_GPU\n},\n{\nname: "CPU_group"\ncount: 3\nkind: KIND_CPU\n}', ) # Flatten the instances representation - self.__update_instance_count( + self._update_instance_count( 0, 0, '{\nname: "CPU_1"\ncount: 1\nkind: KIND_CPU\n},\n{\nname: "CPU_2_3"\ncount: 2\nkind: KIND_CPU\n},\n{\nname: "GPU_1"\ncount: 1\nkind: KIND_GPU\n},\n{\nname: "GPU_2"\ncount: 1\nkind: KIND_GPU\n}', ) time.sleep(0.1) # larger the gap for config.pbtxt timestamp to update # Consolidate different representations - self.__update_instance_count( + self._update_instance_count( 0, 0, '{\nname: "CPU_group"\ncount: 3\nkind: KIND_CPU\n},\n{\nname: "GPU_group"\ncount: 2\nkind: KIND_GPU\n}', ) time.sleep(0.1) # larger the gap for config.pbtxt timestamp to update # Flatten the instances representation - self.__update_instance_count( + self._update_instance_count( 0, 0, '{\nname: "GPU_1"\ncount: 1\nkind: KIND_GPU\n},\n{\nname: "GPU_2"\ncount: 1\nkind: KIND_GPU\n},\n{\nname: "CPU_1"\ncount: 1\nkind: KIND_CPU\n},\n{\nname: "CPU_2"\ncount: 1\nkind: KIND_CPU\n},\n{\nname: "CPU_3"\ncount: 1\nkind: KIND_CPU\n}', ) # Unload model - self.__unload_model() + self._unload_model() # Test instance update with invalid instance group config def test_invalid_config(self): # Load model with 8 instances - self.__load_model(8) + self._load_model(8) # Set invalid config update_instance_group("--- invalid config ---") with self.assertRaises(InferenceServerException): - self.__triton.load_model("model_init_del") + self._triton.load_model("model_init_del") # Correct config by reducing instances to 4 - self.__update_instance_count(0, 4) + self._update_instance_count(0, 4) # Unload model - self.__unload_model() + self._unload_model() # Test instance update with model file changed def test_model_file_update(self): - self.__load_model(5) + self._load_model(5) update_model_file() - self.__update_instance_count( + self._update_instance_count( 6, 5, "{\ncount: 6\nkind: KIND_CPU\n}", wait_for_finalize=True ) - self.__unload_model() + self._unload_model() # Test instance update with non instance config changed in config.pbtxt def test_non_instance_config_update(self): - self.__load_model(4, batching=False) + self._load_model(4, batching=False) enable_batching() - self.__update_instance_count( + self._update_instance_count( 2, 4, "{\ncount: 2\nkind: KIND_CPU\n}", wait_for_finalize=True, batching=True, ) - self.__unload_model(batching=True) + self._unload_model(batching=True) # Test passing new instance config via load API def test_load_api_with_config(self): # Load model with 1 instance - self.__load_model(1) + self._load_model(1) # Get the model config from Triton - config = self.__triton.get_model_config(self.__model_name, as_json=True) + config = self._triton.get_model_config(self._model_name, as_json=True) self.assertIn("config", config) self.assertIsInstance(config["config"], dict) config = config["config"] @@ -313,26 +350,26 @@ def test_load_api_with_config(self): config["instance_group"][0]["count"] += 1 self.assertEqual(config["instance_group"][0]["count"], 2) # Load the extra instance via the load API - self.__triton.load_model(self.__model_name, config=json.dumps(config)) - self.__check_count("initialize", 2) # 2 instances in total - self.__check_count("finalize", 0) # no instance is removed - self.__infer() + self._triton.load_model(self._model_name, config=json.dumps(config)) + self._check_count("initialize", 2) # 2 instances in total + self._check_count("finalize", 0) # no instance is removed + self._infer() # Unload model - self.__unload_model() + self._unload_model() # Test instance update with an ongoing inference def test_update_while_inferencing(self): # Load model with 1 instance - self.__load_model(1) + self._load_model(1) # Add 1 instance while inferencing set_delay("infer", 10) update_instance_group("{\ncount: 2\nkind: KIND_CPU\n}") with concurrent.futures.ThreadPoolExecutor() as pool: infer_start_time = time.time() - infer_thread = pool.submit(self.__infer) + infer_thread = pool.submit(self._infer) time.sleep(2) # make sure inference has started update_start_time = time.time() - update_thread = pool.submit(self.__triton.load_model, self.__model_name) + update_thread = pool.submit(self._triton.load_model, self._model_name) update_thread.result() update_end_time = time.time() infer_thread.result() @@ -343,25 +380,25 @@ def test_update_while_inferencing(self): # ongoing inference should not block the update. self.assertGreaterEqual(infer_time, 10.0, "Invalid infer time") self.assertLess(update_time, 5.0, "Update blocked by infer") - self.__check_count("initialize", 2) - self.__check_count("finalize", 0) - self.__infer() + self._check_count("initialize", 2) + self._check_count("finalize", 0) + self._infer() # Unload model - self.__unload_model() + self._unload_model() # Test inference with an ongoing instance update def test_infer_while_updating(self): # Load model with 1 instance - self.__load_model(1) + self._load_model(1) # Infer while adding 1 instance set_delay("initialize", 10) update_instance_group("{\ncount: 2\nkind: KIND_CPU\n}") with concurrent.futures.ThreadPoolExecutor() as pool: update_start_time = time.time() - update_thread = pool.submit(self.__triton.load_model, self.__model_name) + update_thread = pool.submit(self._triton.load_model, self._model_name) time.sleep(2) # make sure update has started infer_start_time = time.time() - infer_thread = pool.submit(self.__infer) + infer_thread = pool.submit(self._infer) infer_thread.result() infer_end_time = time.time() update_thread.result() @@ -372,11 +409,11 @@ def test_infer_while_updating(self): # existing instances. self.assertGreaterEqual(update_time, 10.0, "Invalid update time") self.assertLess(infer_time, 5.0, "Infer blocked by update") - self.__check_count("initialize", 2) - self.__check_count("finalize", 0) - self.__infer() + self._check_count("initialize", 2) + self._check_count("finalize", 0) + self._infer() # Unload model - self.__unload_model() + self._unload_model() # Test instance resource requirement increase @unittest.skipUnless( @@ -385,12 +422,12 @@ def test_infer_while_updating(self): ) def test_instance_resource_increase(self): # Load model - self.__load_model( + self._load_model( 1, '{\ncount: 1\nkind: KIND_CPU\nrate_limiter {\nresources [\n{\nname: "R1"\ncount: 2\n}\n]\n}\n}', ) # Increase resource requirement - self.__update_instance_count( + self._update_instance_count( 1, 1, '{\ncount: 1\nkind: KIND_CPU\nrate_limiter {\nresources [\n{\nname: "R1"\ncount: 8\n}\n]\n}\n}', @@ -402,7 +439,7 @@ def test_instance_resource_increase(self): def infer(): for i in range(infer_count): - self.__infer() + self._infer() infer_complete[i] = True with concurrent.futures.ThreadPoolExecutor() as pool: @@ -411,7 +448,7 @@ def infer(): self.assertNotIn(False, infer_complete, "Infer possibly stuck") infer_thread.result() # Unload model - self.__unload_model() + self._unload_model() # Test instance resource requirement increase above explicit resource @unittest.skipUnless( @@ -420,25 +457,25 @@ def infer(): ) def test_instance_resource_increase_above_explicit(self): # Load model - self.__load_model( + self._load_model( 1, '{\ncount: 1\nkind: KIND_CPU\nrate_limiter {\nresources [\n{\nname: "R1"\ncount: 2\n}\n]\n}\n}', ) # Increase resource requirement with self.assertRaises(InferenceServerException): - self.__update_instance_count( + self._update_instance_count( 0, 0, '{\ncount: 1\nkind: KIND_CPU\nrate_limiter {\nresources [\n{\nname: "R1"\ncount: 32\n}\n]\n}\n}', ) # Correct the resource requirement to match the explicit resource - self.__update_instance_count( + self._update_instance_count( 1, 1, '{\ncount: 1\nkind: KIND_CPU\nrate_limiter {\nresources [\n{\nname: "R1"\ncount: 10\n}\n]\n}\n}', ) # Unload model - self.__unload_model() + self._unload_model() # Test instance resource requirement decrease @unittest.skipUnless( @@ -447,18 +484,18 @@ def test_instance_resource_increase_above_explicit(self): ) def test_instance_resource_decrease(self): # Load model - self.__load_model( + self._load_model( 1, '{\ncount: 1\nkind: KIND_CPU\nrate_limiter {\nresources [\n{\nname: "R1"\ncount: 4\n}\n]\n}\n}', ) # Decrease resource requirement - self.__update_instance_count( + self._update_instance_count( 1, 1, '{\ncount: 1\nkind: KIND_CPU\nrate_limiter {\nresources [\n{\nname: "R1"\ncount: 3\n}\n]\n}\n}', ) # Unload model - self.__unload_model() + self._unload_model() # The resource count of 3 is unique across this entire test, so check # the server output to make sure it is printed, which ensures the # max resource is actually decreased. @@ -479,15 +516,133 @@ def test_instance_resource_decrease(self): # explicit limit of 10 is set. self.assertNotIn("Resource: R1\t Count: 3", f.read()) - # Test for instance update on direct sequence scheduling - @unittest.skip("Sequence will not continue after update [FIXME: DLIS-4820]") - def test_instance_update_on_direct_sequence_scheduling(self): - pass + _direct_sequence_batching_str = ( + "direct { }\nmax_sequence_idle_microseconds: 8000000" + ) + _oldest_sequence_batching_str = ( + "oldest { max_candidate_sequences: 4 }\nmax_sequence_idle_microseconds: 8000000" + ) + + # Test instance update for direct scheduler without any ongoing sequences + def test_direct_scheduler_update_no_ongoing_sequences(self): + self._test_scheduler_update_no_ongoing_sequences( + self._direct_sequence_batching_str + ) - # Test for instance update on oldest sequence scheduling - @unittest.skip("Sequence will not continue after update [FIXME: DLIS-4820]") - def test_instance_update_on_oldest_sequence_scheduling(self): - pass + # Test instance update for direct scheduler with any ongoing sequences + def test_direct_scheduler_update_with_ongoing_sequences(self): + self._test_scheduler_update_with_ongoing_sequences( + self._direct_sequence_batching_str + ) + + # Test instance update for oldest scheduler without ongoing sequences + def test_oldest_scheduler_update_no_ongoing_sequences(self): + self._test_scheduler_update_no_ongoing_sequences( + self._oldest_sequence_batching_str + ) + + # Test instance update for oldest scheduler with ongoing sequences + def test_oldest_scheduler_update_with_ongoing_sequences(self): + self._test_scheduler_update_with_ongoing_sequences( + self._oldest_sequence_batching_str + ) + + # Helper function for testing the success of sequence instance updates + # without any ongoing sequences. + def _test_scheduler_update_no_ongoing_sequences(self, sequence_batching_str): + # Load model + update_instance_group("{\ncount: 2\nkind: KIND_CPU\n}") + update_sequence_batching(sequence_batching_str) + self._triton.load_model(self._model_name) + self._check_count("initialize", 2) + self._check_count("finalize", 0) + # Basic sequence inference + self._triton.infer( + self._model_name, self._get_inputs(), sequence_id=1, sequence_start=True + ) + self._triton.infer(self._model_name, self._get_inputs(), sequence_id=1) + self._triton.infer( + self._model_name, self._get_inputs(), sequence_id=1, sequence_end=True + ) + # Add 2 instances without in-flight sequence + update_instance_group("{\ncount: 4\nkind: KIND_CPU\n}") + self._triton.load_model(self._model_name) + self._check_count("initialize", 4) + self._check_count("finalize", 0) + # Basic sequence inference + self._triton.infer( + self._model_name, self._get_inputs(), sequence_id=1, sequence_start=True + ) + self._triton.infer( + self._model_name, self._get_inputs(), sequence_id=1, sequence_end=True + ) + # Remove 1 instance without in-flight sequence + update_instance_group("{\ncount: 3\nkind: KIND_CPU\n}") + self._triton.load_model(self._model_name) + self._check_count("initialize", 4) + self._check_count("finalize", 1, poll=True) + # Basic sequence inference + self._triton.infer( + self._model_name, self._get_inputs(), sequence_id=1, sequence_start=True + ) + self._triton.infer( + self._model_name, self._get_inputs(), sequence_id=1, sequence_end=True + ) + # Unload model + self._triton.unload_model(self._model_name) + self._check_count("initialize", 4) + self._check_count("finalize", 4, poll=True) + + # Helper function for testing if ongoing sequences may continue to infer on + # the same instance after the instance processing the sequence is removed + # from an instance update, which the removed instance will live until the + # sequences end. + def _test_scheduler_update_with_ongoing_sequences(self, sequence_batching_str): + # Load model + update_instance_group("{\ncount: 3\nkind: KIND_CPU\n}") + update_sequence_batching(sequence_batching_str) + self._triton.load_model(self._model_name) + self._check_count("initialize", 3) + self._check_count("finalize", 0) + # Start sequence 1 and 2 on CPU instances + self._triton.infer( + self._model_name, self._get_inputs(), sequence_id=1, sequence_start=True + ) + self._triton.infer( + self._model_name, self._get_inputs(), sequence_id=2, sequence_start=True + ) + # Remove all 3 CPU and add 1 GPU instance with in-flight sequences. Both + # in-flight sequences are assigned to any 2 CPU instances, so exactly 1 + # CPU instance can be removed immediately. + update_instance_group("{\ncount: 1\nkind: KIND_GPU\n}") + self._triton.load_model(self._model_name) + self._check_count("initialize", 4) # 3 CPU + 1 GPU + self._check_count("finalize", 1, poll=True) # 1 CPU + # Sequence 1 and 2 may continue to infer + self._triton.infer(self._model_name, self._get_inputs(), sequence_id=1) + self._triton.infer(self._model_name, self._get_inputs(), sequence_id=2) + self._check_count("finalize", 1) # check 2 CPU instances not removed + # Start sequence 3 on GPU instance + self._triton.infer( + self._model_name, self._get_inputs(), sequence_id=3, sequence_start=True + ) + self._check_count("finalize", 1) # check 2 CPU instances not removed + # End sequence 1 and 2 will remove the 2 CPU instances + self._triton.infer( + self._model_name, self._get_inputs(), sequence_id=1, sequence_end=True + ) + self._triton.infer( + self._model_name, self._get_inputs(), sequence_id=2, sequence_end=True + ) + self._check_count("finalize", 3, poll=True) # 3 CPU + # End sequence 3 + self._triton.infer( + self._model_name, self._get_inputs(), sequence_id=3, sequence_end=True + ) + # Unload model + self._triton.unload_model(self._model_name) + self._check_count("initialize", 4) # 3 CPU + 1 GPU + self._check_count("finalize", 4, poll=True) # 3 CPU + 1 GPU if __name__ == "__main__": diff --git a/qa/L0_model_update/test.sh b/qa/L0_model_update/test.sh index 7f8c23e38a..aa9cf7fcc1 100755 --- a/qa/L0_model_update/test.sh +++ b/qa/L0_model_update/test.sh @@ -38,6 +38,10 @@ if [ ! -z "$TEST_REPO_ARCH" ]; then REPO_VERSION=${REPO_VERSION}_${TEST_REPO_ARCH} fi +# This L0_model_update test should make changes to models without restarting the +# server, unless restarting the server is the only way of accomplishing the +# change. + export CUDA_VISIBLE_DEVICES=0 export PYTHONDONTWRITEBYTECODE="True" export MODEL_LOG_DIR="`pwd`" diff --git a/qa/python_models/model_init_del/config.pbtxt b/qa/python_models/model_init_del/config.pbtxt index ee0ed17d26..be66468a0a 100644 --- a/qa/python_models/model_init_del/config.pbtxt +++ b/qa/python_models/model_init_del/config.pbtxt @@ -49,4 +49,4 @@ instance_group [ count: 1 kind: KIND_CPU } -] +] # end instance_group diff --git a/qa/python_models/model_init_del/model.py b/qa/python_models/model_init_del/model.py index 924132ecb1..578279f8ef 100644 --- a/qa/python_models/model_init_del/model.py +++ b/qa/python_models/model_init_del/model.py @@ -37,7 +37,7 @@ class TritonPythonModel: def initialize(self, args): inc_count("initialize") - self.__sleep("initialize") + self._sleep("initialize") def execute(self, requests): responses = [] @@ -45,13 +45,13 @@ def execute(self, requests): input_tensor = pb_utils.get_input_tensor_by_name(request, "INPUT0") out_tensor = pb_utils.Tensor("OUTPUT0", input_tensor.as_numpy()) responses.append(pb_utils.InferenceResponse([out_tensor])) - self.__sleep("infer") + self._sleep("infer") return responses def finalize(self): inc_count("finalize") - def __sleep(self, kind): + def _sleep(self, kind): delay = get_delay(kind) if delay > 0: time.sleep(delay) diff --git a/qa/python_models/model_init_del/util.py b/qa/python_models/model_init_del/util.py index f7d5c30d6b..a36f13eea9 100755 --- a/qa/python_models/model_init_del/util.py +++ b/qa/python_models/model_init_del/util.py @@ -29,14 +29,14 @@ import fcntl import os -__model_name = "model_init_del" +_model_name = "model_init_del" # # Helper functions for reading/writing state to disk # -def __get_number(filename): +def _get_number(filename): full_path = os.path.join(os.environ["MODEL_LOG_DIR"], filename) try: with open(full_path, mode="r", encoding="utf-8", errors="strict") as f: @@ -47,7 +47,7 @@ def __get_number(filename): return int(txt) -def __store_number(filename, number): +def _store_number(filename, number): full_path = os.path.join(os.environ["MODEL_LOG_DIR"], filename) txt = str(number) with open(full_path, mode="w", encoding="utf-8", errors="strict") as f: @@ -55,7 +55,7 @@ def __store_number(filename, number): f.write(txt) -def __inc_number(filename): +def _inc_number(filename): full_path = os.path.join(os.environ["MODEL_LOG_DIR"], filename) try: with open(full_path, mode="r+", encoding="utf-8", errors="strict") as f: @@ -68,7 +68,7 @@ def __inc_number(filename): f.write(txt) except FileNotFoundError: number = 1 - __store_number(filename, number) + _store_number(filename, number) return number @@ -78,24 +78,24 @@ def __inc_number(filename): # -def __get_count_filename(kind): +def _get_count_filename(kind): if kind != "initialize" and kind != "finalize": raise KeyError("Invalid count kind: " + str(kind)) - filename = __model_name + "_" + kind + "_count.txt" + filename = _model_name + "_" + kind + "_count.txt" return filename def get_count(kind): - return __get_number(__get_count_filename(kind)) + return _get_number(_get_count_filename(kind)) def inc_count(kind): - return __inc_number(__get_count_filename(kind)) + return _inc_number(_get_count_filename(kind)) def reset_count(kind): count = 0 - __store_number(__get_count_filename(kind), count) + _store_number(_get_count_filename(kind), count) return count @@ -104,19 +104,19 @@ def reset_count(kind): # -def __get_delay_filename(kind): +def _get_delay_filename(kind): if kind != "initialize" and kind != "infer": raise KeyError("Invalid delay kind: " + str(kind)) - filename = __model_name + "_" + kind + "_delay.txt" + filename = _model_name + "_" + kind + "_delay.txt" return filename def get_delay(kind): - return __get_number(__get_delay_filename(kind)) + return _get_number(_get_delay_filename(kind)) def set_delay(kind, delay): - __store_number(__get_delay_filename(kind), delay) + _store_number(_get_delay_filename(kind), delay) return delay @@ -129,10 +129,32 @@ def update_instance_group(instance_group_str): full_path = os.path.join(os.path.dirname(__file__), "config.pbtxt") with open(full_path, mode="r+", encoding="utf-8", errors="strict") as f: txt = f.read() - txt = txt.split("instance_group [")[0] + txt, post_match = txt.split("instance_group [") txt += "instance_group [\n" txt += instance_group_str - txt += "\n]\n" + txt += "\n] # end instance_group\n" + txt += post_match.split("\n] # end instance_group\n")[1] + f.truncate(0) + f.seek(0) + f.write(txt) + return txt + + +def update_sequence_batching(sequence_batching_str): + full_path = os.path.join(os.path.dirname(__file__), "config.pbtxt") + with open(full_path, mode="r+", encoding="utf-8", errors="strict") as f: + txt = f.read() + if "sequence_batching {" in txt: + txt, post_match = txt.split("sequence_batching {") + if sequence_batching_str != "": + txt += "sequence_batching {\n" + txt += sequence_batching_str + txt += "\n} # end sequence_batching\n" + txt += post_match.split("\n} # end sequence_batching\n")[1] + elif sequence_batching_str != "": + txt += "\nsequence_batching {\n" + txt += sequence_batching_str + txt += "\n} # end sequence_batching\n" f.truncate(0) f.seek(0) f.write(txt)