Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[V1] [5/N] API Server: unify Detokenizer and EngineCore input #11545

Merged
merged 16 commits into from
Dec 28, 2024
Prev Previous commit
Next Next commit
updated
  • Loading branch information
robertgshaw2-redhat committed Dec 27, 2024
commit 1789162d014f2133a459071d86dc5ba61d023708
17 changes: 8 additions & 9 deletions vllm/v1/engine/async_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(
self.tokenizer.ping()

# Request streams (map of request_id -> queue).
self.rid_to_queues: Dict[str, asyncio.Queue] = {}
self.rid_to_queue: Dict[str, asyncio.Queue] = {}

# Processor (converts Inputs --> EngineCoreRequests).
self.processor = Processor(
Expand Down Expand Up @@ -248,15 +248,15 @@ def _add_request_to_queues(
request_id: str,
) -> asyncio.Queue[RequestOutput]:

if request_id in self.rid_to_queues:
if request_id in self.rid_to_queue:
raise ValueError(f"Request id {request_id} already running.")

self.rid_to_queues[request_id] = asyncio.Queue()
self.rid_to_queue[request_id] = asyncio.Queue()

if self.log_requests:
logger.info("Added request %s.", request_id)

return self.rid_to_queues[request_id]
return self.rid_to_queue[request_id]

def _process_request_outputs(self, request_outputs: List[RequestOutput]):
"""Process outputs by putting them into per-request queues."""
Expand All @@ -267,9 +267,8 @@ def _process_request_outputs(self, request_outputs: List[RequestOutput]):
# Note: it is possible a request was aborted and removed from
# the state due to client cancellations, so if we encounter a
# request id not in the state, we skip.
if request_id in self.rid_to_queues:
q = self.rid_to_queues[request_id]
q.put_nowait(request_output)
if request_id in self.rid_to_queue:
self.rid_to_queue[request_id].put_nowait(request_output)

async def _run_output_handler(self):
"""Background loop: pulls from EngineCore and pushes to AsyncStreams."""
Expand Down Expand Up @@ -303,8 +302,8 @@ async def abort(self, request_id: str) -> None:
# then it is possible that the request is already
# removed from the queues, so we do nothing if the
# request_id is no longer in the tracked queues.
if request_id in self.rid_to_queues:
del self.rid_to_queues[request_id]
if request_id in self.rid_to_queue:
del self.rid_to_queue[request_id]

def encode(
self,
Expand Down
Loading