Skip to content

Commit

Permalink
[Bugfix] Fix multi nodes TP+PP for XPU (vllm-project#8884)
Browse files Browse the repository at this point in the history
Signed-off-by: YiSheng5 <syhm@mail.ustc.edu.cn>
Signed-off-by: yan ma <yan.ma@intel.com>
Co-authored-by: YiSheng5 <syhm@mail.ustc.edu.cn>
Signed-off-by: Tyler Michael Smith <tyler@neuralmagic.com>
  • Loading branch information
2 people authored and tlrmchlsmth committed Nov 23, 2024
1 parent e5357fd commit 689980c
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 11 deletions.
18 changes: 18 additions & 0 deletions docs/source/getting_started/xpu-installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,21 @@ Build from source
- FP16 is the default data type in the current XPU backend. The BF16 data
type will be supported in the future.


Distributed inference and serving
---------------------------------

XPU platform supports tensor-parallel inference/serving and also supports pipeline parallel as a beta feature for online serving. We requires Ray as the distributed runtime backend. For example, a reference execution likes following:

.. code-block:: console
$ python -m vllm.entrypoints.openai.api_server \
$ --model=facebook/opt-13b \
$ --dtype=bfloat16 \
$ --device=xpu \
$ --max_model_len=1024 \
$ --distributed-executor-backend=ray \
$ --pipeline-parallel-size=2 \
$ -tp=8
By default, a ray instance will be launched automatically if no existing one is detected in system, with ``num-gpus`` equals to ``parallel_config.world_size``. We recommend properly starting a ray cluster before execution, referring helper `script <https://github.com/vllm-project/vllm/tree/main/examples/run_cluster.sh>`_.
2 changes: 1 addition & 1 deletion requirements-xpu.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ torch == 2.3.1+cxx11.abi
intel-extension-for-pytorch == 2.3.110+xpu
oneccl_bind_pt == 2.3.100+xpu

triton-xpu == 3.0.0b2
triton-xpu == 3.0.0b1
22 changes: 22 additions & 0 deletions vllm/distributed/parallel_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,28 @@ def gather(self,
if dim < 0:
# Convert negative dim to positive.
dim += input_.dim()
# For xpu path, gather doesn't work properly together with ray
# cluster so we use all_gather instead for now.
if current_platform.is_xpu():
input_size = input_.size()
# Allocate output tensor.
output_tensor = torch.empty((world_size, ) + input_size,
dtype=input_.dtype,
device=input_.device)
# All-gather.
torch.distributed.all_gather_into_tensor(output_tensor,
input_,
group=self.device_group)
if self.rank_in_group == dst:
# Reshape
output_tensor = output_tensor.movedim(0, dim)
output_tensor = output_tensor.reshape(input_size[:dim] +
(world_size *
input_size[dim], ) +
input_size[dim + 1:])
else:
output_tensor = None
return output_tensor
# Allocate output tensor.
if self.rank_in_group == dst:
gather_list = [torch.empty_like(input_) for _ in range(world_size)]
Expand Down
12 changes: 11 additions & 1 deletion vllm/executor/xpu_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def __init__(
self.cache_config = cache_config
self.load_config = load_config
self.lora_config = lora_config
self.parallel_config = parallel_config
self.parallel_config = _verify_and_get_parallel_config(parallel_config)
self.scheduler_config = scheduler_config
self.device_config = device_config
self.prompt_adapter_config = prompt_adapter_config
Expand Down Expand Up @@ -94,3 +94,13 @@ def _verify_and_get_model_config(config: ModelConfig) -> ModelConfig:
"mode.")
config.enforce_eager = True
return config


def _verify_and_get_parallel_config(config: ParallelConfig) -> ParallelConfig:
if (config.distributed_executor_backend is not None
and config.distributed_executor_backend != "ray"):
logger.warning(
"%s is not supported on XPU, fallback to ray distributed executor "
"backend.", config.distributed_executor_backend)
config.distributed_executor_backend = "ray"
return config
3 changes: 3 additions & 0 deletions vllm/platforms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
is_xpu = False

try:
# installed IPEX if the machine has XPUs.
import intel_extension_for_pytorch # noqa: F401
import oneccl_bindings_for_pytorch # noqa: F401
import torch
if hasattr(torch, 'xpu') and torch.xpu.is_available():
is_xpu = True
Expand Down
4 changes: 4 additions & 0 deletions vllm/platforms/xpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,7 @@ def get_device_name(device_id: int = 0) -> str:
def get_device_total_memory(cls, device_id: int = 0) -> int:
device_props = torch.xpu.get_device_properties(device_id)
return device_props.total_memory

@staticmethod
def inference_mode():
return torch.no_grad()
13 changes: 4 additions & 9 deletions vllm/worker/xpu_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
SpeculativeConfig)
from vllm.distributed import (ensure_model_parallel_initialized,
init_distributed_environment)
from vllm.distributed.parallel_state import get_pp_group
from vllm.logger import init_logger
from vllm.model_executor import set_random_seed
from vllm.platforms import current_platform
Expand Down Expand Up @@ -183,11 +182,10 @@ def init_worker_distributed_environment(self) -> None:
# use sockets as default Level zero IPC exchange backend. By
# default oneccl will use `drmfd` as mechanism which need extra
# dependency (libdrm and drm headers) on your system.
ENV_CCL_ZE_IPC_EXCHANGE = os.getenv("CCL_ZE_IPC_EXCHANGE",
"sockets")
ENV_CCL_ATL_TRANSPORT = os.getenv("CCL_ATL_TRANSPORT", "ofi")
ENV_LOCAL_WORLD_SIZE = os.getenv("LOCAL_WORLD_SIZE",
str(parallel_config.world_size))
os.environ['CCL_ZE_IPC_EXCHANGE'] = ENV_CCL_ZE_IPC_EXCHANGE
os.environ["CCL_ATL_TRANSPORT"] = ENV_CCL_ATL_TRANSPORT
os.environ["LOCAL_WORLD_SIZE"] = ENV_LOCAL_WORLD_SIZE
os.environ["LOCAL_RANK"] = str(self.local_rank)
init_distributed_environment(
Expand All @@ -200,8 +198,5 @@ def init_worker_distributed_environment(self) -> None:
ensure_model_parallel_initialized(
parallel_config.tensor_parallel_size,
parallel_config.pipeline_parallel_size)

if parallel_config.pipeline_parallel_size > 1:
# torch-ccl xpu need a collective API warm up
# before calling send/recv API
get_pp_group().all_reduce(torch.zeros(1).xpu())
# global all_reduce needed for overall oneccl warm up
torch.distributed.all_reduce(torch.zeros(1).xpu())

0 comments on commit 689980c

Please sign in to comment.