Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 25 additions & 13 deletions src/guidellm/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,34 +188,46 @@ async def _start_processes(
maxsize=scheduling_strategy.queued_requests_limit
)
responses_queue = manager.Queue()
per_process_requests_limit = scheduling_strategy.processing_requests_limit // (
scheduling_strategy.processes_limit

num_processes = min(
scheduling_strategy.processes_limit,
scheduling_strategy.processing_requests_limit
)
requests_limit_split = (scheduling_strategy.processing_requests_limit //
scheduling_strategy.processes_limit)
requests_limit_remain = (scheduling_strategy.processing_requests_limit %
scheduling_strategy.processes_limit)
process_ids = (id_ for id_ in range(num_processes))
process_requests_limits = (
requests_limit_split + 1
if i < requests_limit_remain else requests_limit_split
for i in range(num_processes)
)

futures = []
loop = asyncio.get_event_loop()
for process_id in range(scheduling_strategy.processes_limit):
for id_, requests_limit in zip(process_ids, process_requests_limits):
if scheduling_strategy.processing_mode == "sync":
futures.append(
loop.run_in_executor(
executor,
self.worker.process_loop_synchronous,
requests_queue,
responses_queue,
process_id,
id_,
)
)
elif scheduling_strategy.processing_mode == "async":
futures.append(
loop.run_in_executor(
executor,
self.worker.process_loop_asynchronous,
requests_queue,
responses_queue,
per_process_requests_limit,
process_id,
futures.append(
loop.run_in_executor(
executor,
self.worker.process_loop_asynchronous,
requests_queue,
responses_queue,
requests_limit,
id_,
)
)
)
else:
raise ValueError(
f"Invalid processing mode: {scheduling_strategy.processing_mode} "
Expand Down
17 changes: 10 additions & 7 deletions src/guidellm/scheduler/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,24 +220,27 @@ def process_loop_asynchronous(
self,
requests_queue: multiprocessing.Queue,
results_queue: multiprocessing.Queue,
max_concurrency: Optional[int],
max_concurrency: int,
process_id: int,
):
async def _process_runner():
pending = asyncio.Semaphore(max_concurrency) if max_concurrency else None
pending = asyncio.Semaphore(max_concurrency)

if pending.locked():
raise ValueError(
"Async worker called with max_concurrency < 1"
)

while (
process_request := await self.get_request(requests_queue)
) is not None:
dequeued_time = time.time()

if pending:
await pending.acquire()
await pending.acquire()

def _task_done(_: asyncio.Task):
nonlocal pending
if pending:
pending.release()
pending.release()

task = asyncio.create_task(
self.resolve_scheduler_request(
Expand Down Expand Up @@ -325,7 +328,7 @@ def process_loop_asynchronous(
self,
requests_queue: multiprocessing.Queue,
results_queue: multiprocessing.Queue,
max_concurrency: Optional[int],
max_concurrency: int,
process_id: int,
):
asyncio.run(self.backend.validate())
Expand Down