-
Notifications
You must be signed in to change notification settings - Fork 6.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[xray] Track ray.get
calls as task dependencies
#2362
[xray] Track ray.get
calls as task dependencies
#2362
Conversation
Test FAILed. |
Test FAILed. |
python/ray/worker.py
Outdated
self.local_scheduler_client.reconstruct_objects( | ||
ray_object_ids_to_fetch[i:( | ||
i + ray._config.worker_fetch_request_size())], | ||
False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did this batching work? Legacy Ray doesn't seem to support batching here
ray/src/local_scheduler/local_scheduler.cc
Line 1123 in e3534c4
RAY_CHECK(message->object_ids()->size() == 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, thanks, yeah it should only work for xray.
src/ray/raylet/scheduling_queue.h
Outdated
@@ -12,7 +12,8 @@ namespace ray { | |||
|
|||
namespace raylet { | |||
|
|||
enum TaskState { INIT, PLACEABLE, WAITING, READY, RUNNING }; | |||
enum class TaskState { INIT, PLACEABLE, WAITING, READY, RUNNING, BLOCKED }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you for switching to enum class :)
src/ray/raylet/node_manager.cc
Outdated
// Subscribe to the objects required by the ray.get. These objects will | ||
// be fetched and/or reconstructed as necessary, until the objects become | ||
// local or are unsubscribed. | ||
task_dependency_manager_.SubscribeDependencies(current_task_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does it mean if this is called and current_task_id
is nil?
Does that mean it's a driver? Won't that be an issue if there are multiple drivers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that means it's a driver.
Ah yes, I think you're right. We'll have to include the driver ID in the initial request to register a client with the raylet.
it++; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've had to do awkward things with the scheduling queues before, not sure what the best way to fix this is, but some kinds of queries are a bit hard to do, e.g.,
ray/src/ray/raylet/node_manager.cc
Lines 384 to 391 in 35f4a30
// TODO(rkn): This is too heavyweight just to get the task's driver ID. | |
auto const it = std::find_if( | |
running_tasks.begin(), running_tasks.end(), [task_id](const Task &task) { | |
return task.GetTaskSpecification().TaskId() == task_id; | |
}); | |
RAY_CHECK(running_tasks.size() != 0); | |
RAY_CHECK(it != running_tasks.end()); | |
JobID job_id = it->GetTaskSpecification().DriverId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, until we have a way to do fast lookup of a task's state, I think this is the best option for now...
For that snippet that you posted, maybe it makes sense to just remove the task from the queue? I don't know if we should be leaving that task in the running tasks queue anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh good point, I commented about this above.
src/ray/raylet/scheduling_queue.h
Outdated
@@ -70,9 +71,12 @@ class SchedulingQueue { | |||
/// Remove tasks from the task queue. | |||
/// | |||
/// \param tasks The set of task IDs to remove from the queue. The | |||
/// corresponding tasks must be contained in the queue. | |||
/// corresponding tasks must be contained in the queue. The IDs of | |||
/// removed tasks will be erased from the set. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unindent
src/ray/raylet/scheduling_queue.cc
Outdated
FilterStateFromQueue(blocked_tasks_, task_ids, filter_state); | ||
break; | ||
default: | ||
RAY_LOG(ERROR) << "Attempting to filter tasks on unrecognized state " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ERROR -> FATAL
src/ray/raylet/scheduling_queue.cc
Outdated
break; | ||
default: | ||
RAY_LOG(ERROR) << "Attempting to move tasks from unrecognized state " << src_state; | ||
RAY_LOG(ERROR) << "Attempting to move tasks from unrecognized state " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ERROR -> FATAL
Test FAILed. |
Test FAILed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks pretty good to me (though there are a few parts I haven't read through carefully yet).
cc @atumanov is also currently working on the scheduling queues code.
// The client is a driver. | ||
const std::shared_ptr<Worker> driver = worker_pool_.GetRegisteredDriver(client); | ||
RAY_CHECK(driver); | ||
auto driver_id = driver->GetAssignedTaskId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like we're conflating "driver ID" with "task ID of the driver task". Is that correct? Is there a reason for doing that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I felt like it made sense to treat them as the same thing, since it seems like they pretty much serve the same purpose. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree they serve the same purpose.
I'm fine with unifying the IDs into the same thing, though I find it a little confusing to refer to it in some places as the "driver ID" and in some places as a "task ID". I think your approach is ok.
However, note that in the current PR, if we actually use it as a "task ID" (e.g., use it to look up something in the task table), that won't work because we're actually just passing in the "worker ID" (which I think is randomly generated) when we connect to the local scheduler.
Do you anticipate ever using the driver ID as a task ID in the node manager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. I think that's actually fixed since the most recent commit since I changed the driver's frontend to pass in worker.current_task_id
instead of the worker ID. Do I understand that correctly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh ok, I think that's correct then. We confusingly set that ID for drivers in two places (both at the top and bottom of the snippet
Lines 2181 to 2221 in 05490b8
worker.current_task_id = ray.ObjectID(np.random.bytes(20)) | |
# Reset the state of the numpy random number generator. | |
np.random.set_state(numpy_state) | |
# Set other fields needed for computing task IDs. | |
worker.task_index = 0 | |
worker.put_index = 1 | |
# Create an entry for the driver task in the task table. This task is | |
# added immediately with status RUNNING. This allows us to push errors | |
# related to this driver task back to the driver. For example, if the | |
# driver creates an object that is later evicted, we should notify the | |
# user that we're unable to reconstruct the object, since we cannot | |
# rerun the driver. | |
nil_actor_counter = 0 | |
driver_task = ray.local_scheduler.Task( | |
worker.task_driver_id, ray.ObjectID(NIL_FUNCTION_ID), [], 0, | |
worker.current_task_id, worker.task_index, | |
ray.ObjectID(NIL_ACTOR_ID), ray.ObjectID(NIL_ACTOR_ID), | |
ray.ObjectID(NIL_ACTOR_ID), ray.ObjectID(NIL_ACTOR_ID), | |
nil_actor_counter, False, [], {"CPU": 0}, worker.use_raylet) | |
# Add the driver task to the task table. | |
if not worker.use_raylet: | |
global_state._execute_command( | |
driver_task.task_id(), "RAY.TASK_TABLE_ADD", | |
driver_task.task_id().id(), TASK_STATUS_RUNNING, | |
NIL_LOCAL_SCHEDULER_ID, | |
driver_task.execution_dependencies_string(), 0, | |
ray.local_scheduler.task_to_string(driver_task)) | |
else: | |
global_state._execute_command( | |
driver_task.task_id(), "RAY.TABLE_ADD", | |
ray.gcs_utils.TablePrefix.RAYLET_TASK, | |
ray.gcs_utils.TablePubsub.RAYLET_TASK, | |
driver_task.task_id().id(), | |
driver_task._serialized_raylet_task()) | |
# Set the driver's current task ID to the task ID assigned to the | |
# driver task. | |
worker.current_task_id = driver_task.task_id() |
src/ray/raylet/node_manager.cc
Outdated
// dependency manager. | ||
RAY_CHECK_OK(object_manager_.Pull(object_id)); | ||
if (message->fetch_only()) { | ||
RAY_CHECK_OK(object_manager_.Pull(object_id)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we want to Pull
in the non-fetch-only case as well?
I interpreted the fetch_only
flag as "if true, then just fetch, if false, then fetch + reconstruct".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah actually, the reason I did that was because the task dependency manager calls Pull
internally once it determines that a dependency is missing. A little unfortunate, but I guess this makes sense for now while we have the fetch_only
flag? I'll leave a comment about that.
src/ray/raylet/node_manager.cc
Outdated
RAY_CHECK_OK(object_manager_.Pull(object_id)); | ||
} else { | ||
// Add any missing objects to the list to subscribe to in the task | ||
// dependency manager. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this else
might be applied to the wrong if
statement.
// Clean up the assigned task's resources, push an error to the driver. | ||
// The client is a worker. Handle the case where the worker is killed | ||
// while executing a task. Clean up the assigned task's resources, push | ||
// an error to the driver. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can address this in a different PR, but I noticed that when the client dies, we should be removing the in-progress task from the queues, which we are not doing.
it++; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh good point, I commented about this above.
@@ -12,7 +12,8 @@ namespace ray { | |||
|
|||
namespace raylet { | |||
|
|||
enum TaskState { INIT, PLACEABLE, WAITING, READY, RUNNING }; | |||
enum class TaskState { INIT, PLACEABLE, WAITING, READY, RUNNING, BLOCKED, DRIVER }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you for making it enum class
@@ -67,12 +68,31 @@ class SchedulingQueue { | |||
/// at runtime. | |||
const std::list<Task> &GetBlockedTasks() const; | |||
|
|||
/// Get the set of driver task IDs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A general comment, SchedulingQueue
feels like a very "shallow" class. In the sense that it has a large API and each method has a fairly simple implementation. Also, we directly expose some of the internal data structures.
These are things that are pointed out as "red flags" in A Philosophy of Software Design. However, I don't have a concrete suggestion here.
cc @pcmoritz
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree :(
I was actually considering putting the set of driver tasks somewhere else, but I thought it made sense to keep all of the task status information in one place. We should probably rethink some of the methods on the SchedulingQueue
.
python/ray/worker.py
Outdated
@@ -1998,6 +2002,8 @@ def connect(info, | |||
# the correct driver. | |||
if mode != WORKER_MODE: | |||
worker.task_driver_id = ray.ObjectID(worker.worker_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure this "driver ID" is different from the task ID of the driver task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, I mixed this up with the worker.current_task_id
. Thanks!
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test PASSed. |
src/ray/raylet/node_manager.cc
Outdated
@@ -766,7 +766,10 @@ void NodeManager::HandleWorkerBlocked(std::shared_ptr<Worker> worker) { | |||
} | |||
|
|||
void NodeManager::HandleWorkerUnblocked(std::shared_ptr<Worker> worker) { | |||
RAY_CHECK(worker->IsBlocked()); | |||
RAY_CHECK(worker); | |||
if (!worker->IsBlocked()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why has this changed? Shouldn't this only be called when the worker is blocked?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In legacy ray, a worker always gets marked as blocked if it sends a ReconstructObjects
message, so once it sends the NotifyUnblocked
message, it's guaranteed to be blocked. The thing that I changed here compared to legacy ray is that the worker only gets marked as blocked if there is an object requested in the ReconstructObjects
message that is not local. That means that sometimes, an unblocked worker will send the NotifyUnblocked
message.
Test FAILed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @stephanie-wang, this looks good to me assuming the tests are passing.
@eric-jj @kingchin1218 do you understand the Java test failure in the attached log? I'm not really sure how to read what is going wrong. Log attached. EDIT: @stephanie-wang in this PR you change the signature of |
jenkins, retest this please |
@stephanie-wang I pushed a few changes showing which part of the Java code needs to be changed. However, the semantics of what I did don't quite match the semantics of the Python side. In particular, on the Python side, we pass in the "driver task ID" and on the Java side I'm not sure if there is a dummy task, so it's passing in something else. Please take a look. |
Test PASSed. |
@robertnishihara ah thanks. I took a brief look, but I'm not familiar enough with the code to see how we could do the same thing. @eric-jj @kingchin1218, do you guys know if a driver process gets assigned a dummy task representing the driver? And if so, is there a convenient way to pass it into the Java local scheduler client? |
Test PASSed. |
Test PASSed. |
Test PASSed. |
Test FAILed. |
Test PASSed. |
@stephanie-wang @robertnishihara I will change the notification-email-addr to my frequently-used one. |
namespace { | ||
|
||
// A helper function to remove a worker from a list. Returns true if the worker | ||
// was found and removed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems that you forgot to update this comment.
## What do these changes do? #2362 left a bug where it assumed that the driver task ID was nil. This fixes the bug to check the `SchedulingQueue` for any driver task IDs instead.
What do these changes do?
The task dependency manager is used to make any dependencies required by a task available, either by fetching it from a remote node, or eventually, by reconstructing it. This was previously used for scheduling submitted tasks with object arguments, but the logic needed for a task that calls
ray.get
is similar. This PR directs that logic through the task dependency manager as well. This also includes some cleanup in theSchedulingQueue
code.