Skip to content

--rate-type concurrent CLI parameter is implemented #79

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/guidellm/executor/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class Executor:
:type backend: Backend
:param request_generator: The generator that creates requests for execution.
:type request_generator: RequestGenerator
:param mode: The mode for profile generation (e.g., sweep, synchronous).
:param mode: The mode for profile generation (e.g., sweep, synchronous, concurrent).
:type mode: ProfileGenerationMode
:param rate: The list of rates for load generation, or None.
:type rate: Optional[List[float]]
Expand Down
55 changes: 48 additions & 7 deletions src/guidellm/executor/profile_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
]

ProfileGenerationMode = Literal[
"sweep", "synchronous", "throughput", "constant", "poisson"
"sweep", "synchronous", "throughput", "constant", "poisson", "concurrent"
]


Expand All @@ -34,7 +34,7 @@ class Profile(Serializable):
"""

load_gen_mode: LoadGenerationMode
load_gen_rate: Optional[float] = None
load_gen_rate: Optional[Union[float, int]] = None
args: Dict[str, Any] = Field(default_factory=dict)


Expand All @@ -45,6 +45,7 @@ class ProfileGenerator:
:param mode: The mode for profile generation (e.g., sweep, synchronous).
:type mode: ProfileGenerationMode
:param rate: The rate(s) for load generation; could be a float or list of floats.
In case ``mode`` is concurrent - integer which is the number of streams.
:type rate: Optional[Union[float, Sequence[float]]]
"""

Expand All @@ -61,7 +62,7 @@ def __init__(
logger.error(err)
raise err

self._mode = mode
self._mode: ProfileGenerationMode = mode

if self._mode in ("sweep", "throughput", "synchronous"):
if rate is not None:
Expand All @@ -74,6 +75,7 @@ def __init__(
err = ValueError(f"Rates are required for {self._mode} mode")
logger.error(err)
raise err

self._rates = rate if isinstance(rate, Sequence) else [rate]

for rt in self._rates:
Expand All @@ -96,13 +98,13 @@ def __len__(self) -> int:
if self._mode == "sweep":
return settings.num_sweep_profiles + 2

if self._mode in ("throughput", "synchronous"):
if self._mode in ("throughput", "synchronous", "concurrent"):
return 1

if not self._rates:
if not self.rates:
raise ValueError(f"Rates are required for {self._mode} mode")

return len(self._rates)
return len(self.rates)

@property
def mode(self) -> ProfileGenerationMode:
Expand Down Expand Up @@ -147,7 +149,7 @@ def profile_generation_modes(self) -> Sequence[ProfileGenerationMode]:
settings.num_sweep_profiles
)

if self._mode in ["throughput", "synchronous"]:
if self._mode in ["throughput", "synchronous", "concurrent"]:
return [self._mode]

if self._rates is None:
Expand Down Expand Up @@ -188,6 +190,19 @@ def next(self, current_report: TextGenerationBenchmarkReport) -> Optional[Profil
profile = self.create_synchronous_profile(self.generated_count)
elif self.mode == "throughput":
profile = self.create_throughput_profile(self.generated_count)
elif self.mode == "concurrent":
err = ValueError(
f"Can not create concurrent profile with rate {self.rates}"
)
try:
if not self.rates:
raise err

_rate: int = int(self.rates[0])
except IndexError as error:
logger.error(err)
raise err from error
profile = self.create_concurrent_profile(self.generated_count, _rate)
elif self.mode == "sweep":
profile = self.create_sweep_profile(
self.generated_count,
Expand All @@ -211,6 +226,7 @@ def next(self, current_report: TextGenerationBenchmarkReport) -> Optional[Profil
profile,
self._generated_count,
)

return profile

@staticmethod
Expand All @@ -229,9 +245,11 @@ def create_fixed_rate_profile(
:return: The generated profile or None if index is out of range.
:rtype: Optional[Profile]
"""

modes_map: Dict[str, LoadGenerationMode] = {
"constant": "constant",
"poisson": "poisson",
"concurrent": "concurrent",
}

if mode not in modes_map:
Expand Down Expand Up @@ -348,3 +366,26 @@ def create_sweep_profile(
else 1.0 # the fallback value
),
)

@staticmethod
def create_concurrent_profile(index: int, rate: int) -> Optional[Profile]:
"""
Creates a profile with concurrent constant mode.

:param index: The index of the profile to create.
:type index: int
:return: The generated profile or None if index is out of range.
:rtype: Optional[Profile]
"""

profile = (
Profile(
load_gen_mode="concurrent",
load_gen_rate=rate,
)
if index < 1
else None
)
logger.debug("Created concurrent profile: {}", profile)

return profile
2 changes: 1 addition & 1 deletion src/guidellm/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def generate_benchmark_report(
backend=backend_inst,
request_generator=request_generator,
mode=rate_type,
rate=rate if rate_type in ("constant", "poisson") else None,
rate=rate if rate_type in ("constant", "poisson", "concurrent") else None,
max_number=(
len(request_generator) if max_requests == "dataset" else max_requests
),
Expand Down
58 changes: 34 additions & 24 deletions src/guidellm/scheduler/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import math
import time
from dataclasses import dataclass
from typing import AsyncGenerator, Literal, Optional, Union, get_args
from typing import AsyncGenerator, Literal, Optional, Union, get_args, Generator

from loguru import logger

Expand Down Expand Up @@ -109,17 +109,17 @@ def __init__(
logger.error(err)
raise err

if mode in ["constant", "poisson"] and not rate:
if mode in ["constant", "poisson", "concurrent"] and not rate:
err = ValueError(f"Rate must be > 0 for mode: {mode}. Given: {rate}")
logger.error(err)
raise err

self._generator = generator
self._worker = worker
self._mode = mode
self._rate = rate
self._max_number = max_number
self._max_duration = max_duration
self._mode: LoadGenerationMode = mode
self._rate: Optional[float] = rate
self._max_number: Optional[int] = max_number
self._max_duration: Optional[float] = max_duration

self._load_generator = LoadGenerator(mode, rate)

Expand Down Expand Up @@ -227,9 +227,7 @@ async def run(self) -> AsyncGenerator[SchedulerResult, None]:
count_total = (
self.max_number
if self.max_number
else round(self.max_duration)
if self.max_duration
else 0
else round(self.max_duration) if self.max_duration else 0
)

# yield initial result for progress tracking
Expand All @@ -246,9 +244,7 @@ async def run(self) -> AsyncGenerator[SchedulerResult, None]:
count_completed = (
min(run_count, self.max_number)
if self.max_number
else round(time.time() - start_time)
if self.max_duration
else 0
else round(time.time() - start_time) if self.max_duration else 0
)

yield SchedulerResult(
Expand All @@ -267,9 +263,7 @@ async def run(self) -> AsyncGenerator[SchedulerResult, None]:
count_completed=(
benchmark.request_count + benchmark.error_count
if self.max_number
else round(time.time() - start_time)
if self.max_duration
else 0
else round(time.time() - start_time) if self.max_duration else 0
),
benchmark=benchmark,
)
Expand Down Expand Up @@ -311,9 +305,7 @@ async def _run_async(
break

logger.debug(
"Running asynchronous request={} at submit_at={}",
request,
submit_at,
"Running asynchronous request={} at submit_at={}", request, submit_at
)

def _completed(_task: asyncio.Task) -> None:
Expand All @@ -326,14 +318,32 @@ def _completed(_task: asyncio.Task) -> None:
logger.debug("Request completed: {}", _res)

benchmark.request_started()
task = asyncio.create_task(
self._submit_task_coroutine(request, submit_at, end_time)
)
task.add_done_callback(_completed)
tasks.append(task)

if self.mode == "concurrent":
if self.rate is None:
raise ValueError(
"Can not use concurrent mode with no rate specified"
)

_tasks: Generator[asyncio.Task, None, None] = (
asyncio.create_task(
self._submit_task_coroutine(request, submit_at, end_time)
)
for _ in range(int(self.rate))
)

for task in _tasks:
task.add_done_callback(_completed)
tasks.append(task)
else:
task = asyncio.create_task(
self._submit_task_coroutine(request, submit_at, end_time)
)
task.add_done_callback(_completed)
tasks.append(task)

# release control to the event loop for other tasks
await asyncio.sleep(0.001)
await asyncio.sleep(0)

for compl_task in asyncio.as_completed(tasks):
task_res = await compl_task
Expand Down
11 changes: 8 additions & 3 deletions src/guidellm/scheduler/load_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

__all__ = ["LoadGenerationMode", "LoadGenerator"]

LoadGenerationMode = Literal["synchronous", "constant", "poisson", "throughput"]
LoadGenerationMode = Literal[
"synchronous", "constant", "poisson", "throughput", "concurrent"
]


class LoadGenerator:
Expand Down Expand Up @@ -52,8 +54,9 @@ def __init__(self, mode: LoadGenerationMode, rate: Optional[float] = None):
logger.error(error)
raise error

self._mode = mode
self._rate = rate
self._mode: LoadGenerationMode = mode
self._rate: Optional[float] = rate

logger.debug(
"Initialized LoadGenerator with mode: {mode}, rate: {rate}",
mode=mode,
Expand Down Expand Up @@ -100,6 +103,8 @@ def times(self) -> Generator[float, None, None]:
yield from self.poisson_times()
elif self._mode == "synchronous":
yield from self.synchronous_times()
elif self._mode == "concurrent":
yield from self.throughput_times()
else:
logger.error(f"Invalid mode encountered: {self._mode}")
raise ValueError(f"Invalid mode: {self._mode}")
Expand Down
18 changes: 18 additions & 0 deletions tests/unit/executor/test_profile_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def test_profile_generator_mode():
"throughput",
"constant",
"poisson",
"concurrent",
}


Expand All @@ -41,6 +42,7 @@ def test_profile_instantiation():
("constant", [10, 20, 30]),
("poisson", 10),
("poisson", [10, 20, 30]),
("concurrent", 2),
],
)
def test_profile_generator_instantiation(mode, rate):
Expand Down Expand Up @@ -93,6 +95,8 @@ def test_profile_generator_instantiation(mode, rate):
("poisson", None),
("poisson", -1),
("poisson", 0),
("concurrent", 0),
("concurrent", -1),
],
)
def test_profile_generator_invalid_instantiation(mode, rate):
Expand Down Expand Up @@ -158,6 +162,20 @@ def test_profile_generator_next_throughput():
assert generator.next(current_report) is None


@pytest.mark.sanity()
def test_profile_generator_next_concurrent():
generator = ProfileGenerator(mode="concurrent", rate=2.0)
current_report = TextGenerationBenchmarkReport()

profile: Profile = generator.next(current_report) # type: ignore
assert profile.load_gen_mode == "concurrent"
assert profile.load_gen_rate == 2
assert generator.generated_count == 1

for _ in range(3):
assert generator.next(current_report) is None


@pytest.mark.sanity()
@pytest.mark.parametrize(
"rate",
Expand Down
4 changes: 4 additions & 0 deletions tests/unit/scheduler/test_load_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def test_load_generator_mode():
"constant",
"poisson",
"throughput",
"concurrent",
}


Expand All @@ -25,6 +26,7 @@ def test_load_generator_mode():
("poisson", 5),
("throughput", None),
("synchronous", None),
("concurrent", 3),
],
)
def test_load_generator_instantiation(mode, rate):
Expand All @@ -40,6 +42,8 @@ def test_load_generator_instantiation(mode, rate):
("invalid_mode", None, ValueError),
("constant", 0, ValueError),
("poisson", -1, ValueError),
("concurrent", -1, ValueError),
("concurrent", 0, ValueError),
],
)
def test_load_generator_invalid_instantiation(mode, rate, expected_error):
Expand Down
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ env_list = py38,py39,py310,py311,py312

[testenv]
description = Run all tests
usedevelop = true
deps =
.[dev]
commands =
Expand Down
Loading