diff --git a/benchmarks/1_serving_benchmark.sh b/benchmarks/1_serving_benchmark.sh index cb7bd390058cb..50645c52556cb 100644 --- a/benchmarks/1_serving_benchmark.sh +++ b/benchmarks/1_serving_benchmark.sh @@ -20,10 +20,10 @@ result_dir="/root/v1/vllm/benchmarks/result" # scheduler_policy=(infer) # swap_policies=(partial) declare -a scheduler_swap_policies -scheduler_swap_policies[0]="tfittradeoff partial" +# scheduler_swap_policies[0]="tfittradeoff partial" scheduler_swap_policies[1]="fcfs full" # scheduler_swap_policies[1]="tfittradeoff full" -# scheduler_swap_policies[2]="sjf full" +# scheduler_swap_policies[2]="las full" # scheduler_swap_policies[3]="sjmlfq full" # scheduler_swap_policies[3]="infer partial" # scheduler_swap_policies[4]="inferpreempt full" @@ -36,17 +36,17 @@ swap_space=64 max_tokens=2048 iter_theshold=15 -request_rates[0]=0.5 -request_rates[1]=1.0 -request_rates[2]=2.0 -request_rates[3]=5.0 +# request_rates[0]=0.5 +# request_rates[1]=1.0 +# request_rates[2]=2.0 +# request_rates[3]=5.0 request_rates[4]=10.0 -request_rates[5]=20.0 +# request_rates[5]=20.0 # request_rates=(2.0) swap_out_partial_rates=(0.5) waiting_iter_base=(0.1) -gpu_devices=1 +gpu_devices=3 for i in {0..0}; do for waiting_iter in "${waiting_iter_base[@]}"; do for swap_out_partial_rate in "${swap_out_partial_rates[@]}"; do @@ -57,7 +57,7 @@ for i in {0..0}; do swap_policy=${element[1]} # tmux new-session -s "api_server" -d bash start_server.sh $gpu_devices $model_name $swap_space $preemption_mode $policy $max_tokens $iter_theshold $max_num_seqs $swap_policy $swap_out_partial_rate $gpu_memory_utilization $waiting_iter - CUDA_VISIBLE_DEVICES=$gpu_devices taskset -c 10-11 python3 -m vllm.entrypoints.openai.api_server \ + CUDA_VISIBLE_DEVICES=$gpu_devices taskset -c 20-21 python3 -m vllm.entrypoints.openai.api_server \ --model $model_name --swap-space $swap_space --preemption-mode $preemption_mode --scheduler-policy $policy \ --enable-chunked-prefill --max-num-batched-tokens $max_tokens --iter-threshold $iter_theshold --max-num-seqs $max_num_seqs --swap-out-tokens-policy $swap_policy --swap-out-partial-rate $swap_out_partial_rate --execution-budget $iter_theshold \ --gpu-memory-utilization $gpu_memory_utilization --waiting-iter-base $waiting_iter --disable-log-requests >api_server_${policy}_${swap_policy}.log 2>&1 & diff --git a/benchmarks/backend_request_func.py b/benchmarks/backend_request_func.py index 58dcc6167efa6..dcb696deec996 100644 --- a/benchmarks/backend_request_func.py +++ b/benchmarks/backend_request_func.py @@ -228,9 +228,11 @@ async def async_request_openai_completions( "prompt": request_func_input.prompt, "temperature": 0.0, "best_of": request_func_input.best_of, + "min_tokens": request_func_input.output_len, "max_tokens": request_func_input.output_len, "stream": True, } + headers = { "Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}" } diff --git a/benchmarks/benchmark_serving.py b/benchmarks/benchmark_serving.py index fcbe4ee66660e..1484e3196904e 100644 --- a/benchmarks/benchmark_serving.py +++ b/benchmarks/benchmark_serving.py @@ -720,7 +720,7 @@ def main(args: argparse.Namespace): parser.add_argument("--scheduler-policy", type=str, default="fcfs", - choices=["fcfs", "infer","sjmlfq", "inferpreempt","sjf","tfittradeoff"], + choices=["fcfs", "infer","sjmlfq", "inferpreempt","sjf","tfittradeoff", "las"], help="Specify the scheduler policy.") parser.add_argument("--execution-counter", type=int, diff --git a/benchmarks/result/20240902/401/SJF.py b/benchmarks/result/20240902/401/SJF.py new file mode 100644 index 0000000000000..fa4ebc3f55320 --- /dev/null +++ b/benchmarks/result/20240902/401/SJF.py @@ -0,0 +1,21 @@ +import json +import os + +file_path_1 = "vllm-2.0qps-Llama-2-13b-chat-hf-203550-fcfs.json" +file_path_2 = "vllm-2.0qps-Llama-2-13b-chat-hf-201350-tfittradeoff.json" + + +with open(file_path_1, 'r', encoding="utf-8") as file1: + data1 = json.load(file1) +# print(type(data)) +input_lens_list1 = data1["input_lens"] +output_lens_list1 = data1["output_lens"] + +with open(file_path_2, 'r', encoding="utf-8") as file2: + data2 = json.load(file2) +# print(type(data)) +input_lens_list2 = data1["input_lens"] +output_lens_list2 = data1["output_lens"] + +print(input_lens_list1 == input_lens_list2) + diff --git a/benchmarks/result/analysis/result_analysis.py b/benchmarks/result/analysis/result_analysis.py index 8d658d2ed5f21..2525a6dd7cbf9 100644 --- a/benchmarks/result/analysis/result_analysis.py +++ b/benchmarks/result/analysis/result_analysis.py @@ -40,7 +40,7 @@ def __(mo): @app.cell def __(base_dir, os): _date = "20240902" - _counters = [387] + _counters = [401] e2e_result_dir_names = [ os.path.join(base_dir, _date, str(counter)) for counter in _counters ] @@ -567,6 +567,14 @@ def __(execute_result_dir_names, os, pd, plt, sns): plt.grid(alpha=0.5, linestyle="dashdot") # Option 1: Draw all motivations + policy_moti = "SJF" # TFITTradeoff, SJF, FCFS + sns.jointplot( + data=execute_result_dfs_moti[policy_moti], + x=metric_labels_moti[1], + y=metric_labels_moti[0], + label=policy_moti, + ) + policy_moti = "FCFS" # TFITTradeoff, SJF, FCFS sns.jointplot( data=execute_result_dfs_moti[policy_moti], @@ -576,14 +584,14 @@ def __(execute_result_dir_names, os, pd, plt, sns): ) # Option 2: Draw single policy motivation - # for policy_moti in policies_moti: - # # policy_moti = "TFITTradeoff" # TFITTradeoff, SJF, FCFS - # sns.scatterplot( - # data=execute_result_dfs_moti[policy_moti], - # x=metric_labels_moti[1], - # y=metric_labels_moti[0], - # label=policy_moti, - # ) + for policy_moti in policies_moti: + # policy_moti = "TFITTradeoff" # TFITTradeoff, SJF, FCFS + sns.jointplot( + data=execute_result_dfs_moti[policy_moti], + x=metric_labels_moti[1], + y=metric_labels_moti[0], + label=policy_moti, + ) plt.tight_layout() plt.gca() diff --git a/vllm/config.py b/vllm/config.py index 3e13ab51bf2a8..ee5d073fe5821 100644 --- a/vllm/config.py +++ b/vllm/config.py @@ -729,7 +729,7 @@ def _verify_args(self) -> None: f"max_num_batched_tokens ({self.max_num_batched_tokens}) must " "be greater than or equal to max_num_seqs " f"({self.max_num_seqs}).") - if self.policy not in ["fcfs", "ljf", "sjf", "utf", "random", "wtf","bff",'infer', 'sjmlfq', 'inferpreempt','tfittradeoff']: + if self.policy not in ["fcfs", "ljf", "las", "sjf", "utf", "random", "wtf","bff",'infer', 'sjmlfq', 'inferpreempt','tfittradeoff']: raise NotImplementedError( f"Scheduler policy {self.policy} is not implemented." ) diff --git a/vllm/core/policy.py b/vllm/core/policy.py index 82cadbb8682a4..74891acf67942 100644 --- a/vllm/core/policy.py +++ b/vllm/core/policy.py @@ -324,7 +324,7 @@ def get_priority( return seq_group.metrics.waiting_iter_nums -class ShortJobFirst(Policy): +class LeastAttainedSvr(Policy): def get_priority( self, @@ -356,7 +356,7 @@ class PolicyFactory: "utf": UncomputedTokensFirst, "random": Random, "wtf": WaitingTimeFirst, - "sjf": ShortJobFirst, + "las": LeastAttainedSvr, "ljf": LongJobFirst, "infer": TFTLatencyTrade, "sjmlfq": SkipJoinMLFQ, diff --git a/vllm/core/scheduler.py b/vllm/core/scheduler.py index fea9355fbc432..66d15ebebad57 100644 --- a/vllm/core/scheduler.py +++ b/vllm/core/scheduler.py @@ -1386,10 +1386,10 @@ def _schedule_running_partial( else: running_queue = policy.sort_by_priority(now, running_queue) - if len(self.waiting)!=0: - partial_swapped_flag = False - else: - partial_swapped_flag = self.partial_swap_out_flag + # if len(self.waiting)!=0: + # partial_swapped_flag = False + # else: + # partial_swapped_flag = self.partial_swap_out_flag while running_queue: seq_group: SequenceGroup = running_queue[0] @@ -1765,6 +1765,7 @@ def _schedule_swapped( leftover_swapped: Deque[SequenceGroup] = deque() while swapped_queue: seq_group: SequenceGroup = swapped_queue[0] + print("schedule current seq_group:", seq_group.get_seqs()) if self.scheduler_config.policy in ["infer", "tfittradeoff"] and seq_group.request_id == self.seq_group_for_preempted[ 0].request_id: # seq_group.reset_execution_iter_nums() @@ -1779,7 +1780,12 @@ def _schedule_swapped( if alloc_status == AllocStatus.LATER: for seq_group in swapped_queue: seq_group.update_waiting_iter_nums() - break + if self.scheduler_config.policy == "tfittradeoff": + swapped_queue.popleft() + leftover_swapped.appendleft(seq_group) + continue + else: + break # swapped_queue.popleft() # leftover_swapped.appendleft(seq_group) elif alloc_status == AllocStatus.NEVER: @@ -1816,7 +1822,12 @@ def _schedule_swapped( or not budget.can_schedule(num_new_tokens=num_new_tokens, num_new_seqs=num_new_seqs)): seq_group.update_waiting_iter_nums() - break + if self.scheduler_config.policy == "tfittradeoff": + swapped_queue.popleft() + leftover_swapped.appendleft(seq_group) + continue + else: + break if lora_int_id > 0 and curr_loras is not None: curr_loras.add(lora_int_id) @@ -1967,7 +1978,12 @@ def _schedule_prefills( # If the sequence group cannot be allocated, stop. can_allocate = self.block_manager.can_allocate(seq_group) if can_allocate == AllocStatus.LATER: - break + if self.scheduler_config.policy == "tfittradeoff": + leftover_waiting_sequences.appendleft(seq_group) + waiting_queue.popleft() + continue + else: + break elif can_allocate == AllocStatus.NEVER: logger.warning( "Input prompt (%d tokens) is too long" @@ -1997,7 +2013,12 @@ def _schedule_prefills( if (num_new_tokens == 0 or not budget.can_schedule(num_new_tokens=num_new_tokens, num_new_seqs=num_new_seqs)): - break + if self.scheduler_config.policy == "tfittradeoff": + leftover_waiting_sequences.appendleft(seq_group) + waiting_queue.popleft() + continue + else: + break # Can schedule this request. if curr_loras is not None and lora_int_id > 0: @@ -2642,7 +2663,7 @@ def _get_num_new_tokens(self, seq_group: SequenceGroup, num_new_tokens += seq.get_num_new_tokens() assert num_new_tokens > 0, f"{seq_group.get_seqs()}, \ - {seq_group.request_id}" # [Sequence(seq_id=80, status=SWAPPED, num_blocks=4)] + {seq_group.request_id}" # Chunk if a running request cannot fit in. # If number of seq > 1, it means it is doing beam search in a diff --git a/vllm/engine/arg_utils.py b/vllm/engine/arg_utils.py index 662cd84e11332..4610c9fbdb1b6 100644 --- a/vllm/engine/arg_utils.py +++ b/vllm/engine/arg_utils.py @@ -528,7 +528,7 @@ def add_cli_args( default="fcfs", help= "Scheduling policy. Can be 'fcfs', 'UncomputedTokensFirst (utf)'," - "'WaitingTimeFirst (wtf)','ShortJobFirst(sjf)', 'LongJobFirst (ljf)', " + "'WaitingTimeFirst (wtf)','ShortJobFirst(sjf)', 'LeastAttainedService(las)' 'LongJobFirst (ljf)', " "'infer', or 'random', default is 'fcfs'", ) diff --git a/vllm/engine/async_llm_engine.py b/vllm/engine/async_llm_engine.py index bcddaf072041a..c21a0578efe66 100644 --- a/vllm/engine/async_llm_engine.py +++ b/vllm/engine/async_llm_engine.py @@ -351,6 +351,8 @@ async def add_request_async( "not enabled!") if arrival_time is None: arrival_time = time.time() + + # raise ValueError(f"Got requests parameters: {params}")`` processed_inputs = await self.process_model_inputs_async( request_id=request_id, inputs=inputs, lora_request=lora_request)