Skip to content

[Bugfix][V0] Another multi-sequence logprobs streaming edge case #16805

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .buildkite/test-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,11 @@ steps:
mirror_hardwares: [amdexperimental, amdproduction]
source_file_dependencies:
- vllm/
- tests/test_regression
- tests/regressions
commands:
- pip install modelscope
- pytest -v -s test_regression.py
working_dir: "/vllm-workspace/tests" # optional
- pytest -v -s regressions
- VLLM_USE_V1=0 pytest -v -s regressions

- label: Engine Test # 10min
mirror_hardwares: [amdexperimental, amdproduction]
Expand Down
Empty file added tests/regressions/__init__.py
Empty file.
File renamed without changes.
115 changes: 115 additions & 0 deletions tests/regressions/test_openai_requests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# SPDX-License-Identifier: Apache-2.0

# imports for guided decoding tests
from itertools import chain

import openai # use the official client for correctness check
import pytest
import pytest_asyncio
# downloading lora to test lora requests
from openai.types import Completion

from ..utils import RemoteOpenAIServer

MODEL_NAME = "Qwen/Qwen2.5-1.5B-Instruct"


@pytest.fixture(scope="module")
def default_server_args():
return [
# use half precision for speed and memory savings in CI environment
"--dtype",
"bfloat16",
"--max-model-len",
"8192",
"--max-num-seqs",
"128",
"--enforce-eager",
]


@pytest.fixture(scope="module")
def server(default_server_args):
with RemoteOpenAIServer(MODEL_NAME, default_server_args) as remote_server:
yield remote_server


@pytest_asyncio.fixture()
async def client(server):
async with server.get_async_client() as async_client:
yield async_client


@pytest.mark.asyncio
async def test_multiseq_logprobs_streaming(client: openai.AsyncOpenAI):
"""Edge case request combining multiple functionalities

https://github.com/vllm-project/vllm/pull/15259
https://github.com/vllm-project/vllm/pull/16805
"""

# completions
stream = await client.completions.create(
model=MODEL_NAME,
prompt="1 2 3 4 5",
max_tokens=3,
# include usage chunk to make sure the stream is complete
stream_options={"include_usage": True},
stream=True,
n=2,
logprobs=0, # include 1-top logprob per generated token
temperature=1.0)

n0_chunks: list[Completion] = []
n1_chunks: list[Completion] = []
usage_chunk: Completion = None
async for chunk in stream:
print(chunk)
if choices := chunk.choices:
assert len(choices) == 1, \
(f"Streamed chunk had {len(choices)} choices, when only 1 was"
" expected")
choice = choices[0]
if choice.index == 0:
n0_chunks.append(chunk)
elif choice.index == 1:
n1_chunks.append(chunk)
else:
raise AssertionError(f"Unexpected choice index {choice.index}")

elif chunk.usage is not None:
usage_chunk = chunk

else:
raise AssertionError(f"Unexpected chunk {chunk}")

# check that we got the requested number of tokens
assert sum(
len(chunk.choices[0].logprobs.tokens) for chunk in n0_chunks
if chunk.choices[0].logprobs
) == 3, "Streamed response did not have the expected number of tokens."
assert sum(
len(chunk.choices[0].logprobs.tokens) for chunk in n1_chunks
if chunk.choices[0].logprobs
) == 3, "Streamed response did not have the expected number of tokens."

# check 1 logprob per token/chunk
for chunk in chain(n0_chunks, n1_chunks):
# a finish chunk may not have any text/logprobs
# V0 does not
# V1 does
choice = chunk.choices[0]
if choice.logprobs is None:
assert choice.finish_reason
assert choice.text == ''
continue

assert choice.logprobs.top_logprobs
for top_logprobs in choice.logprobs.top_logprobs:
assert len(top_logprobs) == 1

# requested usage
assert usage_chunk is not None
assert usage_chunk.usage.completion_tokens == 6
assert usage_chunk.usage.prompt_tokens == 9
assert usage_chunk.usage.total_tokens == 15
3 changes: 2 additions & 1 deletion vllm/entrypoints/openai/serving_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,8 @@ async def chat_completion_stream_generator(
if finish_reason_sent[i]:
continue

if request.logprobs and request.top_logprobs is not None:
if request.logprobs and request.top_logprobs is not None \
and output.token_ids:
assert output.logprobs is not None, (
"Did not output logprobs")
logprobs = self._create_chat_logprobs(
Expand Down
2 changes: 1 addition & 1 deletion vllm/entrypoints/openai/serving_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ async def completion_stream_generator(
# Chunked prefill case, don't return empty chunks
continue

if request.logprobs is not None:
if request.logprobs is not None and output.token_ids:
assert out_logprobs is not None, (
"Did not output logprobs")
logprobs = self._create_completion_logprobs(
Expand Down
13 changes: 6 additions & 7 deletions vllm/sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -1529,18 +1529,17 @@ def add_request(request_id: str, engine, params, **kwargs):
def maybe_assemble_group(
self, seq_group: SequenceGroup) -> Optional[SequenceGroup]:

# in the streaming mode, we will return the assembled sequence
# for the first remaining sequence, and then return None for the
# rest of sequences
if self.streaming:
# in the streaming mode, we will return the assembled sequence while
# sequences are still processing, but must choose only one of the
# remaining sequences
if self.streaming and not seq_group.is_finished():
first_remaining_id = next(iter(self.to_be_finished))
if seq_group.request_id == first_remaining_id:
return self.assembled_seq_group
return None

# in the non-streaming mode, we will return the assembled sequence
# when the last sequences finishes, and then return None for the
# rest of the time
# for non-streaming and when finishing streaming, we will return the
# assembled sequence when the last sequence finishes
if (len(self.to_be_finished) == 1
and seq_group.request_id in self.to_be_finished
and seq_group.is_finished()):
Expand Down