diff --git a/clipper_admin/clipper_admin/clipper_admin.py b/clipper_admin/clipper_admin/clipper_admin.py index f7957251c..2be1f91b7 100644 --- a/clipper_admin/clipper_admin/clipper_admin.py +++ b/clipper_admin/clipper_admin/clipper_admin.py @@ -281,7 +281,9 @@ def build_and_deploy_model(self, for a model can be changed at any time with :py:meth:`clipper.ClipperConnection.set_num_replicas`. batch_size : int, optional - The user-defined batch size. + The user-defined query batch size for the model. Replicas of the model will attempt + to process at most `batch_size` queries simultaneously. They may process smaller + batches if `batch_size` queries are not immediately available. If the default value of -1 is used, Clipper will adaptively calculate the batch size for individual replicas of this model. Raises @@ -443,7 +445,9 @@ def deploy_model(self, for a model can be changed at any time with :py:meth:`clipper.ClipperConnection.set_num_replicas`. batch_size : int, optional - The user-defined batch size. + The user-defined query batch size for the model. Replicas of the model will attempt + to process at most `batch_size` queries simultaneously. They may process smaller + batches if `batch_size` queries are not immediately available. If the default value of -1 is used, Clipper will adaptively calculate the batch size for individual replicas of this model. @@ -516,7 +520,9 @@ def register_model(self, A list of strings annotating the model. These are ignored by Clipper and used purely for user annotations. batch_size : int, optional - The user-defined batch size. + The user-defined query batch size for the model. Replicas of the model will attempt + to process at most `batch_size` queries simultaneously. They may process smaller + batches if `batch_size` queries are not immediately available. If the default value of -1 is used, Clipper will adaptively calculate the batch size for individual replicas of this model. diff --git a/clipper_admin/clipper_admin/deployers/pyspark.py b/clipper_admin/clipper_admin/deployers/pyspark.py index d3f891568..dfa094aab 100644 --- a/clipper_admin/clipper_admin/deployers/pyspark.py +++ b/clipper_admin/clipper_admin/deployers/pyspark.py @@ -26,7 +26,8 @@ def create_endpoint( labels=None, registry=None, base_image="clipper/pyspark-container:{}".format(__version__), - num_replicas=1): + num_replicas=1, + batch_size=-1): """Registers an app and deploys the provided predict function with PySpark model as a Clipper model. @@ -79,13 +80,19 @@ def create_endpoint( The number of replicas of the model to create. The number of replicas for a model can be changed at any time with :py:meth:`clipper.ClipperConnection.set_num_replicas`. + batch_size : int, optional + The user-defined query batch size for the model. Replicas of the model will attempt + to process at most `batch_size` queries simultaneously. They may process smaller + batches if `batch_size` queries are not immediately available. + If the default value of -1 is used, Clipper will adaptively calculate the batch size for individual + replicas of this model. """ clipper_conn.register_application(name, input_type, default_output, slo_micros) deploy_pyspark_model(clipper_conn, name, version, input_type, func, pyspark_model, sc, base_image, labels, registry, - num_replicas) + num_replicas, batch_size) clipper_conn.link_model_to_app(name, name) @@ -101,7 +108,8 @@ def deploy_pyspark_model( base_image="clipper/pyspark-container:{}".format(__version__), labels=None, registry=None, - num_replicas=1): + num_replicas=1, + batch_size=-1): """Deploy a Python function with a PySpark model. The function must take 3 arguments (in order): a SparkSession, the PySpark model, and a list of @@ -141,7 +149,12 @@ def deploy_pyspark_model( The number of replicas of the model to create. The number of replicas for a model can be changed at any time with :py:meth:`clipper.ClipperConnection.set_num_replicas`. - + batch_size : int, optional + The user-defined query batch size for the model. Replicas of the model will attempt + to process at most `batch_size` queries simultaneously. They may process smaller + batches if `batch_size` queries are not immediately available. + If the default value of -1 is used, Clipper will adaptively calculate the batch size for individual + replicas of this model. Example ------- @@ -214,7 +227,7 @@ def predict(spark, model, inputs): # Deploy model clipper_conn.build_and_deploy_model(name, version, input_type, serialization_dir, base_image, labels, - registry, num_replicas) + registry, num_replicas, batch_size) # Remove temp files shutil.rmtree(serialization_dir) diff --git a/clipper_admin/clipper_admin/deployers/python.py b/clipper_admin/clipper_admin/deployers/python.py index 1cf7cce0b..f3895464a 100644 --- a/clipper_admin/clipper_admin/deployers/python.py +++ b/clipper_admin/clipper_admin/deployers/python.py @@ -22,7 +22,8 @@ def create_endpoint( labels=None, registry=None, base_image="clipper/python-closure-container:{}".format(__version__), - num_replicas=1): + num_replicas=1, + batch_size=-1): """Registers an application and deploys the provided predict function as a model. Parameters @@ -70,12 +71,19 @@ def create_endpoint( The number of replicas of the model to create. The number of replicas for a model can be changed at any time with :py:meth:`clipper.ClipperConnection.set_num_replicas`. + batch_size : int, optional + The user-defined query batch size for the model. Replicas of the model will attempt + to process at most `batch_size` queries simultaneously. They may process smaller + batches if `batch_size` queries are not immediately available. + If the default value of -1 is used, Clipper will adaptively calculate the batch size for individual + replicas of this model. """ clipper_conn.register_application(name, input_type, default_output, slo_micros) deploy_python_closure(clipper_conn, name, version, input_type, func, - base_image, labels, registry, num_replicas) + base_image, labels, registry, num_replicas, + batch_size) clipper_conn.link_model_to_app(name, name) @@ -89,7 +97,8 @@ def deploy_python_closure( base_image="clipper/python-closure-container:{}".format(__version__), labels=None, registry=None, - num_replicas=1): + num_replicas=1, + batch_size=-1): """Deploy an arbitrary Python function to Clipper. The function should take a list of inputs of the type specified by `input_type` and @@ -125,7 +134,12 @@ def deploy_python_closure( The number of replicas of the model to create. The number of replicas for a model can be changed at any time with :py:meth:`clipper.ClipperConnection.set_num_replicas`. - + batch_size : int, optional + The user-defined query batch size for the model. Replicas of the model will attempt + to process at most `batch_size` queries simultaneously. They may process smaller + batches if `batch_size` queries are not immediately available. + If the default value of -1 is used, Clipper will adaptively calculate the batch size for individual + replicas of this model. Example ------- @@ -171,6 +185,6 @@ def centered_predict(inputs): # Deploy function clipper_conn.build_and_deploy_model(name, version, input_type, serialization_dir, base_image, labels, - registry, num_replicas) + registry, num_replicas, batch_size) # Remove temp files shutil.rmtree(serialization_dir) diff --git a/clipper_admin/clipper_admin/deployers/pytorch.py b/clipper_admin/clipper_admin/deployers/pytorch.py index dd3e63844..fe91011e6 100644 --- a/clipper_admin/clipper_admin/deployers/pytorch.py +++ b/clipper_admin/clipper_admin/deployers/pytorch.py @@ -28,7 +28,8 @@ def create_endpoint( labels=None, registry=None, base_image="clipper/pytorch-container:{}".format(__version__), - num_replicas=1): + num_replicas=1, + batch_size=-1): """Registers an app and deploys the provided predict function with PyTorch model as a Clipper model. Parameters @@ -78,13 +79,19 @@ def create_endpoint( The number of replicas of the model to create. The number of replicas for a model can be changed at any time with :py:meth:`clipper.ClipperConnection.set_num_replicas`. + batch_size : int, optional + The user-defined query batch size for the model. Replicas of the model will attempt + to process at most `batch_size` queries simultaneously. They may process smaller + batches if `batch_size` queries are not immediately available. + If the default value of -1 is used, Clipper will adaptively calculate the batch size for individual + replicas of this model. """ clipper_conn.register_application(name, input_type, default_output, slo_micros) deploy_pytorch_model(clipper_conn, name, version, input_type, func, pytorch_model, base_image, labels, registry, - num_replicas) + num_replicas, batch_size) clipper_conn.link_model_to_app(name, name) @@ -99,7 +106,8 @@ def deploy_pytorch_model( base_image="clipper/pytorch-container:{}".format(__version__), labels=None, registry=None, - num_replicas=1): + num_replicas=1, + batch_size=-1): """Deploy a Python function with a PyTorch model. Parameters ---------- @@ -133,6 +141,13 @@ def deploy_pytorch_model( The number of replicas of the model to create. The number of replicas for a model can be changed at any time with :py:meth:`clipper.ClipperConnection.set_num_replicas`. + batch_size : int, optional + The user-defined query batch size for the model. Replicas of the model will attempt + to process at most `batch_size` queries simultaneously. They may process smaller + batches if `batch_size` queries are not immediately available. + If the default value of -1 is used, Clipper will adaptively calculate the batch size for individual + replicas of this model. + Example ------- @@ -185,7 +200,7 @@ def predict(model, inputs): # Deploy model clipper_conn.build_and_deploy_model(name, version, input_type, serialization_dir, base_image, labels, - registry, num_replicas) + registry, num_replicas, batch_size) # Remove temp files shutil.rmtree(serialization_dir) diff --git a/clipper_admin/clipper_admin/deployers/tensorflow.py b/clipper_admin/clipper_admin/deployers/tensorflow.py index 615bb261b..360b3d446 100644 --- a/clipper_admin/clipper_admin/deployers/tensorflow.py +++ b/clipper_admin/clipper_admin/deployers/tensorflow.py @@ -26,7 +26,8 @@ def create_endpoint(clipper_conn, labels=None, registry=None, base_image="clipper/tf-container:{}".format(__version__), - num_replicas=1): + num_replicas=1, + batch_size=-1): """Registers an app and deploys the provided predict function with TensorFlow model as a Clipper model. @@ -76,13 +77,19 @@ def create_endpoint(clipper_conn, The number of replicas of the model to create. The number of replicas for a model can be changed at any time with :py:meth:`clipper.ClipperConnection.set_num_replicas`. + batch_size : int, optional + The user-defined query batch size for the model. Replicas of the model will attempt + to process at most `batch_size` queries simultaneously. They may process smaller + batches if `batch_size` queries are not immediately available. + If the default value of -1 is used, Clipper will adaptively calculate the batch size for individual + replicas of this model. """ clipper_conn.register_application(name, input_type, default_output, slo_micros) deploy_tensorflow_model(clipper_conn, name, version, input_type, func, tf_sess_or_saved_model_path, base_image, labels, - registry, num_replicas) + registry, num_replicas, batch_size) clipper_conn.link_model_to_app(name, name) @@ -97,7 +104,8 @@ def deploy_tensorflow_model( base_image="clipper/tf-container:{}".format(__version__), labels=None, registry=None, - num_replicas=1): + num_replicas=1, + batch_size=-1): """Deploy a Python prediction function with a Tensorflow session or saved Tensorflow model. Parameters ---------- @@ -131,7 +139,12 @@ def deploy_tensorflow_model( The number of replicas of the model to create. The number of replicas for a model can be changed at any time with :py:meth:`clipper.ClipperConnection.set_num_replicas`. - + batch_size : int, optional + The user-defined query batch size for the model. Replicas of the model will attempt + to process at most `batch_size` queries simultaneously. They may process smaller + batches if `batch_size` queries are not immediately available. + If the default value of -1 is used, Clipper will adaptively calculate the batch size for individual + replicas of this model. Example ------- @@ -239,7 +252,7 @@ def predict(sess, inputs): # Deploy model clipper_conn.build_and_deploy_model(name, version, input_type, serialization_dir, base_image, labels, - registry, num_replicas) + registry, num_replicas, batch_size) # Remove temp files shutil.rmtree(serialization_dir) diff --git a/integration-tests/clipper_admin_tests.py b/integration-tests/clipper_admin_tests.py index 584061638..42bc21ee3 100644 --- a/integration-tests/clipper_admin_tests.py +++ b/integration-tests/clipper_admin_tests.py @@ -352,9 +352,11 @@ def setUpClass(self): self.app_name_1 = "app3" self.app_name_2 = "app4" self.app_name_3 = "app5" + self.app_name_4 = "app6" self.model_name_1 = "m4" self.model_name_2 = "m5" self.model_name_3 = "m6" + self.model_name_4 = "m7" self.input_type = "doubles" self.default_output = "DEFAULT" self.latency_slo_micros = 30000 @@ -371,6 +373,10 @@ def setUpClass(self): self.app_name_3, self.input_type, self.default_output, self.latency_slo_micros) + self.clipper_conn.register_application( + self.app_name_4, self.input_type, self.default_output, + self.latency_slo_micros) + @classmethod def tearDownClass(self): self.clipper_conn = create_docker_connection( @@ -470,6 +476,45 @@ def predict_func(inputs): self.assertTrue(received_non_default_prediction) + def test_fixed_batch_size_model_processes_specified_query_batch_size_when_saturated( + self): + model_version = 1 + + def predict_func(inputs): + batch_size = len(inputs) + return [str(batch_size) for _ in inputs] + + fixed_batch_size = 9 + total_num_queries = fixed_batch_size * 50 + deploy_python_closure( + self.clipper_conn, + self.model_name_4, + model_version, + self.input_type, + predict_func, + batch_size=fixed_batch_size) + self.clipper_conn.link_model_to_app(self.app_name_4, self.model_name_4) + time.sleep(60) + + addr = self.clipper_conn.get_query_addr() + url = "http://{addr}/{app}/predict".format( + addr=addr, app=self.app_name_4) + test_input = [[float(x) + (j * .001) for x in range(5)] + for j in range(total_num_queries)] + req_json = json.dumps({'input_batch': test_input}) + headers = {'Content-type': 'application/json'} + response = requests.post(url, headers=headers, data=req_json) + parsed_response = response.json() + num_max_batch_queries = 0 + for prediction in parsed_response["batch_predictions"]: + batch_size = prediction["output"] + if batch_size != self.default_output and int( + batch_size) == fixed_batch_size: + num_max_batch_queries += 1 + + self.assertGreaterEqual(num_max_batch_queries, + int(total_num_queries * .7)) + SHORT_TEST_ORDERING = [ 'test_register_model_correct', @@ -495,7 +540,8 @@ def predict_func(inputs): 'test_unlinked_app_returns_default_predictions', 'test_deployed_model_queried_successfully', 'test_batch_queries_returned_successfully', - 'test_deployed_python_closure_queried_successfully' + 'test_deployed_python_closure_queried_successfully', + 'test_fixed_batch_size_model_processes_specified_query_batch_size_when_saturated' ] if __name__ == '__main__':