From 23c45ae08f9c8169347b01aee58ba1f41248df5a Mon Sep 17 00:00:00 2001 From: wangshuai09 <391746016@qq.com> Date: Tue, 8 Oct 2024 09:25:48 +0000 Subject: [PATCH] fix type check --- examples/offline_inference_npu.py | 14 +- setup.py | 3 +- .../test_basic_correctness.py | 5 +- tests/engine/test_custom_executor.py | 54 ++-- tests/kernels/test_cache_npu.py | 248 ------------------ vllm/attention/backends/ascend.py | 45 ++-- vllm/engine/async_llm_engine.py | 6 +- vllm/engine/llm_engine.py | 8 +- vllm/executor/multiproc_npu_executor.py | 10 +- vllm/model_executor/layers/layernorm.py | 3 +- vllm/model_executor/models/commandr.py | 6 +- vllm/model_executor/sampling_metadata.py | 3 +- vllm/platforms/ascend.py | 39 ++- vllm/platforms/interface.py | 29 -- vllm/utils.py | 3 + vllm/worker/npu_model_runner.py | 52 ++-- 16 files changed, 119 insertions(+), 409 deletions(-) delete mode 100644 tests/kernels/test_cache_npu.py diff --git a/examples/offline_inference_npu.py b/examples/offline_inference_npu.py index ee451b8e4201c..4835d91fbb778 100644 --- a/examples/offline_inference_npu.py +++ b/examples/offline_inference_npu.py @@ -11,22 +11,18 @@ def clean_up(): # Sample prompts. prompts = [ - # "Hello, my name is", + "Hello, my name is", "The president of the United States is", + "The capital of France is", "The future of AI is", - - - # "美国的首都是" - # "The capital of France is", - ] + # Create a sampling params object. sampling_params = SamplingParams(max_tokens=100, temperature=0.0) # Create an LLM. -llm = LLM(model="facebook/opt-125m", tensor_parallel_size=2, distributed_executor_backend="mp") -# llm = LLM(model="Qwen/Qwen2-7B-Instruct") -# llm = LLM(model="/workspace/cmq/models/LLM-Research/Meta-Llama-3-8B-Instruct") +llm = LLM(model="facebook/opt-125m") + # Generate texts from the prompts. The output is a list of RequestOutput objects # that contain the prompt, generated text, and other information. outputs = llm.generate(prompts, sampling_params) diff --git a/setup.py b/setup.py index 9a7888281d0e6..58b7c7c6a31e5 100644 --- a/setup.py +++ b/setup.py @@ -294,7 +294,8 @@ def _build_custom_ops() -> bool: def _build_core_ext() -> bool: - return not (_is_neuron() or _is_tpu() or _is_openvino() or _is_xpu() or _is_npu()) + return not (_is_neuron() or _is_tpu() or _is_openvino() or _is_xpu() or + _is_npu()) def get_hipcc_rocm_version(): diff --git a/tests/basic_correctness/test_basic_correctness.py b/tests/basic_correctness/test_basic_correctness.py index 9a4b0ba98c560..44028028990a5 100644 --- a/tests/basic_correctness/test_basic_correctness.py +++ b/tests/basic_correctness/test_basic_correctness.py @@ -18,9 +18,8 @@ from ..utils import multi_gpu_test MODELS = [ - # "facebook/opt-125m", - # "meta-llama/Llama-2-7b-hf", - "/workspace/cmq/models/LLM-Research/Meta-Llama-3-8B-Instruct", + "facebook/opt-125m", + "meta-llama/Llama-2-7b-hf", ] TARGET_TEST_SUITE = os.environ.get("TARGET_TEST_SUITE", "L4") diff --git a/tests/engine/test_custom_executor.py b/tests/engine/test_custom_executor.py index 3d3fbbd83605c..3163241922808 100644 --- a/tests/engine/test_custom_executor.py +++ b/tests/engine/test_custom_executor.py @@ -7,7 +7,6 @@ from vllm.engine.async_llm_engine import AsyncLLMEngine from vllm.engine.llm_engine import LLMEngine from vllm.executor.gpu_executor import GPUExecutor, GPUExecutorAsync -from vllm.executor.npu_executor import NPUExecutor, NPUExecutorAsync from vllm.sampling_params import SamplingParams @@ -24,15 +23,6 @@ def execute_model(self, *args, **kwargs): return super().execute_model(*args, **kwargs) -class CustomNPUExecutor(NPUExecutor): - - def execute_model(self, *args, **kwargs): - # Drop marker to show that this was ran - with open(".marker", "w"): - ... - return super().execute_model(*args, **kwargs) - - class CustomGPUExecutorAsync(GPUExecutorAsync): async def execute_model_async(self, *args, **kwargs): @@ -51,13 +41,9 @@ def test_custom_executor_type_checking(model): engine_args = AsyncEngineArgs(model=model, distributed_executor_backend=Mock) AsyncLLMEngine.from_engine_args(engine_args) - # with pytest.raises(TypeError): - # engine_args = AsyncEngineArgs( - # model=model, distributed_executor_backend=CustomGPUExecutor) - # AsyncLLMEngine.from_engine_args(engine_args) with pytest.raises(TypeError): engine_args = AsyncEngineArgs( - model=model, distributed_executor_backend=CustomNPUExecutor) + model=model, distributed_executor_backend=CustomGPUExecutor) AsyncLLMEngine.from_engine_args(engine_args) @@ -69,7 +55,7 @@ def test_custom_executor(model, tmp_path): assert not os.path.exists(".marker") engine_args = EngineArgs( - model=model, distributed_executor_backend=CustomNPUExecutor) + model=model, distributed_executor_backend=CustomGPUExecutor) engine = LLMEngine.from_engine_args(engine_args) sampling_params = SamplingParams(max_tokens=1) @@ -81,25 +67,25 @@ def test_custom_executor(model, tmp_path): os.chdir(cwd) -# @pytest.mark.parametrize("model", ["facebook/opt-125m"]) -# def test_custom_executor_async(model, tmpdir): -# cwd = os.path.abspath(".") -# os.chdir(tmpdir) -# try: -# assert not os.path.exists(".marker") +@pytest.mark.parametrize("model", ["facebook/opt-125m"]) +def test_custom_executor_async(model, tmp_path): + cwd = os.path.abspath(".") + os.chdir(tmp_path) + try: + assert not os.path.exists(".marker") -# engine_args = AsyncEngineArgs( -# model=model, distributed_executor_backend=CustomGPUExecutorAsync) -# engine = AsyncLLMEngine.from_engine_args(engine_args) -# sampling_params = SamplingParams(max_tokens=1) + engine_args = AsyncEngineArgs( + model=model, distributed_executor_backend=CustomGPUExecutorAsync) + engine = AsyncLLMEngine.from_engine_args(engine_args) + sampling_params = SamplingParams(max_tokens=1) -# async def t(): -# stream = await engine.add_request("0", "foo", sampling_params) -# async for x in stream: -# ... + async def t(): + stream = await engine.add_request("0", "foo", sampling_params) + async for x in stream: + ... -# asyncio.run(t()) + asyncio.run(t()) -# assert os.path.exists(".marker") -# finally: -# os.chdir(cwd) + assert os.path.exists(".marker") + finally: + os.chdir(cwd) \ No newline at end of file diff --git a/tests/kernels/test_cache_npu.py b/tests/kernels/test_cache_npu.py deleted file mode 100644 index 14f00c1a64e84..0000000000000 --- a/tests/kernels/test_cache_npu.py +++ /dev/null @@ -1,248 +0,0 @@ -import random -from typing import List, Tuple - -import pytest -import torch -from typing import (List, Optional, Tuple, Union) - -# from vllm import _custom_ops as ops -from vllm.attention.backends.ascend import AscendAttentionBackend -from vllm.utils import get_kv_cache_torch_dtype - -COPYING_DIRECTION = [('npu', 'cpu'), ('npu', 'npu'), ('cpu', 'npu')] -DTYPES = [torch.half, torch.bfloat16, torch.float] -NUM_TOKENS = [42] # Arbitrary values for testing -NUM_LAYERS = [1] # Arbitrary values for testing -NUM_HEADS = [8] # Arbitrary values for testing -HEAD_SIZES = [64, 80, 96, 112, 120, 128, 192, 256] -BLOCK_SIZES = [8, 16, 32] - -# Arbitrary values for testing -# don't make it too large. e.g. [1024, 36000] will OOM -NUM_BLOCKS = [1024, 10000] - -NUM_MAPPINGS = [256] # Arbitrary values for testing -SEEDS = [0] -NPU_DEVICES = [ - "npu:0" -] - -# We assume fp8 is always enabled for testing. -# KV_CACHE_DTYPE = ["auto", "fp8"] -KV_CACHE_DTYPE = ["float"] - -copy_blocks = AscendAttentionBackend.copy_blocks -swap_blocks = AscendAttentionBackend.swap_blocks - - -def create_kv_caches_with_random_for_npu( - num_blocks: int, - block_size: int, - num_layers: int, - num_heads: int, - head_size: int, - cache_dtype: Optional[Union[str, torch.dtype]], - model_dtype: Optional[Union[str, torch.dtype]] = None, - seed: int = 0, - device: Optional[str] = "cuda", -) -> Tuple[List[torch.Tensor], List[torch.Tensor]]: - - if cache_dtype == "fp8" and head_size % 16: - raise ValueError( - f"Does not support key cache of type fp8 with head_size {head_size}" - ) - - torch.random.manual_seed(seed) - if torch.npu.is_available(): - torch.npu.manual_seed(seed) - - torch_dtype = get_kv_cache_torch_dtype(cache_dtype, model_dtype) - - scale = head_size**-0.5 - key_cache_shape = (num_blocks, block_size, num_heads * head_size) - key_caches: List[torch.Tensor] = [] - for _ in range(num_layers): - key_cache = torch.empty(size=key_cache_shape, - dtype=torch_dtype, - device=device) - if cache_dtype in ["auto", "half", "bfloat16", "float"]: - key_cache.uniform_(-scale, scale) - else: - raise ValueError( - f"Does not support key cache of type {cache_dtype}") - key_caches.append(key_cache) - - value_cache_shape = (num_blocks, block_size, num_heads * head_size) - value_caches: List[torch.Tensor] = [] - for _ in range(num_layers): - value_cache = torch.empty(size=value_cache_shape, - dtype=torch_dtype, - device=device) - if cache_dtype in ["auto", "half", "bfloat16", "float"]: - value_cache.uniform_(-scale, scale) - else: - raise ValueError( - f"Does not support value cache of type {cache_dtype}") - value_caches.append(value_cache) - return key_caches, value_caches - - -@pytest.mark.parametrize("num_mappings", NUM_MAPPINGS) -@pytest.mark.parametrize("num_layers", NUM_LAYERS) -@pytest.mark.parametrize("num_heads", NUM_HEADS) -@pytest.mark.parametrize("head_size", HEAD_SIZES) -@pytest.mark.parametrize("block_size", BLOCK_SIZES) -@pytest.mark.parametrize("num_blocks", NUM_BLOCKS) -@pytest.mark.parametrize("dtype", DTYPES) -@pytest.mark.parametrize("seed", SEEDS) -@pytest.mark.parametrize("device", NPU_DEVICES) -@pytest.mark.parametrize("kv_cache_dtype", KV_CACHE_DTYPE) -@torch.inference_mode() -def test_copy_blocks( - kv_cache_factory, - num_mappings: int, - num_layers: int, - num_heads: int, - head_size: int, - block_size: int, - num_blocks: int, - dtype: torch.dtype, - seed: int, - kv_cache_dtype: str, - device: str, -) -> None: - if kv_cache_dtype == "fp8" and head_size % 16: - pytest.skip() - random.seed(seed) - torch.random.manual_seed(seed) - if torch.npu.is_available(): - torch.npu.manual_seed(seed) - torch.set_default_device(device) - # Generate random block mappings where each source block is mapped to two - # destination blocks. - assert 2 * num_mappings <= num_blocks - src_blocks = random.sample(range(num_blocks), num_mappings) - remainig_blocks = list(set(range(num_blocks)) - set(src_blocks)) - dst_blocks = random.sample(remainig_blocks, 2 * num_mappings) - block_mapping: List[Tuple[int, int]] = [] - for i in range(num_mappings): - src = src_blocks[i] - dst1 = dst_blocks[2 * i] - dst2 = dst_blocks[2 * i + 1] - block_mapping.append((src, dst1)) - block_mapping.append((src, dst2)) - - # Create the KV caches. - key_caches, value_caches = create_kv_caches_with_random_for_npu(num_blocks, block_size, - num_layers, num_heads, - head_size, kv_cache_dtype, - dtype, seed, device) - - kv_caches = [] - for i in range(len(key_caches)): - kv_caches.append(torch.tensor(torch.cat((key_caches[i].unsqueeze(0), value_caches[i].unsqueeze(0)), dim=0))) - - # Clone the KV caches. - cloned_key_caches = [key_cache.clone() for key_cache in key_caches] - cloned_value_caches = [value_cache.clone() for value_cache in value_caches] - - # Call the copy blocks kernel. - block_mapping_tensor = torch.tensor(block_mapping, - dtype=torch.int64, - device=device).view(-1, 2) - # ops.copy_blocks(key_caches, value_caches, block_mapping_tensor) - copy_blocks(kv_caches, block_mapping_tensor) - - # Run the reference implementation. - for src, dst in block_mapping: - for cloned_key_cache in cloned_key_caches: - cloned_key_cache[dst].copy_(cloned_key_cache[src]) - for cloned_value_cache in cloned_value_caches: - cloned_value_cache[dst].copy_(cloned_value_cache[src]) - - # Compare the results. - for kv_cache, cloned_key_cache, cloned_value_cache in zip(kv_caches, cloned_key_caches, cloned_value_caches): - k = kv_cache[0] - v = kv_cache[1] - torch.testing.assert_close(k, cloned_key_cache) - torch.testing.assert_close(v, cloned_value_cache) - - -@pytest.mark.parametrize("direction", COPYING_DIRECTION) -@pytest.mark.parametrize("num_mappings", NUM_MAPPINGS) -@pytest.mark.parametrize("num_heads", NUM_HEADS) -@pytest.mark.parametrize("head_size", HEAD_SIZES) -@pytest.mark.parametrize("block_size", BLOCK_SIZES) -@pytest.mark.parametrize("num_blocks", NUM_BLOCKS) -@pytest.mark.parametrize("dtype", DTYPES) -@pytest.mark.parametrize("seed", SEEDS) -@pytest.mark.parametrize("device", NPU_DEVICES) -@pytest.mark.parametrize("kv_cache_dtype", KV_CACHE_DTYPE) -@torch.inference_mode() -def test_swap_blocks( - kv_cache_factory, - direction: Tuple[str, str], - num_mappings: int, - num_heads: int, - head_size: int, - block_size: int, - num_blocks: int, - dtype: torch.dtype, - seed: int, - device: str, - kv_cache_dtype: str, -) -> None: - if kv_cache_dtype == "fp8" and "cpu" in direction: - pytest.skip() - if kv_cache_dtype == "fp8" and head_size % 16: - pytest.skip() - random.seed(seed) - torch.random.manual_seed(seed) - if torch.npu.is_available(): - torch.npu.manual_seed(seed) - - src_device = device if direction[0] == "npu" else 'cpu' - dst_device = device if direction[1] == "npu" else 'cpu' - - src_blocks = random.sample(range(num_blocks), num_mappings) - # For the same device, mapping must not overlap - if src_device == dst_device: - remaining_blocks = list(set(range(num_blocks)) - set(src_blocks)) - dst_blocks = random.sample(remaining_blocks, num_mappings) - else: - dst_blocks = random.sample(range(num_blocks), num_mappings) - - block_mapping = list(zip(src_blocks, dst_blocks)) - block_mapping_tensor = torch.tensor(block_mapping, - dtype=torch.int64, - device="cpu").view(-1, 2) - - # Create the KV caches on the first device. - src_key_caches, src_value_caches = create_kv_caches_with_random_for_npu( - num_blocks, block_size, 1, num_heads, head_size, kv_cache_dtype, dtype, - seed, src_device) - - # Create the KV caches on the second device. - dist_key_caches, dist_value_caches = create_kv_caches_with_random_for_npu( - num_blocks, block_size, 1, num_heads, head_size, kv_cache_dtype, dtype, - seed, dst_device) - - src_key_caches_clone = src_key_caches[0].clone() - src_value_caches_clone = src_value_caches[0].clone() - - # Call the swap_blocks kernel. - src_kv_caches = [] - dist_kv_caches = [] - for i in range(len(src_key_caches)): - src_kv_caches.append(torch.tensor(torch.cat((src_key_caches[i].unsqueeze(0), src_value_caches[i].unsqueeze(0)), dim=0))) - for i in range(len(dist_key_caches)): - dist_kv_caches.append(torch.tensor(torch.cat((dist_key_caches[i].unsqueeze(0), dist_value_caches[i].unsqueeze(0)), dim=0))) - - swap_blocks(src_kv_caches[0], dist_kv_caches[0], block_mapping_tensor) - - - for src, dst in block_mapping: - torch.testing.assert_close(src_key_caches_clone[src].cpu(), - dist_kv_caches[0][0][dst].cpu()) - torch.testing.assert_close(src_value_caches_clone[src].cpu(), - dist_kv_caches[0][1][dst].cpu()) diff --git a/vllm/attention/backends/ascend.py b/vllm/attention/backends/ascend.py index 87693d4743cfb..d8a68bb0dbcec 100644 --- a/vllm/attention/backends/ascend.py +++ b/vllm/attention/backends/ascend.py @@ -54,8 +54,10 @@ def swap_blocks( src_indices = src_to_dst[:, 0] dst_indices = src_to_dst[:, 1] - dst_key_cache[dst_indices] = src_key_cache[src_indices].to(dst_key_cache.device) - dst_value_cache[dst_indices] = src_value_cache[src_indices].to(dst_key_cache.device) + dst_key_cache[dst_indices] = src_key_cache[src_indices].to( + dst_key_cache.device) + dst_value_cache[dst_indices] = src_value_cache[src_indices].to( + dst_key_cache.device) @staticmethod @@ -266,9 +268,9 @@ class AscendMetadataBuilder(CommonMetadataBuilder[AscendMetadata]): _metadata_cls = AscendMetadata - def compute_npu_slot_indices(self, is_profile_run, slot_indices, seq_id, seq_len, - context_len, start_idx, block_size, block_tables, - max_query_len): + def compute_npu_slot_indices(self, is_profile_run, slot_indices, seq_id, + seq_len, context_len, start_idx, block_size, + block_tables, max_query_len): """ compute slot indices @@ -317,7 +319,8 @@ def _add_seq_group( """ is_prompt = inter_data.is_prompt block_tables = inter_data.block_tables - max_query_len = max(max(data.query_lens) for data in self.input_builder.inter_data_list) + max_query_len = max(max(data.query_lens) + for data in self.input_builder.inter_data_list) is_prompt = inter_data.is_prompt block_tables = inter_data.block_tables @@ -359,9 +362,11 @@ def _add_seq_group( is_prompt, query_len, context_len, self.sliding_window, self.use_v2_block_manager) - self.compute_npu_slot_indices(is_profile_run, self.slot_mapping, seq_id, seq_len, - context_len, start_idx, self.block_size, inter_data.block_tables, - max_query_len) + self.compute_npu_slot_indices(is_profile_run, self.slot_mapping, + seq_id, seq_len, context_len, + start_idx, self.block_size, + inter_data.block_tables, + max_query_len) class AscendAttentionBackendImpl(AttentionImpl): @@ -451,7 +456,8 @@ def forward( ) attn_metadata.attn_mask = attention_mask - if self.alibi_slopes is not None and attn_metadata.pse_shift is None: + if (self.alibi_slopes is not None + and attn_metadata.pse_shift is None): attn_metadata.pse_shift = _make_alibi_bias( self.alibi_slopes, self.num_kv_heads, @@ -461,15 +467,13 @@ def forward( ) # shape of q/k/v [B,S*H] --> [B,S,N,D] - query = query.view( - -1, attn_metadata.max_prefill_seq_len, self.num_heads, self.head_size - ).transpose(1, 2) - key = key.view( - -1, attn_metadata.max_prefill_seq_len, self.num_kv_heads, self.head_size - ).transpose(1, 2) - value = value.view( - -1, attn_metadata.max_prefill_seq_len, self.num_kv_heads, self.head_size - ).transpose(1, 2) + query = query.view(-1, attn_metadata.max_prefill_seq_len, + self.num_heads, self.head_size).transpose(1, 2) + key = key.view(-1, attn_metadata.max_prefill_seq_len, + self.num_kv_heads, self.head_size).transpose(1, 2) + value = value.view(-1, attn_metadata.max_prefill_seq_len, + self.num_kv_heads, + self.head_size).transpose(1, 2) # FA for prefill phase output = torch_npu.npu_prompt_flash_attention( @@ -490,7 +494,7 @@ def forward( num_tokens, -1, self.num_heads * self.head_size ) - elif decode_meta := attn_metadata.decode_metadata: + elif attn_metadata.decode_metadata: # FA for decoding phase assert kv_cache is not None # shape of query [B,S*H] --> [B,S,H] @@ -523,7 +527,6 @@ def gen_input_mask(seq_len, sliding_window, len): Generating lower triangular matrix """ if len > 16384: - # TODO (cmq): test me # improve computing performance on NPU when input tokens are huge global SHARE_MASK_TRIL_PREFIX_CACHE if SHARE_MASK_TRIL_PREFIX_CACHE is None: diff --git a/vllm/engine/async_llm_engine.py b/vllm/engine/async_llm_engine.py index 8bf1904b8b351..ff2bc685c3ee7 100644 --- a/vllm/engine/async_llm_engine.py +++ b/vllm/engine/async_llm_engine.py @@ -594,10 +594,12 @@ def _get_executor_cls( "Not supported distributed execution model on XPU device.") elif engine_config.device_config.device_type == "npu": if distributed_executor_backend == "mp": - from vllm.executor.multiproc_npu_executor import MultiprocessingNPUExecutorAsync + from vllm.executor.multiproc_npu_executor import ( + MultiprocessingNPUExecutorAsync) executor_class = MultiprocessingNPUExecutorAsync elif distributed_executor_backend == "ray": - raise NotImplementedError("ray is not implemented in Ascend NPU currently") + raise NotImplementedError( + "ray is not implemented in Ascend NPU currently") else: from vllm.executor.npu_executor import NPUExecutorAsync executor_class = NPUExecutorAsync diff --git a/vllm/engine/llm_engine.py b/vllm/engine/llm_engine.py index 4fb40ad5dcdbe..62634b0b50167 100644 --- a/vllm/engine/llm_engine.py +++ b/vllm/engine/llm_engine.py @@ -545,10 +545,12 @@ def _get_executor_cls(cls, executor_class = XPUExecutor elif engine_config.device_config.device_type == "npu": if distributed_executor_backend == "mp": - from vllm.executor.multiproc_npu_executor import MultiprocessingNPUExecutorAsync + from vllm.executor.multiproc_npu_executor import ( + MultiprocessingNPUExecutorAsync) executor_class = MultiprocessingNPUExecutorAsync elif distributed_executor_backend == "ray": - raise NotImplementedError("ray is not implemented in Ascend NPU currently") + raise NotImplementedError( + "ray is not implemented in Ascend NPU currently") else: from vllm.executor.npu_executor import NPUExecutorAsync executor_class = NPUExecutorAsync @@ -993,7 +995,7 @@ def update_prefill_num_computed_tokens( prefill seq_group num_outputs: int - number of output tokens being processed for the given seq_group - is_first_step_output: Optional[bool] - + is_first_step_output: Optional[bool] - If multi-step is enabled and num_outputs is 1, this value indicates if this outputs belongs to the first step in the multi-step. diff --git a/vllm/executor/multiproc_npu_executor.py b/vllm/executor/multiproc_npu_executor.py index c649d0c2cf986..9d04755450121 100644 --- a/vllm/executor/multiproc_npu_executor.py +++ b/vllm/executor/multiproc_npu_executor.py @@ -3,14 +3,12 @@ from vllm.executor.npu_executor import NPUExecutor from vllm.logger import init_logger -from vllm.utils import (make_async, update_environment_variables) +from vllm.utils import update_environment_variables from vllm.executor.multiproc_gpu_executor import ( MultiprocessingGPUExecutor, MultiprocessingGPUExecutorAsync) logger = init_logger(__name__) -# TODO (cmq) fix daemon process cannot have children process error -# os.environ["TORCHINDUCTOR_COMPILE_THREADS"] = "1" doesn't work in _init_executor class MultiprocessingNPUExecutor(MultiprocessingGPUExecutor, NPUExecutor): """Python multiprocessing-based multi-NPU executor""" @@ -22,7 +20,8 @@ def _check_executor_parameters(self): # Set ASCEND_RT_VISIBLE_DEVICES for the driver, inherited by workers if "ASCEND_RT_VISIBLE_DEVICES" not in os.environ: update_environment_variables({ - "ASCEND_RT_VISIBLE_DEVICES": (",".join(map(str, range(world_size)))) + "ASCEND_RT_VISIBLE_DEVICES": + (",".join(map(str, range(world_size)))) }) npu_device_count = torch.npu.device_count() @@ -33,7 +32,8 @@ def _check_executor_parameters(self): assert world_size <= npu_device_count, ( f"please ensure that world_size ({world_size}) " - f"is less than than max local Ascend npu count ({npu_device_count})") + f"is less than than max local Ascend npu count " + f"({npu_device_count})") class MultiprocessingNPUExecutorAsync(MultiprocessingNPUExecutor, diff --git a/vllm/model_executor/layers/layernorm.py b/vllm/model_executor/layers/layernorm.py index 1221a6518fea0..331b71d7ab961 100644 --- a/vllm/model_executor/layers/layernorm.py +++ b/vllm/model_executor/layers/layernorm.py @@ -102,7 +102,8 @@ def forward_npu( self.variance_epsilon) return x, residual - x, residual = torch_npu.npu_rms_norm(x, self.weight, self.variance_epsilon) + x, residual = torch_npu.npu_rms_norm(x, self.weight, + self.variance_epsilon) return x diff --git a/vllm/model_executor/models/commandr.py b/vllm/model_executor/models/commandr.py index 649dc798d22dc..ddceb15c61bd1 100644 --- a/vllm/model_executor/models/commandr.py +++ b/vllm/model_executor/models/commandr.py @@ -45,12 +45,16 @@ default_weight_loader, row_parallel_weight_loader) from vllm.model_executor.sampling_metadata import SamplingMetadata from vllm.model_executor.utils import set_weight_attrs +from vllm.platforms import current_platform from vllm.sequence import IntermediateTensors from .interfaces import SupportsLoRA -@torch.compile +current_backend = "inductor" +if current_platform.is_npu(): + current_backend = "npu" +@torch.compile(backend=current_backend) def layer_norm_func(hidden_states, weight, variance_epsilon): input_dtype = hidden_states.dtype hidden_states = hidden_states.to(torch.float32) diff --git a/vllm/model_executor/sampling_metadata.py b/vllm/model_executor/sampling_metadata.py index 549b1c1c32598..4c307b692cdb5 100644 --- a/vllm/model_executor/sampling_metadata.py +++ b/vllm/model_executor/sampling_metadata.py @@ -159,7 +159,8 @@ def prepare( categorized_sample_indices, num_prompts, ) = _prepare_seq_groups(seq_group_metadata_list, seq_lens, query_lens, - device, generators, cache, pad_for_invariant_seq_len) + device, generators, cache, + pad_for_invariant_seq_len) selected_token_indices = async_tensor_h2d(selected_token_indices, dtype=torch.long, target_device=device, diff --git a/vllm/platforms/ascend.py b/vllm/platforms/ascend.py index 7dcf632ba80bb..bd31b5d72c4ad 100644 --- a/vllm/platforms/ascend.py +++ b/vllm/platforms/ascend.py @@ -10,8 +10,9 @@ def device_id_to_physical_device_id(device_id: int) -> int: if "ASCEND_RT_VISIBLE_DEVICES" in os.environ: device_ids = os.environ["ASCEND_RT_VISIBLE_DEVICES"].split(",") if device_ids == [""]: - raise RuntimeError("ASCEND_RT_VISIBLE_DEVICES is set to empty string," - " which means Ascend NPU support is disabled.") + raise RuntimeError("ASCEND_RT_VISIBLE_DEVICES is set to empty" + "string, which means Ascend NPU support is" + "disabled.") physical_device_id = device_ids[device_id] return int(physical_device_id) else: @@ -21,37 +22,31 @@ def device_id_to_physical_device_id(device_id: int) -> int: class AscendPlatform(Platform): _enum = PlatformEnum.ASCEND - @staticmethod - def get_device_capability(device_id: int = 0) -> Tuple[int, int]: + @classmethod + def get_device_capability(cls, device_id: int = 0): raise RuntimeError("Ascend NPU does not have device capability.") - @staticmethod - def get_device_name(device_id: int = 0) -> str: + @classmethod + def get_device_name(cls, device_id: int = 0) -> str: physical_device_id = device_id_to_physical_device_id(device_id) return torch.npu.get_device_name(physical_device_id) - @staticmethod - def inference_mode(): + @classmethod + def inference_mode(cls): return torch.inference_mode() - @staticmethod - def set_device(device: torch.device) -> torch.device: + @classmethod + def set_device(cls, device: torch.device) -> torch.device: torch.npu.set_device(device) - @staticmethod - def empty_cache(): + @classmethod + def empty_cache(cls): torch.npu.empty_cache() - @staticmethod - def synchronize(): + @classmethod + def synchronize(cls): torch.npu.synchronize() - @staticmethod - def mem_get_info() -> Tuple[int, int]: + @classmethod + def mem_get_info(cls) -> Tuple[int, int]: return torch.npu.mem_get_info() - - @staticmethod - def current_memory_usage(device: torch.device) -> float: - torch.npu.reset_peak_memory_stats(device) # type: ignore - mem = torch.npu.max_memory_allocated(device) # type: ignore - return mem diff --git a/vllm/platforms/interface.py b/vllm/platforms/interface.py index 9ab71516b3252..6e376a8c0e925 100644 --- a/vllm/platforms/interface.py +++ b/vllm/platforms/interface.py @@ -1,5 +1,4 @@ import enum -import gc from typing import NamedTuple, Optional, Tuple, Union import torch @@ -108,34 +107,6 @@ def inference_mode(cls): """ return torch.inference_mode(mode=True) - @staticmethod - def current_memory_usage(): - return None - - def memory_profiler(self): - return PlatformMemoryProfiler(self) - class UnspecifiedPlatform(Platform): _enum = PlatformEnum.UNSPECIFIED - - -class PlatformMemoryProfiler: - - def __init__(self, - platform: Platform, - device: Optional[torch.types.Device] = None): - self.device = device - self.platform = platform - - def __enter__(self): - self.initial_memory = self.platform.current_memory_usage(self.device) - # This allows us to call methods of the context manager if needed - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.final_memory = self.platform.current_memory_usage(self.device) - self.consumed_memory = self.final_memory - self.initial_memory - - # Force garbage collection - gc.collect() diff --git a/vllm/utils.py b/vllm/utils.py index a025c3c40a434..808b0a9ec3f31 100644 --- a/vllm/utils.py +++ b/vllm/utils.py @@ -784,6 +784,9 @@ def current_memory_usage(self) -> float: elif is_xpu(): torch.xpu.reset_peak_memory_stats(self.device) # type: ignore mem = torch.xpu.max_memory_allocated(self.device) # type: ignore + elif current_platform.is_npu(): + torch.npu.reset_peak_memory_stats(self.device) # type: ignore + mem = torch.npu.max_memory_allocated(self.device) # type: ignore return mem def __enter__(self): diff --git a/vllm/worker/npu_model_runner.py b/vllm/worker/npu_model_runner.py index 723c5ddce1c99..13c614f7c00d4 100644 --- a/vllm/worker/npu_model_runner.py +++ b/vllm/worker/npu_model_runner.py @@ -1,6 +1,6 @@ import dataclasses from dataclasses import dataclass -from typing import (TYPE_CHECKING, Any, Dict, List, Optional, Set, Type, +from typing import (Any, Dict, List, Optional, Set, Type, TypeVar) import torch @@ -30,11 +30,12 @@ from vllm.sampling_params import SamplingParams from vllm.sequence import SequenceGroupMetadata -from vllm.utils import (flatten_2d_lists, make_tensor_with_pad) -from vllm.worker.model_runner import ModelInputForGPU, ModelInputForGPUBuilder, ModelInputForGPUWithSamplingMetadata, GPUModelRunnerBase, ModelRunner +from vllm.utils import (DeviceMemoryProfiler, flatten_2d_lists, + make_tensor_with_pad) +from vllm.worker.model_runner import (ModelInputForGPU, ModelInputForGPUBuilder, + ModelInputForGPUWithSamplingMetadata, + ModelRunner) -if TYPE_CHECKING: - from vllm.attention.backends.abstract import AttentionBackend logger = init_logger(__name__) @@ -55,7 +56,8 @@ class ModelInputForNPU(ModelInputForGPU): @dataclass(frozen=True) -class ModelInputForNPUWithSamplingMetadata(ModelInputForGPUWithSamplingMetadata): +class ModelInputForNPUWithSamplingMetadata( + ModelInputForGPUWithSamplingMetadata): """ Used by the ModelRunner. """ @@ -102,42 +104,34 @@ def build(self) -> ModelInputForNPU: } batch_size = len(input_tokens) - use_captured_graph = self._use_captured_graph(batch_size, - max_decode_seq_len) # If cuda graph can be used, pad tensors accordingly. # See `capture_model` API for more details. # vLLM uses cuda graph only for decoding requests. cuda_graph_pad_size = -1 - # # TODO (cmq): support cuda graph - # if use_captured_graph: - # graph_batch_size = _get_graph_batch_size(batch_size) - # assert graph_batch_size >= batch_size - # cuda_graph_pad_size = graph_batch_size - batch_size - # batch_size = graph_batch_size - - # # Tokens and positions. - # input_tokens.extend([0] * cuda_graph_pad_size) - # input_positions.extend([0] * cuda_graph_pad_size) if self.inter_data_list[0].is_prompt: - input_tokens_tensor = make_tensor_with_pad(input_tokens, 0, - dtype=torch.int, - device=self.runner.device) - input_positions_tensor = make_tensor_with_pad(input_positions, 0, - dtype=torch.int, - device=self.runner.device) + input_tokens_tensor = make_tensor_with_pad( + input_tokens, 0, + dtype=torch.int, + device=self.runner.device) + input_positions_tensor = make_tensor_with_pad( + input_positions, 0, + dtype=torch.int, + device=self.runner.device) input_tokens_tensor = torch.flatten(input_tokens_tensor) input_positions_tensor = torch.flatten(input_positions_tensor) max_seq_len = max(seq_lens) seq_lens = len(seq_lens) * [max_seq_len] else: - input_tokens_tensor = torch.tensor(flatten_2d_lists(input_tokens), + input_tokens_tensor = torch.tensor( + flatten_2d_lists(input_tokens), + dtype=torch.long, + device=self.runner.device) + input_positions_tensor = torch.tensor( + flatten_2d_lists(input_positions), dtype=torch.long, device=self.runner.device) - input_positions_tensor = torch.tensor(flatten_2d_lists(input_positions), - dtype=torch.long, - device=self.runner.device) # Sequence and query lengths. seq_lens.extend([1] * cuda_graph_pad_size) @@ -274,7 +268,7 @@ def load_model(self) -> None: def get_vllm_model(self) -> None: logger.info("Starting to load model %s...", self.model_config.model) - with current_platform.memory_profiler() as m: + with DeviceMemoryProfiler() as m: self.model = get_model(model_config=self.model_config, device_config=self.device_config, load_config=self.load_config,