diff --git a/src/mock/ray/raylet_client/raylet_client.h b/src/mock/ray/raylet_client/raylet_client.h index f918a3da27bfe..6c90e78c32e45 100644 --- a/src/mock/ray/raylet_client/raylet_client.h +++ b/src/mock/ray/raylet_client/raylet_client.h @@ -62,7 +62,7 @@ class MockResourceReserveInterface : public ResourceReserveInterface { public: MOCK_METHOD( void, PrepareBundleResources, - (const BundleSpecification &bundle_spec, + (const std::vector> &bundle_specs, const ray::rpc::ClientCallback &callback), (override)); MOCK_METHOD( @@ -146,7 +146,7 @@ class MockRayletClientInterface : public RayletClientInterface { (override)); MOCK_METHOD( void, PrepareBundleResources, - (const BundleSpecification &bundle_spec, + (const std::vector> &bundle_specs, const ray::rpc::ClientCallback &callback), (override)); MOCK_METHOD( diff --git a/src/ray/common/bundle_spec.cc b/src/ray/common/bundle_spec.cc index c5b4a711e0275..e5e785b9d4e2b 100644 --- a/src/ray/common/bundle_spec.cc +++ b/src/ray/common/bundle_spec.cc @@ -126,4 +126,13 @@ std::string GetOriginalResourceName(const std::string &resource) { return resource.substr(0, idx); } +std::string GetDebugStringForBundles( + const std::vector> &bundles) { + std::ostringstream debug_info; + for (const auto &bundle : bundles) { + debug_info << "{" << bundle->DebugString() << "},"; + } + return debug_info.str(); +}; + } // namespace ray diff --git a/src/ray/common/bundle_spec.h b/src/ray/common/bundle_spec.h index aeff9ebdfbd49..47aa68b8b6097 100644 --- a/src/ray/common/bundle_spec.h +++ b/src/ray/common/bundle_spec.h @@ -108,4 +108,8 @@ bool IsBundleIndex(const std::string &resource, const PlacementGroupID &group_id /// Return the original resource name of the placement group resource. std::string GetOriginalResourceName(const std::string &resource); +/// Generate debug information of given bundles. +std::string GetDebugStringForBundles( + const std::vector> &bundles); + } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc index 37a3ee6b9a2db..c141ec0a6edc6 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc @@ -17,6 +17,31 @@ #include "ray/gcs/gcs_server/gcs_placement_group_manager.h" #include "src/ray/protobuf/gcs.pb.h" +namespace { + +using ray::BundleSpecification; +using ray::NodeID; + +// Get a set of bundle specifications grouped by the node. +std::unordered_map>> +GetUnplacedBundlesPerNode( + const std::vector> &bundles, + const ray::gcs::ScheduleMap &selected_nodes) { + std::unordered_map>> + node_to_bundles; + for (const auto &bundle : bundles) { + const auto &bundle_id = bundle->BundleId(); + const auto &iter = selected_nodes.find(bundle_id); + RAY_CHECK(iter != selected_nodes.end()); + if (node_to_bundles.find(iter->second) == node_to_bundles.end()) { + node_to_bundles[iter->second] = {}; + } + node_to_bundles[iter->second].push_back(bundle); + } + return node_to_bundles; +} +} // namespace + namespace ray { namespace gcs { @@ -169,18 +194,24 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles( .emplace(placement_group->GetPlacementGroupID(), lease_status_tracker) .second); - /// TODO(AlisaWu): Change the strategy when reserve resource failed. - for (const auto &bundle : bundles) { - const auto &bundle_id = bundle->BundleId(); - const auto &node_id = selected_nodes[bundle_id]; - lease_status_tracker->MarkPreparePhaseStarted(node_id, bundle); + const auto &pending_bundles = GetUnplacedBundlesPerNode(bundles, selected_nodes); + for (const auto &node_to_bundles : pending_bundles) { + const auto &node_id = node_to_bundles.first; + const auto &bundles_per_node = node_to_bundles.second; + for (const auto &bundle : bundles_per_node) { + lease_status_tracker->MarkPreparePhaseStarted(node_id, bundle); + } + // TODO(sang): The callback might not be called at all if nodes are dead. We should // handle this case properly. - PrepareResources(bundle, gcs_node_manager_.GetAliveNode(node_id), - [this, bundle, node_id, lease_status_tracker, failure_callback, - success_callback](const Status &status) { - lease_status_tracker->MarkPrepareRequestReturned(node_id, bundle, - status); + PrepareResources(bundles_per_node, gcs_node_manager_.GetAliveNode(node_id), + [this, bundles_per_node, node_id, lease_status_tracker, + failure_callback, success_callback](const Status &status) { + for (const auto &bundle : bundles_per_node) { + lease_status_tracker->MarkPrepareRequestReturned(node_id, bundle, + status); + } + if (lease_status_tracker->AllPrepareRequestsReturned()) { OnAllBundlePrepareRequestReturned( lease_status_tracker, failure_callback, success_callback); @@ -213,7 +244,7 @@ void GcsPlacementGroupScheduler::MarkScheduleCancelled( } void GcsPlacementGroupScheduler::PrepareResources( - const std::shared_ptr &bundle, + const std::vector> &bundles, const absl::optional> &node, const StatusCallback &callback) { if (!node.has_value()) { @@ -224,18 +255,19 @@ void GcsPlacementGroupScheduler::PrepareResources( const auto lease_client = GetLeaseClientFromNode(node.value()); const auto node_id = NodeID::FromBinary(node.value()->node_id()); RAY_LOG(DEBUG) << "Preparing resource from node " << node_id - << " for a bundle: " << bundle->DebugString(); + << " for bundles: " << GetDebugStringForBundles(bundles); + lease_client->PrepareBundleResources( - *bundle, [node_id, bundle, callback]( + bundles, [node_id, bundles, callback]( const Status &status, const rpc::PrepareBundleResourcesReply &reply) { auto result = reply.success() ? Status::OK() : Status::IOError("Failed to reserve resource"); if (result.ok()) { RAY_LOG(DEBUG) << "Finished leasing resource from " << node_id - << " for bundle: " << bundle->DebugString(); + << " for bundles: " << GetDebugStringForBundles(bundles); } else { RAY_LOG(DEBUG) << "Failed to lease resource from " << node_id - << " for bundle: " << bundle->DebugString(); + << " for bundles: " << GetDebugStringForBundles(bundles); } callback(result); }); diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h index 7bc15026519f1..ebf74d7ed30e6 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h @@ -469,14 +469,16 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { &group_to_bundles) override; protected: - /// Send a bundle PREPARE request to a node. The PREPARE request will lock resources + /// Send bundles PREPARE requests to a node. The PREPARE requests will lock resources /// on a node until COMMIT or CANCEL requests are sent to a node. + /// NOTE: All of given bundles will be prepared on the same node. It is guaranteed that + /// all of bundles are atomically prepared on a given node. /// - /// \param bundle A bundle to schedule on a node. + /// \param bundles Bundles to be scheduled on a node. /// \param node A node to prepare resources for a given bundle. /// \param callback void PrepareResources( - const std::shared_ptr &bundle, + const std::vector> &bundles, const absl::optional> &node, const StatusCallback &callback); diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc index a0cacaf364acd..ae743de274707 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc @@ -159,10 +159,11 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { success_placement_groups_.emplace_back(std::move(placement_group)); }); - ASSERT_EQ(2, raylet_clients_[0]->num_lease_requested); - ASSERT_EQ(2, raylet_clients_[0]->lease_callbacks.size()); - ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); + ASSERT_EQ(1, raylet_clients_[0]->num_lease_requested); + ASSERT_EQ(1, raylet_clients_[0]->lease_callbacks.size()); ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); + // TODO(@clay4444): It should be updated to 1 after we make the commit request + // batched. WaitPendingDone(raylet_clients_[0]->commit_callbacks, 2); ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); @@ -197,7 +198,6 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler); ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); - ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); WaitPendingDone(raylet_clients_[0]->commit_callbacks, 2); ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); @@ -276,12 +276,11 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReplyFailure) { success_placement_groups_.emplace_back(std::move(placement_group)); }); - ASSERT_EQ(2, raylet_clients_[0]->num_lease_requested); - ASSERT_EQ(2, raylet_clients_[0]->lease_callbacks.size()); + ASSERT_EQ(1, raylet_clients_[0]->num_lease_requested); + ASSERT_EQ(1, raylet_clients_[0]->lease_callbacks.size()); // Reply failure, so the placement group scheduling failed. ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources(false)); - ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources(false)); WaitPlacementGroupPendingDone(1, GcsPlacementGroupStatus::FAILURE); WaitPlacementGroupPendingDone(0, GcsPlacementGroupStatus::SUCCESS); @@ -336,12 +335,11 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReturnResource) success_placement_groups_.emplace_back(std::move(placement_group)); }); - ASSERT_EQ(2, raylet_clients_[0]->num_lease_requested); - ASSERT_EQ(2, raylet_clients_[0]->lease_callbacks.size()); - // One bundle success and the other failed. - ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); + ASSERT_EQ(1, raylet_clients_[0]->num_lease_requested); + ASSERT_EQ(1, raylet_clients_[0]->lease_callbacks.size()); + // Failed to create these two bundles. ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources(false)); - ASSERT_EQ(1, raylet_clients_[0]->num_return_requested); + ASSERT_EQ(0, raylet_clients_[0]->num_return_requested); // Reply the placement_group creation request, then the placement_group should be // scheduled successfully. WaitPlacementGroupPendingDone(1, GcsPlacementGroupStatus::FAILURE); @@ -377,7 +375,6 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyBalancedScheduling) ++node_select_count[node_index]; node_commit_count[node_index] += 2; ASSERT_TRUE(raylet_clients_[node_index]->GrantPrepareBundleResources()); - ASSERT_TRUE(raylet_clients_[node_index]->GrantPrepareBundleResources()); WaitPendingDone(raylet_clients_[node_index]->commit_callbacks, 2); ASSERT_TRUE(raylet_clients_[node_index]->GrantCommitBundleResources()); ASSERT_TRUE(raylet_clients_[node_index]->GrantCommitBundleResources()); @@ -414,7 +411,6 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyResourceCheck) { auto placement_group = std::make_shared(request, ""); scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler); ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); - ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); WaitPendingDone(raylet_clients_[0]->commit_callbacks, 2); ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); @@ -430,7 +426,6 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyResourceCheck) { std::make_shared(create_placement_group_request2, ""); scheduler_->ScheduleUnplacedBundles(placement_group2, failure_handler, success_handler); ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); - ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); WaitPendingDone(raylet_clients_[0]->commit_callbacks, 2); ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); @@ -460,7 +455,6 @@ TEST_F(GcsPlacementGroupSchedulerTest, DestroyPlacementGroup) { success_placement_groups_.emplace_back(std::move(placement_group)); }); ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); - ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); WaitPendingDone(raylet_clients_[0]->commit_callbacks, 2); ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); @@ -477,9 +471,11 @@ TEST_F(GcsPlacementGroupSchedulerTest, DestroyPlacementGroup) { } TEST_F(GcsPlacementGroupSchedulerTest, DestroyCancelledPlacementGroup) { - auto node = Mocker::GenNodeInfo(); - AddNode(node); - ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); + auto node0 = Mocker::GenNodeInfo(0); + auto node1 = Mocker::GenNodeInfo(1); + AddNode(node0); + AddNode(node1); + ASSERT_EQ(2, gcs_node_manager_->GetAllAliveNodes().size()); auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest(); auto placement_group = @@ -503,16 +499,18 @@ TEST_F(GcsPlacementGroupSchedulerTest, DestroyCancelledPlacementGroup) { // Now, cancel the schedule request. ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); scheduler_->MarkScheduleCancelled(placement_group_id); - ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); - ASSERT_TRUE(raylet_clients_[0]->GrantCancelResourceReserve()); + ASSERT_TRUE(raylet_clients_[1]->GrantPrepareBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantCancelResourceReserve()); + ASSERT_TRUE(raylet_clients_[1]->GrantCancelResourceReserve()); WaitPlacementGroupPendingDone(1, GcsPlacementGroupStatus::FAILURE); } TEST_F(GcsPlacementGroupSchedulerTest, PlacementGroupCancelledDuringCommit) { - auto node = Mocker::GenNodeInfo(); - AddNode(node); - ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); + auto node0 = Mocker::GenNodeInfo(0); + auto node1 = Mocker::GenNodeInfo(1); + AddNode(node0); + AddNode(node1); + ASSERT_EQ(2, gcs_node_manager_->GetAllAliveNodes().size()); auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest(); auto placement_group = @@ -535,13 +533,14 @@ TEST_F(GcsPlacementGroupSchedulerTest, PlacementGroupCancelledDuringCommit) { // Now, cancel the schedule request. ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); - ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); + ASSERT_TRUE(raylet_clients_[1]->GrantPrepareBundleResources()); scheduler_->MarkScheduleCancelled(placement_group_id); - WaitPendingDone(raylet_clients_[0]->commit_callbacks, 2); - ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); + WaitPendingDone(raylet_clients_[0]->commit_callbacks, 1); + WaitPendingDone(raylet_clients_[1]->commit_callbacks, 1); ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); + ASSERT_TRUE(raylet_clients_[1]->GrantCommitBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantCancelResourceReserve()); - ASSERT_TRUE(raylet_clients_[0]->GrantCancelResourceReserve()); + ASSERT_TRUE(raylet_clients_[1]->GrantCancelResourceReserve()); WaitPlacementGroupPendingDone(1, GcsPlacementGroupStatus::FAILURE); } @@ -568,19 +567,16 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestPackStrategyLargeBundlesScheduling) { Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::PACK, 15); auto placement_group = std::make_shared(request, ""); scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler); - RAY_CHECK(raylet_clients_[0]->num_lease_requested > 0); - RAY_CHECK(raylet_clients_[1]->num_lease_requested > 0); - for (int index = 0; index < raylet_clients_[0]->num_lease_requested; ++index) { - ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); - } - for (int index = 0; index < raylet_clients_[1]->num_lease_requested; ++index) { - ASSERT_TRUE(raylet_clients_[1]->GrantPrepareBundleResources()); - } + // Prepared resource is batched! + ASSERT_TRUE(raylet_clients_[0]->num_lease_requested == 1); + ASSERT_TRUE(raylet_clients_[1]->num_lease_requested == 1); + ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); + ASSERT_TRUE(raylet_clients_[1]->GrantPrepareBundleResources()); // Wait until all resources are prepared. WaitPendingDone(raylet_clients_[0]->commit_callbacks, - raylet_clients_[0]->num_lease_requested); + raylet_clients_[0]->num_prepared_bundle); WaitPendingDone(raylet_clients_[1]->commit_callbacks, - raylet_clients_[1]->num_lease_requested); + raylet_clients_[1]->num_prepared_bundle); for (int index = 0; index < raylet_clients_[0]->num_commit_requested; ++index) { ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); } diff --git a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h index 2e19ac21ebab4..086195584a308 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h +++ b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h @@ -175,10 +175,11 @@ struct GcsServerMocker { /// ResourceReserveInterface void PrepareBundleResources( - const BundleSpecification &bundle_spec, + const std::vector> &bundle_specs, const ray::rpc::ClientCallback &callback) override { num_lease_requested += 1; + num_prepared_bundle = bundle_specs.size(); lease_callbacks.push_back(callback); } @@ -299,6 +300,8 @@ struct GcsServerMocker { int num_lease_requested = 0; int num_return_requested = 0; int num_commit_requested = 0; + // TODO(@clay4444): Remove this once we make the commit rpc request batched! + int num_prepared_bundle = 0; int num_release_unused_bundles_requested = 0; std::list> lease_callbacks = {}; diff --git a/src/ray/gcs/test/gcs_test_util.h b/src/ray/gcs/test/gcs_test_util.h index d0cb2d2df41b2..c79792795e60e 100644 --- a/src/ray/gcs/test/gcs_test_util.h +++ b/src/ray/gcs/test/gcs_test_util.h @@ -83,6 +83,26 @@ struct Mocker { return request; } + static std::vector> GenBundleSpecifications( + const PlacementGroupID &placement_group_id, + absl::flat_hash_map &unit_resource, int bundles_size = 1) { + std::vector> bundle_specs; + for (int i = 0; i < bundles_size; i++) { + rpc::Bundle bundle; + auto mutable_bundle_id = bundle.mutable_bundle_id(); + // The bundle index is start from 1. + mutable_bundle_id->set_bundle_index(i + 1); + mutable_bundle_id->set_placement_group_id(placement_group_id.Binary()); + auto mutable_unit_resources = bundle.mutable_unit_resources(); + for (auto &resource : unit_resource) { + mutable_unit_resources->insert({resource.first, resource.second}); + } + bundle_specs.emplace_back(std::make_shared(bundle)); + } + return bundle_specs; + } + + // TODO(@clay4444): Remove this once we did the batch rpc request refactor. static BundleSpecification GenBundleCreation( const PlacementGroupID &placement_group_id, const int bundle_index, absl::flat_hash_map &unit_resource) { diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index db7a2bd334d25..2355c17c7d944 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -73,8 +73,8 @@ message RequestWorkerLeaseReply { } message PrepareBundleResourcesRequest { - // Bundle containing the requested resources. - Bundle bundle_spec = 1; + // Bundles that containing the requested resources. + repeated Bundle bundle_specs = 1; } message PrepareBundleResourcesReply { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 81f70af74d04f..954ace379d960 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1637,11 +1637,14 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest void NodeManager::HandlePrepareBundleResources( const rpc::PrepareBundleResourcesRequest &request, rpc::PrepareBundleResourcesReply *reply, rpc::SendReplyCallback send_reply_callback) { - auto bundle_spec = BundleSpecification(request.bundle_spec()); - RAY_LOG(DEBUG) << "Request to prepare bundle resources is received, " - << bundle_spec.DebugString(); - - auto prepared = placement_group_resource_manager_->PrepareBundle(bundle_spec); + std::vector> bundle_specs; + for (int index = 0; index < request.bundle_specs_size(); index++) { + bundle_specs.emplace_back( + std::make_shared(request.bundle_specs(index))); + } + RAY_LOG(DEBUG) << "Request to prepare resources for bundles: " + << GetDebugStringForBundles(bundle_specs); + auto prepared = placement_group_resource_manager_->PrepareBundles(bundle_specs); reply->set_success(prepared); send_reply_callback(Status::OK(), nullptr, nullptr); } diff --git a/src/ray/raylet/placement_group_resource_manager.cc b/src/ray/raylet/placement_group_resource_manager.cc index 9103143c80d0e..b04ecbc70b60b 100644 --- a/src/ray/raylet/placement_group_resource_manager.cc +++ b/src/ray/raylet/placement_group_resource_manager.cc @@ -78,6 +78,31 @@ bool NewPlacementGroupResourceManager::PrepareBundle( return true; } +bool NewPlacementGroupResourceManager::PrepareBundles( + const std::vector> &bundle_specs) { + std::vector> prepared_bundles; + for (const auto &bundle_spec : bundle_specs) { + if (PrepareBundle(*bundle_spec)) { + prepared_bundles.emplace_back(bundle_spec); + } else { + // Terminate the preparation phase if any of bundle cannot be prepared. + break; + } + } + + if (prepared_bundles.size() != bundle_specs.size()) { + RAY_LOG(DEBUG) << "There are one or more bundles request resource failed, will " + "release the requested resources before."; + for (const auto &bundle : prepared_bundles) { + // `ReturnBundle` will return resource, erase from `pg_bundles_` and + // `bundle_spec_map_`. + ReturnBundle(*bundle); + } + return false; + } + return true; +} + void NewPlacementGroupResourceManager::CommitBundle( const BundleSpecification &bundle_spec) { auto it = pg_bundles_.find(bundle_spec.BundleId()); @@ -160,6 +185,11 @@ void NewPlacementGroupResourceManager::ReturnBundle( } } pg_bundles_.erase(it); + // Erase from `bundle_spec_map_`. + const auto &iter = bundle_spec_map_.find(bundle_spec.BundleId()); + if (iter != bundle_spec_map_.end()) { + bundle_spec_map_.erase(iter); + } delete_resources_(deleted); } diff --git a/src/ray/raylet/placement_group_resource_manager.h b/src/ray/raylet/placement_group_resource_manager.h index d68e52b42d985..88a4b57c0a8a8 100644 --- a/src/ray/raylet/placement_group_resource_manager.h +++ b/src/ray/raylet/placement_group_resource_manager.h @@ -50,26 +50,29 @@ struct BundleTransactionState { /// about allocated for placement group bundles. class PlacementGroupResourceManager { public: - /// Lock the required resources from local available resources. Note that this is phase - /// one of 2PC, it will not convert placement group resource(like CPU -> CPU_group_i). + /// Prepare a list of bundles. It is guaranteed that all bundles are atomically + /// prepared. + ///(e.g., if one of bundle cannot be prepared, all bundles are failed to be prepared) /// - /// \param bundle_spec: Specification of bundle whose resources will be prepared. - virtual bool PrepareBundle(const BundleSpecification &bundle_spec) = 0; + /// \param bundle_specs A set of bundles that waiting to be prepared. + /// \return bool True if all bundles successfully reserved resources, otherwise false. + virtual bool PrepareBundles( + const std::vector> &bundle_specs) = 0; /// Convert the required resources to placement group resources(like CPU -> /// CPU_group_i). This is phase two of 2PC. /// - /// \param bundle_spec: Specification of bundle whose resources will be commited. + /// \param bundle_spec Specification of bundle whose resources will be commited. virtual void CommitBundle(const BundleSpecification &bundle_spec) = 0; /// Return back all the bundle resource. /// - /// \param bundle_spec: Specification of bundle whose resources will be returned. + /// \param bundle_spec Specification of bundle whose resources will be returned. virtual void ReturnBundle(const BundleSpecification &bundle_spec) = 0; /// Return back all the bundle(which is unused) resource. /// - /// \param bundle_spec: A set of bundles which in use. + /// \param bundle_spec A set of bundles which in use. void ReturnUnusedBundle(const std::unordered_set &in_use_bundles); virtual ~PlacementGroupResourceManager() {} @@ -98,7 +101,8 @@ class NewPlacementGroupResourceManager : public PlacementGroupResourceManager { virtual ~NewPlacementGroupResourceManager() = default; - bool PrepareBundle(const BundleSpecification &bundle_spec); + bool PrepareBundles( + const std::vector> &bundle_specs); void CommitBundle(const BundleSpecification &bundle_spec); @@ -122,6 +126,13 @@ class NewPlacementGroupResourceManager : public PlacementGroupResourceManager { /// truth for the new scheduler. absl::flat_hash_map, pair_hash> pg_bundles_; + + /// Lock the required resources from local available resources. Note that this is phase + /// one of 2PC, it will not convert placement group resource(like CPU -> CPU_group_i). + /// + /// \param bundle_spec Specification of bundle whose resources will be prepared. + /// \return bool True if the bundle successfully reserved resources, otherwise false. + bool PrepareBundle(const BundleSpecification &bundle_spec); }; } // namespace raylet diff --git a/src/ray/raylet/placement_group_resource_manager_test.cc b/src/ray/raylet/placement_group_resource_manager_test.cc index fbd09727637da..0406c90a815e6 100644 --- a/src/ray/raylet/placement_group_resource_manager_test.cc +++ b/src/ray/raylet/placement_group_resource_manager_test.cc @@ -31,6 +31,7 @@ class NewPlacementGroupResourceManagerTest : public ::testing::Test { public: std::unique_ptr new_placement_group_resource_manager_; + std::shared_ptr cluster_resource_scheduler_; std::unique_ptr gcs_client_; rpc::GcsNodeInfo node_info_; void SetUp() { @@ -40,7 +41,7 @@ class NewPlacementGroupResourceManagerTest : public ::testing::Test { } void InitLocalAvailableResource( absl::flat_hash_map &unit_resource) { - auto cluster_resource_scheduler_ = + cluster_resource_scheduler_ = std::make_shared("local", unit_resource, *gcs_client_); new_placement_group_resource_manager_.reset( new raylet::NewPlacementGroupResourceManager( @@ -54,18 +55,23 @@ class NewPlacementGroupResourceManagerTest : public ::testing::Test { } void CheckAvailableResoueceEmpty(const std::string &resource) { - const auto cluster_resource_scheduler_ = - new_placement_group_resource_manager_->GetResourceScheduler(); ASSERT_TRUE(cluster_resource_scheduler_->IsAvailableResourceEmpty(resource)); } void CheckRemainingResourceCorrect(NodeResources &node_resources) { - const auto cluster_resource_scheduler_ = - new_placement_group_resource_manager_->GetResourceScheduler(); auto local_node_resource = cluster_resource_scheduler_->GetLocalNodeResources(); ASSERT_TRUE(local_node_resource == node_resources); } + // TODO(@clay4444): Remove this once we did the batch rpc request refactor! + std::vector> ConvertSingleSpecToVectorPtrs( + BundleSpecification bundle_spec) { + std::vector> bundle_specs; + bundle_specs.push_back( + std::make_shared(std::move(bundle_spec))); + return bundle_specs; + } + bool update_called_ = false; bool delete_called_ = false; }; @@ -79,7 +85,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewPrepareBundleResource) { /// 2. init local available resource. InitLocalAvailableResource(unit_resource); /// 3. prepare bundle resource. - ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundle(bundle_spec)); + ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles( + ConvertSingleSpecToVectorPtrs(bundle_spec))); /// 4. check remaining resources is correct. CheckAvailableResoueceEmpty("CPU"); } @@ -96,7 +103,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, init_unit_resource.insert({"CPU", 1.0}); InitLocalAvailableResource(init_unit_resource); /// 3. prepare bundle resource. - ASSERT_FALSE(new_placement_group_resource_manager_->PrepareBundle(bundle_spec)); + ASSERT_FALSE(new_placement_group_resource_manager_->PrepareBundles( + ConvertSingleSpecToVectorPtrs(bundle_spec))); } TEST_F(NewPlacementGroupResourceManagerTest, TestNewCommitBundleResource) { @@ -108,7 +116,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewCommitBundleResource) { /// 2. init local available resource. InitLocalAvailableResource(unit_resource); /// 3. prepare and commit bundle resource. - ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundle(bundle_spec)); + ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles( + ConvertSingleSpecToVectorPtrs(bundle_spec))); ASSERT_FALSE(update_called_); new_placement_group_resource_manager_->CommitBundle(bundle_spec); ASSERT_TRUE(update_called_); @@ -139,7 +148,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewReturnBundleResource) { /// 2. init local available resource. InitLocalAvailableResource(unit_resource); /// 3. prepare and commit bundle resource. - ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundle(bundle_spec)); + ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles( + ConvertSingleSpecToVectorPtrs(bundle_spec))); ASSERT_FALSE(update_called_); new_placement_group_resource_manager_->CommitBundle(bundle_spec); ASSERT_TRUE(update_called_); @@ -167,8 +177,10 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu init_unit_resource.insert({"CPU", 2.0}); InitLocalAvailableResource(init_unit_resource); /// 3. prepare and commit two bundle resource. - ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundle(first_bundle_spec)); - ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundle(second_bundle_spec)); + ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles( + ConvertSingleSpecToVectorPtrs(first_bundle_spec))); + ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles( + ConvertSingleSpecToVectorPtrs(second_bundle_spec))); ASSERT_FALSE(update_called_); ASSERT_FALSE(delete_called_); new_placement_group_resource_manager_->CommitBundle(first_bundle_spec); @@ -239,7 +251,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithMultiPrepare) InitLocalAvailableResource(available_resource); /// 3. prepare bundle resource 10 times. for (int i = 0; i < 10; i++) { - new_placement_group_resource_manager_->PrepareBundle(bundle_spec); + new_placement_group_resource_manager_->PrepareBundles( + ConvertSingleSpecToVectorPtrs(bundle_spec)); } /// 4. check remaining resources is correct. absl::flat_hash_map remaining_resources = {{"CPU", 3.0}}; @@ -265,9 +278,11 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder) std::make_pair("CPU", 3.0)}; InitLocalAvailableResource(available_resource); /// 3. prepare bundle -> commit bundle -> prepare bundle. - ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundle(bundle_spec)); + ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles( + ConvertSingleSpecToVectorPtrs(bundle_spec))); new_placement_group_resource_manager_->CommitBundle(bundle_spec); - ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundle(bundle_spec)); + ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles( + ConvertSingleSpecToVectorPtrs(bundle_spec))); /// 4. check remaining resources is correct. absl::flat_hash_map remaining_resources = { {"CPU_group_" + group_id.Hex(), 1.0}, @@ -286,14 +301,16 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder) CheckRemainingResourceCorrect(remaining_resource_instance); new_placement_group_resource_manager_->ReturnBundle(bundle_spec); // 5. prepare bundle -> commit bundle -> commit bundle. - ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundle(bundle_spec)); + ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles( + ConvertSingleSpecToVectorPtrs(bundle_spec))); new_placement_group_resource_manager_->CommitBundle(bundle_spec); new_placement_group_resource_manager_->CommitBundle(bundle_spec); // 6. check remaining resources is correct. CheckRemainingResourceCorrect(remaining_resource_instance); new_placement_group_resource_manager_->ReturnBundle(bundle_spec); // 7. prepare bundle -> return bundle -> commit bundle. - ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundle(bundle_spec)); + ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles( + ConvertSingleSpecToVectorPtrs(bundle_spec))); new_placement_group_resource_manager_->ReturnBundle(bundle_spec); new_placement_group_resource_manager_->CommitBundle(bundle_spec); // 8. check remaining resources is correct. @@ -303,6 +320,63 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder) CheckRemainingResourceCorrect(remaining_resource_instance); } +TEST_F(NewPlacementGroupResourceManagerTest, TestPreparedResourceBatched) { + // 1. create a placement group spec with 4 bundles and each required 1 CPU. + auto group_id = PlacementGroupID::FromRandom(); + absl::flat_hash_map unit_resource; + unit_resource.insert({"CPU", 1.0}); + auto bundle_specs = Mocker::GenBundleSpecifications(group_id, unit_resource, 4); + // 2. init local available resource with 3 CPUs. + absl::flat_hash_map available_resource = { + std::make_pair("CPU", 3.0)}; + InitLocalAvailableResource(available_resource); + // 3. prepare resources for the four bundles. + ASSERT_FALSE(new_placement_group_resource_manager_->PrepareBundles(bundle_specs)); + // make sure it keeps Idempotency. + ASSERT_FALSE(new_placement_group_resource_manager_->PrepareBundles(bundle_specs)); + // 4. check remaining resources is correct. + absl::flat_hash_map remaining_resources = {{"CPU", 3.0}}; + auto remaining_resource_scheduler = std::make_shared( + "remaining", remaining_resources, *gcs_client_); + auto remaining_resource_instance = + remaining_resource_scheduler->GetLocalNodeResources(); + CheckRemainingResourceCorrect(remaining_resource_instance); + // 5. re-init the local available resource with 4 CPUs. + available_resource = {std::make_pair("CPU", 4.0)}; + InitLocalAvailableResource(available_resource); + // 6. re-prepare resources for the four bundles, but this time it should be + // successfully. + ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles(bundle_specs)); + ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles(bundle_specs)); + for (const auto &bundle_spec : bundle_specs) { + new_placement_group_resource_manager_->CommitBundle(*bundle_spec); + } + // 7. re-check remaining resources is correct. + remaining_resources = {{"CPU_group_" + group_id.Hex(), 4.0}, + {"CPU_group_1_" + group_id.Hex(), 1.0}, + {"CPU_group_2_" + group_id.Hex(), 1.0}, + {"CPU_group_3_" + group_id.Hex(), 1.0}, + {"CPU_group_4_" + group_id.Hex(), 1.0}, + {"CPU", 4.0}, + {"bundle_group_1_" + group_id.Hex(), 1000}, + {"bundle_group_2_" + group_id.Hex(), 1000}, + {"bundle_group_3_" + group_id.Hex(), 1000}, + {"bundle_group_4_" + group_id.Hex(), 1000}, + {"bundle_group_" + group_id.Hex(), 4000}}; + remaining_resource_scheduler = std::make_shared( + "remaining", remaining_resources, *gcs_client_); + std::shared_ptr resource_instances = + std::make_shared(); + absl::flat_hash_map allocating_resource; + allocating_resource.insert({"CPU", 4.0}); + ASSERT_TRUE(remaining_resource_scheduler->AllocateLocalTaskResources( + allocating_resource, resource_instances)); + remaining_resource_instance = remaining_resource_scheduler->GetLocalNodeResources(); + RAY_LOG(INFO) << "The current local resource view: " + << cluster_resource_scheduler_->DebugString(); + CheckRemainingResourceCorrect(remaining_resource_instance); +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index 0fef98b3d971a..161aefeed1856 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -373,10 +373,16 @@ void raylet::RayletClient::CancelWorkerLease( } void raylet::RayletClient::PrepareBundleResources( - const BundleSpecification &bundle_spec, + const std::vector> &bundle_specs, const ray::rpc::ClientCallback &callback) { rpc::PrepareBundleResourcesRequest request; - request.mutable_bundle_spec()->CopyFrom(bundle_spec.GetMessage()); + std::set nodes; + for (const auto &bundle_spec : bundle_specs) { + nodes.insert(bundle_spec->NodeId().Hex()); + auto message_bundle = request.add_bundle_specs(); + message_bundle->CopyFrom(bundle_spec->GetMessage()); + } + RAY_CHECK(nodes.size() == 1); grpc_client_->PrepareBundleResources(request, callback); } diff --git a/src/ray/raylet_client/raylet_client.h b/src/ray/raylet_client/raylet_client.h index 4b01d3137713e..bafc350aa7808 100644 --- a/src/ray/raylet_client/raylet_client.h +++ b/src/ray/raylet_client/raylet_client.h @@ -109,14 +109,13 @@ class WorkerLeaseInterface { /// Interface for leasing resource. class ResourceReserveInterface { public: - /// Request a raylet to prepare resources of a given bundle for atomic placement group + /// Request a raylet to prepare resources of given bundles for atomic placement group /// creation. This is used for the first phase of atomic placement group creation. The /// callback will be sent via gRPC. - /// \param resource_spec Resources that should be - /// allocated for the worker. + /// \param bundle_specs Bundles to be scheduled at this raylet. /// \return ray::Status virtual void PrepareBundleResources( - const BundleSpecification &bundle_spec, + const std::vector> &bundle_specs, const ray::rpc::ClientCallback &callback) = 0; @@ -401,7 +400,7 @@ class RayletClient : public RayletClientInterface { /// Implements PrepareBundleResourcesInterface. void PrepareBundleResources( - const BundleSpecification &bundle_spec, + const std::vector> &bundle_specs, const ray::rpc::ClientCallback &callback) override;