22import os
33from typing import AsyncGenerator , List , Mapping , Optional , Type , Union
44
5+ import numpy as np
6+
57from vllm .config import ModelConfig , VllmConfig
68from vllm .engine .arg_utils import AsyncEngineArgs
79from vllm .engine .protocol import EngineClient
10+ from vllm .envs import VLLM_V1_OUTPUT_PROC_CHUNK_SIZE
811from vllm .inputs import INPUT_REGISTRY , InputRegistry , PromptType
912from vllm .inputs .preprocess import InputPreprocessor
1013from vllm .logger import init_logger
1619from vllm .transformers_utils .tokenizer import AnyTokenizer
1720from vllm .transformers_utils .tokenizer_group import init_tokenizer_from_configs
1821from vllm .usage .usage_lib import UsageContext
19- from vllm .utils import kill_process_tree
22+ from vllm .utils import cdiv , kill_process_tree
2023from vllm .v1 .engine .core_client import EngineCoreClient
2124from vllm .v1 .engine .output_processor import OutputProcessor
2225from vllm .v1 .engine .processor import Processor
@@ -205,17 +208,15 @@ async def generate(
205208
206209 # The output_handler task pushes items into the queue.
207210 # This task pulls from the queue and yields to caller.
208- while True :
211+ finished = False
212+ while not finished :
209213 # Note: drain queue without await if possible (avoids
210214 # task switching under load which helps performance).
211- out = q .get_nowait () if q . qsize () > 0 else await q .get ()
215+ out = q .get_nowait () if not q . empty () else await q .get ()
212216
213217 # Note: both OutputProcessor and EngineCore handle their
214218 # own request cleanup based on finished.
215- if out .finished :
216- yield out
217- break
218-
219+ finished = out .finished
219220 yield out
220221
221222 # If the request is disconnected by the client, the
@@ -233,22 +234,41 @@ async def _run_output_handler(self):
233234 # 1) Pull EngineCoreOutputs from the EngineCore.
234235 outputs = await self .engine_core .get_output_async ()
235236
236- # 2) Process EngineCoreOutputs.
237- processed_outputs = self .output_processor .process_outputs (
238- outputs .outputs )
239- # NOTE: RequestOutputs are pushed to their queues.
240- assert len (processed_outputs .request_outputs ) == 0
241-
242- # 3) Abort any reqs that finished due to stop strings.
243- await self .engine_core .abort_requests_async (
244- processed_outputs .reqs_to_abort )
237+ # Split outputs into chunks of at most
238+ # VLLM_V1_OUTPUT_PROC_CHUNK_SIZE, so that we don't block the
239+ # event loop for too long.
240+ num_outputs = len (outputs .outputs )
241+ if num_outputs <= VLLM_V1_OUTPUT_PROC_CHUNK_SIZE :
242+ slices = (outputs .outputs , )
243+ else :
244+ slices = np .array_split (
245+ outputs .outputs ,
246+ cdiv (num_outputs , VLLM_V1_OUTPUT_PROC_CHUNK_SIZE ))
247+
248+ iteration_stats = None
249+ for i , outputs_slice in enumerate (slices ):
250+ # 2) Process EngineCoreOutputs.
251+ processed_outputs = self .output_processor .process_outputs (
252+ outputs_slice , iteration_stats )
253+ # NOTE: RequestOutputs are pushed to their queues.
254+ assert not processed_outputs .request_outputs
255+ iteration_stats = processed_outputs .iteration_stats
256+
257+ # Allow other asyncio tasks to run between chunks
258+ if i + 1 < len (slices ):
259+ await asyncio .sleep (0 )
260+
261+ # 3) Abort any reqs that finished due to stop strings.
262+ await self .engine_core .abort_requests_async (
263+ processed_outputs .reqs_to_abort )
245264
246265 # 4) Logging.
247266 # TODO(rob): make into a coroutine and launch it in
248267 # background thread once we add Prometheus.
268+ assert iteration_stats is not None
249269 self ._log_stats (
250270 scheduler_stats = outputs .scheduler_stats ,
251- iteration_stats = processed_outputs . iteration_stats ,
271+ iteration_stats = iteration_stats ,
252272 )
253273
254274 except Exception as e :
0 commit comments