Skip to content

Return logprobs in delayed_sampling #1323

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions vllm/worker/hpu_enc_dec_model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
from vllm.sequence import (CompletionSequenceGroupOutput, IntermediateTensors,
Logprob, SequenceGroupMetadata, SequenceOutput)
from vllm.utils import bind_kv_cache, is_fake_hpu
from vllm.worker.hpu_model_runner import (HpuModelAdapter, HPUModelRunnerBase,
from vllm.worker.hpu_model_runner import (CachedStepOutput, HpuModelAdapter,
HPUModelRunnerBase,
ModelInputForHPUWithSamplingMetadata,
setup_profiler, subtuple)
from vllm.worker.model_runner_base import (
Expand Down Expand Up @@ -668,7 +669,7 @@ def try_revert_dummy_output_tokens():
if num_steps > 1:
output = output.sampled_token_ids
self.cached_step_outputs.append(
output.detach().clone())
CachedStepOutput(output))
htorch.core.mark_step()
if i < num_steps - 1:
if i == 0:
Expand Down Expand Up @@ -761,8 +762,8 @@ def _decode_sampler_outputs(self, model_input):
sampler_outputs = []
num_outputs = len(self.cached_step_outputs)
for i in range(num_outputs):
next_token_ids = self.cached_step_outputs.pop(0)
next_token_ids = next_token_ids.cpu().tolist()
next_token_ids = self.cached_step_outputs.pop(
0).token_ids.cpu().tolist()
sampler_output = self._make_decode_output(
next_token_ids, model_input.sampling_metadata.seq_groups)
sampler_outputs.append(sampler_output)
Expand Down
76 changes: 64 additions & 12 deletions vllm/worker/hpu_model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import habana_frameworks.torch.internal.bridge_config as bc
import torch
import vllm_hpu_extension.environment as environment
from attr import dataclass
from vllm_hpu_extension.bucketing.common import get_bucketing_context
from vllm_hpu_extension.flags import enabled_flags
from vllm_hpu_extension.ops import LoraMask as LoraMask
Expand All @@ -44,7 +45,10 @@
from vllm.model_executor import SamplingMetadata
from vllm.model_executor.layers.layernorm import RMSNorm
from vllm.model_executor.layers.rotary_embedding import MRotaryEmbedding
from vllm.model_executor.layers.sampler import SamplerOutput, get_sampler
from vllm.model_executor.layers.sampler import (SampleResultArgsType,
SamplerOutput, get_logprobs,
get_pythonized_sample_results,
get_sampler)
from vllm.model_executor.layers.vocab_parallel_embedding import (
VocabParallelEmbedding)
from vllm.model_executor.model_loader import get_model
Expand Down Expand Up @@ -644,6 +648,25 @@ def from_broadcasted_tensor_dict(
return cls(**tensor_dict)


@dataclass
class CachedStepOutput:
token_ids: torch.Tensor
logprobs: Optional[torch.Tensor] = None
deffered_sample_results: Optional[SampleResultArgsType] = None
sampling_metadata: Optional[SamplingMetadata] = None

def __init__(
self,
token_ids: torch.Tensor,
logprobs: Optional[torch.Tensor] = None,
deffered_sample_results: Optional[SampleResultArgsType] = None,
sampling_metadata: Optional[SamplingMetadata] = None):
self.token_ids = token_ids
self.logprobs = logprobs
self.deffered_sample_results = deffered_sample_results
self.sampling_metadata = sampling_metadata


class HPUModelRunnerBase(ModelRunnerBase[TModelInputForHPU]):
"""
Helper class for shared methods between GPU model runners.
Expand Down Expand Up @@ -750,7 +773,7 @@ def __init__(
# For both multi-step scheduling and delayed sampling
self.is_single_step = \
self.vllm_config.scheduler_config.num_scheduler_steps == 1
self.cached_step_outputs: List[torch.Tensor] = []
self.cached_step_outputs: List[CachedStepOutput] = []
self.is_pooler = False
self.is_causal = is_causal
# For delayed sampling
Expand Down Expand Up @@ -2671,15 +2694,15 @@ def execute_model(
target_indices = [
cur_seq_id_pos.get(psi, -1) for psi in prev_seq_ids
]
padding = self.cached_step_outputs[i].size(0) - len(
padding = self.cached_step_outputs[i].token_ids.size(0) - len(
target_indices)
target_indices.extend([-1] * padding)
target_indices = torch.tensor(
target_indices,
device=model_input.input_tokens.device,
dtype=model_input.input_tokens.dtype)
model_input.input_tokens.index_copy_(
0, target_indices, self.cached_step_outputs[i])
0, target_indices, self.cached_step_outputs[i].token_ids)
htorch.core.mark_step()

if not model_input.is_first_multi_step:
Expand Down Expand Up @@ -2901,11 +2924,16 @@ def try_revert_dummy_output_tokens():
)
if num_steps > 1:
output = output.sampled_token_ids
self.cached_step_outputs.append(output)
self.cached_step_outputs.append(
CachedStepOutput(output))
if use_delayed_sampling and self.is_driver_worker:
output = self._pad_to_max_num_seqs(
token_ids = self._pad_to_max_num_seqs(
output.sampled_token_ids, DUMMY_TOKEN_ID)
self.cached_step_outputs.append(output)
self.cached_step_outputs.append(
CachedStepOutput(
token_ids, output.logprobs,
output.deferred_sample_results_args,
sampling_metadata))
self.cached_step_inputs.append(model_input)
htorch.core.mark_step()
if use_delayed_sampling \
Expand Down Expand Up @@ -3037,7 +3065,7 @@ def _decode_sampler_outputs(self, model_input):
sampler_outputs = []
num_outputs = len(self.cached_step_outputs)
for i in range(num_outputs):
next_token_ids = self.cached_step_outputs.pop(0)
next_token_ids = self.cached_step_outputs.pop(0).token_ids
next_token_ids = next_token_ids.cpu().tolist()
sampler_output = self._make_decode_output(
next_token_ids, model_input.sampling_metadata.seq_groups)
Expand Down Expand Up @@ -3106,8 +3134,9 @@ def _patch_prev_output(self):
if len(self.cached_step_inputs) == 0:
return
model_input = self.cached_step_inputs.pop(0)
delayed_output = self.cached_step_outputs.pop(0).cpu().squeeze(
-1).tolist()
model_output = self.cached_step_outputs.pop(0)
delayed_tokens = model_output.token_ids.cpu().squeeze(-1).tolist()

ctx = model_input.async_callback.keywords["ctx"] # type: ignore
# If there's no output to patch with, which is usually the case when
# we're starting a new request after all requests are completed.
Expand All @@ -3117,13 +3146,36 @@ def _patch_prev_output(self):
ctx.output_queue) == 1, 'There should be exactly 1 output waiting!'
output_data = ctx.output_queue[0]
assert len(output_data.outputs) == 1
for fake_out, real_out in zip(output_data.outputs[0], delayed_output):
for fake_out, real_out in zip(output_data.outputs[0], delayed_tokens):
fake_out.samples[0].output_token = real_out
for sg, real_out in zip(output_data.seq_group_metadata_list,
delayed_output):
delayed_tokens):
assert len(sg.seq_data) == 1
seq_data = list(sg.seq_data.values())[0]
# This is a hack. Assigning output_token_ids triggers
# a cache recomputation and we only need to update the last token
seq_data.output_token_ids_array[-1] = real_out
seq_data._cached_all_token_ids[-1] = real_out

delayed_logprobs = None
assert model_output.sampling_metadata is not None, \
'Sampling metadata is required to patch the output!'
logprobs_required = any(
seq_group.sampling_params.logprobs is not None
for seq_group in model_output.sampling_metadata.seq_groups)
if logprobs_required:
sampling_results = get_pythonized_sample_results(
model_output.deffered_sample_results)
_, delayed_logprobs = get_logprobs(model_output.logprobs,
model_output.sampling_metadata,
sampling_results)

# Another hack. We need to pass the logprobs to the output data,
# which are part of scheduler output.
if logprobs_required and delayed_logprobs is not None:
for sg, real_logprobs in zip(
output_data.scheduler_outputs.scheduled_seq_groups,
delayed_logprobs):
assert len(sg.seq_group.seqs) == 1
assert len(real_logprobs) == 1
sg.seq_group.first_seq.output_logprobs[-1] = real_logprobs[0]