-
Notifications
You must be signed in to change notification settings - Fork 6.2k
[xray] Lineage cache requests notifications from the GCS about remote tasks #1834
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[xray] Lineage cache requests notifications from the GCS about remote tasks #1834
Conversation
Test PASSed. |
Test PASSed. |
Test PASSed. |
src/ray/raylet/lineage_cache.cc
Outdated
// Stop listening for notifications about this task. | ||
auto it = subscribed_tasks_.find(task_id); | ||
if (it != subscribed_tasks_.end()) { | ||
task_pubsub_.CancelNotifications(JobID::nil(), task_id, client_id_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like for this use case, it would be fine for the Cancel
to happen automatically once the GCS sends the notification, right?
The issue with that is that it involves special-casing the behavior in the redis module?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm yeah, for the Table
interface, we could just call Cancel
as soon as an entry is found since table entries are supposed to be immutable. Should I change that in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be a subsequent PR if that makes sense.
src/ray/raylet/lineage_cache.cc
Outdated
// Stop listening for notifications about this task. | ||
auto it = subscribed_tasks_.find(task_id); | ||
if (it != subscribed_tasks_.end()) { | ||
task_pubsub_.CancelNotifications(JobID::nil(), task_id, client_id_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should probably do something with the returned STATUS
/// will remain unchanged. | ||
/// | ||
/// \param task_id The ID of the waiting task to remove. | ||
void RemoveWaitingTask(const TaskID &task_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should all of these methods return a status? Or should they handle failures internally?
I added some status checks, but I wasn't really sure if they should be |
@@ -286,12 +295,22 @@ void PopAncestorTasks(const UniqueID &task_id, Lineage &lineage) { | |||
} | |||
|
|||
void LineageCache::HandleEntryCommitted(const UniqueID &task_id) { | |||
RAY_LOG(DEBUG) << "task committed: " << task_id; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Under the current design, we can't actually guarantee that this method won't fire twice (or more) for the same task ID, right? Couldn't that cause issues here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, it should be okay if this method fires twice. I think this function is idempotent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, the only case I can think of right now where this would fire twice is if a task gets reconstructed and added to the table again. That should be pretty rare.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You were right :) I changed this to check that the status is already COMMITTED
if the call to set the status fails.
Test FAILed. |
Thanks, @robertnishihara, I think the code you added looks good. We're basically calling |
Test FAILed. |
Test PASSed. |
Test PASSed. |
Currently seeing
|
Test FAILed. |
Test PASSed. |
* master: Handle interrupts correctly for ASIO synchronous reads and writes. (ray-project#1929) [DataFrame] Adding read methods and tests (ray-project#1712) Allow task_table_update to fail when tasks are finished. (ray-project#1927) [rllib] Contribute DDPG to RLlib (ray-project#1877) [xray] Workers blocked in a `ray.get` release their resources (ray-project#1920) Raylet task dispatch and throttling worker startup (ray-project#1912) [DataFrame] Eval fix (ray-project#1903) [tune] Polishing docs (ray-project#1846) [tune] [rllib] Automatically determine RLlib resources and add queueing mechanism for autoscaling (ray-project#1848) Preemptively push local arguments for actor tasks (ray-project#1901) [tune] Allow fetching pinned objects from trainable functions (ray-project#1895) Multithreading refactor for ObjectManager. (ray-project#1911) Add slice functionality (ray-project#1832) [DataFrame] Pass read_csv kwargs to _infer_column (ray-project#1894) Addresses missed comments from multichunk object transfer PR. (ray-project#1908) Allow numpy arrays to be passed by value into tasks (and inlined in the task spec). (ray-project#1816) [xray] Lineage cache requests notifications from the GCS about remote tasks (ray-project#1834) Fix UI issue for non-json-serializable task arguments. (ray-project#1892) Remove unnecessary calls to .hex() for object IDs. (ray-project#1910) Allow multiple raylets to be started on a single machine. (ray-project#1904) # Conflicts: # python/ray/rllib/__init__.py # python/ray/rllib/dqn/dqn.py
What do these changes do?
This extends the lineage cache to request notifications about tasks that are scheduled to execute at a remote node. This allows each node to evict tasks from its local lineage cache once a notification about the remote task's commit is received.