Skip to content

Commit

Permalink
Integration test for fixed-batch-size model deployment (#370)
Browse files Browse the repository at this point in the history
* batch size test

* fix

* varfix

* add missing param

* make inputs unique

* Clean up test

* Format code

* Variable fix

* Change query length to ensure queue saturation

* formatting fix

* Fix test format

* test fix

* test fix

* Test fix

* ..

* Doc fixes
  • Loading branch information
Corey-Zumar authored and dcrankshaw committed Jan 25, 2018
1 parent ffef540 commit 5e9cd65
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 23 deletions.
12 changes: 9 additions & 3 deletions clipper_admin/clipper_admin/clipper_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,9 @@ def build_and_deploy_model(self,
for a model can be changed at any time with
:py:meth:`clipper.ClipperConnection.set_num_replicas`.
batch_size : int, optional
The user-defined batch size.
The user-defined query batch size for the model. Replicas of the model will attempt
to process at most `batch_size` queries simultaneously. They may process smaller
batches if `batch_size` queries are not immediately available.
If the default value of -1 is used, Clipper will adaptively calculate the batch size for individual
replicas of this model.
Raises
Expand Down Expand Up @@ -443,7 +445,9 @@ def deploy_model(self,
for a model can be changed at any time with
:py:meth:`clipper.ClipperConnection.set_num_replicas`.
batch_size : int, optional
The user-defined batch size.
The user-defined query batch size for the model. Replicas of the model will attempt
to process at most `batch_size` queries simultaneously. They may process smaller
batches if `batch_size` queries are not immediately available.
If the default value of -1 is used, Clipper will adaptively calculate the batch size for individual
replicas of this model.
Expand Down Expand Up @@ -516,7 +520,9 @@ def register_model(self,
A list of strings annotating the model. These are ignored by Clipper
and used purely for user annotations.
batch_size : int, optional
The user-defined batch size.
The user-defined query batch size for the model. Replicas of the model will attempt
to process at most `batch_size` queries simultaneously. They may process smaller
batches if `batch_size` queries are not immediately available.
If the default value of -1 is used, Clipper will adaptively calculate the batch size for individual
replicas of this model.
Expand Down
23 changes: 18 additions & 5 deletions clipper_admin/clipper_admin/deployers/pyspark.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ def create_endpoint(
labels=None,
registry=None,
base_image="clipper/pyspark-container:{}".format(__version__),
num_replicas=1):
num_replicas=1,
batch_size=-1):
"""Registers an app and deploys the provided predict function with PySpark model as
a Clipper model.
Expand Down Expand Up @@ -79,13 +80,19 @@ def create_endpoint(
The number of replicas of the model to create. The number of replicas
for a model can be changed at any time with
:py:meth:`clipper.ClipperConnection.set_num_replicas`.
batch_size : int, optional
The user-defined query batch size for the model. Replicas of the model will attempt
to process at most `batch_size` queries simultaneously. They may process smaller
batches if `batch_size` queries are not immediately available.
If the default value of -1 is used, Clipper will adaptively calculate the batch size for individual
replicas of this model.
"""

clipper_conn.register_application(name, input_type, default_output,
slo_micros)
deploy_pyspark_model(clipper_conn, name, version, input_type, func,
pyspark_model, sc, base_image, labels, registry,
num_replicas)
num_replicas, batch_size)

clipper_conn.link_model_to_app(name, name)

Expand All @@ -101,7 +108,8 @@ def deploy_pyspark_model(
base_image="clipper/pyspark-container:{}".format(__version__),
labels=None,
registry=None,
num_replicas=1):
num_replicas=1,
batch_size=-1):
"""Deploy a Python function with a PySpark model.
The function must take 3 arguments (in order): a SparkSession, the PySpark model, and a list of
Expand Down Expand Up @@ -141,7 +149,12 @@ def deploy_pyspark_model(
The number of replicas of the model to create. The number of replicas
for a model can be changed at any time with
:py:meth:`clipper.ClipperConnection.set_num_replicas`.
batch_size : int, optional
The user-defined query batch size for the model. Replicas of the model will attempt
to process at most `batch_size` queries simultaneously. They may process smaller
batches if `batch_size` queries are not immediately available.
If the default value of -1 is used, Clipper will adaptively calculate the batch size for individual
replicas of this model.
Example
-------
Expand Down Expand Up @@ -214,7 +227,7 @@ def predict(spark, model, inputs):
# Deploy model
clipper_conn.build_and_deploy_model(name, version, input_type,
serialization_dir, base_image, labels,
registry, num_replicas)
registry, num_replicas, batch_size)

# Remove temp files
shutil.rmtree(serialization_dir)
24 changes: 19 additions & 5 deletions clipper_admin/clipper_admin/deployers/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ def create_endpoint(
labels=None,
registry=None,
base_image="clipper/python-closure-container:{}".format(__version__),
num_replicas=1):
num_replicas=1,
batch_size=-1):
"""Registers an application and deploys the provided predict function as a model.
Parameters
Expand Down Expand Up @@ -70,12 +71,19 @@ def create_endpoint(
The number of replicas of the model to create. The number of replicas
for a model can be changed at any time with
:py:meth:`clipper.ClipperConnection.set_num_replicas`.
batch_size : int, optional
The user-defined query batch size for the model. Replicas of the model will attempt
to process at most `batch_size` queries simultaneously. They may process smaller
batches if `batch_size` queries are not immediately available.
If the default value of -1 is used, Clipper will adaptively calculate the batch size for individual
replicas of this model.
"""

clipper_conn.register_application(name, input_type, default_output,
slo_micros)
deploy_python_closure(clipper_conn, name, version, input_type, func,
base_image, labels, registry, num_replicas)
base_image, labels, registry, num_replicas,
batch_size)

clipper_conn.link_model_to_app(name, name)

Expand All @@ -89,7 +97,8 @@ def deploy_python_closure(
base_image="clipper/python-closure-container:{}".format(__version__),
labels=None,
registry=None,
num_replicas=1):
num_replicas=1,
batch_size=-1):
"""Deploy an arbitrary Python function to Clipper.
The function should take a list of inputs of the type specified by `input_type` and
Expand Down Expand Up @@ -125,7 +134,12 @@ def deploy_python_closure(
The number of replicas of the model to create. The number of replicas
for a model can be changed at any time with
:py:meth:`clipper.ClipperConnection.set_num_replicas`.
batch_size : int, optional
The user-defined query batch size for the model. Replicas of the model will attempt
to process at most `batch_size` queries simultaneously. They may process smaller
batches if `batch_size` queries are not immediately available.
If the default value of -1 is used, Clipper will adaptively calculate the batch size for individual
replicas of this model.
Example
-------
Expand Down Expand Up @@ -171,6 +185,6 @@ def centered_predict(inputs):
# Deploy function
clipper_conn.build_and_deploy_model(name, version, input_type,
serialization_dir, base_image, labels,
registry, num_replicas)
registry, num_replicas, batch_size)
# Remove temp files
shutil.rmtree(serialization_dir)
23 changes: 19 additions & 4 deletions clipper_admin/clipper_admin/deployers/pytorch.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ def create_endpoint(
labels=None,
registry=None,
base_image="clipper/pytorch-container:{}".format(__version__),
num_replicas=1):
num_replicas=1,
batch_size=-1):
"""Registers an app and deploys the provided predict function with PyTorch model as
a Clipper model.
Parameters
Expand Down Expand Up @@ -78,13 +79,19 @@ def create_endpoint(
The number of replicas of the model to create. The number of replicas
for a model can be changed at any time with
:py:meth:`clipper.ClipperConnection.set_num_replicas`.
batch_size : int, optional
The user-defined query batch size for the model. Replicas of the model will attempt
to process at most `batch_size` queries simultaneously. They may process smaller
batches if `batch_size` queries are not immediately available.
If the default value of -1 is used, Clipper will adaptively calculate the batch size for individual
replicas of this model.
"""

clipper_conn.register_application(name, input_type, default_output,
slo_micros)
deploy_pytorch_model(clipper_conn, name, version, input_type, func,
pytorch_model, base_image, labels, registry,
num_replicas)
num_replicas, batch_size)

clipper_conn.link_model_to_app(name, name)

Expand All @@ -99,7 +106,8 @@ def deploy_pytorch_model(
base_image="clipper/pytorch-container:{}".format(__version__),
labels=None,
registry=None,
num_replicas=1):
num_replicas=1,
batch_size=-1):
"""Deploy a Python function with a PyTorch model.
Parameters
----------
Expand Down Expand Up @@ -133,6 +141,13 @@ def deploy_pytorch_model(
The number of replicas of the model to create. The number of replicas
for a model can be changed at any time with
:py:meth:`clipper.ClipperConnection.set_num_replicas`.
batch_size : int, optional
The user-defined query batch size for the model. Replicas of the model will attempt
to process at most `batch_size` queries simultaneously. They may process smaller
batches if `batch_size` queries are not immediately available.
If the default value of -1 is used, Clipper will adaptively calculate the batch size for individual
replicas of this model.
Example
-------
Expand Down Expand Up @@ -185,7 +200,7 @@ def predict(model, inputs):
# Deploy model
clipper_conn.build_and_deploy_model(name, version, input_type,
serialization_dir, base_image, labels,
registry, num_replicas)
registry, num_replicas, batch_size)

# Remove temp files
shutil.rmtree(serialization_dir)
23 changes: 18 additions & 5 deletions clipper_admin/clipper_admin/deployers/tensorflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ def create_endpoint(clipper_conn,
labels=None,
registry=None,
base_image="clipper/tf-container:{}".format(__version__),
num_replicas=1):
num_replicas=1,
batch_size=-1):
"""Registers an app and deploys the provided predict function with TensorFlow model as
a Clipper model.
Expand Down Expand Up @@ -76,13 +77,19 @@ def create_endpoint(clipper_conn,
The number of replicas of the model to create. The number of replicas
for a model can be changed at any time with
:py:meth:`clipper.ClipperConnection.set_num_replicas`.
batch_size : int, optional
The user-defined query batch size for the model. Replicas of the model will attempt
to process at most `batch_size` queries simultaneously. They may process smaller
batches if `batch_size` queries are not immediately available.
If the default value of -1 is used, Clipper will adaptively calculate the batch size for individual
replicas of this model.
"""

clipper_conn.register_application(name, input_type, default_output,
slo_micros)
deploy_tensorflow_model(clipper_conn, name, version, input_type, func,
tf_sess_or_saved_model_path, base_image, labels,
registry, num_replicas)
registry, num_replicas, batch_size)

clipper_conn.link_model_to_app(name, name)

Expand All @@ -97,7 +104,8 @@ def deploy_tensorflow_model(
base_image="clipper/tf-container:{}".format(__version__),
labels=None,
registry=None,
num_replicas=1):
num_replicas=1,
batch_size=-1):
"""Deploy a Python prediction function with a Tensorflow session or saved Tensorflow model.
Parameters
----------
Expand Down Expand Up @@ -131,7 +139,12 @@ def deploy_tensorflow_model(
The number of replicas of the model to create. The number of replicas
for a model can be changed at any time with
:py:meth:`clipper.ClipperConnection.set_num_replicas`.
batch_size : int, optional
The user-defined query batch size for the model. Replicas of the model will attempt
to process at most `batch_size` queries simultaneously. They may process smaller
batches if `batch_size` queries are not immediately available.
If the default value of -1 is used, Clipper will adaptively calculate the batch size for individual
replicas of this model.
Example
-------
Expand Down Expand Up @@ -239,7 +252,7 @@ def predict(sess, inputs):
# Deploy model
clipper_conn.build_and_deploy_model(name, version, input_type,
serialization_dir, base_image, labels,
registry, num_replicas)
registry, num_replicas, batch_size)

# Remove temp files
shutil.rmtree(serialization_dir)
48 changes: 47 additions & 1 deletion integration-tests/clipper_admin_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,11 @@ def setUpClass(self):
self.app_name_1 = "app3"
self.app_name_2 = "app4"
self.app_name_3 = "app5"
self.app_name_4 = "app6"
self.model_name_1 = "m4"
self.model_name_2 = "m5"
self.model_name_3 = "m6"
self.model_name_4 = "m7"
self.input_type = "doubles"
self.default_output = "DEFAULT"
self.latency_slo_micros = 30000
Expand All @@ -371,6 +373,10 @@ def setUpClass(self):
self.app_name_3, self.input_type, self.default_output,
self.latency_slo_micros)

self.clipper_conn.register_application(
self.app_name_4, self.input_type, self.default_output,
self.latency_slo_micros)

@classmethod
def tearDownClass(self):
self.clipper_conn = create_docker_connection(
Expand Down Expand Up @@ -470,6 +476,45 @@ def predict_func(inputs):

self.assertTrue(received_non_default_prediction)

def test_fixed_batch_size_model_processes_specified_query_batch_size_when_saturated(
self):
model_version = 1

def predict_func(inputs):
batch_size = len(inputs)
return [str(batch_size) for _ in inputs]

fixed_batch_size = 9
total_num_queries = fixed_batch_size * 50
deploy_python_closure(
self.clipper_conn,
self.model_name_4,
model_version,
self.input_type,
predict_func,
batch_size=fixed_batch_size)
self.clipper_conn.link_model_to_app(self.app_name_4, self.model_name_4)
time.sleep(60)

addr = self.clipper_conn.get_query_addr()
url = "http://{addr}/{app}/predict".format(
addr=addr, app=self.app_name_4)
test_input = [[float(x) + (j * .001) for x in range(5)]
for j in range(total_num_queries)]
req_json = json.dumps({'input_batch': test_input})
headers = {'Content-type': 'application/json'}
response = requests.post(url, headers=headers, data=req_json)
parsed_response = response.json()
num_max_batch_queries = 0
for prediction in parsed_response["batch_predictions"]:
batch_size = prediction["output"]
if batch_size != self.default_output and int(
batch_size) == fixed_batch_size:
num_max_batch_queries += 1

self.assertGreaterEqual(num_max_batch_queries,
int(total_num_queries * .7))


SHORT_TEST_ORDERING = [
'test_register_model_correct',
Expand All @@ -495,7 +540,8 @@ def predict_func(inputs):
'test_unlinked_app_returns_default_predictions',
'test_deployed_model_queried_successfully',
'test_batch_queries_returned_successfully',
'test_deployed_python_closure_queried_successfully'
'test_deployed_python_closure_queried_successfully',
'test_fixed_batch_size_model_processes_specified_query_batch_size_when_saturated'
]

if __name__ == '__main__':
Expand Down

0 comments on commit 5e9cd65

Please sign in to comment.