Skip to content

Commit

Permalink
avoid copying ActorTableData when NodeMananger updates an actor to GCS (
Browse files Browse the repository at this point in the history
  • Loading branch information
micafan authored and raulchen committed Jul 26, 2019
1 parent 3321555 commit 6f682db
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 34 deletions.
56 changes: 27 additions & 29 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -795,14 +795,14 @@ void NodeManager::HandleDisconnectedActor(const ActorID &actor_id, bool was_loca
}
}
// Update the actor's state.
ActorTableData new_actor_data = actor_entry->second.GetTableData();
new_actor_data.set_state(new_state);
ActorTableData new_actor_info = actor_entry->second.GetTableData();
new_actor_info.set_state(new_state);
if (was_local) {
// If the actor was local, immediately update the state in actor registry.
// So if we receive any actor tasks before we receive GCS notification,
// these tasks can be correctly routed to the `MethodsWaitingForActorCreation`
// queue, instead of being assigned to the dead actor.
HandleActorStateTransition(actor_id, ActorRegistration(new_actor_data));
HandleActorStateTransition(actor_id, ActorRegistration(new_actor_info));
}

auto done = [was_local, actor_id](Status status) {
Expand All @@ -812,7 +812,7 @@ void NodeManager::HandleDisconnectedActor(const ActorID &actor_id, bool was_loca
RAY_LOG(FATAL) << "Failed to update state for actor " << actor_id;
}
};
auto actor_notification = std::make_shared<ActorTableData>(new_actor_data);
auto actor_notification = std::make_shared<ActorTableData>(new_actor_info);
RAY_CHECK_OK(gcs_client_->Actors().AsyncUpdate(actor_id, actor_notification, done));
}

Expand Down Expand Up @@ -1829,45 +1829,46 @@ void NodeManager::FinishAssignedTask(Worker &worker) {
}
}

ActorTableData NodeManager::CreateActorTableDataFromCreationTask(
std::shared_ptr<ActorTableData> NodeManager::CreateActorTableDataFromCreationTask(
const TaskSpecification &task_spec) {
RAY_CHECK(task_spec.IsActorCreationTask());
auto actor_id = task_spec.ActorCreationId();
auto actor_entry = actor_registry_.find(actor_id);
ActorTableData new_actor_data;
std::shared_ptr<ActorTableData> actor_info_ptr;
// TODO(swang): If this is an actor that was reconstructed, and previous
// actor notifications were delayed, then this node may not have an entry for
// the actor in actor_regisry_. Then, the fields for the number of
// reconstructions will be wrong.
if (actor_entry == actor_registry_.end()) {
actor_info_ptr.reset(new ActorTableData());
// Set all of the static fields for the actor. These fields will not
// change even if the actor fails or is reconstructed.
new_actor_data.set_actor_id(actor_id.Binary());
new_actor_data.set_actor_creation_dummy_object_id(
actor_info_ptr->set_actor_id(actor_id.Binary());
actor_info_ptr->set_actor_creation_dummy_object_id(
task_spec.ActorDummyObject().Binary());
new_actor_data.set_job_id(task_spec.JobId().Binary());
new_actor_data.set_max_reconstructions(task_spec.MaxActorReconstructions());
actor_info_ptr->set_job_id(task_spec.JobId().Binary());
actor_info_ptr->set_max_reconstructions(task_spec.MaxActorReconstructions());
// This is the first time that the actor has been created, so the number
// of remaining reconstructions is the max.
new_actor_data.set_remaining_reconstructions(task_spec.MaxActorReconstructions());
actor_info_ptr->set_remaining_reconstructions(task_spec.MaxActorReconstructions());
} else {
// If we've already seen this actor, it means that this actor was reconstructed.
// Thus, its previous state must be RECONSTRUCTING.
RAY_CHECK(actor_entry->second.GetState() == ActorTableData::RECONSTRUCTING);
// Copy the static fields from the current actor entry.
new_actor_data = actor_entry->second.GetTableData();
actor_info_ptr.reset(new ActorTableData(actor_entry->second.GetTableData()));
// We are reconstructing the actor, so subtract its
// remaining_reconstructions by 1.
new_actor_data.set_remaining_reconstructions(
new_actor_data.remaining_reconstructions() - 1);
actor_info_ptr->set_remaining_reconstructions(
actor_info_ptr->remaining_reconstructions() - 1);
}

// Set the new fields for the actor's state to indicate that the actor is
// now alive on this node manager.
new_actor_data.set_node_manager_id(
actor_info_ptr->set_node_manager_id(
gcs_client_->client_table().GetLocalClientId().Binary());
new_actor_data.set_state(ActorTableData::ALIVE);
return new_actor_data;
actor_info_ptr->set_state(ActorTableData::ALIVE);
return actor_info_ptr;
}

void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) {
Expand Down Expand Up @@ -1973,8 +1974,8 @@ void NodeManager::FinishAssignedActorCreationTask(const ActorID &parent_actor_id
bool resumed_from_checkpoint) {
// Notify the other node managers that the actor has been created.
const ActorID actor_id = task_spec.ActorCreationId();
auto new_actor_data = CreateActorTableDataFromCreationTask(task_spec);
new_actor_data.set_parent_actor_id(parent_actor_id.Binary());
auto new_actor_info = CreateActorTableDataFromCreationTask(task_spec);
new_actor_info->set_parent_actor_id(parent_actor_id.Binary());
auto update_callback = [actor_id](Status status) {
if (!status.ok()) {
// Only one node at a time should succeed at creating or updating the actor.
Expand All @@ -1992,21 +1993,20 @@ void NodeManager::FinishAssignedActorCreationTask(const ActorID &parent_actor_id
<< actor_id;
RAY_CHECK_OK(gcs_client_->actor_checkpoint_table().Lookup(
JobID::Nil(), checkpoint_id,
[this, actor_id, new_actor_data, update_callback](
[this, actor_id, new_actor_info, update_callback](
ray::gcs::RedisGcsClient *client, const UniqueID &checkpoint_id,
const ActorCheckpointData &checkpoint_data) {
RAY_LOG(INFO) << "Restoring registration for actor " << actor_id
<< " from checkpoint " << checkpoint_id;
ActorRegistration actor_registration =
ActorRegistration(new_actor_data, checkpoint_data);
ActorRegistration(*new_actor_info, checkpoint_data);
// Mark the unreleased dummy objects in the checkpoint frontier as local.
for (const auto &entry : actor_registration.GetDummyObjects()) {
HandleObjectLocal(entry.first);
}
HandleActorStateTransition(actor_id, std::move(actor_registration));
auto actor_notification = std::make_shared<ActorTableData>(new_actor_data);
// The actor was created before.
RAY_CHECK_OK(gcs_client_->Actors().AsyncUpdate(actor_id, actor_notification,
RAY_CHECK_OK(gcs_client_->Actors().AsyncUpdate(actor_id, new_actor_info,
update_callback));
},
[actor_id](ray::gcs::RedisGcsClient *client, const UniqueID &checkpoint_id) {
Expand All @@ -2016,16 +2016,14 @@ void NodeManager::FinishAssignedActorCreationTask(const ActorID &parent_actor_id
} else {
// The actor did not resume from a checkpoint. Immediately notify the
// other node managers that the actor has been created.
HandleActorStateTransition(actor_id, ActorRegistration(new_actor_data));
auto actor_notification = std::make_shared<ActorTableData>(new_actor_data);
HandleActorStateTransition(actor_id, ActorRegistration(*new_actor_info));
if (actor_registry_.find(actor_id) != actor_registry_.end()) {
// The actor was created before.
RAY_CHECK_OK(gcs_client_->Actors().AsyncUpdate(actor_id, actor_notification,
update_callback));
RAY_CHECK_OK(
gcs_client_->Actors().AsyncUpdate(actor_id, new_actor_info, update_callback));
} else {
// The actor was never created before.
RAY_CHECK_OK(
gcs_client_->Actors().AsyncRegister(actor_notification, update_callback));
RAY_CHECK_OK(gcs_client_->Actors().AsyncRegister(new_actor_info, update_callback));
}
}
if (!resumed_from_checkpoint) {
Expand Down
11 changes: 6 additions & 5 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
///
/// \param task_spec Task specification of the actor creation task that created the
/// actor.
ActorTableData CreateActorTableDataFromCreationTask(const TaskSpecification &task_spec);
std::shared_ptr<ActorTableData> CreateActorTableDataFromCreationTask(
const TaskSpecification &task_spec);
/// Handle a worker finishing an assigned actor task or actor creation task.
/// \param worker The worker that finished the task.
/// \param task The actor task or actor creation task.
Expand All @@ -296,11 +297,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
/// Helper function for handling worker to finish its assigned actor task
/// or actor creation task. Gets invoked when tasks's parent actor is known.
///
/// \param actor_id The actor id corresponding to the actor (creation) task.
/// \param actor_handle_id The actor id corresponding to the actor (creation) task.
/// \param new_actor_data The struct which will be used to register the task.
/// \param parent_actor_id The actor id corresponding to the actor which creates
/// the new actor.
/// \param task_spec Task specification of the actor creation task that created the
/// actor.
/// \param resumed_from_checkpoint If the actor was resumed from a checkpoint.
/// \param dummy_object Dummy object corresponding to the actor creation task.
/// \return Void.
void FinishAssignedActorCreationTask(const ActorID &parent_actor_id,
const TaskSpecification &task_spec,
Expand Down

0 comments on commit 6f682db

Please sign in to comment.