Skip to content

Commit dd60db0

Browse files
authored
More comments (vllm-project#11)
* Comments done above worker * format * fixed missing arguments * fix * format
1 parent 2497a14 commit dd60db0

File tree

8 files changed

+241
-153
lines changed

8 files changed

+241
-153
lines changed

tests/under_models/send_mock_request.py

Lines changed: 99 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,25 +15,35 @@
1515
import asyncio
1616

1717
# This is the model to load for workers
18-
MODEL_PATH="/models/vicuna-7b/"
19-
20-
18+
MODEL_PATH = "YOUR_MODEL_PATH"
2119
"""
2220
1. Prepare a faked sequencegroup meta data
2321
2. Start a mocked AsyncLLMEngine, and modify its step_async function
2422
3. invoke the step_async function manually
23+
4. this test tries to kick off the `model_execution` part for the
24+
model so that we can perform tests
2525
"""
2626

27+
2728
class UglyAsyncLLMEngine(LLMEngine):
2829
"""Extension of LLMEngine to add async methods."""
2930

3031
async def step_async(self) -> List[RequestOutput]:
31-
sampling_para = SamplingParams(n=2, best_of=5, temperature=0.8, top_p=0.95, max_tokens=7)
32+
sampling_para = SamplingParams(n=2,
33+
best_of=5,
34+
temperature=0.8,
35+
top_p=0.95,
36+
max_tokens=7)
3237
seq_data = {}
3338
seq_data[0] = SequenceData(prompt_token_ids=[1, 3087, 8970, 338, 263])
3439
request_id = "cmpl-7bef75eaa4394a3d895b5508dd5f69f6"
3540

36-
seq_group_meta_data = SequenceGroupMetadata(request_id=request_id, is_prompt=True, seq_data=seq_data, sampling_params=sampling_para, block_tables={})
41+
seq_group_meta_data = SequenceGroupMetadata(
42+
request_id=request_id,
43+
is_prompt=True,
44+
seq_data=seq_data,
45+
sampling_params=sampling_para,
46+
block_tables={})
3747
seq_group_meta_data_lists = [seq_group_meta_data]
3848

3949
output = await self._run_workers_async(
@@ -44,24 +54,66 @@ async def step_async(self) -> List[RequestOutput]:
4454
blocks_to_copy={},
4555
finished_seqs=[],
4656
)
47-
print(output)
4857

49-
# TODO: change this to real one
50-
return RequestOutput(request_id=request_id, prompt="", prompt_token_ids=[1, 3087, 8970, 338, 263], outputs=[], finished=False)
58+
# Co(gc): we cannot use the real one as it contains private methods that cannot be invoked
59+
return RequestOutput(request_id=request_id,
60+
prompt="",
61+
prompt_token_ids=[1, 3087, 8970, 338, 263],
62+
outputs=[],
63+
finished=False)
5164

5265
async def step_async_multiple(self) -> List[RequestOutput]:
66+
"""
67+
Same but send two requests in a batch
68+
"""
5369
seq_group_metadata_lists = []
54-
request_id_0= "cmpl-81e2b9767b5b47bca7e649482698d385"
55-
seq_data_0 = {0: SequenceData(prompt_token_ids=[1, 3087, 8970, 338, 263])}
56-
sampling_params_0 = SamplingParams(n=1, best_of=1, presence_penalty=0.0, frequency_penalty=0.0, temperature=0.0, top_p=1.0, top_k=-1, use_beam_search=False, length_penalty=1.0, early_stopping=False, stop=[], ignore_eos=False, max_tokens=7, logprobs=None, skip_special_tokens=True)
57-
58-
seq_group_metadata_lists.append(SequenceGroupMetadata(request_id_0, True, seq_data_0, sampling_params_0, {}))
70+
request_id_0 = "cmpl-81e2b9767b5b47bca7e649482698d385"
71+
seq_data_0 = {
72+
0: SequenceData(prompt_token_ids=[1, 3087, 8970, 338, 263])
73+
}
74+
sampling_params_0 = SamplingParams(n=1,
75+
best_of=1,
76+
presence_penalty=0.0,
77+
frequency_penalty=0.0,
78+
temperature=0.0,
79+
top_p=1.0,
80+
top_k=-1,
81+
use_beam_search=False,
82+
length_penalty=1.0,
83+
early_stopping=False,
84+
stop=[],
85+
ignore_eos=False,
86+
max_tokens=7,
87+
logprobs=None,
88+
skip_special_tokens=True)
89+
90+
seq_group_metadata_lists.append(
91+
SequenceGroupMetadata(request_id_0, True, seq_data_0,
92+
sampling_params_0, {}))
5993

6094
request_id_1 = "cmpl-81e2b9767b5b47bca7e649482698d385"
61-
seq_data_1 = {1: SequenceData(prompt_token_ids=[1, 3087, 8970, 338, 263])}
62-
sampling_params_1 = SamplingParams(n=1, best_of=1, presence_penalty=0.0, frequency_penalty=0.0, temperature=0.0, top_p=1.0, top_k=-1, use_beam_search=False, length_penalty=1.0, early_stopping=False, stop=[], ignore_eos=False, max_tokens=7, logprobs=None, skip_special_tokens=True)
63-
64-
seq_group_metadata_lists.append(SequenceGroupMetadata(request_id_1, True, seq_data_1, sampling_params_1, {}))
95+
seq_data_1 = {
96+
1: SequenceData(prompt_token_ids=[1, 3087, 8970, 338, 263])
97+
}
98+
sampling_params_1 = SamplingParams(n=1,
99+
best_of=1,
100+
presence_penalty=0.0,
101+
frequency_penalty=0.0,
102+
temperature=0.0,
103+
top_p=1.0,
104+
top_k=-1,
105+
use_beam_search=False,
106+
length_penalty=1.0,
107+
early_stopping=False,
108+
stop=[],
109+
ignore_eos=False,
110+
max_tokens=7,
111+
logprobs=None,
112+
skip_special_tokens=True)
113+
114+
seq_group_metadata_lists.append(
115+
SequenceGroupMetadata(request_id_1, True, seq_data_1,
116+
sampling_params_1, {}))
65117

66118
output = await self._run_workers_async(
67119
"execute_model",
@@ -72,9 +124,11 @@ async def step_async_multiple(self) -> List[RequestOutput]:
72124
finished_seqs=[],
73125
)
74126

75-
# TODO: change this to real one
76-
return RequestOutput(request_id=request_id_0, prompt="", prompt_token_ids=[1, 3087, 8970, 338, 263], outputs=[], finished=False)
77-
127+
return RequestOutput(request_id=request_id_0,
128+
prompt="",
129+
prompt_token_ids=[1, 3087, 8970, 338, 263],
130+
outputs=[],
131+
finished=False)
78132

79133
async def _run_workers_async(
80134
self,
@@ -106,13 +160,36 @@ async def _run_workers_async(
106160
assert output == other_output
107161
return output
108162

163+
109164
setattr(AsyncLLMEngine, "_engine_class", UglyAsyncLLMEngine)
110165

111166

112167
@pytest.mark.asyncio
113168
async def test_model_execution():
114-
# Let's build an engine_args
115-
engine_args = AsyncEngineArgs(model='/models/vicuna-7b/', tokenizer='/models/vicuna-7b/', tokenizer_mode='auto', trust_remote_code=False, download_dir=None, load_format='dummy', dtype='auto', seed=0, max_model_len=None, worker_use_ray=False, pipeline_parallel_size=1, tensor_parallel_size=1, block_size=16, swap_space=16, gpu_memory_utilization=0.9, max_num_batched_tokens=None, max_num_seqs=256, disable_log_stats=False, revision=None, tokenizer_revision=None, quantization=None, engine_use_ray=False, disable_log_requests=True, max_log_len=None)
169+
# Let's build an engine_args
170+
engine_args = AsyncEngineArgs(model=MODEL_PATH,
171+
tokenizer=MODEL_PATH,
172+
tokenizer_mode='auto',
173+
trust_remote_code=False,
174+
download_dir=None,
175+
dtype='auto',
176+
seed=0,
177+
max_model_len=None,
178+
worker_use_ray=False,
179+
pipeline_parallel_size=1,
180+
tensor_parallel_size=1,
181+
block_size=16,
182+
swap_space=16,
183+
gpu_memory_utilization=0.9,
184+
max_num_batched_tokens=None,
185+
max_num_seqs=256,
186+
disable_log_stats=False,
187+
revision=None,
188+
tokenizer_revision=None,
189+
quantization=None,
190+
engine_use_ray=False,
191+
disable_log_requests=True,
192+
max_log_len=None)
116193
# Start the engine
117194
engine = AsyncLLMEngine.from_engine_args(engine_args)
118195

@@ -121,7 +198,3 @@ async def test_model_execution():
121198
await engine.engine.step_async()
122199
# Now let's try something difficult
123200
await engine.engine.step_async_multiple()
124-
125-
126-
127-

tests/upper_frontends/test_start_async_llm_engine.py

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,46 @@
1-
"""Try sending a mocked request to the underlying model execute stage"""
1+
"""Try start the AsyncLLMEngine"""
22

33
from vllm.engine.async_llm_engine import AsyncLLMEngine
44
from vllm.engine.arg_utils import AsyncEngineArgs
55
import pytest
66
import asyncio
77

88
# This is the model to load for workers
9-
MODEL_PATH="/models/vicuna-7b/"
10-
11-
9+
MODEL_PATH = "YOUR_MODEL_PATH"
1210
"""
1311
1. Test to start a AsyncLLMEngine, to ensure that all goes well before start serving.
1412
"""
1513

14+
1615
@pytest.mark.asyncio
1716
async def test_model_execution():
18-
# Let's build an engine_args
19-
engine_args = AsyncEngineArgs(model='/models/vicuna-7b/', tokenizer='/models/vicuna-7b/', tokenizer_mode='auto', trust_remote_code=False, download_dir=None, load_format='auto', dtype='auto', seed=0, max_model_len=None, worker_use_ray=False, pipeline_parallel_size=1, tensor_parallel_size=1, block_size=16, swap_space=16, gpu_memory_utilization=0.9, max_num_batched_tokens=None, max_num_seqs=256, disable_log_stats=False, revision=None, tokenizer_revision=None, quantization=None, engine_use_ray=False, disable_log_requests=True, max_log_len=None)
17+
# Let's build an engine_args
18+
engine_args = AsyncEngineArgs(model=MODEL_PATH,
19+
tokenizer=MODEL_PATH,
20+
tokenizer_mode='auto',
21+
trust_remote_code=False,
22+
download_dir=None,
23+
load_format='auto',
24+
dtype='auto',
25+
seed=0,
26+
max_model_len=None,
27+
worker_use_ray=False,
28+
pipeline_parallel_size=1,
29+
tensor_parallel_size=1,
30+
block_size=16,
31+
swap_space=16,
32+
gpu_memory_utilization=0.9,
33+
max_num_batched_tokens=None,
34+
max_num_seqs=256,
35+
disable_log_stats=False,
36+
revision=None,
37+
tokenizer_revision=None,
38+
quantization=None,
39+
engine_use_ray=False,
40+
disable_log_requests=True,
41+
max_log_len=None)
2042
# Start the engine
2143
engine = AsyncLLMEngine.from_engine_args(engine_args)
2244

2345
engine.start_background_loop()
2446
await asyncio.sleep(5)
25-
26-

vllm/core/scheduler.py

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -64,20 +64,19 @@ def __init__(
6464
cache_config: CacheConfig,
6565
) -> None:
6666
self.scheduler_config = scheduler_config
67-
#self.cache_config = cache_config
67+
self.cache_config = cache_config
6868

6969
self.prompt_limit = min(self.scheduler_config.max_model_len,
7070
self.scheduler_config.max_num_batched_tokens)
7171

7272
# Instantiate the scheduling policy.
7373
self.policy = PolicyFactory.get_policy(policy_name="fcfs")
7474
# Create the block space manager.
75-
# CO(gc): disable the block_manager
76-
# self.block_manager = BlockSpaceManager(
77-
# block_size=self.cache_config.block_size,
78-
# num_gpu_blocks=self.cache_config.num_gpu_blocks,
79-
# num_cpu_blocks=self.cache_config.num_cpu_blocks,
80-
# sliding_window=self.cache_config.sliding_window)
75+
self.block_manager = BlockSpaceManager(
76+
block_size=self.cache_config.block_size,
77+
num_gpu_blocks=self.cache_config.num_gpu_blocks,
78+
num_cpu_blocks=self.cache_config.num_cpu_blocks,
79+
sliding_window=self.cache_config.sliding_window)
8180

8281
# TODO(zhuohan): Use deque instead of list for better performance.
8382
# Sequence groups in the WAITING state.
@@ -188,6 +187,8 @@ def _schedule(self) -> SchedulerOutputs:
188187
blocks_to_swap_out=blocks_to_swap_out,
189188
blocks_to_copy=blocks_to_copy,
190189
ignored_seq_groups=ignored_seq_groups,
190+
# Co(gc): not used
191+
finished_seqs=[],
191192
)
192193
return scheduler_outputs
193194

@@ -260,6 +261,8 @@ def _schedule(self) -> SchedulerOutputs:
260261
blocks_to_swap_out=blocks_to_swap_out,
261262
blocks_to_copy=blocks_to_copy,
262263
ignored_seq_groups=[],
264+
# Co(gc): not used
265+
finished_seqs=[],
263266
)
264267
return scheduler_outputs
265268

@@ -398,16 +401,24 @@ def _swap_out(
398401
seq.status = SequenceStatus.SWAPPED
399402

400403

401-
402-
403404
class FixedWindowScheduler:
404405

405406
def __init__(
406407
self,
407408
scheduler_config: SchedulerConfig,
408409
cache_config: CacheConfig,
409410
) -> None:
411+
"""
412+
Co(gc): A fixed window scheduler, requests sent limit totally be controlled by SchedulerConfig
413+
We disable the block_manager in this class which results in that we cannot know if a request will
414+
cause OOM in the backend worker.
415+
To enable this, we will need the support of a PageTable, which then need to implement relevant
416+
CUDA functions in ./scrc/cache_kernels.cu using oneAPI? So here's a TODO for you
417+
TODO: Write a block manager so that we can have fine-grained batch control
418+
419+
"""
410420
self.scheduler_config = scheduler_config
421+
# Co(gc): disable the cache_config as we are not using it
411422
#self.cache_config = cache_config
412423

413424
self.prompt_limit = min(self.scheduler_config.max_model_len,
@@ -416,11 +427,14 @@ def __init__(
416427
# Instantiate the scheduling policy.
417428
self.policy = PolicyFactory.get_policy(policy_name="fcfs")
418429

430+
# Co(gc): disable the block manager
431+
419432
# Sequence groups in the WAITING state.
420433
self.waiting: List[SequenceGroup] = []
421434
# Sequence groups in the RUNNING state.
422435
self.running: List[SequenceGroup] = []
423436
self.cleaned: List[int] = []
437+
# Co(gc): We no longer have the swapped space as we are not deciding which to swap
424438

425439
def add_seq_group(self, seq_group: SequenceGroup) -> None:
426440
# Add sequence groups to the waiting queue.
@@ -461,7 +475,7 @@ def _schedule(self) -> SchedulerOutputs:
461475
ignored_seq_groups: List[SequenceGroup] = []
462476
scheduled: List[SequenceGroup] = []
463477
finished_seqs: List[int] = self.cleaned.copy()
464-
self.cleaned=[]
478+
self.cleaned = []
465479
# The total number of sequences on the fly, including the
466480
# requests in the generation phase.
467481
num_curr_seqs = sum(seq_group.get_max_num_running_seqs()
@@ -489,6 +503,7 @@ def _schedule(self) -> SchedulerOutputs:
489503
self.waiting.pop(0)
490504
continue
491505

506+
# TODO(gc): If you can manage to make block_manager work, then this will be fine.
492507
# If the sequence group cannot be allocated, stop.
493508
# if not self.block_manager.can_allocate(seq_group):
494509
# break
@@ -508,14 +523,13 @@ def _schedule(self) -> SchedulerOutputs:
508523
seq_group = self.waiting.pop(0)
509524
for seq in seq_group.get_seqs():
510525
seq.status = SequenceStatus.RUNNING
526+
#TODO(gc): sames here
511527
#self._allocate(seq_group)
512528
self.running.append(seq_group)
513529
num_batched_tokens += num_prompt_tokens
514530
num_curr_seqs += num_new_seqs
515531
scheduled.append(seq_group)
516532

517-
print("We have waited sequence_groups")
518-
519533
scheduler_outputs = SchedulerOutputs(
520534
scheduled_seq_groups=scheduled,
521535
prompt_run=True,
@@ -561,9 +575,6 @@ def schedule(self) -> Tuple[List[SequenceGroupMetadata], SchedulerOutputs]:
561575
for seq_group in scheduler_outputs.scheduled_seq_groups:
562576
seq_data: Dict[int, List[SequenceData]] = {}
563577
block_tables: Dict[int, List[int]] = {}
564-
print("Here we print the length of the seq_groups")
565-
print(len(seq_group.get_seqs()))
566-
print("The following sequences are scheduled")
567578
for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
568579
seq_id = seq.seq_id
569580
seq_data[seq_id] = seq.data
@@ -576,7 +587,6 @@ def schedule(self) -> Tuple[List[SequenceGroupMetadata], SchedulerOutputs]:
576587
sampling_params=seq_group.sampling_params,
577588
block_tables=block_tables,
578589
)
579-
print(seq_group_metadata.seq_data.keys())
580590
seq_group_metadata_list.append(seq_group_metadata)
581591
return seq_group_metadata_list, scheduler_outputs
582592

@@ -588,10 +598,11 @@ def free_seq(self, seq: Sequence) -> None:
588598
self.cleaned.append(seq.seq_id)
589599

590600
def free_finished_seq_groups(self) -> None:
591-
for seq_group in self.running:
592-
if seq_group.is_finished():
593-
print("A finished seq_group")
594-
print(seq_group)
601+
# Co(gc): just some debug statements
602+
# for seq_group in self.running:
603+
# if seq_group.is_finished():
604+
# print("A finished seq_group")
605+
# print(seq_group)
595606
self.running = [
596607
seq_group for seq_group in self.running
597608
if not seq_group.is_finished()

vllm/engine/arg_utils.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,6 @@ def create_engine_configs(
185185
self.dtype, self.seed, self.revision,
186186
self.tokenizer_revision, self.max_model_len,
187187
self.quantization)
188-
# gc-TODO: disable cache_config later
189188
cache_config = CacheConfig(
190189
self.block_size, self.gpu_memory_utilization, self.swap_space,
191190
getattr(model_config.hf_config, 'sliding_window', None))

0 commit comments

Comments
 (0)