diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 2a97931db6666..50af61c9e1e33 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -795,14 +795,14 @@ void NodeManager::HandleDisconnectedActor(const ActorID &actor_id, bool was_loca } } // Update the actor's state. - ActorTableData new_actor_data = actor_entry->second.GetTableData(); - new_actor_data.set_state(new_state); + ActorTableData new_actor_info = actor_entry->second.GetTableData(); + new_actor_info.set_state(new_state); if (was_local) { // If the actor was local, immediately update the state in actor registry. // So if we receive any actor tasks before we receive GCS notification, // these tasks can be correctly routed to the `MethodsWaitingForActorCreation` // queue, instead of being assigned to the dead actor. - HandleActorStateTransition(actor_id, ActorRegistration(new_actor_data)); + HandleActorStateTransition(actor_id, ActorRegistration(new_actor_info)); } auto done = [was_local, actor_id](Status status) { @@ -812,7 +812,7 @@ void NodeManager::HandleDisconnectedActor(const ActorID &actor_id, bool was_loca RAY_LOG(FATAL) << "Failed to update state for actor " << actor_id; } }; - auto actor_notification = std::make_shared(new_actor_data); + auto actor_notification = std::make_shared(new_actor_info); RAY_CHECK_OK(gcs_client_->Actors().AsyncUpdate(actor_id, actor_notification, done)); } @@ -1829,45 +1829,46 @@ void NodeManager::FinishAssignedTask(Worker &worker) { } } -ActorTableData NodeManager::CreateActorTableDataFromCreationTask( +std::shared_ptr NodeManager::CreateActorTableDataFromCreationTask( const TaskSpecification &task_spec) { RAY_CHECK(task_spec.IsActorCreationTask()); auto actor_id = task_spec.ActorCreationId(); auto actor_entry = actor_registry_.find(actor_id); - ActorTableData new_actor_data; + std::shared_ptr actor_info_ptr; // TODO(swang): If this is an actor that was reconstructed, and previous // actor notifications were delayed, then this node may not have an entry for // the actor in actor_regisry_. Then, the fields for the number of // reconstructions will be wrong. if (actor_entry == actor_registry_.end()) { + actor_info_ptr.reset(new ActorTableData()); // Set all of the static fields for the actor. These fields will not // change even if the actor fails or is reconstructed. - new_actor_data.set_actor_id(actor_id.Binary()); - new_actor_data.set_actor_creation_dummy_object_id( + actor_info_ptr->set_actor_id(actor_id.Binary()); + actor_info_ptr->set_actor_creation_dummy_object_id( task_spec.ActorDummyObject().Binary()); - new_actor_data.set_job_id(task_spec.JobId().Binary()); - new_actor_data.set_max_reconstructions(task_spec.MaxActorReconstructions()); + actor_info_ptr->set_job_id(task_spec.JobId().Binary()); + actor_info_ptr->set_max_reconstructions(task_spec.MaxActorReconstructions()); // This is the first time that the actor has been created, so the number // of remaining reconstructions is the max. - new_actor_data.set_remaining_reconstructions(task_spec.MaxActorReconstructions()); + actor_info_ptr->set_remaining_reconstructions(task_spec.MaxActorReconstructions()); } else { // If we've already seen this actor, it means that this actor was reconstructed. // Thus, its previous state must be RECONSTRUCTING. RAY_CHECK(actor_entry->second.GetState() == ActorTableData::RECONSTRUCTING); // Copy the static fields from the current actor entry. - new_actor_data = actor_entry->second.GetTableData(); + actor_info_ptr.reset(new ActorTableData(actor_entry->second.GetTableData())); // We are reconstructing the actor, so subtract its // remaining_reconstructions by 1. - new_actor_data.set_remaining_reconstructions( - new_actor_data.remaining_reconstructions() - 1); + actor_info_ptr->set_remaining_reconstructions( + actor_info_ptr->remaining_reconstructions() - 1); } // Set the new fields for the actor's state to indicate that the actor is // now alive on this node manager. - new_actor_data.set_node_manager_id( + actor_info_ptr->set_node_manager_id( gcs_client_->client_table().GetLocalClientId().Binary()); - new_actor_data.set_state(ActorTableData::ALIVE); - return new_actor_data; + actor_info_ptr->set_state(ActorTableData::ALIVE); + return actor_info_ptr; } void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) { @@ -1973,8 +1974,8 @@ void NodeManager::FinishAssignedActorCreationTask(const ActorID &parent_actor_id bool resumed_from_checkpoint) { // Notify the other node managers that the actor has been created. const ActorID actor_id = task_spec.ActorCreationId(); - auto new_actor_data = CreateActorTableDataFromCreationTask(task_spec); - new_actor_data.set_parent_actor_id(parent_actor_id.Binary()); + auto new_actor_info = CreateActorTableDataFromCreationTask(task_spec); + new_actor_info->set_parent_actor_id(parent_actor_id.Binary()); auto update_callback = [actor_id](Status status) { if (!status.ok()) { // Only one node at a time should succeed at creating or updating the actor. @@ -1992,21 +1993,20 @@ void NodeManager::FinishAssignedActorCreationTask(const ActorID &parent_actor_id << actor_id; RAY_CHECK_OK(gcs_client_->actor_checkpoint_table().Lookup( JobID::Nil(), checkpoint_id, - [this, actor_id, new_actor_data, update_callback]( + [this, actor_id, new_actor_info, update_callback]( ray::gcs::RedisGcsClient *client, const UniqueID &checkpoint_id, const ActorCheckpointData &checkpoint_data) { RAY_LOG(INFO) << "Restoring registration for actor " << actor_id << " from checkpoint " << checkpoint_id; ActorRegistration actor_registration = - ActorRegistration(new_actor_data, checkpoint_data); + ActorRegistration(*new_actor_info, checkpoint_data); // Mark the unreleased dummy objects in the checkpoint frontier as local. for (const auto &entry : actor_registration.GetDummyObjects()) { HandleObjectLocal(entry.first); } HandleActorStateTransition(actor_id, std::move(actor_registration)); - auto actor_notification = std::make_shared(new_actor_data); // The actor was created before. - RAY_CHECK_OK(gcs_client_->Actors().AsyncUpdate(actor_id, actor_notification, + RAY_CHECK_OK(gcs_client_->Actors().AsyncUpdate(actor_id, new_actor_info, update_callback)); }, [actor_id](ray::gcs::RedisGcsClient *client, const UniqueID &checkpoint_id) { @@ -2016,16 +2016,14 @@ void NodeManager::FinishAssignedActorCreationTask(const ActorID &parent_actor_id } else { // The actor did not resume from a checkpoint. Immediately notify the // other node managers that the actor has been created. - HandleActorStateTransition(actor_id, ActorRegistration(new_actor_data)); - auto actor_notification = std::make_shared(new_actor_data); + HandleActorStateTransition(actor_id, ActorRegistration(*new_actor_info)); if (actor_registry_.find(actor_id) != actor_registry_.end()) { // The actor was created before. - RAY_CHECK_OK(gcs_client_->Actors().AsyncUpdate(actor_id, actor_notification, - update_callback)); + RAY_CHECK_OK( + gcs_client_->Actors().AsyncUpdate(actor_id, new_actor_info, update_callback)); } else { // The actor was never created before. - RAY_CHECK_OK( - gcs_client_->Actors().AsyncRegister(actor_notification, update_callback)); + RAY_CHECK_OK(gcs_client_->Actors().AsyncRegister(new_actor_info, update_callback)); } } if (!resumed_from_checkpoint) { diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 2550587832027..d7418ea3a4c0a 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -287,7 +287,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler, /// /// \param task_spec Task specification of the actor creation task that created the /// actor. - ActorTableData CreateActorTableDataFromCreationTask(const TaskSpecification &task_spec); + std::shared_ptr CreateActorTableDataFromCreationTask( + const TaskSpecification &task_spec); /// Handle a worker finishing an assigned actor task or actor creation task. /// \param worker The worker that finished the task. /// \param task The actor task or actor creation task. @@ -296,11 +297,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler, /// Helper function for handling worker to finish its assigned actor task /// or actor creation task. Gets invoked when tasks's parent actor is known. /// - /// \param actor_id The actor id corresponding to the actor (creation) task. - /// \param actor_handle_id The actor id corresponding to the actor (creation) task. - /// \param new_actor_data The struct which will be used to register the task. + /// \param parent_actor_id The actor id corresponding to the actor which creates + /// the new actor. + /// \param task_spec Task specification of the actor creation task that created the + /// actor. /// \param resumed_from_checkpoint If the actor was resumed from a checkpoint. - /// \param dummy_object Dummy object corresponding to the actor creation task. /// \return Void. void FinishAssignedActorCreationTask(const ActorID &parent_actor_id, const TaskSpecification &task_spec,