Skip to content

Commit

Permalink
Implement delete_versioned_model API
Browse files Browse the repository at this point in the history
* Remove the specified versions of the specified models from Clipper internal.
* Implemented `unregister_versioned_models` Clipper API in clipper_admin.

> Description
> -----------
> unregister_versioned_models(model_versions_dict)

> Parameters
> ----------
> model_versions_dict : dict(str, list(str))
>     For each entry in the dict, the key is a model name and the value is a list of model

> Raises
> ------
> :py:exc:`clipper.UnconnectedException`
>     versions. All replicas for each version of each model will be stopped.

* Implemented `delete_versioned_model` Management-Frontend API.

> API path
> --------
> POST /admin/delete_versioned_model

> Request schema
> --------------
> const std::string DELETE_VERSIONED_MODEL_JSON_SCHEMA = R"(
>   {
>    "model_name" := string,
>    "model_version" := string,
>   }
> )";
  • Loading branch information
Sungjun, Kim committed Mar 22, 2019
1 parent 847d6de commit 28a2adf
Show file tree
Hide file tree
Showing 10 changed files with 383 additions and 47 deletions.
34 changes: 34 additions & 0 deletions clipper_admin/clipper_admin/clipper_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,40 @@ def get_query_addr(self):
raise UnconnectedException()
return self.cm.get_query_addr()

def unregister_versioned_models(self, model_versions_dict):
"""Unregister the specified versions of the specified models from Clipper internal.
Parameters
----------
model_versions_dict : dict(str, list(str))
For each entry in the dict, the key is a model name and the value is a list of model
Raises
------
:py:exc:`clipper.UnconnectedException`
versions. All replicas for each version of each model will be stopped.
"""
if not self.connected:
raise UnconnectedException()
url = "http://{host}/admin/delete_versioned_model".format(
host=self.cm.get_admin_addr())
headers = {"Content-type": "application/json"}
for model_name in model_versions_dict:
for model_version in model_versions_dict[model_name]:
req_json = json.dumps({"model_name": model_name,
"model_version": model_version})
r = requests.post(url, headers=headers, data=req_json)
logger.debug(r.text)
if r.status_code != requests.codes.ok:
msg = "Received error status code: {code} and message: " \
"{msg}".format(code=r.status_code, msg=r.text)
logger.error(msg)
raise ClipperException(msg)
else:
logger.info(
"Model {name}:{ver} was successfully deleted".format(
name=model_name, ver=model_version))

def stop_models(self, model_names):
"""Stops all versions of the specified models.
Expand Down
37 changes: 37 additions & 0 deletions integration-tests/clipper_admin_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@ def get_containers(self, container_name):
val=self.clipper_conn.cm.cluster_name)
})

def check_registered_models(self, pairs):
all_models = self.clipper_conn.get_all_models(verbose=True)
if len(all_models) > 0:
try:
for model_info in all_models:
pairs.remove((model_info["model_name"],
model_info["model_version"]))
except ValueError:
self.assertTrue(False)
self.assertTrue(len(pairs) == 0)

def test_register_model_correct(self):
input_type = "doubles"
model_name = "m"
Expand Down Expand Up @@ -259,6 +270,11 @@ def test_remove_inactive_containers_succeeds(self):
containers = self.get_containers(container_name)
self.assertEqual(len(containers), 3)

self.clipper_conn.unregister_versioned_models({
model_name: ["1"]
})
self.check_registered_models(pairs=[(model_name, "2")])

def test_stop_models(self):
container_name = "{}/noop-container:{}".format(clipper_registry,
clipper_version)
Expand All @@ -283,6 +299,13 @@ def test_stop_models(self):

self.assertEqual(len(containers), len(mnames[2:]) * len(versions))

self.clipper_conn.unregister_versioned_models({
"jimmypage": ["i", "ii", "iii", "iv"],
"robertplant": ["i", "ii", "iii", "iv"]
})
self.check_registered_models(
pairs=[(a, b) for a in mnames[:2] for b in versions])

# After calling this method, the remaining models should be:
# jpj:i, jpj:iii, johnbohman:ii
self.clipper_conn.stop_versioned_models({
Expand All @@ -293,11 +316,25 @@ def test_stop_models(self):

self.assertEqual(len(containers), 3)

self.clipper_conn.unregister_versioned_models({
"jpj": ["ii", "iv"],
"johnbohnam": ["i", "iv", "iii"],
})
self.check_registered_models(
pairs=[("jpj", "ii"), ("jpj", "iv"), ("johnbohnam", "i"),
("johnbohnam", "iv"), ("johnbohnam", "iii")])

self.clipper_conn.stop_all_model_containers()
containers = self.get_containers(container_name)

self.assertEqual(len(containers), 0)

self.clipper_conn.unregister_versioned_models({
"jpj": ["i", "iii"],
"johnbohnam": ["ii"]
})
self.check_registered_models(pairs=[])

def test_python_closure_deploys_successfully(self):
model_name = "m2"
model_version = 1
Expand Down
14 changes: 13 additions & 1 deletion src/libclipper/include/clipper/redis.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,26 @@ bool add_model(redox::Redox& redis, const VersionedModelId& model_id,
const std::string& container_name,
const std::string& model_data_path, int batch_size);

/**
* Marks a model for deletion if it exists.
*
* \return Returns true if the model was present in the table
* and was successfully marked for deletion. Returns false if there was a
* problem
* or if the model was not in the table.
*/
bool mark_versioned_model_for_delete(redox::Redox& redis,
const VersionedModelId& model_id);

/**
* Deletes a model from the model table if it exists.
*
* \return Returns true if the model was present in the table
* and was successfully deleted. Returns false if there was a problem
* or if the model was not in the table.
*/
bool delete_model(redox::Redox& redis, const VersionedModelId& model_id);
bool delete_versioned_model(redox::Redox& redis,
const VersionedModelId& model_id);

/**
* Looks up a model based on its model ID. This
Expand Down
92 changes: 88 additions & 4 deletions src/libclipper/include/clipper/task_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ struct DeadlineCompare {
// thread safe model queue
class ModelQueue {
public:
ModelQueue() : queue_(ModelPQueue{}) {}
ModelQueue() : queue_(ModelPQueue{}), valid_(true) {}

// Disallow copy and assign
ModelQueue(const ModelQueue &) = delete;
Expand All @@ -125,6 +125,7 @@ class ModelQueue {
~ModelQueue() = default;

void add_task(PredictTask task) {
if (!valid_) return;
std::lock_guard<std::mutex> lock(queue_mutex_);
Deadline deadline = std::chrono::system_clock::now() +
std::chrono::microseconds(task.latency_slo_micros_);
Expand All @@ -142,11 +143,12 @@ class ModelQueue {
std::function<BatchSizeInfo(Deadline)> &&get_batch_size) {
std::unique_lock<std::mutex> lock(queue_mutex_);
remove_tasks_with_elapsed_deadlines();
queue_not_empty_condition_.wait(lock, [this]() { return !queue_.empty(); });
queue_not_empty_condition_.wait(
lock, [this]() { return !queue_.empty() || !valid_; });
remove_tasks_with_elapsed_deadlines();

std::vector<PredictTask> batch;
if (requesting_container->is_active()) {
if (requesting_container->is_active() && valid_) {
Deadline deadline = queue_.top().first;

size_t max_batch_size;
Expand Down Expand Up @@ -178,6 +180,13 @@ class ModelQueue {
return batch;
}

void invalidate() {
std::lock_guard<std::mutex> l(queue_mutex_);
valid_ = false;
queue_ = ModelPQueue();
queue_not_empty_condition_.notify_all();
}

private:
// Min PriorityQueue so that the task with the earliest
// deadline is at the front of the queue
Expand All @@ -188,6 +197,7 @@ class ModelQueue {
ModelPQueue queue_;
std::mutex queue_mutex_;
std::condition_variable queue_not_empty_condition_;
bool valid_;

// Deletes tasks with deadlines prior or equivalent to the
// current system time. This method should only be called
Expand Down Expand Up @@ -316,6 +326,60 @@ class TaskExecutor {
"Registered batch size of {} for model {}:{}",
batch_size, model_id.get_name(), model_id.get_id());
active_containers_->register_batch_size(model_id, batch_size);

} else if (event_type == "hdel" && *task_executor_valid) {
std::vector<VersionedModelId> parsed_model_info = clipper::redis::str_to_models(key);
VersionedModelId model_id = parsed_model_info.front();
auto container_list = clipper::redis::get_all_containers(redis_connection_);

for (auto c : container_list) {
VersionedModelId vm = std::get<0>(c);
int model_replica_id = std::get<1>(c);
if (vm == model_id) {
TaskExecutionThreadPool::interrupt_thread(vm, model_replica_id);
}
}

bool deleted_queue = delete_model_queue_if_necessary(model_id);
if (deleted_queue) {
log_info_formatted(LOGGING_TAG_TASK_EXECUTOR,
"Deleted queue for model: {} : {}",
model_id.get_name(), model_id.get_id());
}

for (auto c : container_list) {
VersionedModelId vm = std::get<0>(c);
int model_replica_id = std::get<1>(c);
if (vm == model_id) {
if (clipper::redis::delete_container(redis_connection_, vm, model_replica_id)) {
std::stringstream ss;
ss << "Successfully deleted container with name "
<< "'" << model_id.get_name() << "'"
<< " and version "
<< "'" << model_id.get_id() << "'";
log_info_formatted(LOGGING_TAG_TASK_EXECUTOR, "{}", ss.str());

active_containers_->remove_container(vm, model_replica_id);
TaskExecutionThreadPool::delete_queue(vm, model_replica_id);
EstimatorFittingThreadPool::delete_queue(vm, model_replica_id);
} else {
std::stringstream ss;
ss << "Error deleting container with name "
<< "'" << model_id.get_name() << "'"
<< " and version "
<< "'" << model_id.get_id() << "'"
<< " from Redis";
throw std::runtime_error(ss.str());
}
}
}
clipper::redis::delete_versioned_model(redis_connection_, model_id);

} else if (!*task_executor_valid) {
log_info(LOGGING_TAG_TASK_EXECUTOR,
"Not running TaskExecutor's "
"subscribe_to_model_changes callback because "
"TaskExecutor has been destroyed.");
}
});

Expand Down Expand Up @@ -474,6 +538,20 @@ class TaskExecutor {
return queue_created;
}

bool delete_model_queue_if_necessary(const VersionedModelId &model_id) {
// Deletes an entry from the queues map, if one exists
boost::unique_lock<boost::shared_mutex> l(model_queues_mutex_);
if (model_queues_.count(model_id)) {
model_queues_[model_id]->invalidate();
model_queues_.erase(model_id);

boost::unique_lock<boost::shared_mutex> l(model_metrics_mutex_);
model_metrics_.erase(model_id);
return true;
}
return false;
}

void on_container_ready(VersionedModelId model_id, int replica_id) {
std::shared_ptr<ModelContainer> container =
active_containers_->get_model_replica(model_id, replica_id);
Expand Down Expand Up @@ -599,8 +677,14 @@ class TaskExecutor {
}

void on_remove_container(VersionedModelId model_id, int replica_id) {
// remove the given model_id from active_containers_
std::shared_ptr<ModelContainer> container =
active_containers_->get_model_replica(model_id, replica_id);
if (!container)
return;

active_containers_->remove_container(model_id, replica_id);
TaskExecutionThreadPool::delete_queue(model_id, replica_id);
EstimatorFittingThreadPool::delete_queue(model_id, replica_id);
}
};

Expand Down
Loading

0 comments on commit 28a2adf

Please sign in to comment.