From 6aefe9b36ef77a8e17c6f1bec449e33baae2b86e Mon Sep 17 00:00:00 2001 From: Tao Wang Date: Wed, 13 Apr 2022 03:24:26 +0800 Subject: [PATCH] [Core]Save task spec in separate table (#22650) This is a rebase version of #11592. As task spec info is only needed when gcs create or start an actor, so we can remove it from actor table and save the serialization time and memory/network cost when gcs clients get actor infos from gcs. As internal repository varies very much from the community. This pr just add some manual check with simple cherry pick. Welcome to comment first and at the meantime I'll see if there's any test case failed or some points were missed. --- dashboard/modules/actor/actor_head.py | 17 +++--- dashboard/modules/actor/actor_utils.py | 8 +++ dashboard/modules/actor/tests/test_actor.py | 9 ++- dashboard/modules/snapshot/snapshot_head.py | 2 +- src/ray/core_worker/actor_handle.cc | 36 ++++++------ src/ray/core_worker/actor_handle.h | 5 +- src/ray/core_worker/actor_manager.cc | 11 ++-- src/ray/gcs/gcs_client/accessor.cc | 6 +- src/ray/gcs/gcs_client/accessor.h | 3 +- .../gcs/gcs_client/test/gcs_client_test.cc | 1 - src/ray/gcs/gcs_server/gcs_actor_manager.cc | 55 +++++++++++++------ src/ray/gcs/gcs_server/gcs_actor_manager.h | 26 ++++++++- src/ray/gcs/gcs_server/gcs_actor_scheduler.cc | 2 +- src/ray/gcs/gcs_server/gcs_init_data.cc | 17 +++++- src/ray/gcs/gcs_server/gcs_init_data.h | 8 +++ src/ray/gcs/gcs_server/gcs_table_storage.cc | 2 + src/ray/gcs/gcs_server/gcs_table_storage.h | 19 +++++++ .../gcs_server/test/gcs_actor_manager_test.cc | 36 ++++++------ .../test/gcs_actor_scheduler_mock_test.cc | 4 +- src/ray/protobuf/gcs.proto | 7 ++- src/ray/protobuf/gcs_service.proto | 2 + 21 files changed, 189 insertions(+), 87 deletions(-) diff --git a/dashboard/modules/actor/actor_head.py b/dashboard/modules/actor/actor_head.py index fdc3150366358..56f74151485e1 100644 --- a/dashboard/modules/actor/actor_head.py +++ b/dashboard/modules/actor/actor_head.py @@ -14,7 +14,7 @@ import ray.dashboard.optional_utils as dashboard_optional_utils from ray.dashboard.optional_utils import rest_response from ray.dashboard.modules.actor import actor_consts -from ray.dashboard.modules.actor.actor_utils import actor_classname_from_task_spec +from ray.dashboard.modules.actor.actor_utils import actor_classname_from_func_descriptor from ray.core.generated import node_manager_pb2_grpc from ray.core.generated import gcs_service_pb2 from ray.core.generated import gcs_service_pb2_grpc @@ -56,20 +56,17 @@ def actor_table_data_to_dict(message): "state", "name", "numRestarts", - "taskSpec", + "functionDescriptor", "timestamp", "numExecutedTasks", } light_message = {k: v for (k, v) in orig_message.items() if k in fields} - if "taskSpec" in light_message: - actor_class = actor_classname_from_task_spec(light_message["taskSpec"]) + logger.info(light_message) + if "functionDescriptor" in light_message: + actor_class = actor_classname_from_func_descriptor( + light_message["functionDescriptor"] + ) light_message["actorClass"] = actor_class - if "functionDescriptor" in light_message["taskSpec"]: - light_message["taskSpec"] = { - "functionDescriptor": light_message["taskSpec"]["functionDescriptor"] - } - else: - light_message.pop("taskSpec") return light_message diff --git a/dashboard/modules/actor/actor_utils.py b/dashboard/modules/actor/actor_utils.py index 814e0eb796f8f..62f40b125b6c4 100644 --- a/dashboard/modules/actor/actor_utils.py +++ b/dashboard/modules/actor/actor_utils.py @@ -32,6 +32,14 @@ def actor_classname_from_task_spec(task_spec): ) +def actor_classname_from_func_descriptor(func_desc): + return ( + func_desc.get("pythonFunctionDescriptor", {}) + .get("className", "Unknown actor class") + .split(".")[-1] + ) + + def _group_actors_by_python_class(actors): groups = defaultdict(list) for actor in actors.values(): diff --git a/dashboard/modules/actor/tests/test_actor.py b/dashboard/modules/actor/tests/test_actor.py index 79824306e5384..5a53b263e3b1f 100644 --- a/dashboard/modules/actor/tests/test_actor.py +++ b/dashboard/modules/actor/tests/test_actor.py @@ -118,9 +118,8 @@ class InfeasibleActor: assert len(actors) == 3 one_entry = list(actors.values())[0] assert "jobId" in one_entry - assert "taskSpec" in one_entry - assert "functionDescriptor" in one_entry["taskSpec"] - assert type(one_entry["taskSpec"]["functionDescriptor"]) is dict + assert "functionDescriptor" in one_entry + assert type(one_entry["functionDescriptor"]) is dict assert "address" in one_entry assert type(one_entry["address"]) is dict assert "state" in one_entry @@ -264,7 +263,7 @@ def actor_table_data_to_dict(message): including_default_value_fields=False, ) - non_state_keys = ("actorId", "jobId", "taskSpec") + non_state_keys = ("actorId", "jobId") for msg in msgs: actor_data_dict = actor_table_data_to_dict(msg) @@ -292,9 +291,9 @@ def actor_table_data_to_dict(message): "actorCreationDummyObjectId", "jobId", "ownerAddress", - "taskSpec", "className", "serializedRuntimeEnv", + "functionDescriptor", "rayNamespace", } else: diff --git a/dashboard/modules/snapshot/snapshot_head.py b/dashboard/modules/snapshot/snapshot_head.py index 4b6461f0d3c5d..289988498acd1 100644 --- a/dashboard/modules/snapshot/snapshot_head.py +++ b/dashboard/modules/snapshot/snapshot_head.py @@ -169,7 +169,7 @@ async def get_actor_info(self): "start_time": actor_table_entry.start_time, "end_time": actor_table_entry.end_time, "is_detached": actor_table_entry.is_detached, - "resources": dict(actor_table_entry.task_spec.required_resources), + "resources": dict(actor_table_entry.required_resources), "actor_class": actor_table_entry.class_name, "current_worker_id": actor_table_entry.address.worker_id.hex(), "current_raylet_id": actor_table_entry.address.raylet_id.hex(), diff --git a/src/ray/core_worker/actor_handle.cc b/src/ray/core_worker/actor_handle.cc index d012f074f3be7..92cdd52321766 100644 --- a/src/ray/core_worker/actor_handle.cc +++ b/src/ray/core_worker/actor_handle.cc @@ -57,29 +57,28 @@ rpc::ActorHandle CreateInnerActorHandleFromString(const std::string &serialized) return inner; } -rpc::ActorHandle CreateInnerActorHandleFromActorTableData( - const rpc::ActorTableData &actor_table_data) { +rpc::ActorHandle CreateInnerActorHandleFromActorData( + const rpc::ActorTableData &actor_table_data, const rpc::TaskSpec &task_spec) { rpc::ActorHandle inner; inner.set_actor_id(actor_table_data.actor_id()); inner.set_owner_id(actor_table_data.parent_id()); inner.mutable_owner_address()->CopyFrom(actor_table_data.owner_address()); inner.set_creation_job_id(actor_table_data.job_id()); - inner.set_actor_language(actor_table_data.task_spec().language()); + inner.set_actor_language(task_spec.language()); inner.mutable_actor_creation_task_function_descriptor()->CopyFrom( - actor_table_data.task_spec().function_descriptor()); - TaskSpecification task_spec(actor_table_data.task_spec()); - inner.set_actor_cursor(task_spec.ReturnId(0).Binary()); - inner.set_extension_data( - actor_table_data.task_spec().actor_creation_task_spec().extension_data()); - inner.set_max_task_retries( - actor_table_data.task_spec().actor_creation_task_spec().max_task_retries()); - inner.set_name(actor_table_data.task_spec().actor_creation_task_spec().name()); - inner.set_ray_namespace( - actor_table_data.task_spec().actor_creation_task_spec().ray_namespace()); + actor_table_data.function_descriptor()); + inner.set_actor_cursor( + ObjectID::FromIndex( + TaskID::ForActorCreationTask(ActorID::FromBinary(actor_table_data.actor_id())), + 1) + .Binary()); + inner.set_extension_data(task_spec.actor_creation_task_spec().extension_data()); + inner.set_max_task_retries(task_spec.actor_creation_task_spec().max_task_retries()); + inner.set_name(actor_table_data.name()); + inner.set_ray_namespace(actor_table_data.ray_namespace()); inner.set_execute_out_of_order( - actor_table_data.task_spec().actor_creation_task_spec().execute_out_of_order()); - inner.set_max_pending_calls( - actor_table_data.task_spec().actor_creation_task_spec().max_pending_calls()); + task_spec.actor_creation_task_spec().execute_out_of_order()); + inner.set_max_pending_calls(task_spec.actor_creation_task_spec().max_pending_calls()); return inner; } } // namespace @@ -115,8 +114,9 @@ ActorHandle::ActorHandle( ActorHandle::ActorHandle(const std::string &serialized) : ActorHandle(CreateInnerActorHandleFromString(serialized)) {} -ActorHandle::ActorHandle(const rpc::ActorTableData &actor_table_data) - : ActorHandle(CreateInnerActorHandleFromActorTableData(actor_table_data)) {} +ActorHandle::ActorHandle(const rpc::ActorTableData &actor_table_data, + const rpc::TaskSpec &task_spec) + : ActorHandle(CreateInnerActorHandleFromActorData(actor_table_data, task_spec)) {} void ActorHandle::SetActorTaskSpec(TaskSpecBuilder &builder, const ObjectID new_cursor) { absl::MutexLock guard(&mutex_); diff --git a/src/ray/core_worker/actor_handle.h b/src/ray/core_worker/actor_handle.h index 0b43c55e3599f..58bd0ae956412 100644 --- a/src/ray/core_worker/actor_handle.h +++ b/src/ray/core_worker/actor_handle.h @@ -49,8 +49,9 @@ class ActorHandle { /// Constructs an ActorHandle from a serialized string. explicit ActorHandle(const std::string &serialized); - /// Constructs an ActorHandle from a gcs::ActorTableData message. - ActorHandle(const rpc::ActorTableData &actor_table_data); + /// Constructs an ActorHandle from a rpc::ActorTableData and a rpc::TaskSpec message. + ActorHandle(const rpc::ActorTableData &actor_table_data, + const rpc::TaskSpec &task_spec); ActorID GetActorID() const { return ActorID::FromBinary(inner_.actor_id()); }; diff --git a/src/ray/core_worker/actor_manager.cc b/src/ray/core_worker/actor_manager.cc index 5b35f5d36ea62..723cb90b3c953 100644 --- a/src/ray/core_worker/actor_manager.cc +++ b/src/ray/core_worker/actor_manager.cc @@ -64,13 +64,16 @@ std::pair, Status> ActorManager::GetNamedActo // implemented using a promise that's captured in the RPC callback. // There should be no risk of deadlock because we don't hold any // locks during the call and the RPCs run on a separate thread. - rpc::ActorTableData result; - const auto status = gcs_client_->Actors().SyncGetByName(name, ray_namespace, result); + rpc::ActorTableData actor_table_data; + rpc::TaskSpec task_spec; + const auto status = gcs_client_->Actors().SyncGetByName( + name, ray_namespace, actor_table_data, task_spec); if (status.ok()) { - auto actor_handle = std::make_unique(result); + auto actor_handle = std::make_unique(actor_table_data, task_spec); actor_id = actor_handle->GetActorID(); AddNewActorHandle(std::move(actor_handle), - GenerateCachedActorName(result.ray_namespace(), result.name()), + GenerateCachedActorName(actor_table_data.ray_namespace(), + actor_table_data.name()), call_site, caller_address, /*is_detached*/ true); diff --git a/src/ray/gcs/gcs_client/accessor.cc b/src/ray/gcs/gcs_client/accessor.cc index e1b8205390323..8414b564a9efb 100644 --- a/src/ray/gcs/gcs_client/accessor.cc +++ b/src/ray/gcs/gcs_client/accessor.cc @@ -194,15 +194,17 @@ Status ActorInfoAccessor::AsyncGetByName( Status ActorInfoAccessor::SyncGetByName(const std::string &name, const std::string &ray_namespace, - rpc::ActorTableData &actor_table_data) { + rpc::ActorTableData &actor_table_data, + rpc::TaskSpec &task_spec) { rpc::GetNamedActorInfoRequest request; rpc::GetNamedActorInfoReply reply; request.set_name(name); request.set_ray_namespace(ray_namespace); auto status = client_impl_->GetGcsRpcClient().SyncGetNamedActorInfo( request, &reply, /*timeout_ms*/ GetGcsTimeoutMs()); - if (status.ok() && reply.has_actor_table_data()) { + if (status.ok()) { actor_table_data = reply.actor_table_data(); + task_spec = reply.task_spec(); } return status; } diff --git a/src/ray/gcs/gcs_client/accessor.h b/src/ray/gcs/gcs_client/accessor.h index de4f3461435ae..b2e91dad2a6bd 100644 --- a/src/ray/gcs/gcs_client/accessor.h +++ b/src/ray/gcs/gcs_client/accessor.h @@ -80,7 +80,8 @@ class ActorInfoAccessor { /// NotFound if the name doesn't exist. virtual Status SyncGetByName(const std::string &name, const std::string &ray_namespace, - rpc::ActorTableData &actor_table_data); + rpc::ActorTableData &actor_table_data, + rpc::TaskSpec &task_spec); /// List all named actors from the GCS asynchronously. /// diff --git a/src/ray/gcs/gcs_client/test/gcs_client_test.cc b/src/ray/gcs/gcs_client/test/gcs_client_test.cc index 47cec2c799965..9deb1def87e4c 100644 --- a/src/ray/gcs/gcs_client/test/gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/gcs_client_test.cc @@ -908,7 +908,6 @@ TEST_P(GcsClientTest, DISABLED_TestGetActorPerf) { } for (int index = 0; index < actor_count; ++index) { auto actor_table_data = Mocker::GenActorTableData(job_id); - actor_table_data->mutable_task_spec()->CopyFrom(task_spec); RegisterActor(actor_table_data, false, true); } diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index d4378d2fe3a5a..c8bca96cefe2e 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -172,8 +172,9 @@ std::string GcsActor::GetRayNamespace() const { } TaskSpecification GcsActor::GetCreationTaskSpecification() const { - const auto &task_spec = actor_table_data_.task_spec(); - return TaskSpecification(task_spec); + // The task spec is not available when the actor is dead. + RAY_CHECK(actor_table_data_.state() != rpc::ActorTableData::DEAD); + return TaskSpecification(*task_spec_); } const rpc::ActorTableData &GcsActor::GetActorTableData() const { @@ -182,6 +183,8 @@ const rpc::ActorTableData &GcsActor::GetActorTableData() const { rpc::ActorTableData *GcsActor::GetMutableActorTableData() { return &actor_table_data_; } +rpc::TaskSpec *GcsActor::GetMutableTaskSpec() { return task_spec_.get(); } + std::shared_ptr GcsActor::GetActorWorkerAssignment() const { return assignment_ptr_; @@ -372,6 +375,8 @@ void GcsActorManager::HandleGetNamedActorInfo( } else { reply->unsafe_arena_set_allocated_actor_table_data( iter->second->GetMutableActorTableData()); + RAY_LOG(INFO) << "WANGTAO " << iter->second->GetState(); + reply->unsafe_arena_set_allocated_task_spec(iter->second->GetMutableTaskSpec()); RAY_LOG(DEBUG) << "Finished getting actor info, job id = " << actor_id.JobId() << ", actor id = " << actor_id; } @@ -502,6 +507,8 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ } // The backend storage is supposed to be reliable, so the status must be ok. + RAY_CHECK_OK(gcs_table_storage_->ActorTaskSpecTable().Put( + actor_id, request.task_spec(), [](const Status &status) {})); RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put( actor->GetActorID(), *actor->GetMutableActorTableData(), @@ -726,8 +733,6 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id, gcs_actor_scheduler_->OnActorDestruction(it->second); } - const auto &task_id = it->second->GetCreationTaskSpecification().TaskId(); - it->second->GetMutableActorTableData()->mutable_task_spec()->Clear(); it->second->GetMutableActorTableData()->set_timestamp(current_sys_time_ms()); AddDestroyedActorToCache(it->second); const auto actor = std::move(it->second); @@ -768,7 +773,7 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id, created_actors_.erase(node_it); } } else { - CancelActorInScheduling(actor, task_id); + CancelActorInScheduling(actor, TaskID::ForActorCreationTask(actor_id)); } } @@ -792,6 +797,7 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id, [this, actor_id, actor_table_data](Status status) { RAY_CHECK_OK(gcs_publisher_->PublishActor( actor_id, *GenActorDataOnlyWithStates(*actor_table_data), nullptr)); + RAY_CHECK_OK(gcs_table_storage_->ActorTaskSpecTable().Delete(actor_id, nullptr)); // Destroy placement group owned by this actor. destroy_owned_placement_group_if_needed_(actor_id); })); @@ -1054,6 +1060,8 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, } RAY_CHECK_OK(gcs_publisher_->PublishActor( actor_id, *GenActorDataOnlyWithStates(*mutable_actor_table_data), nullptr)); + RAY_CHECK_OK( + gcs_table_storage_->ActorTaskSpecTable().Delete(actor_id, nullptr)); })); // The actor is dead, but we should not remove the entry from the // registered actors yet. If the actor is owned, we will destroy the actor @@ -1172,27 +1180,31 @@ void GcsActorManager::SetSchedulePendingActorsPosted(bool posted) { void GcsActorManager::Initialize(const GcsInitData &gcs_init_data) { const auto &jobs = gcs_init_data.Jobs(); + const auto &actor_task_specs = gcs_init_data.ActorTaskSpecs(); absl::flat_hash_map> node_to_workers; - for (const auto &entry : gcs_init_data.Actors()) { - auto job_iter = jobs.find(entry.first.JobId()); + std::vector dead_actors; + for (const auto &[actor_id, actor_table_data] : gcs_init_data.Actors()) { + auto job_iter = jobs.find(actor_id.JobId()); auto is_job_dead = (job_iter == jobs.end() || job_iter->second.is_dead()); - auto actor = std::make_shared(entry.second); - if (entry.second.state() != ray::rpc::ActorTableData::DEAD && !is_job_dead) { - registered_actors_.emplace(entry.first, actor); + if (actor_table_data.state() != ray::rpc::ActorTableData::DEAD && !is_job_dead) { + const auto &iter = actor_task_specs.find(actor_id); + RAY_CHECK(iter != actor_task_specs.end()); + auto actor = std::make_shared(actor_table_data, iter->second); + registered_actors_.emplace(actor_id, actor); function_manager_.AddJobReference(actor->GetActorID().JobId()); if (!actor->GetName().empty()) { auto &actors_in_namespace = named_actors_[actor->GetRayNamespace()]; actors_in_namespace.emplace(actor->GetName(), actor->GetActorID()); } - if (entry.second.state() == ray::rpc::ActorTableData::DEPENDENCIES_UNREADY) { + if (actor_table_data.state() == ray::rpc::ActorTableData::DEPENDENCIES_UNREADY) { const auto &owner = actor->GetOwnerAddress(); const auto &owner_node = NodeID::FromBinary(owner.raylet_id()); const auto &owner_worker = WorkerID::FromBinary(owner.worker_id()); RAY_CHECK(unresolved_actors_[owner_node][owner_worker] .emplace(actor->GetActorID()) .second); - } else if (entry.second.state() == ray::rpc::ActorTableData::ALIVE) { + } else if (actor_table_data.state() == ray::rpc::ActorTableData::ALIVE) { created_actors_[actor->GetNodeID()].emplace(actor->GetWorkerID(), actor->GetActorID()); } @@ -1208,11 +1220,17 @@ void GcsActorManager::Initialize(const GcsInitData &gcs_init_data) { node_to_workers[actor->GetNodeID()].emplace_back(actor->GetWorkerID()); } } else { - destroyed_actors_.emplace(entry.first, actor); - sorted_destroyed_actor_list_.emplace_back(entry.first, - (int64_t)entry.second.timestamp()); + dead_actors.push_back(actor_id); + auto actor = std::make_shared(actor_table_data); + destroyed_actors_.emplace(actor_id, actor); + sorted_destroyed_actor_list_.emplace_back(actor_id, + (int64_t)actor_table_data.timestamp()); } } + if (!dead_actors.empty()) { + RAY_CHECK_OK( + gcs_table_storage_->ActorTaskSpecTable().BatchDelete(dead_actors, nullptr)); + } sorted_destroyed_actor_list_.sort([](const std::pair &left, const std::pair &right) { return left.second < right.second; @@ -1251,8 +1269,11 @@ void GcsActorManager::OnJobFinished(const JobID &job_id) { run_delayed_( [this, non_detached_actors = std::move(non_detached_actors)]() { - RAY_CHECK_OK(gcs_table_storage_->ActorTable().BatchDelete(non_detached_actors, - nullptr)); + RAY_CHECK_OK(gcs_table_storage_->ActorTable().BatchDelete( + non_detached_actors, [this, non_detached_actors](const Status &status) { + RAY_CHECK_OK(gcs_table_storage_->ActorTaskSpecTable().BatchDelete( + non_detached_actors, nullptr)); + })); }, actor_gc_delay_); diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h index 788241df75ea5..187f721b079b0 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -45,10 +45,22 @@ class GcsActor { explicit GcsActor(rpc::ActorTableData actor_table_data) : actor_table_data_(std::move(actor_table_data)) {} + /// Create a GcsActor by actor_table_data and task_spec. + /// This is only for ALIVE actors. + /// + /// \param actor_table_data Data of the actor (see gcs.proto). + /// \param task_spec Task spec of the actor. + explicit GcsActor(rpc::ActorTableData actor_table_data, rpc::TaskSpec task_spec) + : actor_table_data_(std::move(actor_table_data)), + task_spec_(std::make_unique(task_spec)) { + RAY_CHECK(actor_table_data_.state() != rpc::ActorTableData::DEAD); + } + /// Create a GcsActor by TaskSpec. /// /// \param task_spec Contains the actor creation task specification. - explicit GcsActor(const ray::rpc::TaskSpec &task_spec, std::string ray_namespace) { + explicit GcsActor(const ray::rpc::TaskSpec &task_spec, std::string ray_namespace) + : task_spec_(std::make_unique(task_spec)) { RAY_CHECK(task_spec.type() == TaskType::ACTOR_CREATION_TASK); const auto &actor_creation_task_spec = task_spec.actor_creation_task_spec(); actor_table_data_.set_actor_id(actor_creation_task_spec.actor_id()); @@ -59,18 +71,26 @@ class GcsActor { auto dummy_object = TaskSpecification(task_spec).ActorDummyObject().Binary(); actor_table_data_.set_actor_creation_dummy_object_id(dummy_object); + actor_table_data_.mutable_function_descriptor()->CopyFrom( + task_spec.function_descriptor()); + actor_table_data_.set_is_detached(actor_creation_task_spec.is_detached()); actor_table_data_.set_name(actor_creation_task_spec.name()); actor_table_data_.mutable_owner_address()->CopyFrom(task_spec.caller_address()); actor_table_data_.set_state(rpc::ActorTableData::DEPENDENCIES_UNREADY); - actor_table_data_.mutable_task_spec()->CopyFrom(task_spec); actor_table_data_.mutable_address()->set_raylet_id(NodeID::Nil().Binary()); actor_table_data_.mutable_address()->set_worker_id(WorkerID::Nil().Binary()); actor_table_data_.set_ray_namespace(ray_namespace); + // Set required resources. + auto resource_map = + GetCreationTaskSpecification().GetRequiredResources().GetResourceMap(); + actor_table_data_.mutable_required_resources()->insert(resource_map.begin(), + resource_map.end()); + const auto &function_descriptor = task_spec.function_descriptor(); switch (function_descriptor.function_descriptor_case()) { case rpc::FunctionDescriptor::FunctionDescriptorCase::kJavaFunctionDescriptor: @@ -127,6 +147,7 @@ class GcsActor { const rpc::ActorTableData &GetActorTableData() const; /// Get the mutable ActorTableData of this actor. rpc::ActorTableData *GetMutableActorTableData(); + rpc::TaskSpec *GetMutableTaskSpec(); std::shared_ptr GetActorWorkerAssignment() const; @@ -136,6 +157,7 @@ class GcsActor { /// The actor meta data which contains the task specification as well as the state of /// the gcs actor and so on (see gcs.proto). rpc::ActorTableData actor_table_data_; + const std::unique_ptr task_spec_; // TODO(Chong-Li): Considering shared assignments, this pointer would be moved out. std::shared_ptr assignment_ptr_ = nullptr; }; diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc index 24a3aabd6ba97..028ddd0ba76a3 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc @@ -231,7 +231,7 @@ void GcsActorScheduler::LeaseWorkerFromNode(std::shared_ptr actor, // Actor leases should be sent to the raylet immediately, so we should never build up a // backlog in GCS. lease_client->RequestWorkerLease( - actor->GetActorTableData().task_spec(), + actor->GetCreationTaskSpecification().GetMessage(), RayConfig::instance().gcs_actor_scheduling_enabled(), [this, actor, node](const Status &status, const rpc::RequestWorkerLeaseReply &reply) { diff --git a/src/ray/gcs/gcs_server/gcs_init_data.cc b/src/ray/gcs/gcs_server/gcs_init_data.cc index 63ba042e6c182..0b83270c9796f 100644 --- a/src/ray/gcs/gcs_server/gcs_init_data.cc +++ b/src/ray/gcs/gcs_server/gcs_init_data.cc @@ -18,7 +18,7 @@ namespace ray { namespace gcs { void GcsInitData::AsyncLoad(const EmptyCallback &on_done) { // There are 6 kinds of table data need to be loaded. - auto count_down = std::make_shared(5); + auto count_down = std::make_shared(6); auto on_load_finished = [count_down, on_done] { if (--(*count_down) == 0) { if (on_done) { @@ -35,6 +35,8 @@ void GcsInitData::AsyncLoad(const EmptyCallback &on_done) { AsyncLoadActorTableData(on_load_finished); + AsyncLoadActorTaskSpecTableData(on_load_finished); + AsyncLoadPlacementGroupTableData(on_load_finished); } @@ -102,5 +104,18 @@ void GcsInitData::AsyncLoadActorTableData(const EmptyCallback &on_done) { load_actor_table_data_callback)); } +void GcsInitData::AsyncLoadActorTaskSpecTableData(const EmptyCallback &on_done) { + RAY_LOG(INFO) << "Loading actor task spec table data."; + auto load_actor_task_spec_table_data_callback = + [this, on_done](const absl::flat_hash_map &result) { + actor_task_spec_table_data_ = std::move(result); + RAY_LOG(INFO) << "Finished loading actor task spec table data, size = " + << actor_task_spec_table_data_.size(); + on_done(); + }; + RAY_CHECK_OK(gcs_table_storage_->ActorTaskSpecTable().GetAll( + load_actor_task_spec_table_data_callback)); +} + } // namespace gcs } // namespace ray \ No newline at end of file diff --git a/src/ray/gcs/gcs_server/gcs_init_data.h b/src/ray/gcs/gcs_server/gcs_init_data.h index ff54f552c29f7..6da8e9dc8a48b 100644 --- a/src/ray/gcs/gcs_server/gcs_init_data.h +++ b/src/ray/gcs/gcs_server/gcs_init_data.h @@ -59,6 +59,10 @@ class GcsInitData { return actor_table_data_; } + const absl::flat_hash_map &ActorTaskSpecs() const { + return actor_task_spec_table_data_; + } + /// Get placement group metadata. const absl::flat_hash_map &PlacementGroups() const { @@ -91,6 +95,8 @@ class GcsInitData { /// \param on_done The callback when actor metadata is loaded successfully. void AsyncLoadActorTableData(const EmptyCallback &on_done); + void AsyncLoadActorTaskSpecTableData(const EmptyCallback &on_done); + protected: /// The gcs table storage. std::shared_ptr gcs_table_storage_; @@ -110,6 +116,8 @@ class GcsInitData { /// Actor metadata. absl::flat_hash_map actor_table_data_; + + absl::flat_hash_map actor_task_spec_table_data_; }; } // namespace gcs diff --git a/src/ray/gcs/gcs_server/gcs_table_storage.cc b/src/ray/gcs/gcs_server/gcs_table_storage.cc index 2e8324ad065a0..a6b60714f871f 100644 --- a/src/ray/gcs/gcs_server/gcs_table_storage.cc +++ b/src/ray/gcs/gcs_server/gcs_table_storage.cc @@ -176,8 +176,10 @@ template class GcsTable; template class GcsTable; template class GcsTable; template class GcsTable; +template class GcsTable; template class GcsTable; template class GcsTableWithJobId; +template class GcsTableWithJobId; template class GcsTable; template class GcsTable; diff --git a/src/ray/gcs/gcs_server/gcs_table_storage.h b/src/ray/gcs/gcs_server/gcs_table_storage.h index 7fd2b4df58334..7c75dce44fbce 100644 --- a/src/ray/gcs/gcs_server/gcs_table_storage.h +++ b/src/ray/gcs/gcs_server/gcs_table_storage.h @@ -37,6 +37,7 @@ using rpc::ResourceTableData; using rpc::ResourceUsageBatchData; using rpc::ScheduleData; using rpc::StoredConfig; +using rpc::TaskSpec; using rpc::WorkerTableData; /// \class GcsTable @@ -173,6 +174,17 @@ class GcsActorTable : public GcsTableWithJobId { JobID GetJobIdFromKey(const ActorID &key) override { return key.JobId(); } }; +class GcsActorTaskSpecTable : public GcsTableWithJobId { + public: + explicit GcsActorTaskSpecTable(std::shared_ptr &store_client) + : GcsTableWithJobId(store_client) { + table_name_ = TablePrefix_Name(TablePrefix::ACTOR_TASK_SPEC); + } + + private: + JobID GetJobIdFromKey(const ActorID &key) override { return key.JobId(); } +}; + class GcsPlacementGroupTable : public GcsTable { public: @@ -248,6 +260,7 @@ class GcsTableStorage { : store_client_(std::move(store_client)) { job_table_ = std::make_unique(store_client_); actor_table_ = std::make_unique(store_client_); + actor_task_spec_table_ = std::make_unique(store_client_); placement_group_table_ = std::make_unique(store_client_); node_table_ = std::make_unique(store_client_); node_resource_table_ = std::make_unique(store_client_); @@ -270,6 +283,11 @@ class GcsTableStorage { return *actor_table_; } + GcsActorTaskSpecTable &ActorTaskSpecTable() { + RAY_CHECK(actor_task_spec_table_ != nullptr); + return *actor_task_spec_table_; + } + GcsPlacementGroupTable &PlacementGroupTable() { RAY_CHECK(placement_group_table_ != nullptr); return *placement_group_table_; @@ -319,6 +337,7 @@ class GcsTableStorage { std::shared_ptr store_client_; std::unique_ptr job_table_; std::unique_ptr actor_table_; + std::unique_ptr actor_task_spec_table_; std::unique_ptr placement_group_table_; std::unique_ptr node_table_; std::unique_ptr node_resource_table_; diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc index 76f3f00845afc..8ceb2d68e0eab 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc @@ -263,7 +263,7 @@ TEST_F(GcsActorManagerTest, TestBasic) { auto registered_actor = RegisterActor(job_id); rpc::CreateActorRequest create_actor_request; create_actor_request.mutable_task_spec()->CopyFrom( - registered_actor->GetActorTableData().task_spec()); + registered_actor->GetCreationTaskSpecification().GetMessage()); std::vector> finished_actors; Status status = gcs_actor_manager_->CreateActor( @@ -294,7 +294,7 @@ TEST_F(GcsActorManagerTest, TestSchedulingFailed) { auto registered_actor = RegisterActor(job_id); rpc::CreateActorRequest create_actor_request; create_actor_request.mutable_task_spec()->CopyFrom( - registered_actor->GetActorTableData().task_spec()); + registered_actor->GetCreationTaskSpecification().GetMessage()); std::vector> finished_actors; RAY_CHECK_OK(gcs_actor_manager_->CreateActor( @@ -328,7 +328,7 @@ TEST_F(GcsActorManagerTest, TestWorkerFailure) { auto registered_actor = RegisterActor(job_id); rpc::CreateActorRequest create_actor_request; create_actor_request.mutable_task_spec()->CopyFrom( - registered_actor->GetActorTableData().task_spec()); + registered_actor->GetCreationTaskSpecification().GetMessage()); std::vector> finished_actors; RAY_CHECK_OK(gcs_actor_manager_->CreateActor( @@ -376,7 +376,7 @@ TEST_F(GcsActorManagerTest, TestNodeFailure) { auto registered_actor = RegisterActor(job_id); rpc::CreateActorRequest create_actor_request; create_actor_request.mutable_task_spec()->CopyFrom( - registered_actor->GetActorTableData().task_spec()); + registered_actor->GetCreationTaskSpecification().GetMessage()); std::vector> finished_actors; Status status = gcs_actor_manager_->CreateActor( @@ -428,7 +428,7 @@ TEST_F(GcsActorManagerTest, TestActorReconstruction) { /*detached=*/false); rpc::CreateActorRequest create_actor_request; create_actor_request.mutable_task_spec()->CopyFrom( - registered_actor->GetActorTableData().task_spec()); + registered_actor->GetCreationTaskSpecification().GetMessage()); std::vector> finished_actors; Status status = gcs_actor_manager_->CreateActor( @@ -498,7 +498,7 @@ TEST_F(GcsActorManagerTest, TestActorRestartWhenOwnerDead) { /*detached=*/false); rpc::CreateActorRequest create_actor_request; create_actor_request.mutable_task_spec()->CopyFrom( - registered_actor->GetActorTableData().task_spec()); + registered_actor->GetCreationTaskSpecification().GetMessage()); std::vector> finished_actors; RAY_CHECK_OK(gcs_actor_manager_->CreateActor( @@ -550,7 +550,7 @@ TEST_F(GcsActorManagerTest, TestDetachedActorRestartWhenCreatorDead) { /*detached=*/true); rpc::CreateActorRequest create_actor_request; create_actor_request.mutable_task_spec()->CopyFrom( - registered_actor->GetActorTableData().task_spec()); + registered_actor->GetCreationTaskSpecification().GetMessage()); std::vector> finished_actors; RAY_CHECK_OK(gcs_actor_manager_->CreateActor( @@ -668,7 +668,7 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionWorkerFailure) { /*name=*/actor_name); rpc::CreateActorRequest request1; request1.mutable_task_spec()->CopyFrom( - registered_actor_1->GetActorTableData().task_spec()); + registered_actor_1->GetCreationTaskSpecification().GetMessage()); Status status = gcs_actor_manager_->CreateActor( request1, @@ -705,7 +705,7 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionWorkerFailure) { /*name=*/actor_name); rpc::CreateActorRequest request2; request2.mutable_task_spec()->CopyFrom( - registered_actor_2->GetActorTableData().task_spec()); + registered_actor_2->GetCreationTaskSpecification().GetMessage()); status = gcs_actor_manager_->CreateActor( request2, @@ -724,7 +724,7 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionNodeFailure) { /*name=*/"actor"); rpc::CreateActorRequest request1; request1.mutable_task_spec()->CopyFrom( - registered_actor_1->GetActorTableData().task_spec()); + registered_actor_1->GetCreationTaskSpecification().GetMessage()); Status status = gcs_actor_manager_->CreateActor( request1, @@ -760,7 +760,7 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionNodeFailure) { /*name=*/"actor"); rpc::CreateActorRequest request2; request2.mutable_task_spec()->CopyFrom( - registered_actor_2->GetActorTableData().task_spec()); + registered_actor_2->GetCreationTaskSpecification().GetMessage()); status = gcs_actor_manager_->CreateActor( request2, @@ -780,7 +780,7 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionNotHappendWhenReconstructed) { /*name=*/"actor"); rpc::CreateActorRequest request1; request1.mutable_task_spec()->CopyFrom( - registered_actor_1->GetActorTableData().task_spec()); + registered_actor_1->GetCreationTaskSpecification().GetMessage()); Status status = gcs_actor_manager_->CreateActor( request1, @@ -825,7 +825,7 @@ TEST_F(GcsActorManagerTest, TestDestroyActorBeforeActorCreationCompletes) { auto registered_actor = RegisterActor(job_id); rpc::CreateActorRequest create_actor_request; create_actor_request.mutable_task_spec()->CopyFrom( - registered_actor->GetActorTableData().task_spec()); + registered_actor->GetCreationTaskSpecification().GetMessage()); std::vector> finished_actors; RAY_CHECK_OK(gcs_actor_manager_->CreateActor( @@ -861,7 +861,7 @@ TEST_F(GcsActorManagerTest, TestRaceConditionCancelLease) { /*detached=*/false); rpc::CreateActorRequest create_actor_request; create_actor_request.mutable_task_spec()->CopyFrom( - registered_actor->GetActorTableData().task_spec()); + registered_actor->GetCreationTaskSpecification().GetMessage()); std::vector> finished_actors; RAY_CHECK_OK(gcs_actor_manager_->CreateActor( @@ -886,8 +886,8 @@ TEST_F(GcsActorManagerTest, TestRaceConditionCancelLease) { address.set_worker_id(worker_id.Binary()); actor->UpdateAddress(address); const auto &actor_id = actor->GetActorID(); - const auto &task_id = - TaskID::FromBinary(registered_actor->GetActorTableData().task_spec().task_id()); + const auto &task_id = TaskID::FromBinary( + registered_actor->GetCreationTaskSpecification().GetMessage().task_id()); EXPECT_CALL(*mock_actor_scheduler_, CancelOnLeasing(node_id, actor_id, task_id)); gcs_actor_manager_->OnWorkerDead(owner_node_id, owner_worker_id); ASSERT_TRUE(actor->GetActorTableData().death_cause().has_actor_died_error_context()); @@ -907,7 +907,7 @@ TEST_F(GcsActorManagerTest, TestRegisterActor) { std::vector> finished_actors; rpc::CreateActorRequest request; request.mutable_task_spec()->CopyFrom( - registered_actor->GetActorTableData().task_spec()); + registered_actor->GetCreationTaskSpecification().GetMessage()); RAY_CHECK_OK(gcs_actor_manager_->CreateActor( request, [&finished_actors](std::shared_ptr actor, @@ -1025,7 +1025,7 @@ TEST_F(GcsActorManagerTest, TestOwnerAndChildDiedAtTheSameTimeRaceCondition) { /*detached=*/false); rpc::CreateActorRequest create_actor_request; create_actor_request.mutable_task_spec()->CopyFrom( - registered_actor->GetActorTableData().task_spec()); + registered_actor->GetCreationTaskSpecification().GetMessage()); std::vector> finished_actors; RAY_CHECK_OK(gcs_actor_manager_->CreateActor( diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_mock_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_mock_test.cc index c0ddfdea6aeb0..6687cc7c371db 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_mock_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_mock_test.cc @@ -82,7 +82,7 @@ TEST_F(GcsActorSchedulerTest, KillWorkerLeak1) { rpc::ActorTableData actor_data; actor_data.set_state(rpc::ActorTableData::PENDING_CREATION); actor_data.set_actor_id(actor_id.Binary()); - auto actor = std::make_shared(actor_data); + auto actor = std::make_shared(actor_data, rpc::TaskSpec()); std::function cb; EXPECT_CALL(*raylet_client, RequestWorkerLease(An(), _, _, _, _)) .WillOnce(testing::SaveArg<2>(&cb)); @@ -109,7 +109,7 @@ TEST_F(GcsActorSchedulerTest, KillWorkerLeak2) { rpc::ActorTableData actor_data; actor_data.set_state(rpc::ActorTableData::PENDING_CREATION); actor_data.set_actor_id(actor_id.Binary()); - auto actor = std::make_shared(actor_data); + auto actor = std::make_shared(actor_data, rpc::TaskSpec()); rpc::ClientCallback request_worker_lease_cb; // Ensure actor is killed EXPECT_CALL(*core_worker_client, KillActor(_, _)); diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 4f100465bafbf..1d850e1e5e0fb 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -46,6 +46,7 @@ enum TablePrefix { PLACEMENT_GROUP_SCHEDULE = 18; PLACEMENT_GROUP = 19; KV = 20; + ACTOR_TASK_SPEC = 21; } // The channel that Add operations to the Table should be published on, if any. @@ -126,13 +127,13 @@ message ActorTableData { string name = 12; // Last timestamp that the actor state was updated. double timestamp = 13; - // The task specification of this actor's creation task. - TaskSpec task_spec = 14; // Resource mapping ids acquired by the leased worker. This field is only set when this // actor already has a leased worker. repeated ResourceMapEntry resource_mapping = 15; // The process id of this actor. uint32 pid = 16; + // The function descriptor of the actor creation task. + FunctionDescriptor function_descriptor = 17; // The actor's namespace. Named `ray_namespace` to avoid confusions when invoked in c++. string ray_namespace = 19; // The unix ms timestamp the actor was started at. @@ -148,6 +149,8 @@ message ActorTableData { string class_name = 23; // Contains metadata about why the actor is dead. ActorDeathCause death_cause = 24; + // Quantities of the different resources required by this actor. + map required_resources = 28; } message ErrorTableData { diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index 6e2785e72361d..af8adf32b36e6 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -97,6 +97,8 @@ message GetNamedActorInfoReply { GcsStatus status = 1; // Data of actor. ActorTableData actor_table_data = 2; + // The task specification of this actor's creation task. + TaskSpec task_spec = 3; } message ListNamedActorsRequest {