-
Notifications
You must be signed in to change notification settings - Fork 6.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Serve] Revisiting ProxyState
to fix draining sequence
#41722
Merged
Merged
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit
Hold shift + click to select a range
39c1447
Merge `start_new_ready_check` into `is_ready`
alexeykudinkin 0231ccf
Cleaned up `try_update_status`;
alexeykudinkin 0ebfe1b
Streamlined `is_healthy`, `is_ready` checks in `ProxyActorWrapper`
alexeykudinkin b6b3660
Tidying up
alexeykudinkin feffe80
Simplified `is_healthy`, `is_ready` checks even more
alexeykudinkin 2144898
Fixed `is_drained` check to properly return whether proxy actor is pr…
alexeykudinkin d04d010
Streamlined proxy's state reconciliation flow
alexeykudinkin ab1cc40
Simplified readiness, health, drain checks;
alexeykudinkin 5ba6068
Make sure exceptions are properly handled in proxy state reconciliation
alexeykudinkin d9ef34b
Cleaned up docs
alexeykudinkin e7c5e1e
Added test for ActorProxyWrapper
alexeykudinkin 759d904
Fixed typos;
alexeykudinkin 953343a
Fixed tests
alexeykudinkin 73d881e
Fixed more tests
alexeykudinkin 9e05832
Fixed some more tests
alexeykudinkin 8d0cdcd
`lint`
alexeykudinkin 136df1c
Fixing typo
alexeykudinkin 189cad4
After rebase clean-up
alexeykudinkin c8dcb57
Tidying up
alexeykudinkin 06b1755
Make proxy health-check configurable
alexeykudinkin f4283df
Unify `is_drained` API to require timeout be explicitly provided
alexeykudinkin b787188
Missing log statement
alexeykudinkin adfd2f3
Tidying up
alexeykudinkin eff6f23
Cleaned up exception handling
alexeykudinkin 959feba
`_try_cancel_future` -> `_try_set_exception`
alexeykudinkin c5d2034
Make timeout_s optional
alexeykudinkin 7002e11
Fixing typo
alexeykudinkin cc99b4a
Fixing another typo
alexeykudinkin 8a1e954
Fixing tests
alexeykudinkin File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Added test for ActorProxyWrapper
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
- Loading branch information
commit e7c5e1ed5495975cfac35f36c478d9147a00aef8
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,269 @@ | ||
import asyncio | ||
import concurrent.futures | ||
from unittest.mock import Mock, patch | ||
|
||
import pytest | ||
|
||
from ray.exceptions import RayTaskError | ||
from ray.serve._private.proxy_state import ActorProxyWrapper, wrap_as_future | ||
from ray.serve.schema import LoggingConfig | ||
|
||
|
||
def _create_object_ref_mock(): | ||
fut = concurrent.futures.Future() | ||
|
||
ray_object_ref_mock = Mock( | ||
future=Mock( | ||
return_value=fut | ||
) | ||
) | ||
|
||
return ray_object_ref_mock, fut | ||
|
||
|
||
def _create_mocked_actor_proxy_wrapper(actor_handle_mock): | ||
return ActorProxyWrapper( | ||
logging_config=LoggingConfig(), | ||
actor_handle=actor_handle_mock, | ||
node_id="some_node_id" | ||
) | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_wrap_as_future_timeout(): | ||
object_ref_mock, fut = _create_object_ref_mock() | ||
|
||
aio_fut = wrap_as_future(ref=object_ref_mock, timeout_s=0) | ||
|
||
assert not aio_fut.done() | ||
# Yield the event-loop | ||
await asyncio.sleep(0.001) | ||
|
||
assert aio_fut.done() | ||
with pytest.raises(TimeoutError) as exc_info: | ||
aio_fut.result() | ||
|
||
assert "Future cancelled after timeout 0s" in str(exc_info.value) | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_wrap_as_future_success(): | ||
# Test #1: Validate wrapped asyncio future completes, upon completion of the | ||
# ObjectRef's one | ||
|
||
object_ref_mock, fut = _create_object_ref_mock() | ||
|
||
aio_fut = wrap_as_future(ref=object_ref_mock, timeout_s=3600) | ||
|
||
assert not aio_fut.done() | ||
# Complete source future (ObjectRef one) | ||
fut.set_result("test") | ||
# Yield the event-loop | ||
await asyncio.sleep(0.001) | ||
|
||
assert aio_fut.done() | ||
assert aio_fut.result() == "test" | ||
|
||
# Test #2: Wrapped asyncio future completed before time-out expiration, | ||
# should not be affected by the cancellation callback | ||
|
||
object_ref_mock, fut = _create_object_ref_mock() | ||
# Purposefully set timeout to 0, ie future has to be cancelled upon next event-loop iteration | ||
aio_fut = wrap_as_future(ref=object_ref_mock, timeout_s=0) | ||
|
||
assert not aio_fut.done() | ||
# Complete source future (ObjectRef one) | ||
fut.set_result("test") | ||
# Yield the event-loop | ||
await asyncio.sleep(0.001) | ||
|
||
assert aio_fut.done() | ||
assert aio_fut.result() == "test" | ||
|
||
|
||
@pytest.mark.parametrize( | ||
("response", "is_ready"), [ | ||
# ProxyActor.ready responds with an tuple/array of 2 strings | ||
('["foo", "bar"]', True), | ||
('malformed_json', False), | ||
(Exception(), False) | ||
] | ||
) | ||
@pytest.mark.asyncio | ||
async def test_is_ready_check_success(response, is_ready): | ||
"""Tests calling is_ready method on ProxyActorWrapper, mocking out underlying ActorHandle response | ||
""" | ||
object_ref_mock, fut = _create_object_ref_mock() | ||
actor_handle_mock = Mock( | ||
ready=Mock( | ||
remote=Mock( | ||
return_value=object_ref_mock | ||
) | ||
) | ||
) | ||
|
||
proxy_wrapper = _create_mocked_actor_proxy_wrapper(actor_handle_mock) | ||
|
||
for _ in range(10): | ||
assert proxy_wrapper.is_ready(timeout_s=1) is None | ||
# Yield loop! | ||
await asyncio.sleep(0.01) | ||
|
||
# Complete source future | ||
if isinstance(response, Exception): | ||
object_ref_mock.future().set_exception(response) | ||
else: | ||
object_ref_mock.future().set_result(response) | ||
|
||
# Yield loop! | ||
await asyncio.sleep(0) | ||
# NOTE: Timeout setting is only relevant, in case there's no pending request | ||
# and one will be issued | ||
assert proxy_wrapper.is_ready(timeout_s=1) is is_ready | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_is_ready_check_timeout(): | ||
object_ref_mock, fut = _create_object_ref_mock() | ||
actor_handle_mock = Mock( | ||
ready=Mock( | ||
remote=Mock( | ||
return_value=object_ref_mock | ||
) | ||
) | ||
) | ||
|
||
proxy_wrapper = _create_mocked_actor_proxy_wrapper(actor_handle_mock) | ||
|
||
# First call, invokes ProxyActor.ready call | ||
assert proxy_wrapper.is_ready(timeout_s=0) is None | ||
# Yield loop! | ||
await asyncio.sleep(0.001) | ||
|
||
assert proxy_wrapper.is_ready(timeout_s=0) is False | ||
|
||
|
||
@pytest.mark.parametrize( | ||
("response", "is_healthy"), [ | ||
(None, True), | ||
(RayTaskError("check_health", "<traceback>", "cuz"), False), | ||
] | ||
) | ||
@pytest.mark.asyncio | ||
async def test_is_healthy_check_success(response, is_healthy): | ||
"""Tests calling is_healthy method on ProxyActorWrapper, mocking out underlying ActorHandle response | ||
""" | ||
object_ref_mock, fut = _create_object_ref_mock() | ||
actor_handle_mock = Mock( | ||
check_health=Mock( | ||
remote=Mock( | ||
return_value=object_ref_mock | ||
) | ||
) | ||
) | ||
|
||
proxy_wrapper = _create_mocked_actor_proxy_wrapper(actor_handle_mock) | ||
|
||
for _ in range(10): | ||
assert proxy_wrapper.is_healthy(timeout_s=1) is None | ||
# Yield loop! | ||
await asyncio.sleep(0.01) | ||
|
||
object_ref_mock.future.assert_called_once() | ||
|
||
# Complete source future | ||
if isinstance(response, Exception): | ||
object_ref_mock.future.return_value.set_exception(response) | ||
else: | ||
object_ref_mock.future.return_value.set_result(response) | ||
|
||
# Yield loop! | ||
await asyncio.sleep(0) | ||
# NOTE: Timeout setting is only relevant, in case there's no pending request | ||
# and one will be issued | ||
assert proxy_wrapper.is_healthy(timeout_s=1) is is_healthy | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_is_healthy_check_timeout(): | ||
object_ref_mock, fut = _create_object_ref_mock() | ||
actor_handle_mock = Mock( | ||
check_health=Mock( | ||
remote=Mock( | ||
return_value=object_ref_mock | ||
) | ||
) | ||
) | ||
|
||
proxy_wrapper = _create_mocked_actor_proxy_wrapper(actor_handle_mock) | ||
|
||
# First call, invokes ProxyActor.ready call | ||
assert proxy_wrapper.is_healthy(timeout_s=0) is None | ||
# Yield loop! | ||
await asyncio.sleep(0.001) | ||
|
||
assert proxy_wrapper.is_healthy(timeout_s=0) is False | ||
|
||
|
||
@pytest.mark.parametrize( | ||
("response", "is_drained"), [ | ||
(True, True), | ||
(False, False), | ||
(RayTaskError("is_drained", "<traceback>", "cuz"), False), | ||
] | ||
) | ||
@pytest.mark.asyncio | ||
async def test_is_drained_check_success(response, is_drained): | ||
"""Tests calling is_drained method on ProxyActorWrapper, mocking out underlying ActorHandle response | ||
""" | ||
object_ref_mock, fut = _create_object_ref_mock() | ||
actor_handle_mock = Mock( | ||
is_drained=Mock( | ||
remote=Mock( | ||
return_value=object_ref_mock | ||
) | ||
) | ||
) | ||
|
||
proxy_wrapper = _create_mocked_actor_proxy_wrapper(actor_handle_mock) | ||
|
||
for _ in range(10): | ||
assert proxy_wrapper.is_drained() is None | ||
# Yield loop! | ||
await asyncio.sleep(0.01) | ||
|
||
object_ref_mock.future.assert_called_once() | ||
|
||
# Complete source future | ||
if isinstance(response, Exception): | ||
object_ref_mock.future.return_value.set_exception(response) | ||
else: | ||
object_ref_mock.future.return_value.set_result(response) | ||
|
||
# Yield loop! | ||
await asyncio.sleep(0) | ||
# NOTE: Timeout setting is only relevant, in case there's no pending request | ||
# and one will be issued | ||
assert proxy_wrapper.is_drained() is is_drained | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_is_drained_check_timeout(): | ||
object_ref_mock, fut = _create_object_ref_mock() | ||
actor_handle_mock = Mock( | ||
is_drained=Mock( | ||
remote=Mock( | ||
return_value=object_ref_mock | ||
) | ||
) | ||
) | ||
|
||
proxy_wrapper = _create_mocked_actor_proxy_wrapper(actor_handle_mock) | ||
|
||
# First call, invokes ProxyActor.ready call | ||
assert proxy_wrapper.is_drained(timeout_s=0) is None | ||
# Yield loop! | ||
await asyncio.sleep(0.001) | ||
|
||
assert proxy_wrapper.is_drained(timeout_s=0) is False | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use
ray._private.test_utils.async_wait_for_condition
as a common pattern for adding these assertions that require the loop to be yielded