Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[xray] Track ray.get calls as task dependencies #2362

Merged

Conversation

stephanie-wang
Copy link
Contributor

What do these changes do?

The task dependency manager is used to make any dependencies required by a task available, either by fetching it from a remote node, or eventually, by reconstructing it. This was previously used for scheduling submitted tasks with object arguments, but the logic needed for a task that calls ray.get is similar. This PR directs that logic through the task dependency manager as well. This also includes some cleanup in the SchedulingQueue code.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6518/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6519/
Test FAILed.

self.local_scheduler_client.reconstruct_objects(
ray_object_ids_to_fetch[i:(
i + ray._config.worker_fetch_request_size())],
False)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this batching work? Legacy Ray doesn't seem to support batching here

RAY_CHECK(message->object_ids()->size() == 1);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, thanks, yeah it should only work for xray.

@@ -12,7 +12,8 @@ namespace ray {

namespace raylet {

enum TaskState { INIT, PLACEABLE, WAITING, READY, RUNNING };
enum class TaskState { INIT, PLACEABLE, WAITING, READY, RUNNING, BLOCKED };
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for switching to enum class :)

// Subscribe to the objects required by the ray.get. These objects will
// be fetched and/or reconstructed as necessary, until the objects become
// local or are unsubscribed.
task_dependency_manager_.SubscribeDependencies(current_task_id,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does it mean if this is called and current_task_id is nil?

Does that mean it's a driver? Won't that be an issue if there are multiple drivers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that means it's a driver.

Ah yes, I think you're right. We'll have to include the driver ID in the initial request to register a client with the raylet.

it++;
}
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've had to do awkward things with the scheduling queues before, not sure what the best way to fix this is, but some kinds of queries are a bit hard to do, e.g.,

// TODO(rkn): This is too heavyweight just to get the task's driver ID.
auto const it = std::find_if(
running_tasks.begin(), running_tasks.end(), [task_id](const Task &task) {
return task.GetTaskSpecification().TaskId() == task_id;
});
RAY_CHECK(running_tasks.size() != 0);
RAY_CHECK(it != running_tasks.end());
JobID job_id = it->GetTaskSpecification().DriverId();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, until we have a way to do fast lookup of a task's state, I think this is the best option for now...

For that snippet that you posted, maybe it makes sense to just remove the task from the queue? I don't know if we should be leaving that task in the running tasks queue anyway.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh good point, I commented about this above.

@@ -70,9 +71,12 @@ class SchedulingQueue {
/// Remove tasks from the task queue.
///
/// \param tasks The set of task IDs to remove from the queue. The
/// corresponding tasks must be contained in the queue.
/// corresponding tasks must be contained in the queue. The IDs of
/// removed tasks will be erased from the set.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unindent

FilterStateFromQueue(blocked_tasks_, task_ids, filter_state);
break;
default:
RAY_LOG(ERROR) << "Attempting to filter tasks on unrecognized state "
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ERROR -> FATAL

break;
default:
RAY_LOG(ERROR) << "Attempting to move tasks from unrecognized state " << src_state;
RAY_LOG(ERROR) << "Attempting to move tasks from unrecognized state "
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ERROR -> FATAL

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6749/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6751/
Test FAILed.

Copy link
Collaborator

@robertnishihara robertnishihara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks pretty good to me (though there are a few parts I haven't read through carefully yet).

cc @atumanov is also currently working on the scheduling queues code.

// The client is a driver.
const std::shared_ptr<Worker> driver = worker_pool_.GetRegisteredDriver(client);
RAY_CHECK(driver);
auto driver_id = driver->GetAssignedTaskId();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like we're conflating "driver ID" with "task ID of the driver task". Is that correct? Is there a reason for doing that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I felt like it made sense to treat them as the same thing, since it seems like they pretty much serve the same purpose. What do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree they serve the same purpose.

I'm fine with unifying the IDs into the same thing, though I find it a little confusing to refer to it in some places as the "driver ID" and in some places as a "task ID". I think your approach is ok.

However, note that in the current PR, if we actually use it as a "task ID" (e.g., use it to look up something in the task table), that won't work because we're actually just passing in the "worker ID" (which I think is randomly generated) when we connect to the local scheduler.

Do you anticipate ever using the driver ID as a task ID in the node manager?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. I think that's actually fixed since the most recent commit since I changed the driver's frontend to pass in worker.current_task_id instead of the worker ID. Do I understand that correctly?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh ok, I think that's correct then. We confusingly set that ID for drivers in two places (both at the top and bottom of the snippet

ray/python/ray/worker.py

Lines 2181 to 2221 in 05490b8

worker.current_task_id = ray.ObjectID(np.random.bytes(20))
# Reset the state of the numpy random number generator.
np.random.set_state(numpy_state)
# Set other fields needed for computing task IDs.
worker.task_index = 0
worker.put_index = 1
# Create an entry for the driver task in the task table. This task is
# added immediately with status RUNNING. This allows us to push errors
# related to this driver task back to the driver. For example, if the
# driver creates an object that is later evicted, we should notify the
# user that we're unable to reconstruct the object, since we cannot
# rerun the driver.
nil_actor_counter = 0
driver_task = ray.local_scheduler.Task(
worker.task_driver_id, ray.ObjectID(NIL_FUNCTION_ID), [], 0,
worker.current_task_id, worker.task_index,
ray.ObjectID(NIL_ACTOR_ID), ray.ObjectID(NIL_ACTOR_ID),
ray.ObjectID(NIL_ACTOR_ID), ray.ObjectID(NIL_ACTOR_ID),
nil_actor_counter, False, [], {"CPU": 0}, worker.use_raylet)
# Add the driver task to the task table.
if not worker.use_raylet:
global_state._execute_command(
driver_task.task_id(), "RAY.TASK_TABLE_ADD",
driver_task.task_id().id(), TASK_STATUS_RUNNING,
NIL_LOCAL_SCHEDULER_ID,
driver_task.execution_dependencies_string(), 0,
ray.local_scheduler.task_to_string(driver_task))
else:
global_state._execute_command(
driver_task.task_id(), "RAY.TABLE_ADD",
ray.gcs_utils.TablePrefix.RAYLET_TASK,
ray.gcs_utils.TablePubsub.RAYLET_TASK,
driver_task.task_id().id(),
driver_task._serialized_raylet_task())
# Set the driver's current task ID to the task ID assigned to the
# driver task.
worker.current_task_id = driver_task.task_id()

// dependency manager.
RAY_CHECK_OK(object_manager_.Pull(object_id));
if (message->fetch_only()) {
RAY_CHECK_OK(object_manager_.Pull(object_id));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to Pull in the non-fetch-only case as well?

I interpreted the fetch_only flag as "if true, then just fetch, if false, then fetch + reconstruct".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah actually, the reason I did that was because the task dependency manager calls Pull internally once it determines that a dependency is missing. A little unfortunate, but I guess this makes sense for now while we have the fetch_only flag? I'll leave a comment about that.

RAY_CHECK_OK(object_manager_.Pull(object_id));
} else {
// Add any missing objects to the list to subscribe to in the task
// dependency manager.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this else might be applied to the wrong if statement.

// Clean up the assigned task's resources, push an error to the driver.
// The client is a worker. Handle the case where the worker is killed
// while executing a task. Clean up the assigned task's resources, push
// an error to the driver.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can address this in a different PR, but I noticed that when the client dies, we should be removing the in-progress task from the queues, which we are not doing.

it++;
}
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh good point, I commented about this above.

@@ -12,7 +12,8 @@ namespace ray {

namespace raylet {

enum TaskState { INIT, PLACEABLE, WAITING, READY, RUNNING };
enum class TaskState { INIT, PLACEABLE, WAITING, READY, RUNNING, BLOCKED, DRIVER };
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for making it enum class

@@ -67,12 +68,31 @@ class SchedulingQueue {
/// at runtime.
const std::list<Task> &GetBlockedTasks() const;

/// Get the set of driver task IDs.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A general comment, SchedulingQueue feels like a very "shallow" class. In the sense that it has a large API and each method has a fairly simple implementation. Also, we directly expose some of the internal data structures.

These are things that are pointed out as "red flags" in A Philosophy of Software Design. However, I don't have a concrete suggestion here.

cc @pcmoritz

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree :(

I was actually considering putting the set of driver tasks somewhere else, but I thought it made sense to keep all of the task status information in one place. We should probably rethink some of the methods on the SchedulingQueue.

@@ -1998,6 +2002,8 @@ def connect(info,
# the correct driver.
if mode != WORKER_MODE:
worker.task_driver_id = ray.ObjectID(worker.worker_id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure this "driver ID" is different from the task ID of the driver task.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I mixed this up with the worker.current_task_id. Thanks!

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6752/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6753/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6780/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6804/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6812/
Test PASSed.

@@ -766,7 +766,10 @@ void NodeManager::HandleWorkerBlocked(std::shared_ptr<Worker> worker) {
}

void NodeManager::HandleWorkerUnblocked(std::shared_ptr<Worker> worker) {
RAY_CHECK(worker->IsBlocked());
RAY_CHECK(worker);
if (!worker->IsBlocked()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why has this changed? Shouldn't this only be called when the worker is blocked?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In legacy ray, a worker always gets marked as blocked if it sends a ReconstructObjects message, so once it sends the NotifyUnblocked message, it's guaranteed to be blocked. The thing that I changed here compared to legacy ray is that the worker only gets marked as blocked if there is an object requested in the ReconstructObjects message that is not local. That means that sometimes, an unblocked worker will send the NotifyUnblocked message.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6837/
Test FAILed.

Copy link
Collaborator

@robertnishihara robertnishihara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @stephanie-wang, this looks good to me assuming the tests are passing.

@robertnishihara
Copy link
Collaborator

robertnishihara commented Jul 25, 2018

@eric-jj @kingchin1218 do you understand the Java test failure in the attached log? I'm not really sure how to read what is going wrong.

Log attached.
travis_log_java_error.txt

EDIT: @stephanie-wang in this PR you change the signature of LocalSchedulerConnection_init which is called from the Java code, so I believe that needs to be updated.

@robertnishihara
Copy link
Collaborator

jenkins, retest this please

@robertnishihara
Copy link
Collaborator

@stephanie-wang I pushed a few changes showing which part of the Java code needs to be changed. However, the semantics of what I did don't quite match the semantics of the Python side. In particular, on the Python side, we pass in the "driver task ID" and on the Java side I'm not sure if there is a dummy task, so it's passing in something else. Please take a look.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6856/
Test PASSed.

@stephanie-wang
Copy link
Contributor Author

@robertnishihara ah thanks. I took a brief look, but I'm not familiar enough with the code to see how we could do the same thing. @eric-jj @kingchin1218, do you guys know if a driver process gets assigned a dummy task representing the driver? And if so, is there a convenient way to pass it into the Java local scheduler client?

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6859/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6874/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6886/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6891/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/6908/
Test PASSed.

@robertnishihara robertnishihara merged commit 6675361 into ray-project:master Jul 27, 2018
@robertnishihara robertnishihara deleted the track-get-dependencies branch July 27, 2018 18:59
@jovany-wang
Copy link
Contributor

@stephanie-wang @robertnishihara
Sorry guns, I have missed this messages :(
It looks like that you guns have already solved this issue.

I will change the notification-email-addr to my frequently-used one.

namespace {

// A helper function to remove a worker from a list. Returns true if the worker
// was found and removed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems that you forgot to update this comment.

robertnishihara pushed a commit that referenced this pull request Aug 23, 2018
## What do these changes do?

#2362 left a bug where it assumed that the driver task ID was nil. This fixes the bug to check the `SchedulingQueue` for any driver task IDs instead.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants