Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][cgraph] Fix eager release if destruction out of order #49781

Closed
wants to merge 7 commits into from

Conversation

dayshah
Copy link
Contributor

@dayshah dayshah commented Jan 12, 2025

Why are these changes needed?

The current implementation of the CompiledDagRef destructor will release the buffer for everything up to the execution index of the dagref that was destructed. This means that calling get on a previous dagref will fail. Now we're assuring that the previous execution for previous dagrefs complete and the results get cached before we release.

Because Python doesn't guarantee destruction order, this has the possible pitfall of requiring finishing execution of other previous dagrefs that are also about to be destructed, but the dagref with the higher execution_index just got del called on it first. Opened issue #49782 for this.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: dayshah <dhyey2019@gmail.com>
Comment on lines +115 to +120
self._dag._execute_until(
self._execution_index, self._channel_index, timeout
)
return_vals = self._dag._get_execution_results(
self._execution_index, self._channel_index
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we prefer using an extra method call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we want to use execute_until for release buffer and we don't want to get there because we don't want to remove result from buffer

Signed-off-by: dayshah <dhyey2019@gmail.com>
@dayshah dayshah requested a review from ruisearch42 January 13, 2025 18:14
Signed-off-by: dayshah <dhyey2019@gmail.com>
Signed-off-by: dayshah <dhyey2019@gmail.com>
Signed-off-by: dayshah <dhyey2019@gmail.com>
@dayshah dayshah requested a review from ruisearch42 January 13, 2025 21:02
Signed-off-by: dayshah <dhyey2019@gmail.com>
Copy link
Member

@kevin85421 kevin85421 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline:

Calling _execute_until to consume all DAG references with a smaller execution_index compared to the caller of the destruction may cause the memory usage increases in the driver process.

Instead, we will try to remember the execution_index that should be released and release it only when all DAG references with smaller execution_index values have been consumed.

Signed-off-by: dayshah <dhyey2019@gmail.com>
@dayshah dayshah closed this Jan 16, 2025
@dayshah dayshah deleted the fix-skip-deserialize branch January 16, 2025 22:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants