Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[xray] Basic actor support #1835

Merged
merged 17 commits into from
Apr 6, 2018

Conversation

stephanie-wang
Copy link
Contributor

What do these changes do?

This adds basic support in the Raylet backend for actors, specifically single-caller actors with no ray.put, reconstruction, or checkpointing.

Backend changes include:

  • An actor table to publish notifications about new actor creations.
  • A new TaskSpecification constructor for actor tasks.
  • An ActorRegistration class to store information about an actor's execution state.
  • NodeManager support for handling actor creation as well as queuing, forwarding, and assigning actor tasks.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4679/
Test PASSed.

@@ -802,6 +802,8 @@ def remote(cls, *args, **kwargs):
actor_creation_resources,
actor_method_cpus,
ray.worker.global_worker)
# Increment the actor counter to account for the creation task.
actor_counter += 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Later on I think it would make sense to have state like actor_counter in a C++ worker class instead of in Python. Certainly not in this PR, this is just a comment.

@@ -154,6 +206,46 @@ void NodeManager::HeartbeatAdded(gcs::AsyncGcsClient *client, const ClientID &cl
heartbeat_resource_available);
}

void NodeManager::HandleActorCreation(const ActorID &actor_id,
const std::vector<ActorTableDataT> &data) {
RAY_LOG(DEBUG) << "Actor creation notification received: " << actor_id.hex();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just actor_id, no hex()

// executing a task. Clean up the assigned task's resources, return an
// error to the driver.
// RAY_CHECK(worker->GetAssignedTaskId().is_nil())
// << "Worker died while executing task: " << worker->GetAssignedTaskId().hex();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove .hex()

}
}

void NodeManager::FinishTask(const TaskID &task_id) {
void NodeManager::FinishAssignedTask(std::shared_ptr<Worker> worker) {
TaskID task_id = worker->GetAssignedTaskId();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not part of this PR really but can you remove the .hex() in the line below?

actor_notification->driver_id = JobID::nil().binary();
actor_notification->node_manager_id =
gcs_client_->client_table().GetLocalClientId().binary();
RAY_LOG(DEBUG) << "Publishing actor creation: " << actor_id.hex();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the .hex()

FunctionID function_id,
const std::vector<std::shared_ptr<TaskArgument>> &arguments,
int64_t num_returns,
const std::unordered_map<std::string, double> &required_resources);

TaskSpecification(UniqueID driver_id, TaskID parent_task_id, int64_t parent_counter,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all IDs should be const ref

@robertnishihara
Copy link
Collaborator

Still looking over things, overall the code looks way cleaner!

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4682/
Test FAILed.

auto &task_counters =
algorithm_state->local_actor_infos[actor_id].task_counters;
RAY_CHECK(task_counters[ActorHandleID::nil()] == 0);
task_counters[ActorHandleID::nil()]++;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain this change? I wouldn't have expected any changes to the old code path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because I now count the actor creation task as part of the actor counter on the Python side. Without this change, the old codepath will never run any tasks after the actor creation task since the counters don't match.

// executing a task. Clean up the assigned task's resources, return an
// error to the driver.
// RAY_CHECK(worker->GetAssignedTaskId().is_nil())
// << "Worker died while executing task: " << worker->GetAssignedTaskId();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What changed here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This never happened before, but now it happens if the actor goes out of scope and executes the __ray_terminate__ task, which exits mid-task. I thought it might be a good idea to try and handle this case better in the new version, so I held off on it for now.

// We do not have a registered location for the object, so either the
// actor has not yet been created or we missed the notification for the
// actor creation. Look up the actor's registered location in case we
// missed the creation notification.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would we miss the creation notification? E.g., if a node joins late or something? It seems like it will be very easy for this code path to not get tested. What do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or will this actually be pretty likely to get called on the first actor method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it should only happen in the case where a node joins after the actor has already been published. It won't get tested in a single-node setting, unfortunately. I think we should keep it, though, since it's necessary for things like autoscaling. I can add a note about testing it in a cluster setting.


// Try to get an idle worker that can execute this task.
std::shared_ptr<Worker> worker = worker_pool_.PopWorker(spec.ActorId());
if (worker == nullptr) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a thing that could happen a lot? E.g., would AssignTask be called if there are no workers available?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can happen pretty often, either at the beginning when workers might not have started/connected yet, or during execution when existing workers are blocked in a ray.get.

@@ -312,61 +458,133 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag
}
}

void NodeManager::AssignTask(const Task &task) {
void NodeManager::AssignTask(Task &task) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it an assumption going into AssignTask that task is currently unqueued?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I can add that to the documentation.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4684/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4685/
Test PASSed.


void HeartbeatAdded(gcs::AsyncGcsClient *client, const ClientID &id,
const HeartbeatTableDataT &data);

private:
// Handler for the addition of a new GCS client.
void ClientAdded(gcs::AsyncGcsClient *client, const UniqueID &id,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should document the arguments for these methods. In particular, I'm not sure what id is (part of the reason is that it is a UniqueID as opposed to something more specific).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, thanks. In this particular case, I don't think UniqueID should even be an argument here...I'll remove it.

@@ -56,6 +60,7 @@ std::vector<Task> SchedulingQueue::RemoveTasks(
std::vector<Task> removed_tasks;

// Try to find the tasks to remove from the waiting tasks.
removeTasksFromQueue(uncreated_actor_methods_, task_ids, removed_tasks);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit worried about RemoveTasks being expensive, but that's an issue for later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think eventually the plan is to make the implementation of RemoveTasks store a hashmap from TaskID to iterator so that we can remove it from the queue quickly.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4687/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4688/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4690/
Test PASSed.

@robertnishihara robertnishihara merged commit bf194db into ray-project:master Apr 6, 2018
@robertnishihara robertnishihara deleted the xray-actors branch April 6, 2018 07:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants