Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Decrement lineage ref count of an actor when the actor task return object reference is deleted #46230

Merged
merged 4 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion dashboard/modules/actor/tests/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ def actor_table_data_to_dict(message):
"serializedRuntimeEnv",
"rayNamespace",
"functionDescriptor",
"actorCreationDummyObjectId",
}
else:
raise Exception("Unknown state: {}".format(actor_data_dict["state"]))
Expand Down
5 changes: 0 additions & 5 deletions src/ray/common/task/task_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -446,11 +446,6 @@ ObjectID TaskSpecification::ActorCreationDummyObjectId() const {
message_->actor_task_spec().actor_creation_dummy_object_id());
}

ObjectID TaskSpecification::ActorDummyObject() const {
RAY_CHECK(IsActorTask() || IsActorCreationTask());
return ReturnId(NumReturns() - 1);
}

int TaskSpecification::MaxActorConcurrency() const {
RAY_CHECK(IsActorCreationTask());
return message_->actor_creation_task_spec().max_concurrency();
Expand Down
2 changes: 0 additions & 2 deletions src/ray/common/task/task_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,6 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {

bool IsDetachedActor() const;

ObjectID ActorDummyObject() const;

std::string DebugString() const;

// A one-line summary of the runtime environment for the task. May contain sensitive
Expand Down
3 changes: 1 addition & 2 deletions src/ray/core_worker/actor_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,7 @@ void ActorHandle::SetActorTaskSpec(
task_counter_++);
}

void ActorHandle::SetResubmittedActorTaskSpec(TaskSpecification &spec,
const ObjectID new_cursor) {
void ActorHandle::SetResubmittedActorTaskSpec(TaskSpecification &spec) {
absl::MutexLock guard(&mutex_);
auto mutable_spec = spec.GetMutableMessage().mutable_actor_task_spec();
mutable_spec->set_actor_counter(task_counter_++);
Expand Down
4 changes: 1 addition & 3 deletions src/ray/core_worker/actor_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,7 @@ class ActorHandle {
///
/// \param[in] spec An existing task spec that has executed on the actor
/// before.
/// \param[in] new_cursor Actor dummy object. This is legacy code needed for
/// raylet-based actor restart.
void SetResubmittedActorTaskSpec(TaskSpecification &spec, const ObjectID new_cursor);
void SetResubmittedActorTaskSpec(TaskSpecification &spec);

void Serialize(std::string *output);

Expand Down
4 changes: 2 additions & 2 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
if (spec.IsActorTask()) {
if (update_seqno) {
auto actor_handle = actor_manager_->GetActorHandle(spec.ActorId());
actor_handle->SetResubmittedActorTaskSpec(spec, spec.ActorDummyObject());
actor_handle->SetResubmittedActorTaskSpec(spec);
}
RAY_CHECK_OK(direct_actor_submitter_->SubmitTask(spec));
} else {
Expand Down Expand Up @@ -1075,7 +1075,7 @@ void CoreWorker::InternalHeartbeat() {
if (spec.IsActorTask()) {
if (task_to_retry.update_seqno) {
auto actor_handle = actor_manager_->GetActorHandle(spec.ActorId());
actor_handle->SetResubmittedActorTaskSpec(spec, spec.ActorDummyObject());
actor_handle->SetResubmittedActorTaskSpec(spec);
}
RAY_CHECK_OK(direct_actor_submitter_->SubmitTask(spec));
} else {
Expand Down
7 changes: 7 additions & 0 deletions src/ray/core_worker/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,13 @@ int64_t TaskManager::RemoveLineageReference(const ObjectID &object_id,
}
}

if (it->second.spec.IsActorTask()) {
// We need to decrement the actor lineage ref count here
// since it's incremented during TaskManager::AddPendingTask.
const auto actor_creation_return_id = it->second.spec.ActorCreationDummyObjectId();
released_objects->push_back(actor_creation_return_id);
}
Comment on lines +1244 to +1249
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actual fix


total_lineage_footprint_bytes_ -= it->second.lineage_footprint_bytes;
// The task has finished and none of the return IDs are in scope anymore,
// so it is safe to remove the task spec.
Expand Down
63 changes: 63 additions & 0 deletions src/ray/core_worker/test/task_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "mock/ray/pubsub/publisher.h"
#include "mock/ray/pubsub/subscriber.h"
#include "ray/common/task/task_spec.h"
#include "ray/common/task/task_util.h"
#include "ray/common/test_util.h"
#include "ray/core_worker/reference_count.h"
#include "ray/core_worker/store_provider/memory_store/memory_store.h"
Expand Down Expand Up @@ -655,6 +656,68 @@ TEST_F(TaskManagerTest, TestLocalityDataAdded) {
manager_.CompletePendingTask(spec.TaskId(), reply, worker_addr, false);
}

// Test to make sure that the task spec and actor
// for an actor task return object are
// pinned when lineage pinning is enabled in the ReferenceCounter.
TEST_F(TaskManagerLineageTest, TestActorLineagePinned) {
rpc::Address caller_address;
ActorID actor_id = ActorID::FromHex("f4ce02420592ca68c1738a0d01000000");
const ObjectID actor_creation_dummy_object_id =
ObjectID::FromIndex(TaskID::ForActorCreationTask(actor_id), /*index=*/1);
int num_retries = 3;
TaskSpecBuilder builder;
builder.SetCommonTaskSpec(
TaskID::ForActorTask(JobID::Nil(), TaskID::Nil(), 0, actor_id),
"dummy_actor_task",
Language::PYTHON,
FunctionDescriptorBuilder::BuildPython("a", "", "", ""),
JobID::Nil(),
rpc::JobConfig(),
TaskID::Nil(),
0,
TaskID::Nil(),
rpc::Address(),
1,
false,
false,
-1,
{},
{},
"",
0,
TaskID::Nil());
builder.SetActorTaskSpec(
actor_id, actor_creation_dummy_object_id, num_retries, false, "", 0);
TaskSpecification spec = builder.Build();

ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 0);
manager_.AddPendingTask(caller_address, spec, "", num_retries);
auto return_id = spec.ReturnId(0);
ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId()));
// 2 objects are in scope: actor handle and actor task return object.
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 2);

// The task completes.
rpc::PushTaskReply reply;
auto return_object = reply.add_return_objects();
return_object->set_object_id(return_id.Binary());
auto data = GenerateRandomBuffer();
return_object->set_data(data->Data(), data->Size());
return_object->set_in_plasma(true);
manager_.CompletePendingTask(spec.TaskId(), reply, rpc::Address(), false);

// The task should still be in the lineage because its return ID is in scope.
ASSERT_TRUE(manager_.IsTaskSubmissible(spec.TaskId()));
ASSERT_TRUE(reference_counter_->HasReference(spec.ActorCreationDummyObjectId()));
ASSERT_TRUE(reference_counter_->HasReference(return_id));

// All lineage should be erased.
reference_counter_->RemoveLocalReference(return_id, nullptr);
ASSERT_FALSE(manager_.IsTaskSubmissible(spec.TaskId()));
ASSERT_FALSE(reference_counter_->HasReference(spec.ActorCreationDummyObjectId()));
ASSERT_FALSE(reference_counter_->HasReference(return_id));
}

// Test to make sure that the task spec and dependencies for an object are
// pinned when lineage pinning is enabled in the ReferenceCounter.
TEST_F(TaskManagerLineageTest, TestLineagePinned) {
Expand Down
3 changes: 0 additions & 3 deletions src/ray/gcs/gcs_server/gcs_actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,6 @@ class GcsActor {
actor_table_data_.set_max_restarts(actor_creation_task_spec.max_actor_restarts());
actor_table_data_.set_num_restarts(0);

auto dummy_object = TaskSpecification(task_spec).ActorDummyObject().Binary();
actor_table_data_.set_actor_creation_dummy_object_id(dummy_object);

actor_table_data_.mutable_function_descriptor()->CopyFrom(
task_spec.function_descriptor());

Expand Down
27 changes: 0 additions & 27 deletions src/ray/gcs/pb_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,33 +82,6 @@ inline std::shared_ptr<ray::rpc::ErrorTableData> CreateErrorTableData(
return error_info_ptr;
}

/// Helper function to produce actor table data.
inline std::shared_ptr<ray::rpc::ActorTableData> CreateActorTableData(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this just unused?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, dead code

const TaskSpecification &task_spec,
const ray::rpc::Address &address,
ray::rpc::ActorTableData::ActorState state,
uint64_t num_restarts) {
RAY_CHECK(task_spec.IsActorCreationTask());
auto actor_id = task_spec.ActorCreationId();
auto actor_info_ptr = std::make_shared<ray::rpc::ActorTableData>();
// Set all of the static fields for the actor. These fields will not change
// even if the actor fails or is reconstructed.
actor_info_ptr->set_actor_id(actor_id.Binary());
actor_info_ptr->set_parent_id(task_spec.CallerId().Binary());
actor_info_ptr->set_actor_creation_dummy_object_id(
task_spec.ActorDummyObject().Binary());
actor_info_ptr->set_job_id(task_spec.JobId().Binary());
actor_info_ptr->set_max_restarts(task_spec.MaxActorRestarts());
actor_info_ptr->set_is_detached(task_spec.IsDetachedActor());
// Set the fields that change when the actor is restarted.
actor_info_ptr->set_num_restarts(num_restarts);
actor_info_ptr->mutable_address()->CopyFrom(address);
actor_info_ptr->mutable_owner_address()->CopyFrom(
task_spec.GetMessage().caller_address());
actor_info_ptr->set_state(state);
return actor_info_ptr;
}

/// Helper function to produce worker failure data.
inline std::shared_ptr<ray::rpc::WorkerTableData> CreateWorkerFailureData(
const WorkerID &worker_id,
Expand Down
4 changes: 0 additions & 4 deletions src/ray/protobuf/gcs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,6 @@ message ActorTableData {
bytes actor_id = 1;
// The ID of the caller of the actor creation task.
bytes parent_id = 2;
// The dummy object ID returned by the actor creation task. If the actor
// dies, then this is the object that should be restarted for the actor
// to be recreated.
bytes actor_creation_dummy_object_id = 3;
// The ID of the job that created the actor.
bytes job_id = 4;
// Current state of this actor.
Expand Down
Loading