Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][cgraph] Collapse other params into max_inflight_executions and adjust execution_index counting #49565

Merged
merged 16 commits into from
Jan 13, 2025
Merged
Prev Previous commit
Next Next commit
add async tests
Signed-off-by: dayshah <dhyey2019@gmail.com>
  • Loading branch information
dayshah committed Jan 10, 2025
commit 757b33e80e3d01b80e4944e72ab21d365a195268
69 changes: 57 additions & 12 deletions python/ray/dag/tests/experimental/test_accelerated_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2175,6 +2175,12 @@ async def main():


def test_inflight_requests_exceed_capacity(ray_start_regular):
expected_error_message = (
r".*You cannot execute more than 2 in-flight "
r"requests, and you currently have 2 in-flight "
r"requests. Retrieve an output using ray.get before submitting "
r"more requests or increase `max_inflight_executions`. "
)
a = Actor.remote(0)
with InputNode() as inp:
dag = a.sleep.bind(inp)
Expand All @@ -2183,21 +2189,43 @@ def test_inflight_requests_exceed_capacity(ray_start_regular):
ref2 = compiled_dag.execute(1)
with pytest.raises(
ray.exceptions.RayCgraphCapacityExceeded,
match=(
r"You cannot execute more than 2 in-flight "
r"requests, and you currently have 2 in-flight "
r"requests. Retrieve an output using ray.get before submitting "
r"more requests or increase `max_inflight_executions`. "
),
match=(expected_error_message),
):
_ = compiled_dag.execute(1)

# test same with asyncio
async def main():
a = Actor.remote(0)
with InputNode() as inp:
dag = a.sleep.bind(inp)
async_compiled_dag = dag.experimental_compile(
enable_asyncio=True, _max_inflight_executions=2
)
ref1 = await async_compiled_dag.execute_async(1)
ref2 = await async_compiled_dag.execute_async(1)
print(async_compiled_dag._execution_index)
with pytest.raises(
ray.exceptions.RayCgraphCapacityExceeded,
match=(expected_error_message),
):
_ = await async_compiled_dag.execute_async(1)
(ref1, ref2)

loop = get_or_create_event_loop()
loop.run_until_complete(main())
# to show variables are being used and avoid destruction since
# CompiledDagRef __del__ will release buffers and
# increment _max_finished_execution_index
Copy link
Contributor Author

@dayshah dayshah Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems prone to issues, ex. if destruction is not in the same order execute was called

if not self._ray_get_called:
self._dag.release_output_channel_buffers(self._execution_index)

The continuation of this in the SynchronousReader also just releases all buffers, so anything that was executed up to that point would also be gone ex. this script fails

a = Actor.remote(0)
with InputNode() as inp:
    dag = a.sleep.bind(inp)
    dag = a.inc.bind(dag)
compiled_dag = dag.experimental_compile()
ref = compiled_dag.execute(1)
compiled_dag.execute(2)
ray.get(ref)

this is also inconsistent behavior vs. async

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that looks problematic. cc @kevin85421 who reviewed the skip deserialization PR.

One idea is to deserialize the prior indexes and save the results to buffer, and only skip deserialization for the exact index:
A ref goes out of scope => first check if result in buffer, if so delete, otherwise do above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya that sounds ok, there's the overhead of needing to get the prior indices though. But not really any way to get around that. One thing is that python might not call del in order so we may be forcing deserialization for refs that are also being deleted but not sure if we can do anything about that.

Should I open an issue to track, is it a beta blocker or just do this in a follow-up pr, no need for issue creation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should ensure correctness first. But open an issue for optimization. We can leave the optimization out of beta for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this one is not addressed yet?

Copy link
Contributor Author

@dayshah dayshah Jan 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

opened #49781 as fix and #49782 as issue for future

(ref1, ref2)


def test_result_buffer_exceeds_capacity(ray_start_regular):
expected_error_message = (
r".*You cannot execute more than 2 in-flight "
r"requests, and you currently have 2 in-flight "
r"requests. Retrieve an output using ray.get before submitting "
r"more requests or increase `max_inflight_executions`. "
)
a = Actor.remote(0)
with InputNode() as inp:
dag = a.inc.bind(inp)
Expand All @@ -2208,14 +2236,31 @@ def test_result_buffer_exceeds_capacity(ray_start_regular):
ref3 = compiled_dag.execute(3)
with pytest.raises(
ray.exceptions.RayCgraphCapacityExceeded,
match=(
r".*You cannot execute more than 2 in-flight "
r"requests, and you currently have 2 in-flight "
r"requests. Retrieve an output using ray.get before submitting "
r"more requests or increase `max_inflight_executions`. "
),
match=(expected_error_message),
):
_ = compiled_dag.execute(4)

# test same with asyncio
async def main():
a = Actor.remote(0)
with InputNode() as inp:
dag = a.inc.bind(inp)
async_compiled_dag = dag.experimental_compile(
enable_asyncio=True, _max_inflight_executions=2
)
ref1 = await async_compiled_dag.execute_async(1)
ref2 = await async_compiled_dag.execute_async(2)
await ref2
ref3 = await async_compiled_dag.execute_async(3)
with pytest.raises(
ray.exceptions.RayCgraphCapacityExceeded,
match=(expected_error_message),
):
_ = await async_compiled_dag.execute_async(4)
(ref1, ref3)

loop = get_or_create_event_loop()
loop.run_until_complete(main())
# same reason as comment for test_inflight_requests_exceed_capacity
(ref1, ref3)

Expand Down
Loading