Skip to content

Concurrent load generation option is implemented #71

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,10 @@ GuideLLM provides various CLI and environment options to customize evaluations,

Some typical configurations for the CLI include:

- `--rate-type`: The rate to use for benchmarking. Options include `sweep`, `synchronous`, `throughput`, `constant`, and `poisson`.
- `--rate-type`: The rate to use for benchmarking. Options include `sweep`, `synchronous`, `concurrent`, `throughput`, `constant`, and `poisson`.
- `--rate-type sweep`: (default) Sweep runs through the full range of the server's performance, starting with a `synchronous` rate, then `throughput`, and finally, 10 `constant` rates between the min and max request rate found.
- `--rate-type synchronous`: Synchronous runs requests synchronously, one after the other.
- `--rate-type concurrent`: Concurrent runs requests concurrently in multiple threads. One request per thread. Number of threads is specified with `--rate` argument.
- `--rate-type throughput`: Throughput runs requests in a throughput manner, sending requests as fast as possible.
- `--rate-type constant`: Constant runs requests at a constant rate. Specify the request rate per second with the `--rate` argument. For example, `--rate 10` or multiple rates with `--rate 10 --rate 20 --rate 30`.
- `--rate-type poisson`: Poisson draws from a Poisson distribution with the mean at the specified rate, adding some real-world variance to the runs. Specify the request rate per second with the `--rate` argument. For example, `--rate 10` or multiple rates with `--rate 10 --rate 20 --rate 30`.
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ ignore = [
"TCH002",
"PLW1514", # allow Path.open without encoding
"RET505", # allow `else` blocks
"RET506" # allow `else` blocks
"RET506", # allow `else` blocks
"C901" # allow small if/else complexity

]
select = [
Expand Down
20 changes: 14 additions & 6 deletions src/guidellm/executor/profile_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
]

ProfileGenerationMode = Literal[
"sweep", "synchronous", "throughput", "constant", "poisson"
"sweep",
"synchronous",
"throughput",
"constant",
"poisson",
"concurrent",
]


Expand Down Expand Up @@ -61,7 +66,7 @@ def __init__(
logger.error(err)
raise err

self._mode = mode
self._mode: ProfileGenerationMode = mode

if self._mode in ("sweep", "throughput", "synchronous"):
if rate is not None:
Expand Down Expand Up @@ -135,7 +140,7 @@ def generated_count(self) -> int:
return self._generated_count

@property
def profile_generation_modes(self) -> Sequence[ProfileGenerationMode]:
def profile_generation_modes(self) -> List[ProfileGenerationMode]:
"""
Return the list of profile modes to be run in the report.

Expand All @@ -147,7 +152,8 @@ def profile_generation_modes(self) -> Sequence[ProfileGenerationMode]:
settings.num_sweep_profiles
)

if self._mode in ["throughput", "synchronous"]:
# WIP: think about moving this concurrent above
if self._mode in ["throughput", "synchronous", "concurrent"]:
return [self._mode]

if self._rates is None:
Expand All @@ -173,13 +179,13 @@ def next(self, current_report: TextGenerationBenchmarkReport) -> Optional[Profil
current_report,
)

if self.mode in ["constant", "poisson"]:
if self.mode in ("constant", "poisson", "concurrent"):
if not self.rates:
err = ValueError(f"Rates are required for {self.mode} mode")
logger.error(err)
raise err

profile = self.create_fixed_rate_profile(
profile: Optional[Profile] = self.create_fixed_rate_profile(
self.generated_count,
self.mode,
self.rates,
Expand Down Expand Up @@ -229,9 +235,11 @@ def create_fixed_rate_profile(
:return: The generated profile or None if index is out of range.
:rtype: Optional[Profile]
"""

modes_map: Dict[str, LoadGenerationMode] = {
"constant": "constant",
"poisson": "poisson",
"concurrent": "consistent",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any rationale for remapping this to consistent rather than keeping it as concurrent through the entire codebase?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TLDR:

Makes sense. Will change it to concurrent

Detailed

Well, I would say that consistent is related to the LoadGenerationMode, which differs from the ProfileGenerationMode, which includes the concurrent mode.
This LoadGeneration will produce a new request immediately, so I called this consistent. Like we produce consistent load generation.

}

if mode not in modes_map:
Expand Down
2 changes: 1 addition & 1 deletion src/guidellm/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def generate_benchmark_report(
backend=backend_inst,
request_generator=request_generator,
mode=rate_type,
rate=rate if rate_type in ("constant", "poisson") else None,
rate=rate if rate_type in ("constant", "poisson", "concurrent") else None,
max_number=(
len(request_generator) if max_requests == "dataset" else max_requests
),
Expand Down
123 changes: 79 additions & 44 deletions src/guidellm/scheduler/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ def __init__(
logger.error(err)
raise err

self._generator = generator
self._worker = worker
self._mode = mode
self._rate = rate
self._max_number = max_number
self._max_duration = max_duration
self._generator: RequestGenerator = generator
self._worker: Backend = worker
self._mode: LoadGenerationMode = mode
self._rate: Optional[float] = rate
self._max_number: Optional[int] = max_number
self._max_duration: Optional[float] = max_duration

self._load_generator = LoadGenerator(mode, rate)

Expand Down Expand Up @@ -227,9 +227,7 @@ async def run(self) -> AsyncGenerator[SchedulerResult, None]:
count_total = (
self.max_number
if self.max_number
else round(self.max_duration)
if self.max_duration
else 0
else round(self.max_duration) if self.max_duration else 0
)

# yield initial result for progress tracking
Expand All @@ -246,9 +244,7 @@ async def run(self) -> AsyncGenerator[SchedulerResult, None]:
count_completed = (
min(run_count, self.max_number)
if self.max_number
else round(time.time() - start_time)
if self.max_duration
else 0
else round(time.time() - start_time) if self.max_duration else 0
)

yield SchedulerResult(
Expand All @@ -267,16 +263,16 @@ async def run(self) -> AsyncGenerator[SchedulerResult, None]:
count_completed=(
benchmark.request_count + benchmark.error_count
if self.max_number
else round(time.time() - start_time)
if self.max_duration
else 0
else round(time.time() - start_time) if self.max_duration else 0
),
benchmark=benchmark,
)

async def _run_sync(
self, benchmark: TextGenerationBenchmark, end_time: float, max_number: float
) -> AsyncGenerator[Union[TextGenerationResult, TextGenerationError], None]:
"""Runs only for "synchronous" mode."""

for index, (request, submit_at) in enumerate(
zip(self.generator, self.load_generator.times())
):
Expand All @@ -298,42 +294,81 @@ async def _run_sync(
async def _run_async(
self, benchmark: TextGenerationBenchmark, end_time: float, max_number: float
) -> AsyncGenerator[Union[TextGenerationResult, TextGenerationError], None]:
"""
Notes:
if the Load Generation Mode is set to 'consistent' - timestamps should
not be generated in order to make as many requests as possible to
simulate concurrent clients interaction.
"""

tasks = []
completed = 0

for index, (request, submit_at) in enumerate(
zip(self.generator, self.load_generator.times())
):
while (index + 1 - completed) >= settings.max_concurrency:
await asyncio.sleep(0.1)
def _completed(_task: asyncio.Task) -> None:
nonlocal completed
completed += 1
_res = _task.result()

if index >= max_number or time.time() >= end_time or submit_at >= end_time:
break
if _res:
benchmark.request_completed(_res)
logger.debug("Request completed: {}", _res)

logger.debug(
"Running asynchronous request={} at submit_at={}",
request,
submit_at,
)

def _completed(_task: asyncio.Task) -> None:
nonlocal completed
completed += 1
_res = _task.result()

if _res:
benchmark.request_completed(_res)
logger.debug("Request completed: {}", _res)
if self.mode == "consistent":
if self.rate is None:
raise ValueError(
"The `rate` must be specified in order to provide "
"the concurrent execution"
)
for index, request in enumerate(self.generator):
while (index + 1 - completed) >= settings.max_concurrency:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we ignore the max_concurrency setting in this case / raise an error if the concurrent requests number is smaller than the rate type we need to run at?

Additionally, our check should be on the rate here and we would only allow a new request if we're below our rate or have passed our time / request count restrictions, right?

await asyncio.sleep(0.1)

if index >= max_number or time.time() >= end_time:
break

logger.debug(f"Running concurrently request={request}")

benchmark.request_started()

# Create multiple concurrent tasks
tasks: list[asyncio.Task] = []
for _ in range(int(self.rate)):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not following how this is going to keep the concurrent request count fixed. Due to the outer loop, this would always just max out around max concurrency. I say around, because this inner for loop enables it to go above the max concurrency by appending N=rate new requests before checking max concurrency again

task: asyncio.Task = asyncio.create_task(
self._submit_task_coroutine( # submit the call with 'Backend'
request=request, submit_at=0.0, end_time=end_time
)
)
task.add_done_callback(_completed)
tasks.append(task)
else:
for index, (request, submit_at) in enumerate(
zip(self.generator, self.load_generator.times())
):
while (index + 1 - completed) >= settings.max_concurrency:
await asyncio.sleep(0.1)

if (
index >= max_number
or time.time() >= end_time
or submit_at >= end_time
):
break

logger.debug(
"Running asynchronous request={} at submit_at={}",
request,
submit_at,
)

benchmark.request_started()
task = asyncio.create_task(
self._submit_task_coroutine(request, submit_at, end_time)
)
task.add_done_callback(_completed)
tasks.append(task)
benchmark.request_started()
task = asyncio.create_task(
self._submit_task_coroutine(request, submit_at, end_time)
)
task.add_done_callback(_completed)
tasks.append(task)

# release control to the event loop for other tasks
await asyncio.sleep(0.001)
# release control to the event loop for other tasks
await asyncio.sleep(0.001)

for compl_task in asyncio.as_completed(tasks):
task_res = await compl_task
Expand Down
10 changes: 6 additions & 4 deletions src/guidellm/scheduler/load_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

__all__ = ["LoadGenerationMode", "LoadGenerator"]

LoadGenerationMode = Literal["synchronous", "constant", "poisson", "throughput"]
LoadGenerationMode = Literal[
"synchronous", "constant", "poisson", "throughput", "consistent"
]


class LoadGenerator:
Expand All @@ -18,7 +20,7 @@ class LoadGenerator:
timestamps based on the rate provided during initialization.

:param mode: The mode of load generation. Valid options are "constant",
"poisson", "throughput", and "synchronous".
"poisson", "throughput", and "synchronous", "consistent"
:type mode: LoadGenerationMode
:param rate: The rate at which to generate timestamps. This value is
interpreted differently depending on the mode.
Expand Down Expand Up @@ -52,8 +54,8 @@ def __init__(self, mode: LoadGenerationMode, rate: Optional[float] = None):
logger.error(error)
raise error

self._mode = mode
self._rate = rate
self._mode: LoadGenerationMode = mode
self._rate: Optional[float] = rate
logger.debug(
"Initialized LoadGenerator with mode: {mode}, rate: {rate}",
mode=mode,
Expand Down
4 changes: 3 additions & 1 deletion tests/unit/scheduler/test_load_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,19 @@ def test_load_generator_mode():
"constant",
"poisson",
"throughput",
"consistent",
}


@pytest.mark.smoke()
@pytest.mark.parametrize(
("mode", "rate"),
[
("synchronous", None),
("constant", 10),
("poisson", 5),
("throughput", None),
("synchronous", None),
("consistent", 2),
],
)
def test_load_generator_instantiation(mode, rate):
Expand Down
Loading