-
Notifications
You must be signed in to change notification settings - Fork 6.7k
[xray] Improve flush algorithm for the lineage cache #2130
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[xray] Improve flush algorithm for the lineage cache #2130
Conversation
Test PASSed. |
Not necessarily related to this PR, however, for testing correctness, it would be nice to be able to modulate the frequency of flushing. E.g., if we were flushing strictly on a timer, then we could make it very frequent or very infrequent, which could help catch bugs. What do you think about that? Related to this PR, one category of scheduling bugs that we had over and over was related to scheduling. Basically, we never called |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me! I left a couple small comments.
src/ray/raylet/lineage_cache.cc
Outdated
return ray::Status::OK(); | ||
void LineageCache::Flush() { | ||
// Iterate through all tasks that are READY. | ||
std::vector<TaskID> ready_task_ids; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't used anywhere is it? If not, we should remove it.
src/ray/raylet/lineage_cache.cc
Outdated
@@ -328,6 +334,24 @@ void LineageCache::HandleEntryCommitted(const UniqueID &task_id) { | |||
RAY_CHECK_OK(task_pubsub_.CancelNotifications(JobID::nil(), task_id, client_id_)); | |||
subscribed_tasks_.erase(it); | |||
} | |||
|
|||
auto children_entry = uncommitted_ready_children_.find(task_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code block probably deserves a comment explaining what it's doing.
Yeah, I left Yeah, that could definitely come up for the lineage cache as well. This PR should help with the issue, but it might make sense to add a warning for tasks that haven't been flushed after a certain time, for example. |
Test PASSed. |
Looks good, will merge once tests pass. |
Thanks, @robertnishihara! I merged, since the failing tests look unrelated. |
* master: [autoscaler] GCP node provider (ray-project#2061) [xray] Evict tasks from the lineage cache (ray-project#2152) [ASV] Add ray.init and simple Ray benchmarks (ray-project#2166) Re-encrypt key for uploading to S3 from travis to use travis-ci.com. (ray-project#2169) [rllib] Fix A3C PyTorch implementation (ray-project#2036) [JavaWorker] Do not kill local-scheduler-forked workers in RunManager.cleanup (ray-project#2151) Update Travis CI badge from travis-ci.org to travis-ci.com. (ray-project#2155) Implement Python global state API for xray. (ray-project#2125) [xray] Improve flush algorithm for the lineage cache (ray-project#2130) Fix support for actor classmethods (ray-project#2146) Add empty df test (ray-project#1879) [JavaWorker] Enable java worker support (ray-project#2094) [DataFrame] Fixing the code formatting of the tests (ray-project#2123) Update resource documentation (remove outdated limitations). (ray-project#2022) bugfix: use array redis_primary_addr out of its scope (ray-project#2139) Fix infinite retry in Push function. (ray-project#2133) [JavaWorker] Changes to the directory under src for support java worker (ray-project#2093) Integrate credis with Ray & route task table entries into credis. (ray-project#1841)
What do these changes do?
This improves the algorithm for flushing tasks from the lineage cache to the GCS. A task only gets flushed if all of its parents have already been flushed to the GCS. Previously, we would only flush tasks in a batched method, where we would iterate over all tasks that were ready every time a new task was marked as ready. However, this could cause some tasks to get "stuck". For example, if a child task's parents are all committed, but no new tasks are marked as ready, then the child task would never be flushed.
This PR tracks the parent -> child dependencies for tasks that are ready to write. Then, when a task is committed, we use this map to check for any children that are now flushable.