Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve] Stop @serve.batch's batch handler when event loop closes #42911

Merged

Conversation

shrekris-anyscale
Copy link
Contributor

Why are these changes needed?

test_batching.py is currently flaky:

Screenshot 2024-01-31 at 3 27 05 PM

It seems to only be flaky on Python 3.11. They typically fail with a timeout (examples: 1, 2) When I ran the tests locally with Python 3.11, some tests hung with the following error repeated indefinitely:

Traceback (most recent call last):
  File "/Users/shrekris/Desktop/ray/python/ray/serve/batching.py", line 256, in _process_batches
    await self._process_batch(func)
  File "/Users/shrekris/Desktop/ray/python/ray/serve/batching.py", line 265, in _process_batch
    batch: List[_SingleRequest] = await self.wait_for_batch()
                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/shrekris/Desktop/ray/python/ray/serve/batching.py", line 131, in wait_for_batch
    batch.append(await self.queue.get())
                 ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/shrekris/miniforge3/envs/py311/lib/python3.11/asyncio/queues.py", line 155, in get
    getter = self._get_loop().create_future()
             ^^^^^^^^^^^^^^^^
  File "/Users/shrekris/miniforge3/envs/py311/lib/python3.11/asyncio/mixins.py", line 20, in _get_loop
    raise RuntimeError(f'{self!r} is bound to a different event loop')
RuntimeError: <Queue at 0x107097f90 maxsize=0 _getters[1] tasks=2> is bound to a different event loop

#42631 wraps the @serve.batch logic in an infinite while loop that catches any exception and keeps retrying. This may have made it impossible for some of the unit tests to stop, which causes them to hang and enter timeout.

This change makes @serve.batch's while loop exit if the handler's event loop closes.

Related issue number

Addresses #42777.

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
      • This change deflakes the test_batching unit test.

Signed-off-by: Shreyas Krishnaswamy <shrekris@anyscale.com>
@shrekris-anyscale
Copy link
Contributor Author

This is a cleaned version of #42895.

Signed-off-by: Shreyas Krishnaswamy <shrekris@anyscale.com>
Signed-off-by: Shreyas Krishnaswamy <shrekris@anyscale.com>
Signed-off-by: Shreyas Krishnaswamy <shrekris@anyscale.com>
Signed-off-by: Shreyas Krishnaswamy <shrekris@anyscale.com>
try:
self.curr_iteration_start_time = time.time()
await self._process_batch(func)
except Exception:
logger.exception(
"_process_batches asyncio task ran into an " "unexpected exception."
"_process_batches asyncio task ran into an unexpected exception."
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you try adding a log statement when the loop closes (and therefore we exit the while loop) and see if it's logged on every replica shutdown?

Copy link
Contributor Author

@shrekris-anyscale shrekris-anyscale Feb 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strangely, it doesn't get logged. I changed the function to:

Modified _process_batches
async def _process_batches(self, func: Callable) -> None:
        """Loops infinitely and processes queued request batches."""

        while not self._loop.is_closed():
            logger.warning("Starting a while loop iteration.")
            try:
                self.curr_iteration_start_time = time.time()
                await self._process_batch(func)
                logger.warning("Finished a while loop iteration.")
            except Exception:
                logger.exception(
                    "_process_batches asyncio task ran into an unexpected exception."
                )
        
        logger.warning("Do you see me?")

I used this app:

Serve app
# File name: try.py
from ray import serve

@serve.deployment(graceful_shutdown_timeout_s=0.1)
class GetList:
    def __init__(self):
        self.count = 0

    @serve.batch(max_batch_size=2, batch_wait_timeout_s=10)
    async def handle_batch(self, requests):
        return [0] * len(requests)

    async def __call__(self, request):
        return await self.handle_batch(request)

app = GetList.bind()

I started the app with serve run try:app, and then I ran a few requests. Then I ran one request (so the batch wouldn't run), and then killed the app with ctrl-C.

Here's the log file:

replica_default_GetList_qsc3m3uz.log

We don't see either the Finished a while loop iteration. or Do you see me? logs. This also strange because yesterday when I ran unit tests with a log after the while loop, it did get printed. Maybe the logs only show up if we start Serve using serve.run() and the event loop gets killed explicitly (e.g. by pytest)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my experience the asyncio loop closing behavior is very finnicky/confusing. So it's possible it doesn't get closed and the process just shuts down.

@@ -399,27 +408,29 @@ async def streaming_func(key1, key2):
else:
coros = [func("hi1", "hi2"), func("hi3", "hi4")]

tasks = [asyncio.create_task(coro) for coro in coros]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can just inline this above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I inlined it.

assert len(done) == 0
assert len(pending) == 2

print("Requests are blocked, as expected.")

# Cancel the first request. The second request should still be blocked on
# the long request_timeout
coros[0].close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't even know coroutine.close() existed...

Signed-off-by: Shreyas Krishnaswamy <shrekris@anyscale.com>
Signed-off-by: Shreyas Krishnaswamy <shrekris@anyscale.com>
@shrekris-anyscale shrekris-anyscale changed the title [Serve] Stop @serve.batch's batch handler when event loop closes [Serve] Stop @serve.batch's batch handler when event loop closes Feb 1, 2024
@edoakes edoakes merged commit f4d1900 into ray-project:master Feb 1, 2024
9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants