Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip/spmd] Serialization Optimization #6903

Closed
wants to merge 14 commits into from
2 changes: 2 additions & 0 deletions .buildkite/test-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ steps:
- TEST_DIST_MODEL=facebook/opt-125m DISTRIBUTED_EXECUTOR_BACKEND=ray VLLM_USE_RAY_SPMD_WORKER=1 VLLM_USE_RAY_COMPILED_DAG=1 pytest -v -s distributed/test_basic_distributed_correctness.py
- TEST_DIST_MODEL=meta-llama/Llama-2-7b-hf DISTRIBUTED_EXECUTOR_BACKEND=ray VLLM_USE_RAY_SPMD_WORKER=1 VLLM_USE_RAY_COMPILED_DAG=1 pytest -v -s distributed/test_basic_distributed_correctness.py
- TEST_DIST_MODEL=facebook/opt-125m DISTRIBUTED_EXECUTOR_BACKEND=ray pytest -v -s distributed/test_chunked_prefill_distributed.py
- TEST_DIST_MODEL=facebook/opt-125m DISTRIBUTED_EXECUTOR_BACKEND=ray VLLM_USE_RAY_SPMD_WORKER=1 VLLM_USE_RAY_COMPILED_DAG=1 pytest -v -s distributed/test_chunked_prefill_distributed.py
- TEST_DIST_MODEL=meta-llama/Llama-2-7b-hf DISTRIBUTED_EXECUTOR_BACKEND=ray pytest -v -s distributed/test_chunked_prefill_distributed.py
- TEST_DIST_MODEL=llava-hf/llava-1.5-7b-hf DISTRIBUTED_EXECUTOR_BACKEND=ray pytest -v -s distributed/test_multimodal_broadcast.py
- TEST_DIST_MODEL=microsoft/Phi-3-vision-128k-instruct DISTRIBUTED_EXECUTOR_BACKEND=ray pytest -v -s distributed/test_multimodal_broadcast.py
Expand All @@ -97,6 +98,7 @@ steps:
- TEST_DIST_MODEL=meta-llama/Llama-2-7b-hf DISTRIBUTED_EXECUTOR_BACKEND=mp pytest -v -s distributed/test_chunked_prefill_distributed.py
- TEST_DIST_MODEL=llava-hf/llava-1.5-7b-hf DISTRIBUTED_EXECUTOR_BACKEND=mp pytest -v -s distributed/test_multimodal_broadcast.py
- TEST_DIST_MODEL=microsoft/Phi-3-vision-128k-instruct DISTRIBUTED_EXECUTOR_BACKEND=mp pytest -v -s distributed/test_multimodal_broadcast.py
- DISTRIBUTED_EXECUTOR_BACKEND=ray VLLM_USE_RAY_SPMD_WORKER=1 VLLM_USE_RAY_COMPILED_DAG=1 pytest -v -s spec_decode/e2e/test_integration_dist_tp2.py
- pytest -v -s spec_decode/e2e/test_integration_dist_tp2.py
- CUDA_VISIBLE_DEVICES=0,1 pytest -v -s test_sharded_state_loader.py
- CUDA_VISIBLE_DEVICES=0,1 pytest -v -s distributed/test_utils.py
Expand Down
74 changes: 74 additions & 0 deletions a.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import time
import sys
from array import array
from vllm.sequence import ExecuteModelRequest, SequenceData, SequenceDataDelta, SequenceStage
import msgspec

with open('example.bin', 'rb') as file:
data = file.read()


def dec_hook(type, obj):
# `type` here is the value of the custom type annotation being decoded.
if type is array:
deserialized = array('l')
deserialized.frombytes(obj)
return deserialized


def enc_hook(obj):
if isinstance(obj, array):
# convert the complex to a tuple of real, imag
return obj.tobytes()


class Timer:

def __init__(self, msg):
self.msg = msg

def __enter__(self):
self.start = time.time()
return self # This allows access to the instance in the 'as' part of the context manager

def __exit__(self, exc_type, exc_val, exc_tb):
self.end = time.time()
self.elapsed_us = (self.end - self.start) * 1000 * 1000
print(f"{self.msg=}. Elapsed time: {self.elapsed_us:.2f} us")


# encoder = msgspec.msgpack.Encoder(enc_hook=enc_hook)
# decoder = msgspec.msgpack.Decoder(ExecuteModelRequest, dec_hook=dec_hook)

# with Timer("Serialization"):
# serialized = encoder.encode(data)
# print(f"{sys.getsizeof(data)=}")
# with Timer("Deserialization original"):
# decoder.decode(data)
# with Timer("Deserialization original"):
# data = decoder.decode(data)

# with Timer("Serialization, big block tables"):
# data = encoder.encode(data)
# with Timer("Deserialization, big block tables"):
# data = decoder.decode(data)

# for i, metadata in enumerate(data.seq_group_metadata_list):
# for key, value in metadata.block_tables.items():
# metadata.block_tables[key] = [i]

# with Timer("Serialization, small block tables"):
# data = encoder.encode(data)
# with Timer("Deserialization, small block tables"):
# data = decoder.decode(data)

# print(decoder.decode(encoder.encode(data)))

encoder = msgspec.msgpack.Encoder(enc_hook=enc_hook)
decoder = msgspec.msgpack.Decoder(SequenceDataDelta, dec_hook=dec_hook)

data = SequenceDataDelta([i for i in range(2048)], 0, 0, SequenceStage.DECODE)
with Timer("Serialization, big block tables"):
data = encoder.encode(data)
with Timer("Deserialization, big block tables"):
data = decoder.decode(data)
76 changes: 76 additions & 0 deletions b.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import time
from array import array
from vllm.sequence import SequenceData


def t():
l = [i for i in range(256)]
s = time.time()
a = array('l')
a.fromlist(l)
print((time.time() - s) * 1000 * 1000, "us")


t()

import msgspec


def dec_hook(type, obj):
# `type` here is the value of the custom type annotation being decoded.
if type is array:
deserialized = array('l')
deserialized.frombytes(obj)
return deserialized


def enc_hook(obj):
if isinstance(obj, array):
# convert the complex to a tuple of real, imag
return obj.tobytes()


class Timer:

def __init__(self, msg):
self.msg = msg

def __enter__(self):
self.start = time.time()
return self # This allows access to the instance in the 'as' part of the context manager

def __exit__(self, exc_type, exc_val, exc_tb):
self.end = time.time()
self.elapsed_us = (self.end - self.start) * 1000 * 1000
print(f"{self.msg=}. Elapsed time: {self.elapsed_us:.2f} us")


encoder = msgspec.msgpack.Encoder(enc_hook=enc_hook)
decoder = msgspec.msgpack.Decoder(dec_hook=dec_hook)

# l = [i for i in range(256)]
# d = {"1": l}

# with Timer("Serialization array"):
# # a = array('l')
# # a.fromlist(l)
# data = encoder.encode(a)
# with Timer("Deserialization"):
# data = decoder.decode(data)

l = [i for i in range(64 * 256)]
a = array('I')
a.fromlist(l)
# a = SequenceData(a)

# with Timer("Serialization sequence data"):
# # a = array('l')
# # a.fromlist(l)
# data = encoder.encode(a)
# with Timer("Deserialization"):
# data = decoder.decode(data)

with Timer("Serialization array"):
data = encoder.encode(a)
with Timer("Deserialization"):
data = decoder.decode(data)
1 change: 1 addition & 0 deletions benchmarks/benchmark_throughput.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def run_vllm(
max_num_batched_tokens=max_num_batched_tokens,
distributed_executor_backend=distributed_executor_backend,
load_format=load_format,
max_num_seqs=32,
)

# Add the requests to the engine.
Expand Down
20 changes: 20 additions & 0 deletions c.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import time
import numpy as np

class Timer:

def __init__(self, msg):
self.msg = msg

def __enter__(self):
self.start = time.time()
return self # This allows access to the instance in the 'as' part of the context manager

def __exit__(self, exc_type, exc_val, exc_tb):
self.end = time.time()
self.elapsed_us = (self.end - self.start) * 1000 * 1000
print(f"{self.msg=}. Elapsed time: {self.elapsed_us:.2f} us")
l = [i for i in range(4096)]
from array import array
with Timer("converesion"):
arr = array("I", l)
Binary file added example.bin
Binary file not shown.
1 change: 1 addition & 0 deletions requirements-common.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ outlines >= 0.0.43, < 0.1 # Requires torch >= 2.1.0
typing_extensions
filelock >= 3.10.4 # filelock starts to support `mode` argument from 3.10.4
pyzmq
msgspec
9 changes: 1 addition & 8 deletions tests/prompts/example.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1 @@
vLLM is a high-throughput and memory-efficient inference and serving engine for LLMs.
Briefly describe the major milestones in the development of artificial intelligence from 1950 to 2020.
Compare and contrast artificial intelligence with human intelligence in terms of processing information.
Describe the basic components of a neural network and how it can be trained.
Write a short story about a robot that dreams for the first time.
Analyze the impact of the COVID-19 pandemic on global economic structures and future business models.
Explain the cultural significance of the Mona Lisa painting, and how its perception might vary in Western versus Eastern societies.
Translate the following English sentence into Japanese, French, and Swahili: 'The early bird catches the worm.'
vLLM is a high-throughput and memory-efficient inference and serving engine for LLMs.
2 changes: 0 additions & 2 deletions vllm/adapter_commons/request.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from abc import abstractmethod
from dataclasses import dataclass


@dataclass
class AdapterRequest:
"""
Base class for adapter requests.
Expand Down
2 changes: 2 additions & 0 deletions vllm/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import torch
from transformers import PretrainedConfig

import vllm.envs as envs
from vllm.logger import init_logger
from vllm.model_executor.layers.quantization import QUANTIZATION_METHODS
from vllm.model_executor.models import ModelRegistry
Expand Down Expand Up @@ -823,6 +824,7 @@ def __init__(self,
self.chunked_prefill_enabled = enable_chunked_prefill
self.embedding_mode = embedding_mode
self.preemption_mode = preemption_mode
self._use_delta = envs.VLLM_USE_DELTA_INPUT
self._verify_args()

def _verify_args(self) -> None:
Expand Down
74 changes: 49 additions & 25 deletions vllm/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@
from dataclasses import dataclass, field
from typing import Deque, Dict, Iterable, List, Optional, Set, Tuple, Union

import vllm.envs as envs
from vllm.config import CacheConfig, LoRAConfig, SchedulerConfig
from vllm.core.interfaces import AllocStatus, BlockSpaceManager
from vllm.core.policy import Policy, PolicyFactory
from vllm.logger import init_logger
from vllm.lora.request import LoRARequest
from vllm.prompt_adapter.request import PromptAdapterRequest
from vllm.sequence import (Sequence, SequenceData, SequenceGroup,
SequenceGroupMetadata, SequenceStatus)
SequenceGroupMetadata, SequenceGroupMetadataDelta,
SequenceStatus)

logger = init_logger(__name__)

Expand Down Expand Up @@ -996,49 +998,72 @@ def schedule(self) -> Tuple[List[SequenceGroupMetadata], SchedulerOutputs]:
for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
seq_id = seq.seq_id
seq_data[seq_id] = seq.data
block_tables[seq_id] = self.block_manager.get_block_table(seq)
block_table = self.block_manager.get_block_table(seq)
block_tables[seq_id] = block_table
self.block_manager.access_all_blocks_in_seq(seq, now)

common_computed_block_nums = (
self.block_manager.get_common_computed_block_ids(
seq_group.get_seqs(status=SequenceStatus.RUNNING)))

do_sample = True
if seq_group.is_prefill():
is_prompt = seq_group.is_prefill()
# We should send the metadata to workers when the first prefill
# is sent. Subsequent requests could be chunked prefill or decode.
is_first_prefill = False
if is_prompt:
seqs = seq_group.get_seqs()
# Prefill has only 1 sequence.
assert len(seqs) == 1
num_computed_tokens = seqs[0].data.get_num_computed_tokens()
is_first_prefill = num_computed_tokens == 0
# In the next iteration, all prompt tokens are not computed.
# It means the prefill is chunked, and we don't need sampling.
# NOTE: We use get_len instead of get_prompt_len because when
# a sequence is preempted, prefill includes previous generated
# output tokens.
if (token_chunk_size + seqs[0].data.get_num_computed_tokens() <
if (token_chunk_size + num_computed_tokens <
seqs[0].data.get_len()):
do_sample = False

# It assumes the scheduled_seq_groups is ordered by
# prefill < decoding.
is_prompt = seq_group.is_prefill()
seq_group_metadata = SequenceGroupMetadata(
request_id=seq_group.request_id,
is_prompt=is_prompt,
seq_data=seq_data,
sampling_params=seq_group.sampling_params,
block_tables=block_tables,
do_sample=do_sample,
pooling_params=seq_group.pooling_params,
token_chunk_size=token_chunk_size,
lora_request=seq_group.lora_request,
computed_block_nums=common_computed_block_nums,
# `multi_modal_data` will only be present for the 1st comm
# between engine and worker.
# the subsequent comms can still use delta, but
# `multi_modal_data` will be None.
multi_modal_data=seq_group.multi_modal_data
if scheduler_outputs.num_prefill_groups > 0 else None,
prompt_adapter_request=seq_group.prompt_adapter_request,
)
# When SPMD mode is enabled, we only send delta data except for
# the first request to reduce serialization cost.
if is_first_prefill or not envs.VLLM_USE_RAY_SPMD_WORKER:
seq_group_metadata = SequenceGroupMetadata(
request_id=seq_group.request_id,
is_prompt=is_prompt,
seq_data=seq_data,
sampling_params=seq_group.sampling_params,
block_tables=block_tables,
do_sample=do_sample,
pooling_params=seq_group.pooling_params,
token_chunk_size=token_chunk_size,
lora_request=seq_group.lora_request,
computed_block_nums=common_computed_block_nums,
# `multi_modal_data` will only be present for the 1st comm
# between engine and worker.
# the subsequent comms can still use delta, but
# `multi_modal_data` will be None.
multi_modal_data=seq_group.multi_modal_data
if scheduler_outputs.num_prefill_groups > 0 else None,
prompt_adapter_request=seq_group.prompt_adapter_request,
)
else:
# Delta is used only for spmd workers.
seq_data_delta = {}
for id, data in seq_data.items():
seq_data_delta[id] = data.get_delta()
seq_group_metadata = SequenceGroupMetadataDelta(
seq_data_delta,
seq_group.request_id,
block_tables,
is_prompt,
do_sample=do_sample,
token_chunk_size=token_chunk_size,
computed_block_nums=common_computed_block_nums,
)
seq_group_metadata_list.append(seq_group_metadata)

# Now that the batch has been created, we can assume all blocks in the
Expand All @@ -1048,7 +1073,6 @@ def schedule(self) -> Tuple[List[SequenceGroupMetadata], SchedulerOutputs]:
for scheduled_seq_group in scheduler_outputs.scheduled_seq_groups:
self.block_manager.mark_blocks_as_computed(
scheduled_seq_group.seq_group)

return seq_group_metadata_list, scheduler_outputs

def fork_seq(self, parent_seq: Sequence, child_seq: Sequence) -> None:
Expand Down
2 changes: 1 addition & 1 deletion vllm/engine/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ def __init__(
cache_config.enable_prefix_caching,
)
# TODO(woosuk): Print more configs in debug mode.

self.model_config = model_config
self.cache_config = cache_config
self.lora_config = lora_config
Expand Down Expand Up @@ -902,6 +901,7 @@ def step(self) -> List[Union[RequestOutput, EmbeddingRequestOutput]]:
"as performance will be severely degraded otherwise.")
seq_group_metadata_list, scheduler_outputs = self.scheduler[
0].schedule()
# print("SANG-TODO batch size,", len(seq_group_metadata_list))

if not scheduler_outputs.is_empty():
finished_requests_ids = self.scheduler[
Expand Down
Loading