Skip to content

Conversation

@Qiaolin-Yu
Copy link
Member

@Qiaolin-Yu Qiaolin-Yu commented Oct 1, 2025

Why are these changes needed?

Previously, there's a possible data race pattern like this:

1. add object xxx to store 
2. async ray_send xxx, not actually finished
3. async ray_recv xxx 
4. free_object_primary_copy xxx since it's out of scope
5. async ray_send fails during running (xxx does not exist in object store)

In this pr, we aim to do two main changes

  1. move the gc task for gpu objects to _ray_system thread (the same thread as ray_send and ray_recv) to control the execution order.
  2. use torch.Tensor.record_stream to record the send stream, make sure the tensor will not be freed before finishing the send task.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run pre-commit jobs to lint the changes in this PR. (pre-commit setup)
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Note

Plumbs a new FreeActorObject flow: TaskManager triggers a callback to RPC the source actor to free a GPU object’s primary copy, with client/server handlers and a Python GPUObjectManager entry point.

  • GPU Object Freeing (Primary Copy):
    • Add TaskManager free callback (FreeActorObjectCallback) and pass from CoreWorkerProcess to TaskManager.
    • On GPU-object out-of-scope, TaskManager invokes callback instead of inlined RPC logic.
    • Python: _raylet.pyx now calls gpu_object_manager.free_object_primary_copy(...).
    • Python: GPUObjectManager.free_object_primary_copy added; calls src actor __ray_free__ (stub in gpu_object_store.py) via _ray_system concurrency group.
  • RPC/Plumbing:
    • Define and expose FreeActorObject RPC in core_worker.proto and gRPC service (grpc_service.{h,cc}), proxy (core_worker_rpc_proxy.h).
    • Implement client methods in CoreWorkerClient{,Interface} and fake client; update mocks to drop old paths.
  • Refactors/Logging:
    • Remove old inlined free-object paths; route through new callback/RPC.
    • Minor debug logs (e.g., pop_object output).

Written by Cursor Bugbot for commit db5243d. This will update automatically on new commits. Configure here.

@Qiaolin-Yu Qiaolin-Yu requested a review from a team as a code owner October 1, 2025 23:15
@Qiaolin-Yu Qiaolin-Yu marked this pull request as draft October 1, 2025 23:15
Signed-off-by: dayshah <dhyey2019@gmail.com>
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the mechanism for freeing GPU objects. It replaces the FreeActorObject C++ RPC with a Python-level RPC (__ray_free__) initiated from the object owner's process. This change centralizes the logic for freeing GPU objects in the Python GPUObjectManager.

While the overall direction of the refactoring is sound, the current implementation appears to be a work in progress. There are several debugging print statements that should be replaced with proper logging. More importantly, the core logic in __ray_free__ is commented out, which would prevent GPU objects from being freed. Additionally, there's an overly broad exception handler that could mask potential bugs.

I've left specific comments on these points. Please address them to ensure the new mechanism is robust and production-ready.

@Qiaolin-Yu Qiaolin-Yu changed the title draft Fix data race in RDT when using async gpu to gpu transfer Oct 2, 2025
@Qiaolin-Yu Qiaolin-Yu marked this pull request as ready for review October 2, 2025 20:25
cursor[bot]

This comment was marked as outdated.

cursor[bot]

This comment was marked as outdated.

Copy link
Contributor

@dayshah dayshah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awesome, some nits but generally lgtm, you can't repro the crash anymore right?

I'm wondering if we could write a test for this but i'm not really sure how...

p2p_fn(tensor, comms[i], streams[i], peer_p2p_rank)
# Record the stream to avoid tensor being freed before the send/recv is completed.
torch_stream = torch.cuda.ExternalStream(streams[i].ptr)
tensor.record_stream(torch_stream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't love that this is so deep in the stack, but i guess it doesn't apply to gloo / nixl so it needs to be here?

Maybe we could do something like have a tensor_transport_manager.wait_for_stream_on_send
and then wait on it until we finish.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the issue is we can't get the stream on the tensor_transport_manager level, since the stream is maintained in this file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think this would also be fixed if we implement @dayshah's proposed change of just implementing a nccl backend directly on cupy instead of going through ray.util.collective.

Let's keep it for now and add a NOTE?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually on second thought, I think we can just keep this. The same bug can appear just in ray.util.collective code too.

src_actor.__ray_call__.options(concurrency_group="_ray_system").remote(
__ray_free__, object_id
)
except Exception:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this except exception, in what cases will it get hit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will be useful after we add gc for gpu object metadata. For that case, managed_gpu_object_metadata[object_id] may have keyerror if the object metadata has been cleaned?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya that makes sense, can you just do something where you do self.managed_gpu_object_metadata.get instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's log the error for now. It's easy for these kinds of codepaths to fail in unexpected ways and we don't want to bring down the whole application from a bad assert.

Copy link
Contributor

@dayshah dayshah Oct 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, can log a something went wrong on freeing error + exception rn, should also do the get(, None) for metadata when we implement metadata cleanup

@Qiaolin-Yu Qiaolin-Yu changed the title Fix data race in RDT when using async gpu to gpu transfer [core][RDT] Fix data race when using async gpu to gpu transfer Oct 2, 2025
@Qiaolin-Yu
Copy link
Member Author

Qiaolin-Yu commented Oct 2, 2025

awesome, some nits but generally lgtm, you can't repro the crash anymore right?

Yes, I can't reproduce the issue with the previous script now. @dayshah
Maybe we can add tests in the future. I need some time to figure out how to test it.

@Qiaolin-Yu Qiaolin-Yu requested a review from dayshah October 2, 2025 22:34
Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Niiice!

@stephanie-wang
Copy link
Contributor

awesome, some nits but generally lgtm, you can't repro the crash anymore right?

Yes, I can't reproduce the issue with the previous script now. @dayshah Maybe we can add tests in the future. I need some time to figure out how to test it.

Yes that sounds good. Maybe there is a way to mock it. Otherwise we could try to launch a blocking kernel on the send stream...?

@ray-gardener ray-gardener bot added core Issues that should be addressed in Ray Core gpu GPU related issues labels Oct 3, 2025
Signed-off-by: dayshah <dhyey2019@gmail.com>
@dayshah dayshah added the go add ONLY when ready to merge, run all tests label Oct 3, 2025
@dayshah dayshah enabled auto-merge (squash) October 3, 2025 19:19
Copy link
Contributor

@dayshah dayshah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed test build + logging the exception now

@dayshah dayshah merged commit ca92d11 into ray-project:master Oct 3, 2025
6 of 7 checks passed
dstrodtman pushed a commit to dstrodtman/ray that referenced this pull request Oct 6, 2025
…roject#57112)

In this pr, we aim to do two main changes
1. move the gc task for gpu objects to _ray_system thread (the same
thread as ray_send and ray_recv) to control the execution order.
2. use `torch.Tensor.record_stream` to record the send stream, make sure
the tensor will not be freed before finishing the send task.

Signed-off-by: dayshah <dhyey2019@gmail.com>
Co-authored-by: dayshah <dhyey2019@gmail.com>
Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
eicherseiji pushed a commit to eicherseiji/ray that referenced this pull request Oct 6, 2025
…roject#57112)

In this pr, we aim to do two main changes
1. move the gc task for gpu objects to _ray_system thread (the same
thread as ray_send and ray_recv) to control the execution order.
2. use `torch.Tensor.record_stream` to record the send stream, make sure
the tensor will not be freed before finishing the send task.

Signed-off-by: dayshah <dhyey2019@gmail.com>
Co-authored-by: dayshah <dhyey2019@gmail.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
eicherseiji pushed a commit to eicherseiji/ray that referenced this pull request Oct 6, 2025
…roject#57112)

In this pr, we aim to do two main changes
1. move the gc task for gpu objects to _ray_system thread (the same
thread as ray_send and ray_recv) to control the execution order.
2. use `torch.Tensor.record_stream` to record the send stream, make sure
the tensor will not be freed before finishing the send task.

Signed-off-by: dayshah <dhyey2019@gmail.com>
Co-authored-by: dayshah <dhyey2019@gmail.com>
eicherseiji pushed a commit to eicherseiji/ray that referenced this pull request Oct 6, 2025
…roject#57112)

In this pr, we aim to do two main changes
1. move the gc task for gpu objects to _ray_system thread (the same
thread as ray_send and ray_recv) to control the execution order.
2. use `torch.Tensor.record_stream` to record the send stream, make sure
the tensor will not be freed before finishing the send task.

Signed-off-by: dayshah <dhyey2019@gmail.com>
Co-authored-by: dayshah <dhyey2019@gmail.com>
eicherseiji pushed a commit to eicherseiji/ray that referenced this pull request Oct 6, 2025
…roject#57112)

In this pr, we aim to do two main changes
1. move the gc task for gpu objects to _ray_system thread (the same
thread as ray_send and ray_recv) to control the execution order.
2. use `torch.Tensor.record_stream` to record the send stream, make sure
the tensor will not be freed before finishing the send task.

Signed-off-by: dayshah <dhyey2019@gmail.com>
Co-authored-by: dayshah <dhyey2019@gmail.com>
eicherseiji pushed a commit to eicherseiji/ray that referenced this pull request Oct 6, 2025
…roject#57112)

In this pr, we aim to do two main changes
1. move the gc task for gpu objects to _ray_system thread (the same
thread as ray_send and ray_recv) to control the execution order.
2. use `torch.Tensor.record_stream` to record the send stream, make sure
the tensor will not be freed before finishing the send task.

Signed-off-by: dayshah <dhyey2019@gmail.com>
Co-authored-by: dayshah <dhyey2019@gmail.com>
liulehui pushed a commit to liulehui/ray that referenced this pull request Oct 9, 2025
…roject#57112)

In this pr, we aim to do two main changes
1. move the gc task for gpu objects to _ray_system thread (the same
thread as ray_send and ray_recv) to control the execution order.
2. use `torch.Tensor.record_stream` to record the send stream, make sure
the tensor will not be freed before finishing the send task.

Signed-off-by: dayshah <dhyey2019@gmail.com>
Co-authored-by: dayshah <dhyey2019@gmail.com>
joshkodi pushed a commit to joshkodi/ray that referenced this pull request Oct 13, 2025
…roject#57112)

In this pr, we aim to do two main changes
1. move the gc task for gpu objects to _ray_system thread (the same
thread as ray_send and ray_recv) to control the execution order.
2. use `torch.Tensor.record_stream` to record the send stream, make sure
the tensor will not be freed before finishing the send task.

Signed-off-by: dayshah <dhyey2019@gmail.com>
Co-authored-by: dayshah <dhyey2019@gmail.com>
Signed-off-by: Josh Kodi <joshkodi@gmail.com>
justinyeh1995 pushed a commit to justinyeh1995/ray that referenced this pull request Oct 20, 2025
…roject#57112)

In this pr, we aim to do two main changes
1. move the gc task for gpu objects to _ray_system thread (the same
thread as ray_send and ray_recv) to control the execution order.
2. use `torch.Tensor.record_stream` to record the send stream, make sure
the tensor will not be freed before finishing the send task.

Signed-off-by: dayshah <dhyey2019@gmail.com>
Co-authored-by: dayshah <dhyey2019@gmail.com>
landscapepainter pushed a commit to landscapepainter/ray that referenced this pull request Nov 17, 2025
…roject#57112)

In this pr, we aim to do two main changes
1. move the gc task for gpu objects to _ray_system thread (the same
thread as ray_send and ray_recv) to control the execution order.
2. use `torch.Tensor.record_stream` to record the send stream, make sure
the tensor will not be freed before finishing the send task.

Signed-off-by: dayshah <dhyey2019@gmail.com>
Co-authored-by: dayshah <dhyey2019@gmail.com>
Aydin-ab pushed a commit to Aydin-ab/ray-aydin that referenced this pull request Nov 19, 2025
…roject#57112)

In this pr, we aim to do two main changes
1. move the gc task for gpu objects to _ray_system thread (the same
thread as ray_send and ray_recv) to control the execution order.
2. use `torch.Tensor.record_stream` to record the send stream, make sure
the tensor will not be freed before finishing the send task.

Signed-off-by: dayshah <dhyey2019@gmail.com>
Co-authored-by: dayshah <dhyey2019@gmail.com>
Signed-off-by: Aydin Abiar <aydin@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Issues that should be addressed in Ray Core go add ONLY when ready to merge, run all tests gpu GPU related issues rdt Ray Direct Transport

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants