Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions lightllm/server/api_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,18 @@ async def flush_cache():
)


@app.post("/pause_generation")
async def pause_generation():
await g_objs.httpserver_manager.pause_generation()
return Response(content="Generation paused successfully.", status_code=200)


@app.post("/continue_generation")
async def continue_generation():
await g_objs.httpserver_manager.continue_generation()
return Response(content="Generation continued successfully.", status_code=200)


@app.websocket("/pd_register")
async def register_and_keep_alive(websocket: WebSocket):
await websocket.accept()
Expand Down
24 changes: 24 additions & 0 deletions lightllm/server/httpserver/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ def __init__(
self.latest_success_infer_time_mark = SharedInt(f"{get_unique_server_name()}_latest_success_infer_time_mark")
self.latest_success_infer_time_mark.set_value(int(time.time()))

self.is_pause = False
self.is_pause_cond = asyncio.Condition()

# 交互式请求 event
self.flush_cache_event: Optional[asyncio.Event] = None
return
Expand Down Expand Up @@ -302,6 +305,10 @@ async def generate(

# 记录请求到达的相关信息
await self._log_req_header(request_headers, group_request_id)

async with self.is_pause_cond:
await self.is_pause_cond.wait_for(lambda: not self.is_pause)

# encode
prompt_ids = await self._encode(prompt, multimodal_params, sampling_params)

Expand Down Expand Up @@ -832,6 +839,23 @@ async def flush_cache(self):
self.flush_cache_event.clear()
return ret

async def pause_generation(self):
# 因为请求是从master node转发到slave node的
# 所以只要master暂停了,slave自然暂停。
async with self.is_pause_cond:
self.is_pause = True
while True:
await self.abort_request(AbortReq(request_id=None, abort_all=True))
running_req_num = len(list(self.req_id_to_out_inf.keys()))
if running_req_num == 0:
break
await asyncio.sleep(1.0)
Comment on lines +845 to +852
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The current implementation of pause_generation can lead to a deadlock. The async with self.is_pause_cond: block holds the condition lock for the entire duration of the while loop. The await asyncio.sleep(1.0) call yields control to the event loop but does not release the lock.

This will cause any other coroutine attempting to acquire the same lock (e.g., continue_generation or new generate calls) to block indefinitely, making it impossible to resume generation.

To fix this, the lock should only be held when modifying the shared is_pause state. The waiting loop should be outside the async with block. Additionally, abort_request only needs to be called once to signal all current requests to abort, so it can be moved out of the loop for efficiency.

Suggested change
async with self.is_pause_cond:
self.is_pause = True
while True:
await self.abort_request(AbortReq(request_id=None, abort_all=True))
running_req_num = len(list(self.req_id_to_out_inf.keys()))
if running_req_num == 0:
break
await asyncio.sleep(1.0)
async with self.is_pause_cond:
self.is_pause = True
await self.abort_request(AbortReq(request_id=None, abort_all=True))
while True:
running_req_num = len(list(self.req_id_to_out_inf.keys()))
if running_req_num == 0:
break
await asyncio.sleep(1.0)


async def continue_generation(self):
async with self.is_pause_cond:
self.is_pause = False
self.is_pause_cond.notify_all()


class ReqStatus:
def __init__(
Expand Down