-
Notifications
You must be signed in to change notification settings - Fork 46
Concurrent load generation option is implemented #71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -114,12 +114,12 @@ def __init__( | |
logger.error(err) | ||
raise err | ||
|
||
self._generator = generator | ||
self._worker = worker | ||
self._mode = mode | ||
self._rate = rate | ||
self._max_number = max_number | ||
self._max_duration = max_duration | ||
self._generator: RequestGenerator = generator | ||
self._worker: Backend = worker | ||
self._mode: LoadGenerationMode = mode | ||
self._rate: Optional[float] = rate | ||
self._max_number: Optional[int] = max_number | ||
self._max_duration: Optional[float] = max_duration | ||
|
||
self._load_generator = LoadGenerator(mode, rate) | ||
|
||
|
@@ -227,9 +227,7 @@ async def run(self) -> AsyncGenerator[SchedulerResult, None]: | |
count_total = ( | ||
self.max_number | ||
if self.max_number | ||
else round(self.max_duration) | ||
if self.max_duration | ||
else 0 | ||
else round(self.max_duration) if self.max_duration else 0 | ||
) | ||
|
||
# yield initial result for progress tracking | ||
|
@@ -246,9 +244,7 @@ async def run(self) -> AsyncGenerator[SchedulerResult, None]: | |
count_completed = ( | ||
min(run_count, self.max_number) | ||
if self.max_number | ||
else round(time.time() - start_time) | ||
if self.max_duration | ||
else 0 | ||
else round(time.time() - start_time) if self.max_duration else 0 | ||
) | ||
|
||
yield SchedulerResult( | ||
|
@@ -267,16 +263,16 @@ async def run(self) -> AsyncGenerator[SchedulerResult, None]: | |
count_completed=( | ||
benchmark.request_count + benchmark.error_count | ||
if self.max_number | ||
else round(time.time() - start_time) | ||
if self.max_duration | ||
else 0 | ||
else round(time.time() - start_time) if self.max_duration else 0 | ||
), | ||
benchmark=benchmark, | ||
) | ||
|
||
async def _run_sync( | ||
self, benchmark: TextGenerationBenchmark, end_time: float, max_number: float | ||
) -> AsyncGenerator[Union[TextGenerationResult, TextGenerationError], None]: | ||
"""Runs only for "synchronous" mode.""" | ||
|
||
for index, (request, submit_at) in enumerate( | ||
zip(self.generator, self.load_generator.times()) | ||
): | ||
|
@@ -298,42 +294,81 @@ async def _run_sync( | |
async def _run_async( | ||
self, benchmark: TextGenerationBenchmark, end_time: float, max_number: float | ||
) -> AsyncGenerator[Union[TextGenerationResult, TextGenerationError], None]: | ||
""" | ||
Notes: | ||
if the Load Generation Mode is set to 'consistent' - timestamps should | ||
not be generated in order to make as many requests as possible to | ||
simulate concurrent clients interaction. | ||
""" | ||
|
||
tasks = [] | ||
completed = 0 | ||
|
||
for index, (request, submit_at) in enumerate( | ||
zip(self.generator, self.load_generator.times()) | ||
): | ||
while (index + 1 - completed) >= settings.max_concurrency: | ||
await asyncio.sleep(0.1) | ||
def _completed(_task: asyncio.Task) -> None: | ||
nonlocal completed | ||
completed += 1 | ||
_res = _task.result() | ||
|
||
if index >= max_number or time.time() >= end_time or submit_at >= end_time: | ||
break | ||
if _res: | ||
benchmark.request_completed(_res) | ||
logger.debug("Request completed: {}", _res) | ||
|
||
logger.debug( | ||
"Running asynchronous request={} at submit_at={}", | ||
request, | ||
submit_at, | ||
) | ||
|
||
def _completed(_task: asyncio.Task) -> None: | ||
nonlocal completed | ||
completed += 1 | ||
_res = _task.result() | ||
|
||
if _res: | ||
benchmark.request_completed(_res) | ||
logger.debug("Request completed: {}", _res) | ||
if self.mode == "consistent": | ||
parfeniukink marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if self.rate is None: | ||
raise ValueError( | ||
"The `rate` must be specified in order to provide " | ||
"the concurrent execution" | ||
) | ||
for index, request in enumerate(self.generator): | ||
while (index + 1 - completed) >= settings.max_concurrency: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't we ignore the max_concurrency setting in this case / raise an error if the concurrent requests number is smaller than the rate type we need to run at? Additionally, our check should be on the rate here and we would only allow a new request if we're below our rate or have passed our time / request count restrictions, right? |
||
await asyncio.sleep(0.1) | ||
|
||
if index >= max_number or time.time() >= end_time: | ||
break | ||
|
||
logger.debug(f"Running concurrently request={request}") | ||
|
||
benchmark.request_started() | ||
|
||
# Create multiple concurrent tasks | ||
tasks: list[asyncio.Task] = [] | ||
for _ in range(int(self.rate)): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not following how this is going to keep the concurrent request count fixed. Due to the outer loop, this would always just max out around max concurrency. I say around, because this inner for loop enables it to go above the max concurrency by appending N=rate new requests before checking max concurrency again |
||
task: asyncio.Task = asyncio.create_task( | ||
self._submit_task_coroutine( # submit the call with 'Backend' | ||
request=request, submit_at=0.0, end_time=end_time | ||
) | ||
) | ||
task.add_done_callback(_completed) | ||
tasks.append(task) | ||
else: | ||
for index, (request, submit_at) in enumerate( | ||
zip(self.generator, self.load_generator.times()) | ||
): | ||
while (index + 1 - completed) >= settings.max_concurrency: | ||
await asyncio.sleep(0.1) | ||
|
||
if ( | ||
index >= max_number | ||
or time.time() >= end_time | ||
or submit_at >= end_time | ||
): | ||
break | ||
|
||
logger.debug( | ||
"Running asynchronous request={} at submit_at={}", | ||
request, | ||
submit_at, | ||
) | ||
|
||
benchmark.request_started() | ||
task = asyncio.create_task( | ||
self._submit_task_coroutine(request, submit_at, end_time) | ||
) | ||
task.add_done_callback(_completed) | ||
tasks.append(task) | ||
benchmark.request_started() | ||
task = asyncio.create_task( | ||
self._submit_task_coroutine(request, submit_at, end_time) | ||
) | ||
task.add_done_callback(_completed) | ||
tasks.append(task) | ||
|
||
# release control to the event loop for other tasks | ||
await asyncio.sleep(0.001) | ||
# release control to the event loop for other tasks | ||
await asyncio.sleep(0.001) | ||
|
||
for compl_task in asyncio.as_completed(tasks): | ||
task_res = await compl_task | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any rationale for remapping this to consistent rather than keeping it as concurrent through the entire codebase?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TLDR:
Makes sense. Will change it to
concurrent
Detailed
Well, I would say that
consistent
is related to theLoadGenerationMode
, which differs from theProfileGenerationMode
, which includes theconcurrent
mode.This LoadGeneration will produce a new request immediately, so I called this
consistent
. Like we produce consistent load generation.