Skip to content

[DP] Tiny fix of dp and update example #1273

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions .github/workflows/vllm_ascend_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,10 @@ jobs:
pytest -sv tests/e2e/multicard/test_offline_inference_distributed.py::test_models_distributed_DeepSeek_W8A8
pytest -sv tests/e2e/multicard/test_offline_inference_distributed.py::test_models_distributed_DeepSeek_dbo
pytest -sv tests/e2e/multicard/test_offline_inference_distributed.py::test_models_distributed_DeepSeekV3_dbo
pytest -sv tests/e2e/multicard/ --ignore=tests/e2e/multicard/test_ilama_lora_tp2.py --ignore=tests/e2e/multicard/test_offline_inference_distributed.py
pytest -sv tests/e2e/multicard/test_data_parallel.py
pytest -sv tests/e2e/multicard/ --ignore=tests/e2e/multicard/test_ilama_lora_tp2.py \
--ignore=tests/e2e/multicard/test_offline_inference_distributed.py \
--ignore=tests/e2e/multicard/test_data_parallel.py

- name: Run vllm-project/vllm-ascend test on V0 engine
if: ${{ github.event_name == 'schedule' }}
Expand All @@ -380,4 +383,7 @@ jobs:
pytest -sv tests/e2e/multicard/test_offline_inference_distributed.py::test_models_distributed_DeepSeek
pytest -sv tests/e2e/multicard/test_offline_inference_distributed.py::test_models_distributed_topk
pytest -sv tests/e2e/multicard/test_offline_inference_distributed.py::test_models_distributed_DeepSeek_W8A8
pytest -sv tests/e2e/multicard/ --ignore=tests/e2e/multicard/test_ilama_lora_tp2.py --ignore=tests/e2e/multicard/test_offline_inference_distributed.py
pytest -sv tests/e2e/multicard/test_data_parallel.py
pytest -sv tests/e2e/multicard/ --ignore=tests/e2e/multicard/test_ilama_lora_tp2.py \
--ignore=tests/e2e/multicard/test_offline_inference_distributed.py \
--ignore=tests/e2e/multicard/test_data_parallel.py
85 changes: 0 additions & 85 deletions examples/dp_offline/data_parallel.py

This file was deleted.

19 changes: 0 additions & 19 deletions examples/dp_offline/run_dp.sh

This file was deleted.

241 changes: 241 additions & 0 deletions examples/offline_data_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
#
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved.
# Copyright 2023 The vLLM team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This file is a part of the vllm-ascend project.
# Adapted from vllm-project/vllm/examples/offline_inference/data_parallel.py
#
"""
Usage:
Single node:
Dense models:
python examples/offline_data_parallel.py \
--model="Qwen/Qwen2.5-0.5B-Instruct" \
--dp-size=2 \
--tp-size=2
MOE models:
python examples/offline_data_parallel.py \
--model="ibm-research/PowerMoE-3b" \
--dp-size=2 \
--tp-size=2 \
--enable-expert-parallel
Multi-node:
Node 0 (assume the node has ip of 10.99.48.128):
python examples/offline_data_parallel.py \
--model="ibm-research/PowerMoE-3b" \
--dp-size=2 \
--tp-size=2 \
--node-size=2 \
--node-rank=0 \
--enable-expert-parallel \
--master-addr=10.99.48.128 \
--master-port=13345
Node 1:
python examples/offline_data_parallel.py \
--model="ibm-research/PowerMoE-3b" \
--dp-size=2 \
--tp-size=2 \
--node-size=2 \
--node-rank=1 \
--enable-expert-parallel \
--master-addr=10.99.48.128 \
--master-port=13345
"""

import os
from time import sleep

from vllm import LLM, SamplingParams
from vllm.utils import get_open_port


def parse_args():
import argparse

parser = argparse.ArgumentParser(description="Data Parallel Inference")
parser.add_argument(
"--model",
type=str,
default="ibm-research/PowerMoE-3b",
help="Model name or path",
)
parser.add_argument("--dp-size",
type=int,
default=2,
help="Data parallel size")
parser.add_argument("--tp-size",
type=int,
default=1,
help="Tensor parallel size")
parser.add_argument("--node-size",
type=int,
default=1,
help="Total number of nodes")
parser.add_argument("--node-rank",
type=int,
default=0,
help="Rank of the current node")
parser.add_argument("--master-addr",
type=str,
default="",
help="Master node IP address")
parser.add_argument("--master-port",
type=int,
default=0,
help="Master node port")
parser.add_argument("--enforce-eager",
action="store_true",
help="Enforce eager mode execution.")
parser.add_argument("--trust-remote-code",
action="store_true",
help="Trust remote code.")
parser.add_argument("--enable-expert-parallel",
action="store_true",
help="Enable expert parallel, used in MOE models.")
return parser.parse_args()


def main(
model,
dp_size,
local_dp_rank,
global_dp_rank,
dp_master_ip,
dp_master_port,
GPUs_per_dp_rank,
enable_expert_parallel,
enforce_eager,
trust_remote_code,
):
# DP only support on V1 engine
os.environ["VLLM_USE_V1"] = "1"
os.environ["VLLM_DP_RANK"] = str(global_dp_rank)
os.environ["VLLM_DP_RANK_LOCAL"] = str(local_dp_rank)
os.environ["VLLM_DP_SIZE"] = str(dp_size)
os.environ["VLLM_DP_MASTER_IP"] = dp_master_ip
os.environ["VLLM_DP_MASTER_PORT"] = str(dp_master_port)

# CUDA_VISIBLE_DEVICES for each DP rank is set automatically inside the
# engine processes.

# Sample prompts.
prompts = [
"Hello, my name is",
"The president of the United States is",
"The capital of France is",
"The future of AI is",
] * 100

# with DP, each rank should process different prompts.
# usually all the DP ranks process a full dataset,
# and each rank processes a different part of the dataset.
floor = len(prompts) // dp_size
remainder = len(prompts) % dp_size

# Distribute prompts into even groups.
def start(rank):
return rank * floor + min(rank, remainder)

prompts = prompts[start(global_dp_rank):start(global_dp_rank + 1)]
if len(prompts) == 0:
# if any rank has no prompts to process,
# we need to set a placeholder prompt
prompts = ["Placeholder"]
print(f"DP rank {global_dp_rank} needs to process {len(prompts)} prompts")

# Create a sampling params object.
# since we are doing data parallel, every rank can have different
# sampling params. here we set different max_tokens for different
# ranks for demonstration.
sampling_params = SamplingParams(temperature=0.8,
top_p=0.95,
max_tokens=[16, 20][global_dp_rank % 2])

# Create an LLM.
llm = LLM(
model=model,
tensor_parallel_size=GPUs_per_dp_rank,
enforce_eager=enforce_eager,
enable_expert_parallel=enable_expert_parallel,
trust_remote_code=trust_remote_code,
)
outputs = llm.generate(prompts, sampling_params)
# Print the outputs.
for i, output in enumerate(outputs):
if i >= 5:
# print only 5 outputs
break
prompt = output.prompt
generated_text = output.outputs[0].text
print(f"DP rank {global_dp_rank}, Prompt: {prompt!r}, "
f"Generated text: {generated_text!r}")

# Give engines time to pause their processing loops before exiting.
sleep(1)


if __name__ == "__main__":
args = parse_args()

dp_size = args.dp_size
tp_size = args.tp_size
node_size = args.node_size
node_rank = args.node_rank

if node_size == 1:
dp_master_ip = "127.0.0.1"
dp_master_port = get_open_port()
else:
dp_master_ip = args.master_addr
dp_master_port = args.master_port

assert dp_size % node_size == 0, "dp_size should be divisible by node_size"
dp_per_node = dp_size // node_size

from multiprocessing import Process

procs = []
for local_dp_rank, global_dp_rank in enumerate(
range(node_rank * dp_per_node, (node_rank + 1) * dp_per_node)):
proc = Process(
target=main,
args=(
args.model,
dp_size,
local_dp_rank,
global_dp_rank,
dp_master_ip,
dp_master_port,
tp_size,
args.enable_expert_parallel,
args.enforce_eager,
args.trust_remote_code,
),
)
proc.start()
procs.append(proc)
exit_code = 0
for proc in procs:
proc.join(timeout=300)
if proc.exitcode is None:
print(
f"Killing process {proc.pid} that didn't stop within 5 minutes."
)
proc.kill()
exit_code = 1
elif proc.exitcode:
exit_code = proc.exitcode

exit(exit_code)
Loading
Loading