Skip to content

[xray] Fix bug in updating actor execution dependencies #2064

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -616,10 +616,13 @@ void NodeManager::AssignTask(Task &task) {
// If the task was an actor task, then record this execution to guarantee
// consistency in the case of reconstruction.
if (spec.IsActorTask()) {
// Extend the frontier to include the executing task.
auto actor_entry = actor_registry_.find(spec.ActorId());
RAY_CHECK(actor_entry != actor_registry_.end());
actor_entry->second.ExtendFrontier(spec.ActorHandleId(), spec.ActorDummyObject());
auto execution_dependency = actor_entry->second.GetExecutionDependency();
// The execution dependency is initialized to the actor creation task's
// return value, and is subsequently updated to the assigned tasks'
// return values, so it should never be nil.
RAY_CHECK(!execution_dependency.is_nil());
// Update the task's execution dependencies to reflect the actual
// execution order, to support deterministic reconstruction.
// NOTE(swang): The update of an actor task's execution dependencies is
Expand All @@ -628,8 +631,9 @@ void NodeManager::AssignTask(Task &task) {
// guarantee deterministic reconstruction ordering for tasks whose
// updates are reflected in the task table.
TaskExecutionSpecification &mutable_spec = task.GetTaskExecutionSpec();
mutable_spec.SetExecutionDependencies(
{actor_entry->second.GetExecutionDependency()});
mutable_spec.SetExecutionDependencies({execution_dependency});
// Extend the frontier to include the executing task.
actor_entry->second.ExtendFrontier(spec.ActorHandleId(), spec.ActorDummyObject());
}
// We started running the task, so the task is ready to write to GCS.
lineage_cache_.AddReadyTask(task);
Expand Down Expand Up @@ -662,7 +666,7 @@ void NodeManager::FinishAssignedTask(Worker &worker) {
auto actor_notification = std::make_shared<ActorTableDataT>();
actor_notification->actor_id = actor_id.binary();
actor_notification->actor_creation_dummy_object_id =
task.GetTaskSpecification().ActorCreationDummyObjectId().binary();
task.GetTaskSpecification().ActorDummyObject().binary();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't this correct before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ActorCreationDummyObjectId is only set for actor methods, not for the actor creation task. :(

// TODO(swang): The driver ID.
actor_notification->driver_id = JobID::nil().binary();
actor_notification->node_manager_id =
Expand Down
1 change: 1 addition & 0 deletions src/ray/raylet/task_execution_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ std::vector<ObjectID> TaskExecutionSpecification::ExecutionDependencies() const

void TaskExecutionSpecification::SetExecutionDependencies(
const std::vector<ObjectID> &dependencies) {
execution_spec_.dependencies.clear();
for (const auto &dependency : dependencies) {
execution_spec_.dependencies.push_back(dependency.binary());
}
Expand Down