Skip to content

Commit 42dd326

Browse files
committed
Enable temp testing script to work, refactor strategy to be more generic with how scheduler uses them
1 parent 4b6e9f8 commit 42dd326

File tree

4 files changed

+410
-88
lines changed

4 files changed

+410
-88
lines changed

src/guidellm/executor/profile_generator.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from guidellm.config import settings
99
from guidellm.core import TextGenerationBenchmark, TextGenerationBenchmarkReport
1010
from guidellm.core.serializable import Serializable
11-
from guidellm.scheduler import LoadGenerationMode
1211

1312
__all__ = [
1413
"Profile",
@@ -33,7 +32,7 @@ class Profile(Serializable):
3332
:type args: Optional[Dict[str, Any]]
3433
"""
3534

36-
load_gen_mode: LoadGenerationMode
35+
load_gen_mode: Any
3736
load_gen_rate: Optional[float] = None
3837
args: Dict[str, Any] = Field(default_factory=dict)
3938

@@ -229,7 +228,7 @@ def create_fixed_rate_profile(
229228
:return: The generated profile or None if index is out of range.
230229
:rtype: Optional[Profile]
231230
"""
232-
modes_map: Dict[str, LoadGenerationMode] = {
231+
modes_map: Dict[str, Any] = {
233232
"constant": "constant",
234233
"poisson": "poisson",
235234
}

src/guidellm/scheduler/scheduler.py

Lines changed: 23 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,6 @@
2323

2424
from guidellm.config import settings
2525
from guidellm.scheduler.strategy import (
26-
AsyncConstantStrategy,
27-
AsyncPoissonStrategy,
28-
ConcurrentStrategy,
2926
SchedulingStrategy,
3027
SynchronousStrategy,
3128
ThroughputStrategy,
@@ -398,55 +395,29 @@ async def _start_processes(
398395
multiprocessing.Queue,
399396
multiprocessing.Queue,
400397
]:
401-
cpu_cores = os.cpu_count() or 1
402-
403-
worker_type: Literal["sync", "async"]
404-
requests_queue_limit: Optional[int]
405-
max_concurrency: int
406-
num_processes: int
407-
408-
if isinstance(self._scheduling_strategy, SynchronousStrategy):
409-
worker_type = "sync"
410-
requests_queue_limit = 2
411-
num_processes = 1
412-
max_concurrency = -1
413-
elif isinstance(self._scheduling_strategy, ConcurrentStrategy):
414-
worker_type = "sync"
415-
num_processes = self._scheduling_strategy.streams
416-
requests_queue_limit = (
417-
num_processes * 2
418-
) # add 2 per process to ensure no idling
419-
max_concurrency = -1
420-
elif isinstance(
421-
self._scheduling_strategy,
422-
(ThroughputStrategy, AsyncConstantStrategy, AsyncPoissonStrategy),
423-
):
424-
worker_type = "async"
425-
num_processes = self._num_processes or min(
426-
max(1, cpu_cores - 1), settings.max_worker_processes
427-
)
428-
max_concurrency = (
429-
self._scheduling_strategy.max_concurrency
430-
if isinstance(self._scheduling_strategy, ThroughputStrategy)
431-
else None
432-
) or settings.max_concurrency
433-
requests_queue_limit = (
434-
max_concurrency
435-
+ num_processes # add 1 extra per process to ensure no idling
436-
)
437-
max_concurrency = max_concurrency // num_processes # convert to per process
438-
else:
439-
raise ValueError(
440-
f"Invalid scheduling strategy: {self._scheduling_strategy}"
441-
)
398+
processing_mode = self._scheduling_strategy.processing_mode
442399

443-
requests_queue = manager.Queue(maxsize=requests_queue_limit)
400+
num_processes = self._scheduling_strategy.processes_limit
401+
if num_processes is None:
402+
cpu_cores = os.cpu_count() or 1
403+
num_processes = min(max(1, cpu_cores - 1), settings.max_worker_processes)
404+
405+
num_processing_requests = self._scheduling_strategy.processing_requests_limit
406+
if num_processing_requests is None:
407+
num_processing_requests = settings.max_concurrency
408+
num_processing_requests_per_process = num_processing_requests // num_processes
409+
410+
num_queued_requests = self._scheduling_strategy.queued_requests_limit
411+
if num_queued_requests is None:
412+
num_queued_requests = num_processing_requests + num_processes
413+
414+
requests_queue = manager.Queue(maxsize=num_queued_requests)
444415
responses_queue = manager.Queue()
445416

446417
futures = []
447418
loop = asyncio.get_event_loop()
448419
for process_id in range(num_processes):
449-
if worker_type == "sync":
420+
if processing_mode == "sync":
450421
futures.append(
451422
loop.run_in_executor(
452423
executor,
@@ -456,19 +427,22 @@ async def _start_processes(
456427
process_id,
457428
)
458429
)
459-
elif worker_type == "async":
430+
elif processing_mode == "async":
460431
futures.append(
461432
loop.run_in_executor(
462433
executor,
463434
self._worker_process_async,
464435
requests_queue,
465436
responses_queue,
466-
max_concurrency,
437+
num_processing_requests_per_process,
467438
process_id,
468439
)
469440
)
470441
else:
471-
raise ValueError(f"Invalid worker type: {worker_type}")
442+
raise ValueError(
443+
f"Invalid processing mode: {processing_mode} "
444+
f"for strategy: {self._scheduling_strategy}"
445+
)
472446

473447
await asyncio.sleep(0.1) # give time for processes to start
474448

0 commit comments

Comments
 (0)