Skip to content

Commit 293b28e

Browse files
phymberthodlen
authored andcommitted
server: health: fix race condition on slots data using tasks queue (ggml-org#5634)
* server: health: fix race condition on slots data using tasks queue * server: health: * include_slots only if slots_endpoint * fix compile warning task.target_id not initialized.
1 parent 040ea49 commit 293b28e

File tree

3 files changed

+84
-43
lines changed

3 files changed

+84
-43
lines changed

examples/server/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ node index.js
140140
- 200 -> `{"status": "no slot available", "slots_idle": 0, "slots_processing": 32}` if no slot are currently available.
141141
- 503 -> `{"status": "no slot available", "slots_idle": 0, "slots_processing": 32}` if the query parameter `fail_on_no_slot` is provided and no slot are currently available.
142142

143+
If the query parameter `include_slots` is passed, `slots` field will contain internal slots data except if `--slots-endpoint-disable` is set.
144+
143145
- **POST** `/completion`: Given a `prompt`, it returns the predicted completion.
144146

145147
*Options:*

examples/server/server.cpp

Lines changed: 80 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1394,6 +1394,46 @@ struct llama_server_context
13941394
case TASK_TYPE_NEXT_RESPONSE: {
13951395
// do nothing
13961396
} break;
1397+
case TASK_TYPE_SLOTS_DATA: {
1398+
json slots_data = json::array();
1399+
int n_idle_slots = 0;
1400+
int n_processing_slots = 0;
1401+
1402+
for (llama_client_slot &slot: slots) {
1403+
if (slot.available()) {
1404+
n_idle_slots++;
1405+
} else {
1406+
n_processing_slots++;
1407+
}
1408+
json slot_data = get_formated_generation(slot);
1409+
slot_data["id"] = slot.id;
1410+
slot_data["task_id"] = slot.task_id;
1411+
slot_data["state"] = slot.state;
1412+
slot_data["prompt"] = slot.prompt;
1413+
slot_data["next_token"] = {
1414+
{"has_next_token", slot.has_next_token},
1415+
{"n_remain", slot.n_remaining},
1416+
{"num_tokens_predicted", slot.n_decoded},
1417+
{"stopped_eos", slot.stopped_eos},
1418+
{"stopped_word", slot.stopped_word},
1419+
{"stopped_limit", slot.stopped_limit},
1420+
{"stopping_word", slot.stopping_word},
1421+
};
1422+
slots_data.push_back(slot_data);
1423+
}
1424+
LOG_TEE("task %i - slots data: idle=%i processing=%i\n", task.id, n_idle_slots, n_processing_slots);
1425+
task_result res;
1426+
res.id = task.id;
1427+
res.multitask_id = task.multitask_id;
1428+
res.stop = true;
1429+
res.error = false;
1430+
res.result_json = {
1431+
{ "idle", n_idle_slots },
1432+
{ "processing", n_processing_slots },
1433+
{ "slots", slots_data }
1434+
};
1435+
queue_results.send(res);
1436+
} break;
13971437
}
13981438
}
13991439

@@ -2557,34 +2597,38 @@ int main(int argc, char **argv)
25572597
server_state current_state = state.load();
25582598
switch(current_state) {
25592599
case SERVER_STATE_READY: {
2560-
int available_slots = 0;
2561-
int processing_slots = 0;
2562-
for (llama_client_slot &slot: llama.slots) {
2563-
if (slot.available()) {
2564-
available_slots++;
2565-
} else {
2566-
processing_slots++;
2567-
}
2600+
// request slots data using task queue
2601+
task_server task;
2602+
task.id = llama.queue_tasks.get_new_id();
2603+
task.type = TASK_TYPE_SLOTS_DATA;
2604+
task.target_id = -1;
2605+
2606+
llama.queue_results.add_waiting_task_id(task.id);
2607+
llama.queue_tasks.post(task);
2608+
2609+
// get the result
2610+
task_result result = llama.queue_results.recv(task.id);
2611+
llama.queue_results.remove_waiting_task_id(task.id);
2612+
2613+
int n_idle_slots = result.result_json["idle"];
2614+
int n_processing_slots = result.result_json["processing"];
2615+
2616+
json health = {
2617+
{"status", "ok"},
2618+
{"slots_idle", n_idle_slots},
2619+
{"slots_processing", n_processing_slots}};
2620+
res.status = 200; // HTTP OK
2621+
if (sparams.slots_endpoint && req.has_param("include_slots")) {
2622+
health["slots"] = result.result_json["slots"];
25682623
}
2569-
if (available_slots > 0) {
2570-
json health = {
2571-
{"status", "ok"},
2572-
{"slots_idle", available_slots},
2573-
{"slots_processing", processing_slots}};
2574-
res.set_content(health.dump(), "application/json");
2575-
res.status = 200; // HTTP OK
2576-
} else {
2577-
json health = {
2578-
{"status", "no slot available"},
2579-
{"slots_idle", available_slots},
2580-
{"slots_processing", processing_slots}};
2581-
res.set_content(health.dump(), "application/json");
2624+
2625+
if (n_idle_slots == 0) {
2626+
health["status"] = "no slot available";
25822627
if (req.has_param("fail_on_no_slot")) {
25832628
res.status = 503; // HTTP Service Unavailable
2584-
} else {
2585-
res.status = 200; // HTTP OK
25862629
}
25872630
}
2631+
res.set_content(health.dump(), "application/json");
25882632
break;
25892633
}
25902634
case SERVER_STATE_LOADING_MODEL:
@@ -2600,26 +2644,20 @@ int main(int argc, char **argv)
26002644

26012645
if (sparams.slots_endpoint) {
26022646
svr.Get("/slots", [&](const httplib::Request&, httplib::Response& res) {
2603-
json slots;
2604-
for (llama_client_slot & slot : llama.slots) {
2605-
json slot_data = llama.get_formated_generation(slot);
2606-
slot_data["id"] = slot.id;
2607-
slot_data["task_id"] = slot.task_id;
2608-
slot_data["state"] = slot.state;
2609-
slot_data["prompt"] = slot.prompt;
2610-
slot_data["next_token"] = {
2611-
{"has_next_token", slot.has_next_token},
2612-
{"n_remain", slot.n_remaining},
2613-
{"num_tokens_predicted", slot.n_decoded},
2614-
{"stopped_eos", slot.stopped_eos},
2615-
{"stopped_word", slot.stopped_word},
2616-
{"stopped_limit", slot.stopped_limit},
2617-
{"stopping_word", slot.stopping_word},
2618-
};
2647+
// request slots data using task queue
2648+
task_server task;
2649+
task.id = llama.queue_tasks.get_new_id();
2650+
task.type = TASK_TYPE_SLOTS_DATA;
2651+
task.target_id = -1;
26192652

2620-
slots.push_back(slot_data);
2621-
}
2622-
res.set_content(slots.dump(), "application/json");
2653+
llama.queue_results.add_waiting_task_id(task.id);
2654+
llama.queue_tasks.post(task);
2655+
2656+
// get the result
2657+
task_result result = llama.queue_results.recv(task.id);
2658+
llama.queue_results.remove_waiting_task_id(task.id);
2659+
2660+
res.set_content(result.result_json["slots"].dump(), "application/json");
26232661
res.status = 200; // HTTP OK
26242662
});
26252663
}

examples/server/utils.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ enum server_state {
4949
enum task_type {
5050
TASK_TYPE_COMPLETION,
5151
TASK_TYPE_CANCEL,
52-
TASK_TYPE_NEXT_RESPONSE
52+
TASK_TYPE_NEXT_RESPONSE,
53+
TASK_TYPE_SLOTS_DATA
5354
};
5455

5556
struct task_server {

0 commit comments

Comments
 (0)