From 689980c07782d4e896213ab43cbb30318f94a10f Mon Sep 17 00:00:00 2001 From: Yan Ma Date: Wed, 30 Oct 2024 12:34:45 +0800 Subject: [PATCH] [Bugfix] Fix multi nodes TP+PP for XPU (#8884) Signed-off-by: YiSheng5 Signed-off-by: yan ma Co-authored-by: YiSheng5 Signed-off-by: Tyler Michael Smith --- .../getting_started/xpu-installation.rst | 18 +++++++++++++++ requirements-xpu.txt | 2 +- vllm/distributed/parallel_state.py | 22 +++++++++++++++++++ vllm/executor/xpu_executor.py | 12 +++++++++- vllm/platforms/__init__.py | 3 +++ vllm/platforms/xpu.py | 4 ++++ vllm/worker/xpu_worker.py | 13 ++++------- 7 files changed, 63 insertions(+), 11 deletions(-) diff --git a/docs/source/getting_started/xpu-installation.rst b/docs/source/getting_started/xpu-installation.rst index 151ebb5f1811f..b1868acbc84b0 100644 --- a/docs/source/getting_started/xpu-installation.rst +++ b/docs/source/getting_started/xpu-installation.rst @@ -60,3 +60,21 @@ Build from source - FP16 is the default data type in the current XPU backend. The BF16 data type will be supported in the future. + +Distributed inference and serving +--------------------------------- + +XPU platform supports tensor-parallel inference/serving and also supports pipeline parallel as a beta feature for online serving. We requires Ray as the distributed runtime backend. For example, a reference execution likes following: + +.. code-block:: console + + $ python -m vllm.entrypoints.openai.api_server \ + $ --model=facebook/opt-13b \ + $ --dtype=bfloat16 \ + $ --device=xpu \ + $ --max_model_len=1024 \ + $ --distributed-executor-backend=ray \ + $ --pipeline-parallel-size=2 \ + $ -tp=8 + +By default, a ray instance will be launched automatically if no existing one is detected in system, with ``num-gpus`` equals to ``parallel_config.world_size``. We recommend properly starting a ray cluster before execution, referring helper `script `_. diff --git a/requirements-xpu.txt b/requirements-xpu.txt index ce83a178c618f..eb76a33dab5c2 100644 --- a/requirements-xpu.txt +++ b/requirements-xpu.txt @@ -13,4 +13,4 @@ torch == 2.3.1+cxx11.abi intel-extension-for-pytorch == 2.3.110+xpu oneccl_bind_pt == 2.3.100+xpu -triton-xpu == 3.0.0b2 +triton-xpu == 3.0.0b1 diff --git a/vllm/distributed/parallel_state.py b/vllm/distributed/parallel_state.py index ec39856b6f67c..b04bbc478534c 100644 --- a/vllm/distributed/parallel_state.py +++ b/vllm/distributed/parallel_state.py @@ -431,6 +431,28 @@ def gather(self, if dim < 0: # Convert negative dim to positive. dim += input_.dim() + # For xpu path, gather doesn't work properly together with ray + # cluster so we use all_gather instead for now. + if current_platform.is_xpu(): + input_size = input_.size() + # Allocate output tensor. + output_tensor = torch.empty((world_size, ) + input_size, + dtype=input_.dtype, + device=input_.device) + # All-gather. + torch.distributed.all_gather_into_tensor(output_tensor, + input_, + group=self.device_group) + if self.rank_in_group == dst: + # Reshape + output_tensor = output_tensor.movedim(0, dim) + output_tensor = output_tensor.reshape(input_size[:dim] + + (world_size * + input_size[dim], ) + + input_size[dim + 1:]) + else: + output_tensor = None + return output_tensor # Allocate output tensor. if self.rank_in_group == dst: gather_list = [torch.empty_like(input_) for _ in range(world_size)] diff --git a/vllm/executor/xpu_executor.py b/vllm/executor/xpu_executor.py index bada56068507a..5f78993ddc4b4 100644 --- a/vllm/executor/xpu_executor.py +++ b/vllm/executor/xpu_executor.py @@ -44,7 +44,7 @@ def __init__( self.cache_config = cache_config self.load_config = load_config self.lora_config = lora_config - self.parallel_config = parallel_config + self.parallel_config = _verify_and_get_parallel_config(parallel_config) self.scheduler_config = scheduler_config self.device_config = device_config self.prompt_adapter_config = prompt_adapter_config @@ -94,3 +94,13 @@ def _verify_and_get_model_config(config: ModelConfig) -> ModelConfig: "mode.") config.enforce_eager = True return config + + +def _verify_and_get_parallel_config(config: ParallelConfig) -> ParallelConfig: + if (config.distributed_executor_backend is not None + and config.distributed_executor_backend != "ray"): + logger.warning( + "%s is not supported on XPU, fallback to ray distributed executor " + "backend.", config.distributed_executor_backend) + config.distributed_executor_backend = "ray" + return config diff --git a/vllm/platforms/__init__.py b/vllm/platforms/__init__.py index 7e9f8b1297b80..524150920b854 100644 --- a/vllm/platforms/__init__.py +++ b/vllm/platforms/__init__.py @@ -45,6 +45,9 @@ is_xpu = False try: + # installed IPEX if the machine has XPUs. + import intel_extension_for_pytorch # noqa: F401 + import oneccl_bindings_for_pytorch # noqa: F401 import torch if hasattr(torch, 'xpu') and torch.xpu.is_available(): is_xpu = True diff --git a/vllm/platforms/xpu.py b/vllm/platforms/xpu.py index d00e0dca84fff..106e8eddf458f 100644 --- a/vllm/platforms/xpu.py +++ b/vllm/platforms/xpu.py @@ -20,3 +20,7 @@ def get_device_name(device_id: int = 0) -> str: def get_device_total_memory(cls, device_id: int = 0) -> int: device_props = torch.xpu.get_device_properties(device_id) return device_props.total_memory + + @staticmethod + def inference_mode(): + return torch.no_grad() diff --git a/vllm/worker/xpu_worker.py b/vllm/worker/xpu_worker.py index 917866f2d985b..c1d836bb0d318 100644 --- a/vllm/worker/xpu_worker.py +++ b/vllm/worker/xpu_worker.py @@ -14,7 +14,6 @@ SpeculativeConfig) from vllm.distributed import (ensure_model_parallel_initialized, init_distributed_environment) -from vllm.distributed.parallel_state import get_pp_group from vllm.logger import init_logger from vllm.model_executor import set_random_seed from vllm.platforms import current_platform @@ -183,11 +182,10 @@ def init_worker_distributed_environment(self) -> None: # use sockets as default Level zero IPC exchange backend. By # default oneccl will use `drmfd` as mechanism which need extra # dependency (libdrm and drm headers) on your system. - ENV_CCL_ZE_IPC_EXCHANGE = os.getenv("CCL_ZE_IPC_EXCHANGE", - "sockets") + ENV_CCL_ATL_TRANSPORT = os.getenv("CCL_ATL_TRANSPORT", "ofi") ENV_LOCAL_WORLD_SIZE = os.getenv("LOCAL_WORLD_SIZE", str(parallel_config.world_size)) - os.environ['CCL_ZE_IPC_EXCHANGE'] = ENV_CCL_ZE_IPC_EXCHANGE + os.environ["CCL_ATL_TRANSPORT"] = ENV_CCL_ATL_TRANSPORT os.environ["LOCAL_WORLD_SIZE"] = ENV_LOCAL_WORLD_SIZE os.environ["LOCAL_RANK"] = str(self.local_rank) init_distributed_environment( @@ -200,8 +198,5 @@ def init_worker_distributed_environment(self) -> None: ensure_model_parallel_initialized( parallel_config.tensor_parallel_size, parallel_config.pipeline_parallel_size) - - if parallel_config.pipeline_parallel_size > 1: - # torch-ccl xpu need a collective API warm up - # before calling send/recv API - get_pp_group().all_reduce(torch.zeros(1).xpu()) + # global all_reduce needed for overall oneccl warm up + torch.distributed.all_reduce(torch.zeros(1).xpu())