Skip to content

Commit

Permalink
change the partial swap to dynamic swap
Browse files Browse the repository at this point in the history
  • Loading branch information
blinkbear committed Oct 27, 2024
1 parent f10d012 commit a45bdd2
Show file tree
Hide file tree
Showing 10 changed files with 284 additions and 316 deletions.
14 changes: 7 additions & 7 deletions benchmarks/1_serving_benchmark.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ swap_space=64
max_tokens=2048
iter_theshold=15
max_serving_time=86400
request_duration=1200
request_rates[0]=0.5
# request_rates[1]=1.0
# request_rates[2]=2.0
request_rates[3]=5.0
# request_rates[2]=10.0
request_duration=120
# request_rates[0]=0.5
#request_rates[1]=1.0
#request_rates[2]=2.0
#request_rates[3]=5.0
request_rates[2]=10.0
# request_rates[4]=10.0
# request_rates[5]=20.0
# request_rates[5]=50.0
Expand Down Expand Up @@ -79,7 +79,7 @@ for i in {0..0}; do
# run benchmark and save the output to benchmark.log
taskset -c 20-39 python3 benchmark_serving.py --execution-counter $COUNTER --dataset-path $dataset_path \
--dataset-name $dataset_name --request-rate $request_rate \
--num-prompts 500 --request-duration $request_duration --sharegpt-output-len 2000 --model $model_name --scheduler-policy $policy \
--num-prompts 500 --request-duration $request_duration --sharegpt-output-len 2000 --model $model_name --scheduler-policy $policy \
--save-result --result-dir $result_dir \
--metadata swap_space=$swap_space preemption_mode=$preemption_mode \
scheduler_policy=$policy gpu_memory_utilization=$gpu_memory_utilization\
Expand Down
3 changes: 1 addition & 2 deletions benchmarks/backend_request_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import fnmatch
import aiohttp
from tqdm.asyncio import tqdm
import asyncio

AIOHTTP_TIMEOUT = aiohttp.ClientTimeout(total=6 * 60 * 60)

Expand Down Expand Up @@ -234,7 +233,7 @@ async def async_request_openai_completions(
async with aiohttp.ClientSession(timeout=AIOHTTP_TIMEOUT) as session:
assert not request_func_input.use_beam_search
if policy in ["srjf", "sjf"]:
if request_func_input.min_tokens == None:
if request_func_input.min_tokens is None:
raise ValueError(f"For policy: {policy}, should specify min_tokens")
payload = {
"model": request_func_input.model,
Expand Down
57 changes: 30 additions & 27 deletions benchmarks/benchmark_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
to the end of the command above.
"""
import argparse
from concurrent.futures import ProcessPoolExecutor
import asyncio
import json
import os
Expand All @@ -33,7 +32,7 @@
import copy
from dataclasses import dataclass
from datetime import datetime
from typing import AsyncGenerator, List, Optional, Tuple, Generator
from typing import AsyncGenerator, List, Optional, Tuple
import fnmatch

import numpy as np
Expand All @@ -44,12 +43,11 @@

from vllm.transformers_utils.tokenizer import get_tokenizer
import multiprocessing
from multiprocessing import Pool
from typing import List
import queue
import nest_asyncio
nest_asyncio.apply()

os.environ["TOKENIZERS_PARALLELISM"] = "false"


@dataclass
class BenchmarkMetrics:
Expand Down Expand Up @@ -404,9 +402,9 @@ async def process_multiple_requests(requests, model_id, api_url, best_of, use_be
results = await asyncio.gather(*tasks)
return results

def process_requests(backend, args, pbar, request_func, data1=None):
def process_requests(backend, args, request_func, data1=None):
async def handle_requests():
tasks = []
tasks:List[asyncio.Task] = []
while True:
request_func_input = await asyncio.get_event_loop().run_in_executor(None, request_queue.get)
if request_func_input is None:
Expand All @@ -416,26 +414,30 @@ async def handle_requests():
tasks.append(
asyncio.create_task(
request_func(args.scheduler_policy,
request_func_input=request_func_input,
pbar=pbar,)
request_func_input=request_func_input)
)
)

else:
tasks.append(
asyncio.create_task(
request_func(request_func_input=request_func_input,
pbar=pbar,)
request_func(request_func_input=request_func_input)
)
)

# Gather the results of all tasks
outputs = await asyncio.gather(*tasks)
try:
outputs = await asyncio.gather(*tasks)
except Exception as e:
raise e
await asyncio.get_event_loop().run_in_executor(None, result_queue.put, outputs)

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(handle_requests())
# asyncio.run(handle_requests())
try:
loop.run_until_complete(handle_requests())
finally:
loop.close()
loop.stop()

async def benchmark(
backend: str,
Expand Down Expand Up @@ -464,16 +466,15 @@ async def benchmark(
workers = []

for i in range(num_workers):
worker = multiprocessing.Process(target=process_requests, args=(backend, args, pbar, request_func))
worker = multiprocessing.Process(target=process_requests,
args=(backend, args,request_func))
worker.start()
workers.append(worker)

send_request_num = 0
async for request in get_request_duration(input_requests, request_rate, request_duration, scheduler_policy):
prompt, prompt_len, output_len = request
if scheduler_policy in ["srjf", "sjf"]:
min_tokens = copy.deepcopy(data[prompt])
else:
min_tokens = None
min_tokens = copy.deepcopy(data[prompt]) if scheduler_policy in ["srjf", "sjf"] else None
request_func_input = RequestFuncInput(
model=model_id,
prompt=prompt,
Expand All @@ -484,21 +485,23 @@ async def benchmark(
use_beam_search=use_beam_search,
min_tokens=min_tokens,
)
send_request_num += 1
request_queue.put(request_func_input)

for _ in range(num_workers):
request_queue.put(None)

outputs: List[RequestFuncOutput] = []

while True:
try:
result = result_queue.get(timeout=2)
if len(outputs) == send_request_num:
break
result = result_queue.get()
for res in result:
pbar.update(1)
outputs.append(res)
# break
except Exception as error:
break
raise error

for worker in workers:
worker.join()
Expand Down Expand Up @@ -676,7 +679,7 @@ def main(args: argparse.Namespace):
else:
print("No JSON file found in the current directory.")

if data != None:
if data is not None:
data = {k:v for k, v in data.items() if v != 0}
input_requests = [req for req in input_requests if req[0] in data]

Expand Down Expand Up @@ -760,12 +763,12 @@ def main(args: argparse.Namespace):
prompt_output_lens_json = {}
for i in range(len(outputs)):
prompt_output_lens_json[outputs[i].prompt] = benchmark_result["output_lens"][i]
prompt_output_lens_file_name = f"prompt_output_{backend}-{args.request_rate}qps-{base_model_id}-{seconds}-{args.scheduler_policy}.json"
prompt_output_lens_file_name = f"prompt_output_{backend}-{args.request_rate}qps-{base_model_id}-{seconds}-{args.scheduler_policy}.json" # noqa: E501

if args.result_dir:
if not os.path.exists(os.path.join(args.result_dir, dir_name,"prompt")):
os.makedirs(os.path.join(args.result_dir, dir_name,"prompt"))
prompt_output_lens_file_name = os.path.join(args.result_dir, dir_name,"prompt", prompt_output_lens_file_name)
prompt_output_lens_file_name = os.path.join(args.result_dir, dir_name,"prompt", prompt_output_lens_file_name) # noqa: E501
with open(prompt_output_lens_file_name, "w") as prompt_output_lens_file_name_outfile:
json.dump(prompt_output_lens_json, prompt_output_lens_file_name_outfile)

Expand Down
Loading

0 comments on commit a45bdd2

Please sign in to comment.