From 3f28a8a2297e9019364104bf6c957f8879e8d40d Mon Sep 17 00:00:00 2001 From: ZhuSenlin Date: Mon, 20 Apr 2020 00:53:02 +0800 Subject: [PATCH] [GCS] reply to the owner only after the actor has been successfully created. (#8079) * reply to the owner only after the actor is successfully created. * reply immediately if the actor is already created * fix comment * add test_actor_creation_task provided by @Stephanie Wang Co-authored-by: senlin.zsl --- python/ray/tests/test_reference_counting.py | 23 ++++++++++ src/ray/gcs/gcs_server/gcs_actor_manager.cc | 45 ++++++++----------- src/ray/gcs/gcs_server/gcs_actor_manager.h | 5 ++- .../gcs_server/test/gcs_actor_manager_test.cc | 11 ++--- 4 files changed, 51 insertions(+), 33 deletions(-) diff --git a/python/ray/tests/test_reference_counting.py b/python/ray/tests/test_reference_counting.py index ef3462bb1f149..955f4c7d3346b 100644 --- a/python/ray/tests/test_reference_counting.py +++ b/python/ray/tests/test_reference_counting.py @@ -162,6 +162,29 @@ def one_dep_large(dep, signal=None): check_refcounts({}) +def test_actor_creation_task(ray_start_regular): + @ray.remote + def large_object(): + # This will be spilled to plasma. + return np.zeros(10 * 1024 * 1024, dtype=np.uint8) + + @ray.remote(resources={"init": 1}) + class Actor: + def __init__(self, dependency): + return + + def ping(self): + return + + a = Actor.remote(large_object.remote()) + ping = a.ping.remote() + ready, unready = ray.wait([ping], timeout=1) + assert not ready + + ray.experimental.set_resource("init", 1) + ray.get(ping) + + def test_basic_pinning(one_worker_100MiB): @ray.remote def f(array): diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 8b36f742f3e02..f9fa2ae8b7a9d 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -111,14 +111,12 @@ void GcsActorManager::RegisterActor( auto actor_id = ActorID::FromBinary(actor_creation_task_spec.actor_id()); auto iter = registered_actors_.find(actor_id); - if (iter != registered_actors_.end()) { + if (iter != registered_actors_.end() && + iter->second->GetState() == rpc::ActorTableData::ALIVE) { // When the network fails, Driver/Worker is not sure whether GcsServer has received // the request of actor creation task, so Driver/Worker will try again and again until - // receiving the reply from GcsServer. If the actor is already records on the GCS - // Server side, the GCS Server will be responsible for creating or reconstructing the - // actor regardless of whether the Driver/Worker sends the request to create the actor - // again, so we just need fast reply OK to the Driver/Worker that the actor is already - // recorded by GCS Server. + // receiving the reply from GcsServer. If the actor has been created successfully then + // just reply to the caller. callback(iter->second); return; } @@ -126,33 +124,18 @@ void GcsActorManager::RegisterActor( auto pending_register_iter = actor_to_register_callbacks_.find(actor_id); if (pending_register_iter != actor_to_register_callbacks_.end()) { // It is a duplicate message, just mark the callback as pending and invoke it after - // the related actor is flushed. + // the actor has been successfully created. pending_register_iter->second.emplace_back(std::move(callback)); return; } - // Mark the callback as pending and invoke it after the related actor is flushed. + // Mark the callback as pending and invoke it after the actor has been successfully + // created. actor_to_register_callbacks_[actor_id].emplace_back(std::move(callback)); auto actor = std::make_shared(request); - auto actor_table_data = - std::make_shared(actor->GetActorTableData()); - // The backend storage is reliable in the future, so the status must be ok. - RAY_CHECK_OK(actor_info_accessor_.AsyncUpdate( - actor_id, actor_table_data, [this, actor](Status status) { - RAY_CHECK_OK(status); - RAY_CHECK(registered_actors_.emplace(actor->GetActorID(), actor).second); - // Invoke all callbacks for all registration requests of this actor (duplicated - // requests are included) and remove all of them from - // actor_to_register_callbacks_. - auto iter = actor_to_register_callbacks_.find(actor->GetActorID()); - RAY_CHECK(iter != actor_to_register_callbacks_.end() && !iter->second.empty()); - for (auto &callback : iter->second) { - callback(actor); - } - actor_to_register_callbacks_.erase(iter); - gcs_actor_scheduler_->Schedule(actor); - })); + RAY_CHECK(registered_actors_.emplace(actor->GetActorID(), actor).second); + gcs_actor_scheduler_->Schedule(actor); } void GcsActorManager::ReconstructActorOnWorker(const ray::ClientID &node_id, @@ -265,6 +248,16 @@ void GcsActorManager::OnActorCreateSuccess(std::shared_ptr actor) { std::make_shared(actor->GetActorTableData()); // The backend storage is reliable in the future, so the status must be ok. RAY_CHECK_OK(actor_info_accessor_.AsyncUpdate(actor_id, actor_table_data, nullptr)); + + // Invoke all callbacks for all registration requests of this actor (duplicated + // requests are included) and remove all of them from actor_to_register_callbacks_. + auto iter = actor_to_register_callbacks_.find(actor->GetActorID()); + if (iter != actor_to_register_callbacks_.end()) { + for (auto &callback : iter->second) { + callback(actor); + } + actor_to_register_callbacks_.erase(iter); + } } void GcsActorManager::SchedulePendingActors() { diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h index 6e999654e156d..c4a1dc7c6fb01 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -125,8 +125,9 @@ class GcsActorManager { /// Register actor asynchronously. /// /// \param request Contains the meta info to create the actor. - /// \param callback Will be invoked after the meta info is flushed to the storage or be - /// invoked immediately if the meta info already exists. + /// \param callback Will be invoked after the actor is created successfully or be + /// invoked immediately if the actor is already registered to `registered_actors_` and + /// its state is `ALIVE`. void RegisterActor(const rpc::CreateActorRequest &request, RegisterActorCallback callback); diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc index 3697f41abd265..e7e4d85f16199 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc @@ -108,23 +108,24 @@ TEST_F(GcsActorManagerTest, TestNormalFlow) { auto job_id = JobID::FromInt(1); auto create_actor_request = Mocker::GenCreateActorRequest(job_id, /*max_reconstructions=*/2); - std::vector> registered_actors; + std::vector> finished_actors; gcs_actor_manager_->RegisterActor( - create_actor_request, [®istered_actors](std::shared_ptr actor) { - registered_actors.emplace_back(actor); + create_actor_request, [&finished_actors](std::shared_ptr actor) { + finished_actors.emplace_back(actor); }); - ASSERT_EQ(1, registered_actors.size()); + ASSERT_EQ(0, finished_actors.size()); ASSERT_EQ(1, gcs_actor_manager_->GetAllRegisteredActors().size()); ASSERT_EQ(1, gcs_actor_manager_->GetAllPendingActors().size()); - auto actor = registered_actors.front(); + auto actor = gcs_actor_manager_->GetAllRegisteredActors().begin()->second; ASSERT_EQ(rpc::ActorTableData::PENDING, actor->GetState()); // Add node_1 and then check if the actor is in state `ALIVE` auto node_1 = Mocker::GenNodeInfo(); auto node_id_1 = ClientID::FromBinary(node_1->node_id()); gcs_node_manager_->AddNode(node_1); + ASSERT_EQ(1, finished_actors.size()); ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); ASSERT_EQ(0, gcs_actor_manager_->GetAllPendingActors().size()); ASSERT_EQ(rpc::ActorTableData::ALIVE, actor->GetState());