Skip to content

Commit ab38935

Browse files
authored
Fix issue if async task count does not evenly divide accross process pool (#120)
If `SchedulingStrategy.processing_requests_limit` does not evenly divide over `SchedulingStrategy.processes_limit` then we will either run too few requests or run "0" requests which will cause and unlimited number of request to be scheduled at a time.
1 parent 3be2b5d commit ab38935

File tree

2 files changed

+35
-20
lines changed

2 files changed

+35
-20
lines changed

src/guidellm/scheduler/scheduler.py

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -188,34 +188,46 @@ async def _start_processes(
188188
maxsize=scheduling_strategy.queued_requests_limit
189189
)
190190
responses_queue = manager.Queue()
191-
per_process_requests_limit = scheduling_strategy.processing_requests_limit // (
192-
scheduling_strategy.processes_limit
191+
192+
num_processes = min(
193+
scheduling_strategy.processes_limit,
194+
scheduling_strategy.processing_requests_limit
195+
)
196+
requests_limit_split = (scheduling_strategy.processing_requests_limit //
197+
scheduling_strategy.processes_limit)
198+
requests_limit_remain = (scheduling_strategy.processing_requests_limit %
199+
scheduling_strategy.processes_limit)
200+
process_ids = (id_ for id_ in range(num_processes))
201+
process_requests_limits = (
202+
requests_limit_split + 1
203+
if i < requests_limit_remain else requests_limit_split
204+
for i in range(num_processes)
193205
)
194206

195207
futures = []
196208
loop = asyncio.get_event_loop()
197-
for process_id in range(scheduling_strategy.processes_limit):
209+
for id_, requests_limit in zip(process_ids, process_requests_limits):
198210
if scheduling_strategy.processing_mode == "sync":
199211
futures.append(
200212
loop.run_in_executor(
201213
executor,
202214
self.worker.process_loop_synchronous,
203215
requests_queue,
204216
responses_queue,
205-
process_id,
217+
id_,
206218
)
207219
)
208220
elif scheduling_strategy.processing_mode == "async":
209-
futures.append(
210-
loop.run_in_executor(
211-
executor,
212-
self.worker.process_loop_asynchronous,
213-
requests_queue,
214-
responses_queue,
215-
per_process_requests_limit,
216-
process_id,
221+
futures.append(
222+
loop.run_in_executor(
223+
executor,
224+
self.worker.process_loop_asynchronous,
225+
requests_queue,
226+
responses_queue,
227+
requests_limit,
228+
id_,
229+
)
217230
)
218-
)
219231
else:
220232
raise ValueError(
221233
f"Invalid processing mode: {scheduling_strategy.processing_mode} "

src/guidellm/scheduler/worker.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -220,24 +220,27 @@ def process_loop_asynchronous(
220220
self,
221221
requests_queue: multiprocessing.Queue,
222222
results_queue: multiprocessing.Queue,
223-
max_concurrency: Optional[int],
223+
max_concurrency: int,
224224
process_id: int,
225225
):
226226
async def _process_runner():
227-
pending = asyncio.Semaphore(max_concurrency) if max_concurrency else None
227+
pending = asyncio.Semaphore(max_concurrency)
228+
229+
if pending.locked():
230+
raise ValueError(
231+
"Async worker called with max_concurrency < 1"
232+
)
228233

229234
while (
230235
process_request := await self.get_request(requests_queue)
231236
) is not None:
232237
dequeued_time = time.time()
233238

234-
if pending:
235-
await pending.acquire()
239+
await pending.acquire()
236240

237241
def _task_done(_: asyncio.Task):
238242
nonlocal pending
239-
if pending:
240-
pending.release()
243+
pending.release()
241244

242245
task = asyncio.create_task(
243246
self.resolve_scheduler_request(
@@ -325,7 +328,7 @@ def process_loop_asynchronous(
325328
self,
326329
requests_queue: multiprocessing.Queue,
327330
results_queue: multiprocessing.Queue,
328-
max_concurrency: Optional[int],
331+
max_concurrency: int,
329332
process_id: int,
330333
):
331334
asyncio.run(self.backend.validate())

0 commit comments

Comments
 (0)