Skip to content

Commit

Permalink
[Placement Group] Make placement group prepare resource rpc request b…
Browse files Browse the repository at this point in the history
…atched (#20897)

This is one part of this refactor,  #20715 , make the prepare resource RPC requests batched per node.
  • Loading branch information
clay4megtr authored Dec 16, 2021
1 parent 1939070 commit 1edf4ab
Show file tree
Hide file tree
Showing 15 changed files with 286 additions and 97 deletions.
4 changes: 2 additions & 2 deletions src/mock/ray/raylet_client/raylet_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class MockResourceReserveInterface : public ResourceReserveInterface {
public:
MOCK_METHOD(
void, PrepareBundleResources,
(const BundleSpecification &bundle_spec,
(const std::vector<std::shared_ptr<const BundleSpecification>> &bundle_specs,
const ray::rpc::ClientCallback<ray::rpc::PrepareBundleResourcesReply> &callback),
(override));
MOCK_METHOD(
Expand Down Expand Up @@ -146,7 +146,7 @@ class MockRayletClientInterface : public RayletClientInterface {
(override));
MOCK_METHOD(
void, PrepareBundleResources,
(const BundleSpecification &bundle_spec,
(const std::vector<std::shared_ptr<const BundleSpecification>> &bundle_specs,
const ray::rpc::ClientCallback<ray::rpc::PrepareBundleResourcesReply> &callback),
(override));
MOCK_METHOD(
Expand Down
9 changes: 9 additions & 0 deletions src/ray/common/bundle_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,13 @@ std::string GetOriginalResourceName(const std::string &resource) {
return resource.substr(0, idx);
}

std::string GetDebugStringForBundles(
const std::vector<std::shared_ptr<const BundleSpecification>> &bundles) {
std::ostringstream debug_info;
for (const auto &bundle : bundles) {
debug_info << "{" << bundle->DebugString() << "},";
}
return debug_info.str();
};

} // namespace ray
4 changes: 4 additions & 0 deletions src/ray/common/bundle_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,8 @@ bool IsBundleIndex(const std::string &resource, const PlacementGroupID &group_id
/// Return the original resource name of the placement group resource.
std::string GetOriginalResourceName(const std::string &resource);

/// Generate debug information of given bundles.
std::string GetDebugStringForBundles(
const std::vector<std::shared_ptr<const BundleSpecification>> &bundles);

} // namespace ray
62 changes: 47 additions & 15 deletions src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,31 @@
#include "ray/gcs/gcs_server/gcs_placement_group_manager.h"
#include "src/ray/protobuf/gcs.pb.h"

namespace {

using ray::BundleSpecification;
using ray::NodeID;

// Get a set of bundle specifications grouped by the node.
std::unordered_map<NodeID, std::vector<std::shared_ptr<const BundleSpecification>>>
GetUnplacedBundlesPerNode(
const std::vector<std::shared_ptr<const BundleSpecification>> &bundles,
const ray::gcs::ScheduleMap &selected_nodes) {
std::unordered_map<NodeID, std::vector<std::shared_ptr<const BundleSpecification>>>
node_to_bundles;
for (const auto &bundle : bundles) {
const auto &bundle_id = bundle->BundleId();
const auto &iter = selected_nodes.find(bundle_id);
RAY_CHECK(iter != selected_nodes.end());
if (node_to_bundles.find(iter->second) == node_to_bundles.end()) {
node_to_bundles[iter->second] = {};
}
node_to_bundles[iter->second].push_back(bundle);
}
return node_to_bundles;
}
} // namespace

namespace ray {
namespace gcs {

Expand Down Expand Up @@ -169,18 +194,24 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles(
.emplace(placement_group->GetPlacementGroupID(), lease_status_tracker)
.second);

/// TODO(AlisaWu): Change the strategy when reserve resource failed.
for (const auto &bundle : bundles) {
const auto &bundle_id = bundle->BundleId();
const auto &node_id = selected_nodes[bundle_id];
lease_status_tracker->MarkPreparePhaseStarted(node_id, bundle);
const auto &pending_bundles = GetUnplacedBundlesPerNode(bundles, selected_nodes);
for (const auto &node_to_bundles : pending_bundles) {
const auto &node_id = node_to_bundles.first;
const auto &bundles_per_node = node_to_bundles.second;
for (const auto &bundle : bundles_per_node) {
lease_status_tracker->MarkPreparePhaseStarted(node_id, bundle);
}

// TODO(sang): The callback might not be called at all if nodes are dead. We should
// handle this case properly.
PrepareResources(bundle, gcs_node_manager_.GetAliveNode(node_id),
[this, bundle, node_id, lease_status_tracker, failure_callback,
success_callback](const Status &status) {
lease_status_tracker->MarkPrepareRequestReturned(node_id, bundle,
status);
PrepareResources(bundles_per_node, gcs_node_manager_.GetAliveNode(node_id),
[this, bundles_per_node, node_id, lease_status_tracker,
failure_callback, success_callback](const Status &status) {
for (const auto &bundle : bundles_per_node) {
lease_status_tracker->MarkPrepareRequestReturned(node_id, bundle,
status);
}

if (lease_status_tracker->AllPrepareRequestsReturned()) {
OnAllBundlePrepareRequestReturned(
lease_status_tracker, failure_callback, success_callback);
Expand Down Expand Up @@ -213,7 +244,7 @@ void GcsPlacementGroupScheduler::MarkScheduleCancelled(
}

void GcsPlacementGroupScheduler::PrepareResources(
const std::shared_ptr<const BundleSpecification> &bundle,
const std::vector<std::shared_ptr<const BundleSpecification>> &bundles,
const absl::optional<std::shared_ptr<ray::rpc::GcsNodeInfo>> &node,
const StatusCallback &callback) {
if (!node.has_value()) {
Expand All @@ -224,18 +255,19 @@ void GcsPlacementGroupScheduler::PrepareResources(
const auto lease_client = GetLeaseClientFromNode(node.value());
const auto node_id = NodeID::FromBinary(node.value()->node_id());
RAY_LOG(DEBUG) << "Preparing resource from node " << node_id
<< " for a bundle: " << bundle->DebugString();
<< " for bundles: " << GetDebugStringForBundles(bundles);

lease_client->PrepareBundleResources(
*bundle, [node_id, bundle, callback](
bundles, [node_id, bundles, callback](
const Status &status, const rpc::PrepareBundleResourcesReply &reply) {
auto result = reply.success() ? Status::OK()
: Status::IOError("Failed to reserve resource");
if (result.ok()) {
RAY_LOG(DEBUG) << "Finished leasing resource from " << node_id
<< " for bundle: " << bundle->DebugString();
<< " for bundles: " << GetDebugStringForBundles(bundles);
} else {
RAY_LOG(DEBUG) << "Failed to lease resource from " << node_id
<< " for bundle: " << bundle->DebugString();
<< " for bundles: " << GetDebugStringForBundles(bundles);
}
callback(result);
});
Expand Down
8 changes: 5 additions & 3 deletions src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -469,14 +469,16 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
&group_to_bundles) override;

protected:
/// Send a bundle PREPARE request to a node. The PREPARE request will lock resources
/// Send bundles PREPARE requests to a node. The PREPARE requests will lock resources
/// on a node until COMMIT or CANCEL requests are sent to a node.
/// NOTE: All of given bundles will be prepared on the same node. It is guaranteed that
/// all of bundles are atomically prepared on a given node.
///
/// \param bundle A bundle to schedule on a node.
/// \param bundles Bundles to be scheduled on a node.
/// \param node A node to prepare resources for a given bundle.
/// \param callback
void PrepareResources(
const std::shared_ptr<const BundleSpecification> &bundle,
const std::vector<std::shared_ptr<const BundleSpecification>> &bundles,
const absl::optional<std::shared_ptr<ray::rpc::GcsNodeInfo>> &node,
const StatusCallback &callback);

Expand Down
72 changes: 34 additions & 38 deletions src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,11 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
success_placement_groups_.emplace_back(std::move(placement_group));
});

ASSERT_EQ(2, raylet_clients_[0]->num_lease_requested);
ASSERT_EQ(2, raylet_clients_[0]->lease_callbacks.size());
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
ASSERT_EQ(1, raylet_clients_[0]->num_lease_requested);
ASSERT_EQ(1, raylet_clients_[0]->lease_callbacks.size());
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
// TODO(@clay4444): It should be updated to 1 after we make the commit request
// batched.
WaitPendingDone(raylet_clients_[0]->commit_callbacks, 2);
ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources());
ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources());
Expand Down Expand Up @@ -197,7 +198,6 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler,
success_handler);
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
WaitPendingDone(raylet_clients_[0]->commit_callbacks, 2);
ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources());
ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources());
Expand Down Expand Up @@ -276,12 +276,11 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReplyFailure) {
success_placement_groups_.emplace_back(std::move(placement_group));
});

ASSERT_EQ(2, raylet_clients_[0]->num_lease_requested);
ASSERT_EQ(2, raylet_clients_[0]->lease_callbacks.size());
ASSERT_EQ(1, raylet_clients_[0]->num_lease_requested);
ASSERT_EQ(1, raylet_clients_[0]->lease_callbacks.size());

// Reply failure, so the placement group scheduling failed.
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources(false));
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources(false));

WaitPlacementGroupPendingDone(1, GcsPlacementGroupStatus::FAILURE);
WaitPlacementGroupPendingDone(0, GcsPlacementGroupStatus::SUCCESS);
Expand Down Expand Up @@ -336,12 +335,11 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReturnResource)
success_placement_groups_.emplace_back(std::move(placement_group));
});

ASSERT_EQ(2, raylet_clients_[0]->num_lease_requested);
ASSERT_EQ(2, raylet_clients_[0]->lease_callbacks.size());
// One bundle success and the other failed.
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
ASSERT_EQ(1, raylet_clients_[0]->num_lease_requested);
ASSERT_EQ(1, raylet_clients_[0]->lease_callbacks.size());
// Failed to create these two bundles.
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources(false));
ASSERT_EQ(1, raylet_clients_[0]->num_return_requested);
ASSERT_EQ(0, raylet_clients_[0]->num_return_requested);
// Reply the placement_group creation request, then the placement_group should be
// scheduled successfully.
WaitPlacementGroupPendingDone(1, GcsPlacementGroupStatus::FAILURE);
Expand Down Expand Up @@ -377,7 +375,6 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyBalancedScheduling)
++node_select_count[node_index];
node_commit_count[node_index] += 2;
ASSERT_TRUE(raylet_clients_[node_index]->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_clients_[node_index]->GrantPrepareBundleResources());
WaitPendingDone(raylet_clients_[node_index]->commit_callbacks, 2);
ASSERT_TRUE(raylet_clients_[node_index]->GrantCommitBundleResources());
ASSERT_TRUE(raylet_clients_[node_index]->GrantCommitBundleResources());
Expand Down Expand Up @@ -414,7 +411,6 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyResourceCheck) {
auto placement_group = std::make_shared<gcs::GcsPlacementGroup>(request, "");
scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler);
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
WaitPendingDone(raylet_clients_[0]->commit_callbacks, 2);
ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources());
ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources());
Expand All @@ -430,7 +426,6 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyResourceCheck) {
std::make_shared<gcs::GcsPlacementGroup>(create_placement_group_request2, "");
scheduler_->ScheduleUnplacedBundles(placement_group2, failure_handler, success_handler);
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
WaitPendingDone(raylet_clients_[0]->commit_callbacks, 2);
ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources());
ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources());
Expand Down Expand Up @@ -460,7 +455,6 @@ TEST_F(GcsPlacementGroupSchedulerTest, DestroyPlacementGroup) {
success_placement_groups_.emplace_back(std::move(placement_group));
});
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
WaitPendingDone(raylet_clients_[0]->commit_callbacks, 2);
ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources());
ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources());
Expand All @@ -477,9 +471,11 @@ TEST_F(GcsPlacementGroupSchedulerTest, DestroyPlacementGroup) {
}

TEST_F(GcsPlacementGroupSchedulerTest, DestroyCancelledPlacementGroup) {
auto node = Mocker::GenNodeInfo();
AddNode(node);
ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size());
auto node0 = Mocker::GenNodeInfo(0);
auto node1 = Mocker::GenNodeInfo(1);
AddNode(node0);
AddNode(node1);
ASSERT_EQ(2, gcs_node_manager_->GetAllAliveNodes().size());

auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest();
auto placement_group =
Expand All @@ -503,16 +499,18 @@ TEST_F(GcsPlacementGroupSchedulerTest, DestroyCancelledPlacementGroup) {
// Now, cancel the schedule request.
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
scheduler_->MarkScheduleCancelled(placement_group_id);
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_clients_[0]->GrantCancelResourceReserve());
ASSERT_TRUE(raylet_clients_[1]->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_clients_[0]->GrantCancelResourceReserve());
ASSERT_TRUE(raylet_clients_[1]->GrantCancelResourceReserve());
WaitPlacementGroupPendingDone(1, GcsPlacementGroupStatus::FAILURE);
}

TEST_F(GcsPlacementGroupSchedulerTest, PlacementGroupCancelledDuringCommit) {
auto node = Mocker::GenNodeInfo();
AddNode(node);
ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size());
auto node0 = Mocker::GenNodeInfo(0);
auto node1 = Mocker::GenNodeInfo(1);
AddNode(node0);
AddNode(node1);
ASSERT_EQ(2, gcs_node_manager_->GetAllAliveNodes().size());

auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest();
auto placement_group =
Expand All @@ -535,13 +533,14 @@ TEST_F(GcsPlacementGroupSchedulerTest, PlacementGroupCancelledDuringCommit) {

// Now, cancel the schedule request.
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_clients_[1]->GrantPrepareBundleResources());
scheduler_->MarkScheduleCancelled(placement_group_id);
WaitPendingDone(raylet_clients_[0]->commit_callbacks, 2);
ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources());
WaitPendingDone(raylet_clients_[0]->commit_callbacks, 1);
WaitPendingDone(raylet_clients_[1]->commit_callbacks, 1);
ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources());
ASSERT_TRUE(raylet_clients_[1]->GrantCommitBundleResources());
ASSERT_TRUE(raylet_clients_[0]->GrantCancelResourceReserve());
ASSERT_TRUE(raylet_clients_[0]->GrantCancelResourceReserve());
ASSERT_TRUE(raylet_clients_[1]->GrantCancelResourceReserve());
WaitPlacementGroupPendingDone(1, GcsPlacementGroupStatus::FAILURE);
}

Expand All @@ -568,19 +567,16 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestPackStrategyLargeBundlesScheduling) {
Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::PACK, 15);
auto placement_group = std::make_shared<gcs::GcsPlacementGroup>(request, "");
scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler);
RAY_CHECK(raylet_clients_[0]->num_lease_requested > 0);
RAY_CHECK(raylet_clients_[1]->num_lease_requested > 0);
for (int index = 0; index < raylet_clients_[0]->num_lease_requested; ++index) {
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
}
for (int index = 0; index < raylet_clients_[1]->num_lease_requested; ++index) {
ASSERT_TRUE(raylet_clients_[1]->GrantPrepareBundleResources());
}
// Prepared resource is batched!
ASSERT_TRUE(raylet_clients_[0]->num_lease_requested == 1);
ASSERT_TRUE(raylet_clients_[1]->num_lease_requested == 1);
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_clients_[1]->GrantPrepareBundleResources());
// Wait until all resources are prepared.
WaitPendingDone(raylet_clients_[0]->commit_callbacks,
raylet_clients_[0]->num_lease_requested);
raylet_clients_[0]->num_prepared_bundle);
WaitPendingDone(raylet_clients_[1]->commit_callbacks,
raylet_clients_[1]->num_lease_requested);
raylet_clients_[1]->num_prepared_bundle);
for (int index = 0; index < raylet_clients_[0]->num_commit_requested; ++index) {
ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources());
}
Expand Down
5 changes: 4 additions & 1 deletion src/ray/gcs/gcs_server/test/gcs_server_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,11 @@ struct GcsServerMocker {

/// ResourceReserveInterface
void PrepareBundleResources(
const BundleSpecification &bundle_spec,
const std::vector<std::shared_ptr<const BundleSpecification>> &bundle_specs,
const ray::rpc::ClientCallback<ray::rpc::PrepareBundleResourcesReply> &callback)
override {
num_lease_requested += 1;
num_prepared_bundle = bundle_specs.size();
lease_callbacks.push_back(callback);
}

Expand Down Expand Up @@ -299,6 +300,8 @@ struct GcsServerMocker {
int num_lease_requested = 0;
int num_return_requested = 0;
int num_commit_requested = 0;
// TODO(@clay4444): Remove this once we make the commit rpc request batched!
int num_prepared_bundle = 0;

int num_release_unused_bundles_requested = 0;
std::list<rpc::ClientCallback<rpc::PrepareBundleResourcesReply>> lease_callbacks = {};
Expand Down
20 changes: 20 additions & 0 deletions src/ray/gcs/test/gcs_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,26 @@ struct Mocker {
return request;
}

static std::vector<std::shared_ptr<const BundleSpecification>> GenBundleSpecifications(
const PlacementGroupID &placement_group_id,
absl::flat_hash_map<std::string, double> &unit_resource, int bundles_size = 1) {
std::vector<std::shared_ptr<const BundleSpecification>> bundle_specs;
for (int i = 0; i < bundles_size; i++) {
rpc::Bundle bundle;
auto mutable_bundle_id = bundle.mutable_bundle_id();
// The bundle index is start from 1.
mutable_bundle_id->set_bundle_index(i + 1);
mutable_bundle_id->set_placement_group_id(placement_group_id.Binary());
auto mutable_unit_resources = bundle.mutable_unit_resources();
for (auto &resource : unit_resource) {
mutable_unit_resources->insert({resource.first, resource.second});
}
bundle_specs.emplace_back(std::make_shared<BundleSpecification>(bundle));
}
return bundle_specs;
}

// TODO(@clay4444): Remove this once we did the batch rpc request refactor.
static BundleSpecification GenBundleCreation(
const PlacementGroupID &placement_group_id, const int bundle_index,
absl::flat_hash_map<std::string, double> &unit_resource) {
Expand Down
4 changes: 2 additions & 2 deletions src/ray/protobuf/node_manager.proto
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ message RequestWorkerLeaseReply {
}

message PrepareBundleResourcesRequest {
// Bundle containing the requested resources.
Bundle bundle_spec = 1;
// Bundles that containing the requested resources.
repeated Bundle bundle_specs = 1;
}

message PrepareBundleResourcesReply {
Expand Down
Loading

0 comments on commit 1edf4ab

Please sign in to comment.