Skip to content

support_dynamic_server for wintx #10589

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: develop
Choose a base branch
from
72 changes: 61 additions & 11 deletions llm/server/server/scripts/start_server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

export PYTHONPATH=/root/paddlejob/workspace/env_run/output/changwenbin/PaddleNLP
export PYTHONPATH=/root/paddlejob/workspace/env_run/output/changwenbin/PaddleNLP/llm/server/server/:$PYTHONPATH
export PYTHONPATH=/root/paddlejob/workspace/env_run/output/changwenbin/PaddleNLP/llm/:$PYTHONPATH
export PATH=/opt/tritonserver/bin/:$PATH

export FLAGS_cascade_attention_max_partition_size=163840
export FLAGS_mla_use_tensorcore=0
export USE_DYNAMIC_GRAPH=1

export GLOG_v=0
export GLOG_logtostderr=1
export PYTHONIOENCODING=utf8
Expand All @@ -23,22 +32,30 @@ export LC_ALL=C.UTF-8
export FLAGS_gemm_use_half_precision_compute_type=0
export NVIDIA_TF32_OVERRIDE=0

export NCCL_ALGO=Tree
export FLAGS_use_wintx_gemm=True


# Model hyperparameters
export MP_NUM=${MP_NUM:-"1"} # number of model parallelism
export MP_NUM=${MP_NUM:-"4"} # number of model parallelism
export MP_NNODE=${MP_NNODE:-"1"} # number of nodes
export CUDA_VISIBLE_DEVICES=${CUDA_VISIBLE_DEVICES:-"0"} # GPU ids
export CUDA_VISIBLE_DEVICES=${CUDA_VISIBLE_DEVICES:-"0,1,2,3"} # GPU ids
export MAX_SEQ_LEN=${MAX_SEQ_LEN:-"8192"}
export MAX_DEC_LEN=${MAX_DEC_LEN:-"8192"}
export BATCH_SIZE=${BATCH_SIZE:-"20"}
export BLOCK_BS=${BLOCK_BS:-"4"}
export MAX_DEC_LEN=${MAX_DEC_LEN:-"4096"}
export BATCH_SIZE=${BATCH_SIZE:-"128"}
export BLOCK_BS=${BLOCK_BS:-"1"}
export BLOCK_RATIO=${BLOCK_RATIO:-"0.75"}
export ENC_DEC_BLOCK_NUM=${ENC_DEC_BLOCK_NUM:-"4"}
export MAX_PREFILL_BATCH=${MAX_PREFILL_BATCH:-"4"}
export STOP_THRESHOLD=${STOP_THRESHOLD:-"0"}

export tag=${tag:-"3.0.0.b4"}
export model_name=$1
export MODEL_DIR=${MODEL_DIR:-"/models"}
export MODEL_DIR=/root/paddlejob/workspace/env_run/output/model/dsv3_w2_tq2.75_w4_channel_tp4_new
# export MODEL_DIR=/root/paddlejob/workspace/env_run/output/bos_model/community/deepseek-ai/DeepSeek-V3-0324
# export MODEL_DIR=/root/.paddlenlp/models/Qwen/Qwen1.5-MoE-A2.7B-Chat
# export MODEL_DIR=/root/paddlejob/workspace/env_run/output/changwenbin/PaddleNLP/llm/predict/static/Qwen/Qwen1.5-MoE-A2.7B-Chat
# ${MODEL_DIR:-"/models"}

if [ ! "$model_name" == "" ]; then
export MODEL_DIR=${MODEL_DIR}/${model_name}
Expand Down Expand Up @@ -71,9 +88,9 @@ check_port_occupied ${SERVICE_HTTP_PORT}



if [ ! -d "llm_model" ];then
ln -s /opt/source/PaddleNLP/llm/server/server/llm_model llm_model
fi
# if [ ! -d "llm_model" ];then
# ln -s /opt/source/PaddleNLP/llm/server/server/llm_model llm_model
# fi

mkdir -p log
rm -rf console.log log/*
Expand All @@ -86,6 +103,39 @@ if [ "$MP_NNODE" -gt 1 ]; then
else
POD_0_IP="127.0.0.1"
HOST_IP="127.0.0.1"
# 屏蔽平台预设的环境变量,因为框架采用兼容升级,检测到这些配置会使用原方式启动
# unset PADDLE_ELASTIC_JOB_ID
# unset PADDLE_TRAINER_ENDPOINTS
# unset DISTRIBUTED_TRAINER_ENDPOINTS
# unset FLAGS_START_PORT
# unset PADDLE_ELASTIC_TIMEOUT
# nnodes=$PADDLE_TRAINERS_NUM
# rank=$PADDLE_TRAINER_ID

# for name in `env | grep -E 'PADDLE|ENDPOINT' | awk -F'=' '{print $1}'`; do
# unset ${name}
# done

# START_RANK=0
# END_RANK=$nnodes
# END_RANK=1

# if [[ $rank -lt $START_RANK ]]; then
# echo "rank exit"
# exit 0
# fi

# if [[ $rank -ge $END_RANK ]]; then
# echo "rank exit"
# exit 0
# fi

# rank=$(($rank-$START_RANK))
# nnodes=$(($END_RANK-$START_RANK))
# master=`cat /root/paddlejob/workspace/hostfile | head -n $(($START_RANK+1)) | tail -n 1 | awk '{print $1}'`
# port=36677

# set -ex
fi

echo "POD_0_IP: $POD_0_IP HOST_IP: $HOST_IP"
Expand All @@ -105,10 +155,10 @@ LOG_REDIRECT=""
if [ "$OUTPUT_LOG_TO_CONSOLE" == "1" ]; then
LOG_REDIRECT="> log/console.log 2>&1"
fi
eval nohup tritonserver --exit-timeout-secs 100000 --cuda-memory-pool-byte-size 0:0 --cuda-memory-pool-byte-size 1:0 \
eval nohup fastdeployserver --exit-timeout-secs 100000 --cuda-memory-pool-byte-size 0:0 --cuda-memory-pool-byte-size 1:0 \
--cuda-memory-pool-byte-size 2:0 --cuda-memory-pool-byte-size 3:0 --cuda-memory-pool-byte-size 4:0 \
--cuda-memory-pool-byte-size 5:0 --cuda-memory-pool-byte-size 6:0 --cuda-memory-pool-byte-size 7:0 \
--pinned-memory-pool-byte-size 0 --model-repository llm_model/ \
--pinned-memory-pool-byte-size 0 --model-repository /root/paddlejob/workspace/env_run/output/changwenbin/PaddleNLP/llm/server/server/llm_model/ \
--allow-http false \
--grpc-port=${SERVICE_GRPC_PORT} \
--metrics-port=${METRICS_HTTP_PORT} \
Expand Down
8 changes: 4 additions & 4 deletions llm/server/server/scripts/stop_server.sh
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# /bin/bash

pids=($(ps aux | grep -E 'tritonserver' | grep -v grep | awk '{print $2}'))
pids=($(ps aux | grep -E 'fastdeployserver' | grep -v grep | awk '{print $2}'))

if [ ${#pids[@]} -eq 0 ]; then
echo "Can not find tritonserver."
echo "Can not find fastdeployserver."
timeout=1
else
timeout=300
Expand All @@ -30,15 +30,15 @@ while : ; do

if [ $elapsed_time -ge $timeout ]; then
echo "forcibly kill all process ..."
pids=$(ps auxww | grep -E "tritonserver|triton_python_backend_stub|infer|multiprocessing.resource_tracker|paddle.distributed.launch|task_queue_manager|app.py|spawn_main" | grep -v grep | grep -v start_both | awk '{print $2}');
pids=$(ps auxww | grep -E "fastdeployserver|triton_python_backend_stub|infer|multiprocessing.resource_tracker|paddle.distributed.launch|task_queue_manager|app.py|spawn_main" | grep -v grep | grep -v start_both | awk '{print $2}');
echo $pids;
for pid in ${pids[@]}; do
kill -9 ${pid}
done
break
fi

pids=$(ps auxww | grep -E "tritonserver|triton_python_backend_stub|multiprocessing.resource_tracker|paddle.distributed.launch|app.py|spawn_main" | grep -v grep | awk '{print $2}');
pids=$(ps auxww | grep -E "fastdeployserver|triton_python_backend_stub|multiprocessing.resource_tracker|paddle.distributed.launch|app.py|spawn_main" | grep -v grep | awk '{print $2}');
array=($(echo "$pids" | tr ' ' '\n'))

if [ ${#array[*]} -ne 0 ]; then
Expand Down
18 changes: 9 additions & 9 deletions llm/server/server/server/engine/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,10 @@ def read_from_config(self):
self._get_download_model()

config = self.get_model_config()
# check paddle nlp version
tag = os.getenv("tag")
if tag not in config["paddlenlp_version"]:
logger.warning(f"Current image paddlenlp version {tag} doesn't match the model paddlenlp version {config['paddlenlp_version']} ")
# # check paddle nlp version
# tag = os.getenv("tag")
# if tag not in config["paddlenlp_version"]:
# logger.warning(f"Current image paddlenlp version {tag} doesn't match the model paddlenlp version {config['paddlenlp_version']} ")

def reset_value(self, value_name, key, config):
if key in config:
Expand All @@ -336,11 +336,11 @@ def reset_value(self, value_name, key, config):
reset_value(self, "max_seq_len", "infer_model_max_seq_len", config)
reset_value(self, "return_full_hidden_states", "return_full_hidden_states", config)
reset_value(self, "dtype", "infer_model_dtype", config)
reset_value(self, "use_cache_kv_int8", "infer_model_cachekv_int8_type", config)
if self.use_cache_kv_int8 == None:
self.use_cache_kv_int8 = 0
else:
self.use_cache_kv_int8 = 1
# reset_value(self, "use_cache_kv_int8", "infer_model_cachekv_int8_type", config)
# if self.use_cache_kv_int8 == None:
# self.use_cache_kv_int8 = 0
# else:
# self.use_cache_kv_int8 = 1
if self.seq_len_limit > self.max_seq_len:
self.seq_len_limit = self.max_seq_len
logger.warning(f"The loading model requires len(input_ids) <= {self.max_seq_len}, now reset MAX_SEQ_LEN.")
Expand Down
113 changes: 97 additions & 16 deletions llm/server/server/server/engine/infer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
import os
import sys
import time

import builtins
from paddlenlp.utils.import_utils import (
auto_dynamic_graph_pybind,
is_paddlenlp_ops_available,
custom_import,
)
# from concurrent.futures import ThreadPoolExecutor
from multiprocessing import shared_memory

Expand Down Expand Up @@ -60,6 +65,7 @@ def __init__(self, args):
# 2**63 - 1
self.MAX_INFER_SEED = 9223372036854775806

self.use_dynamic_graph = int(os.getenv("USE_DYNAMIC_GRAPH", "0"))
self.config = global_config
self.model_cfg = self.config.get_model_config()
self.speculate_config = self.config.get_speculate_config()
Expand All @@ -74,7 +80,7 @@ def __init__(self, args):
self.qk_rope_head_dim = int(self.model_cfg["qk_rope_head_dim"])
self.v_head_dim = int(self.model_cfg["v_head_dim"])
self.kv_lora_rank = int(self.model_cfg["kv_lora_rank"])
self.mla_use_absorb = bool(self.model_cfg["mla_use_matrix_absorption"])
self.mla_use_absorb = False #bool(self.model_cfg["mla_use_matrix_absorption"])

self.max_stop_seqs_num = int(os.getenv("MAX_STOP_SEQS_NUM", 5))
self.stop_seqs_max_len = int(os.getenv("STOP_SEQS_MAX_LEN", 8))
Expand Down Expand Up @@ -115,6 +121,7 @@ def __init__(self, args):
share_inputs=self.share_inputs,
cache_kvs=self.cache_kvs,
config=self.config,
args=self.args,
mp_degree=self.nranks,
)

Expand Down Expand Up @@ -258,6 +265,11 @@ def init_inputs(self):
dtype=cache_type,
)

if self.use_dynamic_graph:
self.share_inputs["cache_kvs"] = list(self.cache_kvs.values())
for value in self.cache_kvs.values():
del value

pre_max_block_num = (
self.args.max_seq_len + self.args.block_size - 1
) // self.args.block_size + self.args.enc_dec_block_num
Expand Down Expand Up @@ -662,11 +674,19 @@ def run(self):
self.share_inputs["seq_lens_this_time"] = copy.deepcopy(
self.helper_tensors["seq_lens_this_time"][:real_bsz]
)
if self.config.return_full_hidden_states:
self.share_inputs["seq_lens_this_time"].name = "seq_lens_this_time"
self.input_tensors[-1] = self.share_inputs["seq_lens_this_time"]
if not self.config.return_full_hidden_states:
self.infer_engine.seq_lens_handle.share_external_data(self.share_inputs["seq_lens_this_time"])
# if self.config.return_full_hidden_states:
# self.share_inputs["seq_lens_this_time"].name = "seq_lens_this_time"
# self.input_tensors[-1] = self.share_inputs["seq_lens_this_time"]
# if not self.config.return_full_hidden_states:
# self.infer_engine.seq_lens_handle.share_external_data(self.share_inputs["seq_lens_this_time"])

if not self.use_dynamic_graph:
if self.config.return_full_hidden_states:
self.share_inputs["seq_lens_this_time"].name = "seq_lens_this_time"
self.input_tensors[-1] = self.share_inputs["seq_lens_this_time"]
else:
self.infer_engine.seq_lens_handle.share_external_data(self.share_inputs["seq_lens_this_time"])

self.share_inputs["not_need_stop"][0] = True

if not self.share_inputs["not_need_stop"]:
Expand All @@ -685,11 +705,22 @@ def run(self):
insert_step=self.insert_step,
)

if self.config.return_full_hidden_states:
outputs = self.infer_engine.predictor.run(self.input_tensors)
self.helper_tensors["full_hidden_states"] = outputs[0]
# if self.config.return_full_hidden_states:
# outputs = self.infer_engine.predictor.run(self.input_tensors)
# self.helper_tensors["full_hidden_states"] = outputs[0]
if self.use_dynamic_graph:
if self.config.return_full_hidden_states:
outputs = self.infer_engine.predictor.generate(**self.share_inputs)
self.helper_tensors["full_hidden_states"] = outputs[0]
else:
self.infer_engine.predictor.generate(**self.share_inputs)
else:
self.infer_engine.predictor.run()
# self.infer_engine.predictor.run()
if self.config.return_full_hidden_states:
outputs = self.infer_engine.predictor.run(self.input_tensors)
self.helper_tensors["full_hidden_states"] = outputs[0]
else:
self.infer_engine.predictor.run()

self.share_inputs["infer_seed"].add_(infer_seed_increment)
self.share_inputs["infer_seed"][:] %= self.MAX_INFER_SEED
Expand All @@ -709,7 +740,9 @@ class InferenceEngine(object):
mp_degree (int): model parallel size
"""

def __init__(self, model_dir, share_inputs, cache_kvs, config, mp_degree=1):
# def __init__(self, model_dir, share_inputs, cache_kvs, config, mp_degree=1):
def __init__(self, model_dir, share_inputs, cache_kvs, config, args, mp_degree=1):
self.args = args
self.config = config
self.model_dir = model_dir
self.mp_degree = mp_degree
Expand All @@ -724,14 +757,62 @@ def __init__(self, model_dir, share_inputs, cache_kvs, config, mp_degree=1):
self.nranks = fleet.worker_num()
self.rank = fleet.worker_index()

self._init_predictor()
if not self.config.return_full_hidden_states:
self.share_data()

# self._init_predictor()
# if not self.config.return_full_hidden_states:
# self.share_data()
# print_memory("before _init_predictor")
self.use_dynamic_graph = int(os.getenv("USE_DYNAMIC_GRAPH", "0"))
if self.use_dynamic_graph: # 动态图
self._init_dygraph_predictor()
else: # 静态图
self._init_predictor()
if not self.config.return_full_hidden_states:
self.share_data()
self.original_import = builtins.__import__
builtins.__import__ = custom_import
# print_memory("after _init_predictor")

def _init_dygraph_predictor(self):
llm_utils.set_triton_cache(self.args.model_dir, "dynamic")
from llm.predict.predictor import ModelArgument, PredictorArgument

predictor_args = PredictorArgument()
model_args = ModelArgument()

predictor_args.model_name_or_path = self.args.model_dir
predictor_args.max_length = self.args.max_dec_len
predictor_args.dtype = self.args.dtype
predictor_args.total_max_length = self.args.max_seq_len
predictor_args.inference_model = True
predictor_args.mode = "dynamic"
predictor_args.block_attn = True
predictor_args.append_attn = True
predictor_args.quant_type = "weight_only_intx"
predictor_args.mla_use_matrix_absorption = False

paddle.set_device(predictor_args.device)
paddle.set_default_dtype(predictor_args.dtype)

from paddlenlp.transformers import AutoConfig, AutoInferenceModelForCausalLM

config = AutoConfig.from_pretrained(predictor_args.model_name_or_path)

tensor_parallel_rank, tensor_parallel_degree = llm_utils.init_dist_env()

self.predictor = AutoInferenceModelForCausalLM.from_pretrained(
predictor_args.model_name_or_path,
config=config,
predictor_args=predictor_args,
model_args=model_args,
dtype=predictor_args.dtype,
tensor_parallel_degree=tensor_parallel_degree,
tensor_parallel_rank=tensor_parallel_rank,
)
def _init_predictor(self):
"""
predictor init
"""
llm_utils.set_triton_cache(self.args.model_dir, "static")
device_id = self.rank % self.config.mp_num_per_node
self.model_file = os.path.join(self.model_dir, f"model{PADDLE_INFERENCE_MODEL_SUFFIX}")
self.param_file = os.path.join(self.model_dir, f"model{PADDLE_INFERENCE_WEIGHTS_SUFFIX}")
Expand Down
Loading
Loading