Skip to content

Commit

Permalink
fix sjf to las & delete conditional partial swap & break to continue …
Browse files Browse the repository at this point in the history
…with bugs
  • Loading branch information
mchen644 committed Sep 3, 2024
1 parent 08e1f5b commit f12596b
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 32 deletions.
18 changes: 9 additions & 9 deletions benchmarks/1_serving_benchmark.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ result_dir="/root/v1/vllm/benchmarks/result"
# scheduler_policy=(infer)
# swap_policies=(partial)
declare -a scheduler_swap_policies
scheduler_swap_policies[0]="tfittradeoff partial"
# scheduler_swap_policies[0]="tfittradeoff partial"
scheduler_swap_policies[1]="fcfs full"
# scheduler_swap_policies[1]="tfittradeoff full"
# scheduler_swap_policies[2]="sjf full"
# scheduler_swap_policies[2]="las full"
# scheduler_swap_policies[3]="sjmlfq full"
# scheduler_swap_policies[3]="infer partial"
# scheduler_swap_policies[4]="inferpreempt full"
Expand All @@ -36,17 +36,17 @@ swap_space=64
max_tokens=2048
iter_theshold=15

request_rates[0]=0.5
request_rates[1]=1.0
request_rates[2]=2.0
request_rates[3]=5.0
# request_rates[0]=0.5
# request_rates[1]=1.0
# request_rates[2]=2.0
# request_rates[3]=5.0
request_rates[4]=10.0
request_rates[5]=20.0
# request_rates[5]=20.0

# request_rates=(2.0)
swap_out_partial_rates=(0.5)
waiting_iter_base=(0.1)
gpu_devices=1
gpu_devices=3
for i in {0..0}; do
for waiting_iter in "${waiting_iter_base[@]}"; do
for swap_out_partial_rate in "${swap_out_partial_rates[@]}"; do
Expand All @@ -57,7 +57,7 @@ for i in {0..0}; do
swap_policy=${element[1]}
# tmux new-session -s "api_server" -d bash start_server.sh $gpu_devices $model_name $swap_space $preemption_mode $policy $max_tokens $iter_theshold $max_num_seqs $swap_policy $swap_out_partial_rate $gpu_memory_utilization $waiting_iter

CUDA_VISIBLE_DEVICES=$gpu_devices taskset -c 10-11 python3 -m vllm.entrypoints.openai.api_server \
CUDA_VISIBLE_DEVICES=$gpu_devices taskset -c 20-21 python3 -m vllm.entrypoints.openai.api_server \
--model $model_name --swap-space $swap_space --preemption-mode $preemption_mode --scheduler-policy $policy \
--enable-chunked-prefill --max-num-batched-tokens $max_tokens --iter-threshold $iter_theshold --max-num-seqs $max_num_seqs --swap-out-tokens-policy $swap_policy --swap-out-partial-rate $swap_out_partial_rate --execution-budget $iter_theshold \
--gpu-memory-utilization $gpu_memory_utilization --waiting-iter-base $waiting_iter --disable-log-requests >api_server_${policy}_${swap_policy}.log 2>&1 &
Expand Down
2 changes: 2 additions & 0 deletions benchmarks/backend_request_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,11 @@ async def async_request_openai_completions(
"prompt": request_func_input.prompt,
"temperature": 0.0,
"best_of": request_func_input.best_of,
"min_tokens": request_func_input.output_len,
"max_tokens": request_func_input.output_len,
"stream": True,
}

headers = {
"Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}"
}
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/benchmark_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ def main(args: argparse.Namespace):
parser.add_argument("--scheduler-policy",
type=str,
default="fcfs",
choices=["fcfs", "infer","sjmlfq", "inferpreempt","sjf","tfittradeoff"],
choices=["fcfs", "infer","sjmlfq", "inferpreempt","sjf","tfittradeoff", "las"],
help="Specify the scheduler policy.")
parser.add_argument("--execution-counter",
type=int,
Expand Down
21 changes: 21 additions & 0 deletions benchmarks/result/20240902/401/SJF.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import json
import os

file_path_1 = "vllm-2.0qps-Llama-2-13b-chat-hf-203550-fcfs.json"
file_path_2 = "vllm-2.0qps-Llama-2-13b-chat-hf-201350-tfittradeoff.json"


with open(file_path_1, 'r', encoding="utf-8") as file1:
data1 = json.load(file1)
# print(type(data))
input_lens_list1 = data1["input_lens"]
output_lens_list1 = data1["output_lens"]

with open(file_path_2, 'r', encoding="utf-8") as file2:
data2 = json.load(file2)
# print(type(data))
input_lens_list2 = data1["input_lens"]
output_lens_list2 = data1["output_lens"]

print(input_lens_list1 == input_lens_list2)

26 changes: 17 additions & 9 deletions benchmarks/result/analysis/result_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __(mo):
@app.cell
def __(base_dir, os):
_date = "20240902"
_counters = [387]
_counters = [401]
e2e_result_dir_names = [
os.path.join(base_dir, _date, str(counter)) for counter in _counters
]
Expand Down Expand Up @@ -567,6 +567,14 @@ def __(execute_result_dir_names, os, pd, plt, sns):
plt.grid(alpha=0.5, linestyle="dashdot")

# Option 1: Draw all motivations
policy_moti = "SJF" # TFITTradeoff, SJF, FCFS
sns.jointplot(
data=execute_result_dfs_moti[policy_moti],
x=metric_labels_moti[1],
y=metric_labels_moti[0],
label=policy_moti,
)

policy_moti = "FCFS" # TFITTradeoff, SJF, FCFS
sns.jointplot(
data=execute_result_dfs_moti[policy_moti],
Expand All @@ -576,14 +584,14 @@ def __(execute_result_dir_names, os, pd, plt, sns):
)

# Option 2: Draw single policy motivation
# for policy_moti in policies_moti:
# # policy_moti = "TFITTradeoff" # TFITTradeoff, SJF, FCFS
# sns.scatterplot(
# data=execute_result_dfs_moti[policy_moti],
# x=metric_labels_moti[1],
# y=metric_labels_moti[0],
# label=policy_moti,
# )
for policy_moti in policies_moti:
# policy_moti = "TFITTradeoff" # TFITTradeoff, SJF, FCFS
sns.jointplot(
data=execute_result_dfs_moti[policy_moti],
x=metric_labels_moti[1],
y=metric_labels_moti[0],
label=policy_moti,
)

plt.tight_layout()
plt.gca()
Expand Down
2 changes: 1 addition & 1 deletion vllm/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ def _verify_args(self) -> None:
f"max_num_batched_tokens ({self.max_num_batched_tokens}) must "
"be greater than or equal to max_num_seqs "
f"({self.max_num_seqs}).")
if self.policy not in ["fcfs", "ljf", "sjf", "utf", "random", "wtf","bff",'infer', 'sjmlfq', 'inferpreempt','tfittradeoff']:
if self.policy not in ["fcfs", "ljf", "las", "sjf", "utf", "random", "wtf","bff",'infer', 'sjmlfq', 'inferpreempt','tfittradeoff']:
raise NotImplementedError(
f"Scheduler policy {self.policy} is not implemented."
)
Expand Down
4 changes: 2 additions & 2 deletions vllm/core/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ def get_priority(
return seq_group.metrics.waiting_iter_nums


class ShortJobFirst(Policy):
class LeastAttainedSvr(Policy):

def get_priority(
self,
Expand Down Expand Up @@ -356,7 +356,7 @@ class PolicyFactory:
"utf": UncomputedTokensFirst,
"random": Random,
"wtf": WaitingTimeFirst,
"sjf": ShortJobFirst,
"las": LeastAttainedSvr,
"ljf": LongJobFirst,
"infer": TFTLatencyTrade,
"sjmlfq": SkipJoinMLFQ,
Expand Down
39 changes: 30 additions & 9 deletions vllm/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1386,10 +1386,10 @@ def _schedule_running_partial(
else:
running_queue = policy.sort_by_priority(now, running_queue)

if len(self.waiting)!=0:
partial_swapped_flag = False
else:
partial_swapped_flag = self.partial_swap_out_flag
# if len(self.waiting)!=0:
# partial_swapped_flag = False
# else:
# partial_swapped_flag = self.partial_swap_out_flag

while running_queue:
seq_group: SequenceGroup = running_queue[0]
Expand Down Expand Up @@ -1765,6 +1765,7 @@ def _schedule_swapped(
leftover_swapped: Deque[SequenceGroup] = deque()
while swapped_queue:
seq_group: SequenceGroup = swapped_queue[0]
print("schedule current seq_group:", seq_group.get_seqs())
if self.scheduler_config.policy in ["infer", "tfittradeoff"] and seq_group.request_id == self.seq_group_for_preempted[
0].request_id:
# seq_group.reset_execution_iter_nums()
Expand All @@ -1779,7 +1780,12 @@ def _schedule_swapped(
if alloc_status == AllocStatus.LATER:
for seq_group in swapped_queue:
seq_group.update_waiting_iter_nums()
break
if self.scheduler_config.policy == "tfittradeoff":
swapped_queue.popleft()
leftover_swapped.appendleft(seq_group)
continue
else:
break
# swapped_queue.popleft()
# leftover_swapped.appendleft(seq_group)
elif alloc_status == AllocStatus.NEVER:
Expand Down Expand Up @@ -1816,7 +1822,12 @@ def _schedule_swapped(
or not budget.can_schedule(num_new_tokens=num_new_tokens,
num_new_seqs=num_new_seqs)):
seq_group.update_waiting_iter_nums()
break
if self.scheduler_config.policy == "tfittradeoff":
swapped_queue.popleft()
leftover_swapped.appendleft(seq_group)
continue
else:
break

if lora_int_id > 0 and curr_loras is not None:
curr_loras.add(lora_int_id)
Expand Down Expand Up @@ -1967,7 +1978,12 @@ def _schedule_prefills(
# If the sequence group cannot be allocated, stop.
can_allocate = self.block_manager.can_allocate(seq_group)
if can_allocate == AllocStatus.LATER:
break
if self.scheduler_config.policy == "tfittradeoff":
leftover_waiting_sequences.appendleft(seq_group)
waiting_queue.popleft()
continue
else:
break
elif can_allocate == AllocStatus.NEVER:
logger.warning(
"Input prompt (%d tokens) is too long"
Expand Down Expand Up @@ -1997,7 +2013,12 @@ def _schedule_prefills(
if (num_new_tokens == 0
or not budget.can_schedule(num_new_tokens=num_new_tokens,
num_new_seqs=num_new_seqs)):
break
if self.scheduler_config.policy == "tfittradeoff":
leftover_waiting_sequences.appendleft(seq_group)
waiting_queue.popleft()
continue
else:
break

# Can schedule this request.
if curr_loras is not None and lora_int_id > 0:
Expand Down Expand Up @@ -2642,7 +2663,7 @@ def _get_num_new_tokens(self, seq_group: SequenceGroup,
num_new_tokens += seq.get_num_new_tokens()

assert num_new_tokens > 0, f"{seq_group.get_seqs()}, \
{seq_group.request_id}" # [Sequence(seq_id=80, status=SWAPPED, num_blocks=4)]
{seq_group.request_id}"

# Chunk if a running request cannot fit in.
# If number of seq > 1, it means it is doing beam search in a
Expand Down
2 changes: 1 addition & 1 deletion vllm/engine/arg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ def add_cli_args(
default="fcfs",
help=
"Scheduling policy. Can be 'fcfs', 'UncomputedTokensFirst (utf)',"
"'WaitingTimeFirst (wtf)','ShortJobFirst(sjf)', 'LongJobFirst (ljf)', "
"'WaitingTimeFirst (wtf)','ShortJobFirst(sjf)', 'LeastAttainedService(las)' 'LongJobFirst (ljf)', "
"'infer', or 'random', default is 'fcfs'",
)

Expand Down
2 changes: 2 additions & 0 deletions vllm/engine/async_llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ async def add_request_async(
"not enabled!")
if arrival_time is None:
arrival_time = time.time()

# raise ValueError(f"Got requests parameters: {params}")``

processed_inputs = await self.process_model_inputs_async(
request_id=request_id, inputs=inputs, lora_request=lora_request)
Expand Down

0 comments on commit f12596b

Please sign in to comment.