-
-
Notifications
You must be signed in to change notification settings - Fork 11.7k
[V1][PoC] Refactor EngineCoreOutputs #12853
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
99b75b0
9a85489
e610473
c3c6f0b
fa5c069
7451b9a
a593dd9
3bb4ab8
d93addb
eac1ee2
c63b98b
1ff520b
e1ac513
7e019f7
3eda699
7021595
d70d0bb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,9 +2,11 @@ | |
|
|
||
| import enum | ||
| import time | ||
| from typing import Any, List, Optional, Union | ||
| from typing import Any, Dict, List, Optional, Tuple, Union | ||
|
|
||
| import msgspec | ||
| import torch | ||
| import numpy as np | ||
|
|
||
| from vllm.lora.request import LoRARequest | ||
| from vllm.multimodal import MultiModalKwargs | ||
|
|
@@ -85,25 +87,25 @@ def new_event(cls, | |
| return cls(event_type, timestamp) | ||
|
|
||
|
|
||
| class EngineCoreOutput( | ||
| msgspec.Struct, | ||
| array_like=True, # type: ignore[call-arg] | ||
| omit_defaults=True, # type: ignore[call-arg] | ||
| gc=False): # type: ignore[call-arg] | ||
|
|
||
| request_id: str | ||
| new_token_ids: List[int] | ||
|
|
||
| new_logprobs: Optional[LogprobsLists] = None | ||
| new_prompt_logprobs_tensors: Optional[LogprobsTensors] = None | ||
|
|
||
| finish_reason: Optional[FinishReason] = None | ||
| stop_reason: Union[int, str, None] = None | ||
| events: Optional[List[EngineCoreEvent]] = None | ||
|
|
||
| @property | ||
| def finished(self) -> bool: | ||
| return self.finish_reason is not None | ||
| #class EngineCoreOutput( | ||
| # msgspec.Struct, | ||
| # array_like=True, # type: ignore[call-arg] | ||
| # omit_defaults=True, # type: ignore[call-arg] | ||
| # gc=False): # type: ignore[call-arg] | ||
| # | ||
| # request_id: str | ||
| # new_token_ids: List[int] | ||
| # | ||
| # new_logprobs: Optional[LogprobsLists] = None | ||
| # new_prompt_logprobs_tensors: Optional[LogprobsTensors] = None | ||
| # | ||
| # finish_reason: Optional[FinishReason] = None | ||
| # stop_reason: Union[int, str, None] = None | ||
| # events: Optional[List[EngineCoreEvent]] = None | ||
| # | ||
| # @property | ||
| # def finished(self) -> bool: | ||
| # return self.finish_reason is not None | ||
|
|
||
|
|
||
| class UtilityOutput( | ||
|
|
@@ -124,11 +126,20 @@ class EngineCoreOutputs( | |
| omit_defaults=True, # type: ignore[call-arg] | ||
| gc=False): # type: ignore[call-arg] | ||
|
|
||
| #NOTE(Nick): We could consider ways to make this more compact, | ||
| # e.g. columnwise layout | ||
|
|
||
| # [num_reqs] | ||
| outputs: List[EngineCoreOutput] = [] | ||
| request_ids: List[str] = [] | ||
| new_token_id_offsets : List[int] = [] | ||
| new_token_id_counts: Optional[List[int]] = None # ndarray? | ||
| new_token_ids: np.ndarray = np.empty(0, dtype=int) # Optional? | ||
|
|
||
| # req_id -> LogprobsLists | ||
| new_logprobs: Dict[str, LogprobsLists] = {} | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should change these to |
||
|
|
||
| # req_id -> LogprobsTensors | ||
| new_prompt_logprobs_tensors: Dict[str, LogprobsTensors] = {} | ||
|
|
||
| finish_reason: Dict[str, Tuple[FinishReason, Union[int, str, None]]] = {} | ||
| events: Optional[List[Optional[List[EngineCoreEvent]]]] = None | ||
| scheduler_stats: Optional[SchedulerStats] = None | ||
| timestamp: float = 0.0 | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,8 +4,6 @@ | |
| import os | ||
| from typing import AsyncGenerator, List, Mapping, Optional, Type, Union | ||
|
|
||
| import numpy as np | ||
|
|
||
| from vllm.config import ModelConfig, VllmConfig | ||
| from vllm.engine.arg_utils import AsyncEngineArgs | ||
| from vllm.engine.protocol import EngineClient | ||
|
|
@@ -30,6 +28,9 @@ | |
| StatLoggerBase) | ||
| from vllm.v1.metrics.stats import IterationStats, SchedulerStats | ||
|
|
||
| import cProfile as profile | ||
| import pyinstrument | ||
|
|
||
| logger = init_logger(__name__) | ||
|
|
||
|
|
||
|
|
@@ -254,23 +255,32 @@ async def _run_output_handler(self): | |
| # Split outputs into chunks of at most | ||
| # VLLM_V1_OUTPUT_PROC_CHUNK_SIZE, so that we don't block the | ||
| # event loop for too long. | ||
| num_outputs = len(outputs.outputs) | ||
| if num_outputs <= VLLM_V1_OUTPUT_PROC_CHUNK_SIZE: | ||
| slices = (outputs.outputs, ) | ||
| num_requests = len(outputs.request_ids) | ||
| if num_requests <= VLLM_V1_OUTPUT_PROC_CHUNK_SIZE: | ||
| num_chunks = 1 | ||
| chunk_size = num_requests | ||
| rem = 0 | ||
| else: | ||
|
Comment on lines
+259
to
263
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could just keep the
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I got div by zero when I tried just the
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm that should only be possible if |
||
| slices = np.array_split( | ||
| outputs.outputs, | ||
| cdiv(num_outputs, VLLM_V1_OUTPUT_PROC_CHUNK_SIZE)) | ||
| num_chunks = cdiv(num_requests, | ||
| VLLM_V1_OUTPUT_PROC_CHUNK_SIZE) | ||
| chunk_size = num_requests // num_chunks | ||
| rem = num_requests % num_chunks | ||
|
|
||
| slice_start = 0 | ||
| for i in range(num_chunks): | ||
| adj = 1 if i < rem else 0 | ||
| slice_end = slice_start + chunk_size + adj | ||
|
|
||
| for i, outputs_slice in enumerate(slices): | ||
| # 2) Process EngineCoreOutputs. | ||
| processed_outputs = self.output_processor.process_outputs( | ||
| outputs_slice, outputs.timestamp, iteration_stats) | ||
| outputs, slice_start, slice_end, outputs.timestamp, | ||
| iteration_stats) | ||
| slice_start = slice_end | ||
| # NOTE: RequestOutputs are pushed to their queues. | ||
| assert not processed_outputs.request_outputs | ||
|
|
||
| # Allow other asyncio tasks to run between chunks | ||
| if i + 1 < len(slices): | ||
| if i + 1 < num_chunks: | ||
| await asyncio.sleep(0) | ||
|
|
||
| # 3) Abort any reqs that finished due to stop strings. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes keep as array ... and we don't need both offsets and counts right?