Skip to content

Commit 0c28b6a

Browse files
committed
Determine number of tasks on per-process basis
1 parent cb4bf2e commit 0c28b6a

File tree

3 files changed

+49
-13
lines changed

3 files changed

+49
-13
lines changed

src/guidellm/scheduler/scheduler.py

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -189,9 +189,6 @@ async def _start_processes(
189189
maxsize=scheduling_strategy.queued_requests_limit
190190
)
191191
responses_queue = manager.Queue()
192-
per_process_requests_limit = scheduling_strategy.processing_requests_limit // (
193-
scheduling_strategy.processes_limit
194-
)
195192

196193
futures = []
197194
loop = asyncio.get_event_loop()
@@ -207,16 +204,17 @@ async def _start_processes(
207204
)
208205
)
209206
elif scheduling_strategy.processing_mode == "async":
210-
futures.append(
211-
loop.run_in_executor(
212-
executor,
213-
self.worker.process_loop_asynchronous,
214-
requests_queue,
215-
responses_queue,
216-
per_process_requests_limit,
217-
process_id,
207+
if scheduling_strategy.process_requests_limits[process_id]:
208+
futures.append(
209+
loop.run_in_executor(
210+
executor,
211+
self.worker.process_loop_asynchronous,
212+
requests_queue,
213+
responses_queue,
214+
scheduling_strategy.process_requests_limits[process_id],
215+
process_id,
216+
)
218217
)
219-
)
220218
else:
221219
raise ValueError(
222220
f"Invalid processing mode: {scheduling_strategy.processing_mode} "

src/guidellm/scheduler/strategy.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import time
55
from typing import (
66
Generator,
7+
List,
78
Literal,
89
Optional,
910
Union,
@@ -94,6 +95,23 @@ def processing_requests_limit(self) -> int:
9495
"""
9596
return settings.max_concurrency
9697

98+
@property
99+
def process_requests_limits(self) -> List[int]:
100+
"""
101+
The maximum number of requests per process for the scheduling strategy.
102+
It determines how many requests can be processed by each worker process
103+
for the scheduling strategy.
104+
105+
:return: A per-process list of the maximum number of requests per process.
106+
"""
107+
split = self.processing_requests_limit // self.processes_limit
108+
remain = self.processing_requests_limit % self.processes_limit
109+
110+
return [
111+
split + 1 if i < remain else split
112+
for i in range(self.processes_limit)
113+
]
114+
97115
def request_times(self) -> Generator[float, None, None]:
98116
"""
99117
A generator that yields timestamps for when requests should be sent.
@@ -168,6 +186,18 @@ def processing_requests_limit(self) -> int:
168186
"""
169187
return 1
170188

189+
@property
190+
def process_requests_limits(self) -> List[int]:
191+
"""
192+
The maximum number of requests per process for the scheduling strategy.
193+
It determines how many requests can be processed by each worker process
194+
for the scheduling strategy.
195+
196+
:return: A per-process list of the maximum number of requests per process.
197+
"""
198+
199+
return [1]
200+
171201
def request_times(self) -> Generator[float, None, None]:
172202
"""
173203
A generator that yields time.time() so requests are sent immediately,

src/guidellm/scheduler/worker.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,15 @@ def process_loop_asynchronous(
226226
process_id: int,
227227
):
228228
async def _process_runner():
229-
pending = asyncio.Semaphore(max_concurrency) if max_concurrency else None
229+
if max_concurrency is not None:
230+
if max_concurrency < 1:
231+
raise ValueError(
232+
f"max_concurrency must be greater than 0, got {max_concurrency}"
233+
)
234+
235+
pending = asyncio.Semaphore(max_concurrency)
236+
else:
237+
pending = None
230238

231239
while (
232240
process_request := await self.get_request(requests_queue)

0 commit comments

Comments
 (0)