Skip to content

Commit

Permalink
[Core]Save task spec in separate table (ray-project#22650)
Browse files Browse the repository at this point in the history
This is a rebase version of ray-project#11592. As task spec info is only needed when gcs create or start an actor, so we can remove it from actor table and save the serialization time and memory/network cost when gcs clients get actor infos from gcs.

As internal repository varies very much from the community. This pr just add some manual check with simple cherry pick. Welcome to comment first and at the meantime I'll see if there's any test case failed or some points were missed.
  • Loading branch information
WangTaoTheTonic authored Apr 12, 2022
1 parent c30491d commit 6aefe9b
Show file tree
Hide file tree
Showing 21 changed files with 189 additions and 87 deletions.
17 changes: 7 additions & 10 deletions dashboard/modules/actor/actor_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import ray.dashboard.optional_utils as dashboard_optional_utils
from ray.dashboard.optional_utils import rest_response
from ray.dashboard.modules.actor import actor_consts
from ray.dashboard.modules.actor.actor_utils import actor_classname_from_task_spec
from ray.dashboard.modules.actor.actor_utils import actor_classname_from_func_descriptor
from ray.core.generated import node_manager_pb2_grpc
from ray.core.generated import gcs_service_pb2
from ray.core.generated import gcs_service_pb2_grpc
Expand Down Expand Up @@ -56,20 +56,17 @@ def actor_table_data_to_dict(message):
"state",
"name",
"numRestarts",
"taskSpec",
"functionDescriptor",
"timestamp",
"numExecutedTasks",
}
light_message = {k: v for (k, v) in orig_message.items() if k in fields}
if "taskSpec" in light_message:
actor_class = actor_classname_from_task_spec(light_message["taskSpec"])
logger.info(light_message)
if "functionDescriptor" in light_message:
actor_class = actor_classname_from_func_descriptor(
light_message["functionDescriptor"]
)
light_message["actorClass"] = actor_class
if "functionDescriptor" in light_message["taskSpec"]:
light_message["taskSpec"] = {
"functionDescriptor": light_message["taskSpec"]["functionDescriptor"]
}
else:
light_message.pop("taskSpec")
return light_message


Expand Down
8 changes: 8 additions & 0 deletions dashboard/modules/actor/actor_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ def actor_classname_from_task_spec(task_spec):
)


def actor_classname_from_func_descriptor(func_desc):
return (
func_desc.get("pythonFunctionDescriptor", {})
.get("className", "Unknown actor class")
.split(".")[-1]
)


def _group_actors_by_python_class(actors):
groups = defaultdict(list)
for actor in actors.values():
Expand Down
9 changes: 4 additions & 5 deletions dashboard/modules/actor/tests/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,8 @@ class InfeasibleActor:
assert len(actors) == 3
one_entry = list(actors.values())[0]
assert "jobId" in one_entry
assert "taskSpec" in one_entry
assert "functionDescriptor" in one_entry["taskSpec"]
assert type(one_entry["taskSpec"]["functionDescriptor"]) is dict
assert "functionDescriptor" in one_entry
assert type(one_entry["functionDescriptor"]) is dict
assert "address" in one_entry
assert type(one_entry["address"]) is dict
assert "state" in one_entry
Expand Down Expand Up @@ -264,7 +263,7 @@ def actor_table_data_to_dict(message):
including_default_value_fields=False,
)

non_state_keys = ("actorId", "jobId", "taskSpec")
non_state_keys = ("actorId", "jobId")

for msg in msgs:
actor_data_dict = actor_table_data_to_dict(msg)
Expand Down Expand Up @@ -292,9 +291,9 @@ def actor_table_data_to_dict(message):
"actorCreationDummyObjectId",
"jobId",
"ownerAddress",
"taskSpec",
"className",
"serializedRuntimeEnv",
"functionDescriptor",
"rayNamespace",
}
else:
Expand Down
2 changes: 1 addition & 1 deletion dashboard/modules/snapshot/snapshot_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ async def get_actor_info(self):
"start_time": actor_table_entry.start_time,
"end_time": actor_table_entry.end_time,
"is_detached": actor_table_entry.is_detached,
"resources": dict(actor_table_entry.task_spec.required_resources),
"resources": dict(actor_table_entry.required_resources),
"actor_class": actor_table_entry.class_name,
"current_worker_id": actor_table_entry.address.worker_id.hex(),
"current_raylet_id": actor_table_entry.address.raylet_id.hex(),
Expand Down
36 changes: 18 additions & 18 deletions src/ray/core_worker/actor_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,29 +57,28 @@ rpc::ActorHandle CreateInnerActorHandleFromString(const std::string &serialized)
return inner;
}

rpc::ActorHandle CreateInnerActorHandleFromActorTableData(
const rpc::ActorTableData &actor_table_data) {
rpc::ActorHandle CreateInnerActorHandleFromActorData(
const rpc::ActorTableData &actor_table_data, const rpc::TaskSpec &task_spec) {
rpc::ActorHandle inner;
inner.set_actor_id(actor_table_data.actor_id());
inner.set_owner_id(actor_table_data.parent_id());
inner.mutable_owner_address()->CopyFrom(actor_table_data.owner_address());
inner.set_creation_job_id(actor_table_data.job_id());
inner.set_actor_language(actor_table_data.task_spec().language());
inner.set_actor_language(task_spec.language());
inner.mutable_actor_creation_task_function_descriptor()->CopyFrom(
actor_table_data.task_spec().function_descriptor());
TaskSpecification task_spec(actor_table_data.task_spec());
inner.set_actor_cursor(task_spec.ReturnId(0).Binary());
inner.set_extension_data(
actor_table_data.task_spec().actor_creation_task_spec().extension_data());
inner.set_max_task_retries(
actor_table_data.task_spec().actor_creation_task_spec().max_task_retries());
inner.set_name(actor_table_data.task_spec().actor_creation_task_spec().name());
inner.set_ray_namespace(
actor_table_data.task_spec().actor_creation_task_spec().ray_namespace());
actor_table_data.function_descriptor());
inner.set_actor_cursor(
ObjectID::FromIndex(
TaskID::ForActorCreationTask(ActorID::FromBinary(actor_table_data.actor_id())),
1)
.Binary());
inner.set_extension_data(task_spec.actor_creation_task_spec().extension_data());
inner.set_max_task_retries(task_spec.actor_creation_task_spec().max_task_retries());
inner.set_name(actor_table_data.name());
inner.set_ray_namespace(actor_table_data.ray_namespace());
inner.set_execute_out_of_order(
actor_table_data.task_spec().actor_creation_task_spec().execute_out_of_order());
inner.set_max_pending_calls(
actor_table_data.task_spec().actor_creation_task_spec().max_pending_calls());
task_spec.actor_creation_task_spec().execute_out_of_order());
inner.set_max_pending_calls(task_spec.actor_creation_task_spec().max_pending_calls());
return inner;
}
} // namespace
Expand Down Expand Up @@ -115,8 +114,9 @@ ActorHandle::ActorHandle(
ActorHandle::ActorHandle(const std::string &serialized)
: ActorHandle(CreateInnerActorHandleFromString(serialized)) {}

ActorHandle::ActorHandle(const rpc::ActorTableData &actor_table_data)
: ActorHandle(CreateInnerActorHandleFromActorTableData(actor_table_data)) {}
ActorHandle::ActorHandle(const rpc::ActorTableData &actor_table_data,
const rpc::TaskSpec &task_spec)
: ActorHandle(CreateInnerActorHandleFromActorData(actor_table_data, task_spec)) {}

void ActorHandle::SetActorTaskSpec(TaskSpecBuilder &builder, const ObjectID new_cursor) {
absl::MutexLock guard(&mutex_);
Expand Down
5 changes: 3 additions & 2 deletions src/ray/core_worker/actor_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ class ActorHandle {
/// Constructs an ActorHandle from a serialized string.
explicit ActorHandle(const std::string &serialized);

/// Constructs an ActorHandle from a gcs::ActorTableData message.
ActorHandle(const rpc::ActorTableData &actor_table_data);
/// Constructs an ActorHandle from a rpc::ActorTableData and a rpc::TaskSpec message.
ActorHandle(const rpc::ActorTableData &actor_table_data,
const rpc::TaskSpec &task_spec);

ActorID GetActorID() const { return ActorID::FromBinary(inner_.actor_id()); };

Expand Down
11 changes: 7 additions & 4 deletions src/ray/core_worker/actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,16 @@ std::pair<std::shared_ptr<const ActorHandle>, Status> ActorManager::GetNamedActo
// implemented using a promise that's captured in the RPC callback.
// There should be no risk of deadlock because we don't hold any
// locks during the call and the RPCs run on a separate thread.
rpc::ActorTableData result;
const auto status = gcs_client_->Actors().SyncGetByName(name, ray_namespace, result);
rpc::ActorTableData actor_table_data;
rpc::TaskSpec task_spec;
const auto status = gcs_client_->Actors().SyncGetByName(
name, ray_namespace, actor_table_data, task_spec);
if (status.ok()) {
auto actor_handle = std::make_unique<ActorHandle>(result);
auto actor_handle = std::make_unique<ActorHandle>(actor_table_data, task_spec);
actor_id = actor_handle->GetActorID();
AddNewActorHandle(std::move(actor_handle),
GenerateCachedActorName(result.ray_namespace(), result.name()),
GenerateCachedActorName(actor_table_data.ray_namespace(),
actor_table_data.name()),
call_site,
caller_address,
/*is_detached*/ true);
Expand Down
6 changes: 4 additions & 2 deletions src/ray/gcs/gcs_client/accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,17 @@ Status ActorInfoAccessor::AsyncGetByName(

Status ActorInfoAccessor::SyncGetByName(const std::string &name,
const std::string &ray_namespace,
rpc::ActorTableData &actor_table_data) {
rpc::ActorTableData &actor_table_data,
rpc::TaskSpec &task_spec) {
rpc::GetNamedActorInfoRequest request;
rpc::GetNamedActorInfoReply reply;
request.set_name(name);
request.set_ray_namespace(ray_namespace);
auto status = client_impl_->GetGcsRpcClient().SyncGetNamedActorInfo(
request, &reply, /*timeout_ms*/ GetGcsTimeoutMs());
if (status.ok() && reply.has_actor_table_data()) {
if (status.ok()) {
actor_table_data = reply.actor_table_data();
task_spec = reply.task_spec();
}
return status;
}
Expand Down
3 changes: 2 additions & 1 deletion src/ray/gcs/gcs_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ class ActorInfoAccessor {
/// NotFound if the name doesn't exist.
virtual Status SyncGetByName(const std::string &name,
const std::string &ray_namespace,
rpc::ActorTableData &actor_table_data);
rpc::ActorTableData &actor_table_data,
rpc::TaskSpec &task_spec);

/// List all named actors from the GCS asynchronously.
///
Expand Down
1 change: 0 additions & 1 deletion src/ray/gcs/gcs_client/test/gcs_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,6 @@ TEST_P(GcsClientTest, DISABLED_TestGetActorPerf) {
}
for (int index = 0; index < actor_count; ++index) {
auto actor_table_data = Mocker::GenActorTableData(job_id);
actor_table_data->mutable_task_spec()->CopyFrom(task_spec);
RegisterActor(actor_table_data, false, true);
}

Expand Down
55 changes: 38 additions & 17 deletions src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,9 @@ std::string GcsActor::GetRayNamespace() const {
}

TaskSpecification GcsActor::GetCreationTaskSpecification() const {
const auto &task_spec = actor_table_data_.task_spec();
return TaskSpecification(task_spec);
// The task spec is not available when the actor is dead.
RAY_CHECK(actor_table_data_.state() != rpc::ActorTableData::DEAD);
return TaskSpecification(*task_spec_);
}

const rpc::ActorTableData &GcsActor::GetActorTableData() const {
Expand All @@ -182,6 +183,8 @@ const rpc::ActorTableData &GcsActor::GetActorTableData() const {

rpc::ActorTableData *GcsActor::GetMutableActorTableData() { return &actor_table_data_; }

rpc::TaskSpec *GcsActor::GetMutableTaskSpec() { return task_spec_.get(); }

std::shared_ptr<const GcsActorWorkerAssignment> GcsActor::GetActorWorkerAssignment()
const {
return assignment_ptr_;
Expand Down Expand Up @@ -372,6 +375,8 @@ void GcsActorManager::HandleGetNamedActorInfo(
} else {
reply->unsafe_arena_set_allocated_actor_table_data(
iter->second->GetMutableActorTableData());
RAY_LOG(INFO) << "WANGTAO " << iter->second->GetState();
reply->unsafe_arena_set_allocated_task_spec(iter->second->GetMutableTaskSpec());
RAY_LOG(DEBUG) << "Finished getting actor info, job id = " << actor_id.JobId()
<< ", actor id = " << actor_id;
}
Expand Down Expand Up @@ -502,6 +507,8 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ
}

// The backend storage is supposed to be reliable, so the status must be ok.
RAY_CHECK_OK(gcs_table_storage_->ActorTaskSpecTable().Put(
actor_id, request.task_spec(), [](const Status &status) {}));
RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put(
actor->GetActorID(),
*actor->GetMutableActorTableData(),
Expand Down Expand Up @@ -726,8 +733,6 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id,
gcs_actor_scheduler_->OnActorDestruction(it->second);
}

const auto &task_id = it->second->GetCreationTaskSpecification().TaskId();
it->second->GetMutableActorTableData()->mutable_task_spec()->Clear();
it->second->GetMutableActorTableData()->set_timestamp(current_sys_time_ms());
AddDestroyedActorToCache(it->second);
const auto actor = std::move(it->second);
Expand Down Expand Up @@ -768,7 +773,7 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id,
created_actors_.erase(node_it);
}
} else {
CancelActorInScheduling(actor, task_id);
CancelActorInScheduling(actor, TaskID::ForActorCreationTask(actor_id));
}
}

Expand All @@ -792,6 +797,7 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id,
[this, actor_id, actor_table_data](Status status) {
RAY_CHECK_OK(gcs_publisher_->PublishActor(
actor_id, *GenActorDataOnlyWithStates(*actor_table_data), nullptr));
RAY_CHECK_OK(gcs_table_storage_->ActorTaskSpecTable().Delete(actor_id, nullptr));
// Destroy placement group owned by this actor.
destroy_owned_placement_group_if_needed_(actor_id);
}));
Expand Down Expand Up @@ -1054,6 +1060,8 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id,
}
RAY_CHECK_OK(gcs_publisher_->PublishActor(
actor_id, *GenActorDataOnlyWithStates(*mutable_actor_table_data), nullptr));
RAY_CHECK_OK(
gcs_table_storage_->ActorTaskSpecTable().Delete(actor_id, nullptr));
}));
// The actor is dead, but we should not remove the entry from the
// registered actors yet. If the actor is owned, we will destroy the actor
Expand Down Expand Up @@ -1172,27 +1180,31 @@ void GcsActorManager::SetSchedulePendingActorsPosted(bool posted) {

void GcsActorManager::Initialize(const GcsInitData &gcs_init_data) {
const auto &jobs = gcs_init_data.Jobs();
const auto &actor_task_specs = gcs_init_data.ActorTaskSpecs();
absl::flat_hash_map<NodeID, std::vector<WorkerID>> node_to_workers;
for (const auto &entry : gcs_init_data.Actors()) {
auto job_iter = jobs.find(entry.first.JobId());
std::vector<ActorID> dead_actors;
for (const auto &[actor_id, actor_table_data] : gcs_init_data.Actors()) {
auto job_iter = jobs.find(actor_id.JobId());
auto is_job_dead = (job_iter == jobs.end() || job_iter->second.is_dead());
auto actor = std::make_shared<GcsActor>(entry.second);
if (entry.second.state() != ray::rpc::ActorTableData::DEAD && !is_job_dead) {
registered_actors_.emplace(entry.first, actor);
if (actor_table_data.state() != ray::rpc::ActorTableData::DEAD && !is_job_dead) {
const auto &iter = actor_task_specs.find(actor_id);
RAY_CHECK(iter != actor_task_specs.end());
auto actor = std::make_shared<GcsActor>(actor_table_data, iter->second);
registered_actors_.emplace(actor_id, actor);
function_manager_.AddJobReference(actor->GetActorID().JobId());
if (!actor->GetName().empty()) {
auto &actors_in_namespace = named_actors_[actor->GetRayNamespace()];
actors_in_namespace.emplace(actor->GetName(), actor->GetActorID());
}

if (entry.second.state() == ray::rpc::ActorTableData::DEPENDENCIES_UNREADY) {
if (actor_table_data.state() == ray::rpc::ActorTableData::DEPENDENCIES_UNREADY) {
const auto &owner = actor->GetOwnerAddress();
const auto &owner_node = NodeID::FromBinary(owner.raylet_id());
const auto &owner_worker = WorkerID::FromBinary(owner.worker_id());
RAY_CHECK(unresolved_actors_[owner_node][owner_worker]
.emplace(actor->GetActorID())
.second);
} else if (entry.second.state() == ray::rpc::ActorTableData::ALIVE) {
} else if (actor_table_data.state() == ray::rpc::ActorTableData::ALIVE) {
created_actors_[actor->GetNodeID()].emplace(actor->GetWorkerID(),
actor->GetActorID());
}
Expand All @@ -1208,11 +1220,17 @@ void GcsActorManager::Initialize(const GcsInitData &gcs_init_data) {
node_to_workers[actor->GetNodeID()].emplace_back(actor->GetWorkerID());
}
} else {
destroyed_actors_.emplace(entry.first, actor);
sorted_destroyed_actor_list_.emplace_back(entry.first,
(int64_t)entry.second.timestamp());
dead_actors.push_back(actor_id);
auto actor = std::make_shared<GcsActor>(actor_table_data);
destroyed_actors_.emplace(actor_id, actor);
sorted_destroyed_actor_list_.emplace_back(actor_id,
(int64_t)actor_table_data.timestamp());
}
}
if (!dead_actors.empty()) {
RAY_CHECK_OK(
gcs_table_storage_->ActorTaskSpecTable().BatchDelete(dead_actors, nullptr));
}
sorted_destroyed_actor_list_.sort([](const std::pair<ActorID, int64_t> &left,
const std::pair<ActorID, int64_t> &right) {
return left.second < right.second;
Expand Down Expand Up @@ -1251,8 +1269,11 @@ void GcsActorManager::OnJobFinished(const JobID &job_id) {

run_delayed_(
[this, non_detached_actors = std::move(non_detached_actors)]() {
RAY_CHECK_OK(gcs_table_storage_->ActorTable().BatchDelete(non_detached_actors,
nullptr));
RAY_CHECK_OK(gcs_table_storage_->ActorTable().BatchDelete(
non_detached_actors, [this, non_detached_actors](const Status &status) {
RAY_CHECK_OK(gcs_table_storage_->ActorTaskSpecTable().BatchDelete(
non_detached_actors, nullptr));
}));
},

actor_gc_delay_);
Expand Down
Loading

0 comments on commit 6aefe9b

Please sign in to comment.