Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -301,12 +301,43 @@ async def start(self):
logger.info("Started vLLM engine.")

async def _start_engine(self) -> "EngineClient":
from vllm import envs

# Since vLLM 0.8.0, the logic to determine v0/v1 engine is as follows:
# 1. If VLLM_USE_V1 is not set, then it tries to use v1 engine. However,
# if any feature specified in the engine config is not supported, then
# it falls back to v0. Note that launching vLLM on a non-main thread
# is an experimental feature, so vLLM will fall back to v0 in this case.
# 2. If VLLM_USE_V1 is set to 1, then it will use v1 engine even with
# experimental features (such as launching vLLM on a non-main thread).
# 3. If VLLM_USE_V1 is set to 0, force using v0 engine.
if not envs.VLLM_USE_V1:
return await self._start_engine_v0()
return await self._start_engine_v1()

async def _start_engine_v1(self) -> "EngineClient":
"""Start the vLLM v1 engine. Note that we only use _get_async_engine_args
to get the engine args and don't use _get_vllm_engine_config, because
we integrate vLLM v1 using the highest-level async engine API.
TODO: Refactor vLLM v0 integration to use the same async engine API
to simplify the code.
"""
from vllm import AsyncLLMEngine

await self.initialize_node(self.llm_config)
engine_args = _get_async_engine_args(self.llm_config)

return AsyncLLMEngine.from_engine_args(
engine_args=engine_args,
)

async def _start_engine_v0(self) -> "EngineClient":
from vllm.engine.multiprocessing.client import MQLLMEngineClient

args: InitializeNodeOutput = await self.initialize_node(self.llm_config)
engine_args, engine_config = _get_vllm_engine_config(self.llm_config)

if MQLLMEngineClient.is_unsupported_config(engine_args):
if MQLLMEngineClient.is_unsupported_config(engine_config):
# If the engine is not supported, we fall back to the legacy async engine.
#
# Note (genesu): as of 2025-02-11, this code path is only triggered when
Expand Down Expand Up @@ -498,20 +529,36 @@ async def _generate(
)

if request_output is not None:
time_in_queue_histogram.observe(request_output.metrics.time_in_queue)
total_request_time = time.perf_counter() - start
generation_time = (
total_request_time - request_output.metrics.time_in_queue
)
if request_output.metrics is None:
# vLLM V1 metrics are not included in the request output yet.
queue_time = "N/A"
generation_time_str = "N/A"
tokens_s = "N/A"
generated_tokens_s = "N/A"
else:
time_in_queue_histogram.observe(
request_output.metrics.time_in_queue
)
queue_time = f"{request_output.metrics.time_in_queue}s"
generation_time = (
total_request_time - request_output.metrics.time_in_queue
)
generation_time_str = f"{generation_time}s"
tokens_s = (
num_input_tokens + all_tokens_collected
) / generation_time
generated_tokens_s = all_tokens_collected / generation_time

logger.info(
f"Request {vllm_generation_request.request_id} finished ({finish_reason}). "
f"Total time: {total_request_time}s, "
f"Queue time: {request_output.metrics.time_in_queue}s, "
f"Generation+async time: {generation_time}s, "
f"Queue time: {queue_time}, "
f"Generation+async time: {generation_time_str}, "
f"Input tokens: {num_input_tokens}, "
f"Generated tokens: {all_tokens_collected}, "
f"tokens/s: {(num_input_tokens + all_tokens_collected) / generation_time}, "
f"generated tokens/s: {all_tokens_collected / generation_time}."
f"tokens/s: {tokens_s}, "
f"generated tokens/s: {generated_tokens_s}."
)
else:
logger.warning(
Expand Down