-
Notifications
You must be signed in to change notification settings - Fork 6.9k
[xray] Basic actor support #1835
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[xray] Basic actor support #1835
Conversation
|
Test PASSed. |
| actor_method_cpus, | ||
| ray.worker.global_worker) | ||
| # Increment the actor counter to account for the creation task. | ||
| actor_counter += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Later on I think it would make sense to have state like actor_counter in a C++ worker class instead of in Python. Certainly not in this PR, this is just a comment.
src/ray/raylet/node_manager.cc
Outdated
|
|
||
| void NodeManager::HandleActorCreation(const ActorID &actor_id, | ||
| const std::vector<ActorTableDataT> &data) { | ||
| RAY_LOG(DEBUG) << "Actor creation notification received: " << actor_id.hex(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just actor_id, no hex()
src/ray/raylet/node_manager.cc
Outdated
| // executing a task. Clean up the assigned task's resources, return an | ||
| // error to the driver. | ||
| // RAY_CHECK(worker->GetAssignedTaskId().is_nil()) | ||
| // << "Worker died while executing task: " << worker->GetAssignedTaskId().hex(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove .hex()
|
|
||
| void NodeManager::FinishTask(const TaskID &task_id) { | ||
| void NodeManager::FinishAssignedTask(std::shared_ptr<Worker> worker) { | ||
| TaskID task_id = worker->GetAssignedTaskId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not part of this PR really but can you remove the .hex() in the line below?
src/ray/raylet/node_manager.cc
Outdated
| actor_notification->driver_id = JobID::nil().binary(); | ||
| actor_notification->node_manager_id = | ||
| gcs_client_->client_table().GetLocalClientId().binary(); | ||
| RAY_LOG(DEBUG) << "Publishing actor creation: " << actor_id.hex(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the .hex()
src/ray/raylet/task_spec.h
Outdated
| int64_t num_returns, | ||
| const std::unordered_map<std::string, double> &required_resources); | ||
|
|
||
| TaskSpecification(UniqueID driver_id, TaskID parent_task_id, int64_t parent_counter, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all IDs should be const ref
|
Still looking over things, overall the code looks way cleaner! |
|
Test FAILed. |
| auto &task_counters = | ||
| algorithm_state->local_actor_infos[actor_id].task_counters; | ||
| RAY_CHECK(task_counters[ActorHandleID::nil()] == 0); | ||
| task_counters[ActorHandleID::nil()]++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain this change? I wouldn't have expected any changes to the old code path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's because I now count the actor creation task as part of the actor counter on the Python side. Without this change, the old codepath will never run any tasks after the actor creation task since the counters don't match.
| // executing a task. Clean up the assigned task's resources, return an | ||
| // error to the driver. | ||
| // RAY_CHECK(worker->GetAssignedTaskId().is_nil()) | ||
| // << "Worker died while executing task: " << worker->GetAssignedTaskId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What changed here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This never happened before, but now it happens if the actor goes out of scope and executes the __ray_terminate__ task, which exits mid-task. I thought it might be a good idea to try and handle this case better in the new version, so I held off on it for now.
src/ray/raylet/node_manager.cc
Outdated
| // We do not have a registered location for the object, so either the | ||
| // actor has not yet been created or we missed the notification for the | ||
| // actor creation. Look up the actor's registered location in case we | ||
| // missed the creation notification. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When would we miss the creation notification? E.g., if a node joins late or something? It seems like it will be very easy for this code path to not get tested. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or will this actually be pretty likely to get called on the first actor method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think it should only happen in the case where a node joins after the actor has already been published. It won't get tested in a single-node setting, unfortunately. I think we should keep it, though, since it's necessary for things like autoscaling. I can add a note about testing it in a cluster setting.
|
|
||
| // Try to get an idle worker that can execute this task. | ||
| std::shared_ptr<Worker> worker = worker_pool_.PopWorker(spec.ActorId()); | ||
| if (worker == nullptr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a thing that could happen a lot? E.g., would AssignTask be called if there are no workers available?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can happen pretty often, either at the beginning when workers might not have started/connected yet, or during execution when existing workers are blocked in a ray.get.
| } | ||
|
|
||
| void NodeManager::AssignTask(const Task &task) { | ||
| void NodeManager::AssignTask(Task &task) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it an assumption going into AssignTask that task is currently unqueued?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I can add that to the documentation.
|
Test PASSed. |
|
Test PASSed. |
src/ray/raylet/node_manager.h
Outdated
|
|
||
| private: | ||
| // Handler for the addition of a new GCS client. | ||
| void ClientAdded(gcs::AsyncGcsClient *client, const UniqueID &id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should document the arguments for these methods. In particular, I'm not sure what id is (part of the reason is that it is a UniqueID as opposed to something more specific).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, thanks. In this particular case, I don't think UniqueID should even be an argument here...I'll remove it.
| std::vector<Task> removed_tasks; | ||
|
|
||
| // Try to find the tasks to remove from the waiting tasks. | ||
| removeTasksFromQueue(uncreated_actor_methods_, task_ids, removed_tasks); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit worried about RemoveTasks being expensive, but that's an issue for later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think eventually the plan is to make the implementation of RemoveTasks store a hashmap from TaskID to iterator so that we can remove it from the queue quickly.
|
Test PASSed. |
|
Test PASSed. |
|
Test PASSed. |
What do these changes do?
This adds basic support in the Raylet backend for actors, specifically single-caller actors with no
ray.put, reconstruction, or checkpointing.Backend changes include:
TaskSpecificationconstructor for actor tasks.ActorRegistrationclass to store information about an actor's execution state.NodeManagersupport for handling actor creation as well as queuing, forwarding, and assigning actor tasks.