Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement delete_versioned_model API #602

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
28a2adf
Implement delete_versioned_model API
Jul 11, 2018
821e64e
Fix build error
Mar 25, 2019
e86468c
Fix unit test error
Mar 25, 2019
58fdef6
solved conflicts
Apr 1, 2019
21d74f4
Merge branch 'develop' into implement_delete_versioned_models_python_api
Apr 5, 2019
9ecb533
Refactor TaskExecutor and fix bugs
Apr 5, 2019
fc41d5a
Merge branch 'develop' into implement_delete_versioned_models_python_api
May 11, 2019
8c4aa24
Enhance the clean-up steps about useless model container
Apr 26, 2019
1a65ee9
Merge branch 'develop' into implement_delete_versioned_models_python_api
May 12, 2019
27996af
Merge branch 'develop' into implement_delete_versioned_models_python_api
May 12, 2019
b79c882
Add VersionedModelId to registered_containers_
May 13, 2019
3f30edb
Fix threadpool's bug
May 17, 2019
303d72a
Merge remote-tracking branch 'official/develop' into implement_delete…
May 17, 2019
3509fe5
Merge branch 'develop' into implement_delete_versioned_models_python_api
May 21, 2019
4e5eaf5
Merge branch 'develop' into implement_delete_versioned_models_python_api
simon-mo May 21, 2019
283499f
Call unregister_versioned_models() when stopped model containers
May 21, 2019
c7519e7
Update clipper_admin_tests.py
May 21, 2019
e70f089
Fix TaskExecutor's bug
May 22, 2019
5d2822c
Change _unregister_versioned_models API to private at clipper_admin
May 27, 2019
49ae746
Add three unittests to management_frontend_tests
May 29, 2019
03b6f6d
Merge remote-tracking branch 'official/develop' into implement_delete…
May 29, 2019
bb57433
Fix build errors
May 29, 2019
e5fc561
Add one unittest to management_frontend_tests
May 30, 2019
eab8e81
Fix TestDeleteVersionedModelCorrect unittest
May 30, 2019
45d29dd
Fix build error
May 30, 2019
b1cda95
Fix build error
May 30, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 58 additions & 3 deletions clipper_admin/clipper_admin/clipper_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1233,12 +1233,10 @@ def get_query_addr(self):

def get_metric_addr(self):
"""Get the IP address of Prometheus metric server.

Returns
-------
str
The address as an IP address or hostname.

Raises
------
:py:exc:`clipper.UnconnectedException`
Expand All @@ -1249,6 +1247,43 @@ def get_metric_addr(self):
raise UnconnectedException()
return self.cm.get_metric_addr()

def _unregister_versioned_models(self, model_versions_dict):
"""Unregister the specified versions of the specified models from Clipper internal.

This function does not be opened to public because it might cause critical operation.
Please use 'stop_models', 'stop_versioned_models', 'stop_inactive_model_versions',
and 'stop_all_model_containers' APIs according to your need.

Parameters
----------
model_versions_dict : dict(str, list(str))
For each entry in the dict, the key is a model name and the value is a list of model
Raises
------
:py:exc:`clipper.UnconnectedException`
versions. All replicas for each version of each model will be stopped.
"""
if not self.connected:
raise UnconnectedException()
url = "http://{host}/admin/delete_versioned_model".format(
host=self.cm.get_admin_addr())
headers = {"Content-type": "application/json"}
for model_name in model_versions_dict:
for model_version in model_versions_dict[model_name]:
req_json = json.dumps({"model_name": model_name,
"model_version": model_version})
r = requests.post(url, headers=headers, data=req_json)
logger.debug(r.text)
if r.status_code != requests.codes.ok:
msg = "Received error status code: {code} and message: " \
"{msg}".format(code=r.status_code, msg=r.text)
logger.error(msg)
raise ClipperException(msg)
else:
logger.info(
"Model {name}:{ver} was successfully deleted".format(
name=model_name, ver=model_version))

def stop_models(self, model_names):
"""Stops all versions of the specified models.

Expand Down Expand Up @@ -1277,6 +1312,7 @@ def stop_models(self, model_names):
else:
model_dict[m["model_name"]] = [m["model_version"]]
self.cm.stop_models(model_dict)
self._unregister_versioned_models(model_dict)
pp = pprint.PrettyPrinter(indent=4)
self.logger.info(
"Stopped all containers for these models and versions:\n{}".format(
Expand All @@ -1303,6 +1339,7 @@ def stop_versioned_models(self, model_versions_dict):
if not self.connected:
raise UnconnectedException()
self.cm.stop_models(model_versions_dict)
self._unregister_versioned_models(model_versions_dict)
pp = pprint.PrettyPrinter(indent=4)
self.logger.info(
"Stopped all containers for these models and versions:\n{}".format(
Expand Down Expand Up @@ -1339,6 +1376,7 @@ def stop_inactive_model_versions(self, model_names):
else:
model_dict[m["model_name"]] = [m["model_version"]]
self.cm.stop_models(model_dict)
self._unregister_versioned_models(model_dict)
pp = pprint.PrettyPrinter(indent=4)
self.logger.info(
"Stopped all containers for these models and versions:\n{}".format(
Expand All @@ -1350,9 +1388,26 @@ def stop_all_model_containers(self):
This method can be used to clean up leftover Clipper model containers even if the
Clipper management frontend or Redis has crashed. It can also be called without calling
``connect`` first.

Raises
------
:py:exc:`clipper.UnconnectedException`
versions. All replicas for each version of each model will be stopped.
"""
if not self.connected:
raise UnconnectedException()
model_info = self.get_all_models(verbose=True)
model_dict = {}
for m in model_info:
if m["model_name"] in model_dict:
model_dict[m["model_name"]].append(m["model_version"])
else:
model_dict[m["model_name"]] = [m["model_version"]]
self.cm.stop_all_model_containers()
self.logger.info("Stopped all Clipper model containers")
self._unregister_versioned_models(model_dict)
pp = pprint.PrettyPrinter(indent=4)
self.logger.info("Stopped all Clipper model containers:\n{}".format(
pp.pformat(model_dict)))

def stop_all(self, graceful=True):
"""Stops all processes that were started via Clipper admin commands.
Expand Down
28 changes: 25 additions & 3 deletions integration-tests/clipper_admin_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@ def get_containers(self, container_name):
val=self.clipper_conn.cm.cluster_name)
})

def check_registered_models(self, pairs):
all_models = self.clipper_conn.get_all_models(verbose=True)
if len(all_models) > 0:
try:
for model_info in all_models:
pairs.remove((model_info["model_name"],
model_info["model_version"]))
except ValueError:
self.assertTrue(False)
self.assertTrue(len(pairs) == 0)

def test_register_model_correct(self):
input_type = "doubles"
model_name = "m"
Expand Down Expand Up @@ -264,6 +275,7 @@ def test_remove_inactive_containers_succeeds(self):
num_replicas=2)
containers = self.get_containers(container_name)
self.assertEqual(len(containers), 2)
self.check_registered_models(pairs=[(model_name, "1")])

self.clipper_conn.build_and_deploy_model(
model_name,
Expand All @@ -274,10 +286,13 @@ def test_remove_inactive_containers_succeeds(self):
num_replicas=3)
containers = self.get_containers(container_name)
self.assertEqual(len(containers), 5)
self.check_registered_models(pairs=[(model_name, "1"),
(model_name, "2")])

self.clipper_conn.stop_inactive_model_versions([model_name])
containers = self.get_containers(container_name)
self.assertEqual(len(containers), 3)
self.check_registered_models(pairs=[(model_name, "2")])

def test_stop_models(self):
container_name = "{}/noop-container:{}".format(clipper_registry,
Expand All @@ -296,25 +311,32 @@ def test_stop_models(self):

containers = self.get_containers(container_name)
self.assertEqual(len(containers), len(mnames) * len(versions))
self.check_registered_models(
pairs=[(a, b) for a in mnames for b in versions])

# stop all versions of jimmypage model
self.clipper_conn.stop_models(mnames[:1])
containers = self.get_containers(container_name)

containers = self.get_containers(container_name)
self.assertEqual(len(containers), len(mnames[1:]) * len(versions))
self.check_registered_models(
pairs=[(a, b) for a in mnames[1:] for b in versions])

# After calling this method, the remaining model should be robertplant:i
self.clipper_conn.stop_versioned_models({
"robertplant": ["ii"],
})
containers = self.get_containers(container_name)

containers = self.get_containers(container_name)
self.assertEqual(len(containers), 1)
self.check_registered_models(pairs=[("robertplant", "i")])

# Stop all model containers
self.clipper_conn.stop_all_model_containers()
containers = self.get_containers(container_name)

containers = self.get_containers(container_name)
self.assertEqual(len(containers), 0)
self.check_registered_models(pairs=[])

def test_python_closure_deploys_successfully(self):
model_name = "m2"
Expand Down
14 changes: 13 additions & 1 deletion src/libclipper/include/clipper/redis.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,26 @@ bool add_model(redox::Redox& redis, const VersionedModelId& model_id,
const std::string& container_name,
const std::string& model_data_path, int batch_size);

/**
* Marks a model for deletion if it exists.
*
* \return Returns true if the model was present in the table
* and was successfully marked for deletion. Returns false if there was a
* problem
* or if the model was not in the table.
*/
bool mark_versioned_model_for_delete(redox::Redox& redis,
const VersionedModelId& model_id);

/**
* Deletes a model from the model table if it exists.
*
* \return Returns true if the model was present in the table
* and was successfully deleted. Returns false if there was a problem
* or if the model was not in the table.
*/
bool delete_model(redox::Redox& redis, const VersionedModelId& model_id);
bool delete_versioned_model(redox::Redox& redis,
const VersionedModelId& model_id);

/**
* Looks up a model based on its model ID. This
Expand Down
Loading