Skip to content

server: health: fix race condition on slots data using tasks queue #5634

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions examples/server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ node index.js
- 200 -> `{"status": "no slot available", "slots_idle": 0, "slots_processing": 32}` if no slot are currently available.
- 503 -> `{"status": "no slot available", "slots_idle": 0, "slots_processing": 32}` if the query parameter `fail_on_no_slot` is provided and no slot are currently available.

If the query parameter `include_slots` is passed, `slots` field will contain internal slots data except if `--slots-endpoint-disable` is set.

- **POST** `/completion`: Given a `prompt`, it returns the predicted completion.

*Options:*
Expand Down
122 changes: 80 additions & 42 deletions examples/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1394,6 +1394,46 @@ struct llama_server_context
case TASK_TYPE_NEXT_RESPONSE: {
// do nothing
} break;
case TASK_TYPE_SLOTS_DATA: {
json slots_data = json::array();
int n_idle_slots = 0;
int n_processing_slots = 0;

for (llama_client_slot &slot: slots) {
if (slot.available()) {
n_idle_slots++;
} else {
n_processing_slots++;
}
json slot_data = get_formated_generation(slot);
slot_data["id"] = slot.id;
slot_data["task_id"] = slot.task_id;
slot_data["state"] = slot.state;
slot_data["prompt"] = slot.prompt;
slot_data["next_token"] = {
{"has_next_token", slot.has_next_token},
{"n_remain", slot.n_remaining},
{"num_tokens_predicted", slot.n_decoded},
{"stopped_eos", slot.stopped_eos},
{"stopped_word", slot.stopped_word},
{"stopped_limit", slot.stopped_limit},
{"stopping_word", slot.stopping_word},
};
slots_data.push_back(slot_data);
}
LOG_TEE("task %i - slots data: idle=%i processing=%i\n", task.id, n_idle_slots, n_processing_slots);
task_result res;
res.id = task.id;
res.multitask_id = task.multitask_id;
res.stop = true;
res.error = false;
res.result_json = {
{ "idle", n_idle_slots },
{ "processing", n_processing_slots },
{ "slots", slots_data }
};
queue_results.send(res);
} break;
}
}

Expand Down Expand Up @@ -2557,34 +2597,38 @@ int main(int argc, char **argv)
server_state current_state = state.load();
switch(current_state) {
case SERVER_STATE_READY: {
int available_slots = 0;
int processing_slots = 0;
for (llama_client_slot &slot: llama.slots) {
if (slot.available()) {
available_slots++;
} else {
processing_slots++;
}
// request slots data using task queue
task_server task;
task.id = llama.queue_tasks.get_new_id();
task.type = TASK_TYPE_SLOTS_DATA;
task.target_id = -1;

llama.queue_results.add_waiting_task_id(task.id);
llama.queue_tasks.post(task);

// get the result
task_result result = llama.queue_results.recv(task.id);
llama.queue_results.remove_waiting_task_id(task.id);

int n_idle_slots = result.result_json["idle"];
int n_processing_slots = result.result_json["processing"];

json health = {
{"status", "ok"},
{"slots_idle", n_idle_slots},
{"slots_processing", n_processing_slots}};
res.status = 200; // HTTP OK
if (sparams.slots_endpoint && req.has_param("include_slots")) {
health["slots"] = result.result_json["slots"];
}
if (available_slots > 0) {
json health = {
{"status", "ok"},
{"slots_idle", available_slots},
{"slots_processing", processing_slots}};
res.set_content(health.dump(), "application/json");
res.status = 200; // HTTP OK
} else {
json health = {
{"status", "no slot available"},
{"slots_idle", available_slots},
{"slots_processing", processing_slots}};
res.set_content(health.dump(), "application/json");

if (n_idle_slots == 0) {
health["status"] = "no slot available";
if (req.has_param("fail_on_no_slot")) {
res.status = 503; // HTTP Service Unavailable
} else {
res.status = 200; // HTTP OK
}
}
res.set_content(health.dump(), "application/json");
break;
}
case SERVER_STATE_LOADING_MODEL:
Expand All @@ -2600,26 +2644,20 @@ int main(int argc, char **argv)

if (sparams.slots_endpoint) {
svr.Get("/slots", [&](const httplib::Request&, httplib::Response& res) {
json slots;
for (llama_client_slot & slot : llama.slots) {
json slot_data = llama.get_formated_generation(slot);
slot_data["id"] = slot.id;
slot_data["task_id"] = slot.task_id;
slot_data["state"] = slot.state;
slot_data["prompt"] = slot.prompt;
slot_data["next_token"] = {
{"has_next_token", slot.has_next_token},
{"n_remain", slot.n_remaining},
{"num_tokens_predicted", slot.n_decoded},
{"stopped_eos", slot.stopped_eos},
{"stopped_word", slot.stopped_word},
{"stopped_limit", slot.stopped_limit},
{"stopping_word", slot.stopping_word},
};
// request slots data using task queue
task_server task;
task.id = llama.queue_tasks.get_new_id();
task.type = TASK_TYPE_SLOTS_DATA;
task.target_id = -1;

slots.push_back(slot_data);
}
res.set_content(slots.dump(), "application/json");
llama.queue_results.add_waiting_task_id(task.id);
llama.queue_tasks.post(task);

// get the result
task_result result = llama.queue_results.recv(task.id);
llama.queue_results.remove_waiting_task_id(task.id);

res.set_content(result.result_json["slots"].dump(), "application/json");
res.status = 200; // HTTP OK
});
}
Expand Down
3 changes: 2 additions & 1 deletion examples/server/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ enum server_state {
enum task_type {
TASK_TYPE_COMPLETION,
TASK_TYPE_CANCEL,
TASK_TYPE_NEXT_RESPONSE
TASK_TYPE_NEXT_RESPONSE,
TASK_TYPE_SLOTS_DATA
};

struct task_server {
Expand Down