-
Notifications
You must be signed in to change notification settings - Fork 6.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[xray] Basic actor support #1835
[xray] Basic actor support #1835
Conversation
Test PASSed. |
@@ -802,6 +802,8 @@ def remote(cls, *args, **kwargs): | |||
actor_creation_resources, | |||
actor_method_cpus, | |||
ray.worker.global_worker) | |||
# Increment the actor counter to account for the creation task. | |||
actor_counter += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Later on I think it would make sense to have state like actor_counter
in a C++ worker class instead of in Python. Certainly not in this PR, this is just a comment.
src/ray/raylet/node_manager.cc
Outdated
@@ -154,6 +206,46 @@ void NodeManager::HeartbeatAdded(gcs::AsyncGcsClient *client, const ClientID &cl | |||
heartbeat_resource_available); | |||
} | |||
|
|||
void NodeManager::HandleActorCreation(const ActorID &actor_id, | |||
const std::vector<ActorTableDataT> &data) { | |||
RAY_LOG(DEBUG) << "Actor creation notification received: " << actor_id.hex(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just actor_id
, no hex()
src/ray/raylet/node_manager.cc
Outdated
// executing a task. Clean up the assigned task's resources, return an | ||
// error to the driver. | ||
// RAY_CHECK(worker->GetAssignedTaskId().is_nil()) | ||
// << "Worker died while executing task: " << worker->GetAssignedTaskId().hex(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove .hex()
src/ray/raylet/node_manager.cc
Outdated
} | ||
} | ||
|
||
void NodeManager::FinishTask(const TaskID &task_id) { | ||
void NodeManager::FinishAssignedTask(std::shared_ptr<Worker> worker) { | ||
TaskID task_id = worker->GetAssignedTaskId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not part of this PR really but can you remove the .hex()
in the line below?
src/ray/raylet/node_manager.cc
Outdated
actor_notification->driver_id = JobID::nil().binary(); | ||
actor_notification->node_manager_id = | ||
gcs_client_->client_table().GetLocalClientId().binary(); | ||
RAY_LOG(DEBUG) << "Publishing actor creation: " << actor_id.hex(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the .hex()
src/ray/raylet/task_spec.h
Outdated
FunctionID function_id, | ||
const std::vector<std::shared_ptr<TaskArgument>> &arguments, | ||
int64_t num_returns, | ||
const std::unordered_map<std::string, double> &required_resources); | ||
|
||
TaskSpecification(UniqueID driver_id, TaskID parent_task_id, int64_t parent_counter, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all IDs should be const ref
Still looking over things, overall the code looks way cleaner! |
Test FAILed. |
auto &task_counters = | ||
algorithm_state->local_actor_infos[actor_id].task_counters; | ||
RAY_CHECK(task_counters[ActorHandleID::nil()] == 0); | ||
task_counters[ActorHandleID::nil()]++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain this change? I wouldn't have expected any changes to the old code path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's because I now count the actor creation task as part of the actor counter on the Python side. Without this change, the old codepath will never run any tasks after the actor creation task since the counters don't match.
// executing a task. Clean up the assigned task's resources, return an | ||
// error to the driver. | ||
// RAY_CHECK(worker->GetAssignedTaskId().is_nil()) | ||
// << "Worker died while executing task: " << worker->GetAssignedTaskId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What changed here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This never happened before, but now it happens if the actor goes out of scope and executes the __ray_terminate__
task, which exits mid-task. I thought it might be a good idea to try and handle this case better in the new version, so I held off on it for now.
src/ray/raylet/node_manager.cc
Outdated
// We do not have a registered location for the object, so either the | ||
// actor has not yet been created or we missed the notification for the | ||
// actor creation. Look up the actor's registered location in case we | ||
// missed the creation notification. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When would we miss the creation notification? E.g., if a node joins late or something? It seems like it will be very easy for this code path to not get tested. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or will this actually be pretty likely to get called on the first actor method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think it should only happen in the case where a node joins after the actor has already been published. It won't get tested in a single-node setting, unfortunately. I think we should keep it, though, since it's necessary for things like autoscaling. I can add a note about testing it in a cluster setting.
|
||
// Try to get an idle worker that can execute this task. | ||
std::shared_ptr<Worker> worker = worker_pool_.PopWorker(spec.ActorId()); | ||
if (worker == nullptr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a thing that could happen a lot? E.g., would AssignTask
be called if there are no workers available?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can happen pretty often, either at the beginning when workers might not have started/connected yet, or during execution when existing workers are blocked in a ray.get
.
@@ -312,61 +458,133 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag | |||
} | |||
} | |||
|
|||
void NodeManager::AssignTask(const Task &task) { | |||
void NodeManager::AssignTask(Task &task) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it an assumption going into AssignTask
that task
is currently unqueued?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I can add that to the documentation.
Test PASSed. |
Test PASSed. |
src/ray/raylet/node_manager.h
Outdated
|
||
void HeartbeatAdded(gcs::AsyncGcsClient *client, const ClientID &id, | ||
const HeartbeatTableDataT &data); | ||
|
||
private: | ||
// Handler for the addition of a new GCS client. | ||
void ClientAdded(gcs::AsyncGcsClient *client, const UniqueID &id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should document the arguments for these methods. In particular, I'm not sure what id
is (part of the reason is that it is a UniqueID
as opposed to something more specific).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, thanks. In this particular case, I don't think UniqueID
should even be an argument here...I'll remove it.
@@ -56,6 +60,7 @@ std::vector<Task> SchedulingQueue::RemoveTasks( | |||
std::vector<Task> removed_tasks; | |||
|
|||
// Try to find the tasks to remove from the waiting tasks. | |||
removeTasksFromQueue(uncreated_actor_methods_, task_ids, removed_tasks); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit worried about RemoveTasks
being expensive, but that's an issue for later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think eventually the plan is to make the implementation of RemoveTasks
store a hashmap from TaskID
to iterator so that we can remove it from the queue quickly.
Test PASSed. |
Test PASSed. |
Test PASSed. |
What do these changes do?
This adds basic support in the Raylet backend for actors, specifically single-caller actors with no
ray.put
, reconstruction, or checkpointing.Backend changes include:
TaskSpecification
constructor for actor tasks.ActorRegistration
class to store information about an actor's execution state.NodeManager
support for handling actor creation as well as queuing, forwarding, and assigning actor tasks.