-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Serve] Stop @serve.batch
's batch handler when event loop closes
#42911
[Serve] Stop @serve.batch
's batch handler when event loop closes
#42911
Conversation
Signed-off-by: Shreyas Krishnaswamy <shrekris@anyscale.com>
This is a cleaned version of #42895. |
Signed-off-by: Shreyas Krishnaswamy <shrekris@anyscale.com>
Signed-off-by: Shreyas Krishnaswamy <shrekris@anyscale.com>
Signed-off-by: Shreyas Krishnaswamy <shrekris@anyscale.com>
Signed-off-by: Shreyas Krishnaswamy <shrekris@anyscale.com>
try: | ||
self.curr_iteration_start_time = time.time() | ||
await self._process_batch(func) | ||
except Exception: | ||
logger.exception( | ||
"_process_batches asyncio task ran into an " "unexpected exception." | ||
"_process_batches asyncio task ran into an unexpected exception." | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you try adding a log statement when the loop closes (and therefore we exit the while loop) and see if it's logged on every replica shutdown?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strangely, it doesn't get logged. I changed the function to:
Modified _process_batches
async def _process_batches(self, func: Callable) -> None:
"""Loops infinitely and processes queued request batches."""
while not self._loop.is_closed():
logger.warning("Starting a while loop iteration.")
try:
self.curr_iteration_start_time = time.time()
await self._process_batch(func)
logger.warning("Finished a while loop iteration.")
except Exception:
logger.exception(
"_process_batches asyncio task ran into an unexpected exception."
)
logger.warning("Do you see me?")
I used this app:
Serve app
# File name: try.py
from ray import serve
@serve.deployment(graceful_shutdown_timeout_s=0.1)
class GetList:
def __init__(self):
self.count = 0
@serve.batch(max_batch_size=2, batch_wait_timeout_s=10)
async def handle_batch(self, requests):
return [0] * len(requests)
async def __call__(self, request):
return await self.handle_batch(request)
app = GetList.bind()
I started the app with serve run try:app
, and then I ran a few requests. Then I ran one request (so the batch wouldn't run), and then killed the app with ctrl-C.
Here's the log file:
replica_default_GetList_qsc3m3uz.log
We don't see either the Finished a while loop iteration.
or Do you see me?
logs. This also strange because yesterday when I ran unit tests with a log after the while loop, it did get printed. Maybe the logs only show up if we start Serve using serve.run()
and the event loop gets killed explicitly (e.g. by pytest
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my experience the asyncio
loop closing behavior is very finnicky/confusing. So it's possible it doesn't get closed and the process just shuts down.
@@ -399,27 +408,29 @@ async def streaming_func(key1, key2): | |||
else: | |||
coros = [func("hi1", "hi2"), func("hi3", "hi4")] | |||
|
|||
tasks = [asyncio.create_task(coro) for coro in coros] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can just inline this above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I inlined it.
assert len(done) == 0 | ||
assert len(pending) == 2 | ||
|
||
print("Requests are blocked, as expected.") | ||
|
||
# Cancel the first request. The second request should still be blocked on | ||
# the long request_timeout | ||
coros[0].close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't even know coroutine.close()
existed...
Signed-off-by: Shreyas Krishnaswamy <shrekris@anyscale.com>
Signed-off-by: Shreyas Krishnaswamy <shrekris@anyscale.com>
@serve.batch
's batch handler when event loop closes
Why are these changes needed?
test_batching.py
is currently flaky:It seems to only be flaky on Python 3.11. They typically fail with a timeout (examples: 1, 2) When I ran the tests locally with Python 3.11, some tests hung with the following error repeated indefinitely:
#42631 wraps the
@serve.batch
logic in an infinite while loop that catches any exception and keeps retrying. This may have made it impossible for some of the unit tests to stop, which causes them to hang and enter timeout.This change makes
@serve.batch
's while loop exit if the handler's event loop closes.Related issue number
Addresses #42777.
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.test_batching
unit test.