-
Notifications
You must be signed in to change notification settings - Fork 6.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Track newly created actor's parent actor #5098
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
src/ray/raylet/node_manager.cc
Outdated
auto parent_task_id = task.GetTaskSpecification().ParentTaskId(); | ||
RAY_CHECK_OK(gcs_client_->raylet_task_table().Lookup( | ||
JobID::Nil(), parent_task_id, | ||
//success_callback |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our convention is to use /success_callback=/ for comments that describe a kwarg.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
src/ray/raylet/node_manager.cc
Outdated
// get the parent actor id. | ||
auto message = flatbuffers::GetRoot<protocol::Task>(parent_task_data.task().data()); | ||
Task parent_task(*message); | ||
new_actor_data.set_parent_actor_id(parent_task.GetTaskSpecification().ActorId().Binary()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit ugly, but there are actually two cases where an actor might start another actor, either in a task for the parent actor (which you have here), or during the parent actor's creation task. You can check for the latter with parent_task.GetTaskSpecification().IsActorCreationTask()
, and the actor ID will be parent_task.GetTaskSpecification().ActorCreationId()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
src/ray/raylet/node_manager.cc
Outdated
if(!resumed_from_checkpoint){ | ||
// The actor was not resumed from a checkpoint. We extend the actor's | ||
// frontier as usual since there is no frontier to restore. | ||
ExtendActorFrontier(task, actor_id, actor_handle_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this if statement into FinishAssignedActorTaskHelper
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
src/ray/raylet/node_manager.cc
Outdated
} | ||
|
||
if (!resumed_from_checkpoint) { | ||
else if (!resumed_from_checkpoint) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like it could just be an else
(you would never have resumed_from_checkpoint
for regular actor tasks). Also, please move to the previous line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The if condition existed before, hence I stayed with it. Can remove if it's unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it used to be necessary because we also wanted to call ExtendActorFrontier
for actor creation tasks that didn't resume from a checkpoint, but now we're calling that in the callback for the actor table lookup. So we can just make this an else
now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
src/ray/raylet/node_manager.cc
Outdated
} | ||
} | ||
|
||
void NodeManager::ExtendActorFrontier(const Task &task, ActorID &actor_id, ActorHandleID &actor_handle_id){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like it only needs to take in task.GetTaskSpecification().ActorDummyObject()
, not the whole task
. This would allow us to avoid capturing task
in FinishAssignedActorTask
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense. However, we would still need to capture one of TaskSpecification and ActorTableData as CreateActorTableDataFromCreationTask would require TaskSpecification if we move the CreateActorTableDataFromCreationTask call to the helper function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Takes dummy_object now instead of the entire task.
src/ray/raylet/node_manager.cc
Outdated
RAY_CHECK_OK(gcs_client_->raylet_task_table().Lookup( | ||
JobID::Nil(), parent_task_id, | ||
//success_callback | ||
[this, task, actor_id, actor_handle_id, new_actor_data, resumed_from_checkpoint] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid capturing new_actor_data
, we can just call CreateActorTableDataFromCreationTask
inside the callback, and maybe pass in the parent actor ID to that helper function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Skipped this for now as it would require capturing TaskSpecification for CreateActorTableDataFromCreationTask.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I would prefer capturing the TaskSpecification
over new_actor_data
so we can avoid marking the lambda as mutable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
src/ray/raylet/node_manager.cc
Outdated
HandleObjectLocal(dummy_object); | ||
} | ||
|
||
void NodeManager::FinishAssignedActorTaskHelper(const ActorID& actor_id, const ActorTableData new_actor_data, bool resumed_from_checkpoint) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would name this something more descriptive, like FinishAssignedActorCreationTask
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
src/ray/raylet/node_manager.h
Outdated
/// \return Void. | ||
void FinishAssignedActorTaskHelper(const ActorID& actor_id, | ||
const ActorTableData new_actor_data, bool resumed_from_checkpoint); | ||
/// Extend actor frontier when an actor task or actor creation task arrives. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Extend actor frontier when an actor task or actor creation task arrives. | |
/// Extend actor frontier after an actor task or actor creation task executes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Co-Authored-By: Stephanie Wang <swang@cs.berkeley.edu>
Test FAILed. |
Co-Authored-By: Stephanie Wang <swang@cs.berkeley.edu>
Test FAILed. |
Test FAILed. |
Test FAILed. |
src/ray/raylet/node_manager.cc
Outdated
@@ -1917,45 +1917,48 @@ ActorTableData NodeManager::CreateActorTableDataFromCreationTask(const Task &tas | |||
void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) { | |||
ActorID actor_id; | |||
ActorHandleID actor_handle_id; | |||
TaskSpecification task_spec = task.GetTaskSpecification(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const TaskSpecification &task_spec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
src/ray/raylet/node_manager.cc
Outdated
} | ||
|
||
if (!resumed_from_checkpoint) { | ||
else if (!resumed_from_checkpoint) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it used to be necessary because we also wanted to call ExtendActorFrontier
for actor creation tasks that didn't resume from a checkpoint, but now we're calling that in the callback for the actor table lookup. So we can just make this an else
now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, a couple small comments, but otherwise looks good!
src/ray/raylet/node_manager.cc
Outdated
(ray::gcs::AsyncGcsClient *client, const TaskID &parent_task_id) mutable { | ||
// The parent task was not in the GCS task table. It must therefore be in the | ||
// lineage cache. | ||
RAY_CHECK(lineage_cache_.ContainsTask(parent_task_id)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, one of the travis tests for the new C++ worker is failing on this check, I think because it does not add the driver task to the GCS. Let's make this a non-fatal check for now. If the task is not in the GCS or the lineage cache, we can just set the parent actor ID to nil (and leave a note/todo explaining this case).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Test FAILed. |
Test FAILed. |
Test PASSed. |
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test PASSed. |
Test FAILed. |
Change-Id: I9c1a65134dc23a2d175047e96b86ab9d9cf61971
Test FAILed. |
Test PASSed. |
Change-Id: I1def06218130b399d2527b999258aecf9abb98dd
Test FAILed. |
Test PASSed. |
@vipulharsh @stephanie-wang CI seems broken after merging this PR. Can you take a look? |
What do these changes do?
Related issue number
Linter
scripts/format.sh
to lint the changes in this PR.