Skip to content

Commit

Permalink
[Enable gcs actor scheduler 2/n] Report pending actor info in gcs (ra…
Browse files Browse the repository at this point in the history
…y-project#24595)

This PR tries to fix some features of the gcs actor scheduler, which include:

1. Report pending actor info in gcs such that `HandleGetAllResourceUsage` is able to export the whole thing (including worker nodes and gcs). Otherwise, external features, i.e., autoscaler, can not work properly.
2. In `ClusterResourceScheduler`, actors that can not find available nodes (by gcs scheduler) should stay in the pending queue of `ClusterTaskManager`.
3. If using gcs scheduler, the PG's wildcard resources have to be updated **incrementally** when committing bundles.

In the next PR, we will fix all remaining trivia issues and enable gcs scheduler to pass the entire testing pipeline.

Co-authored-by: Chong-Li <lc300133@antgroup.com>
  • Loading branch information
Chong-Li and Chong-Li authored May 20, 2022
1 parent a357b7c commit 5bb46ca
Show file tree
Hide file tree
Showing 19 changed files with 216 additions and 59 deletions.
14 changes: 14 additions & 0 deletions src/ray/common/bundle_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,20 @@ const ResourceRequest &BundleSpecification::GetRequiredResources() const {
return *unit_resource_;
}

absl::flat_hash_map<std::string, double> BundleSpecification::GetWildcardResources()
const {
absl::flat_hash_map<std::string, double> wildcard_resources;
std::string pattern("_group_");
for (const auto &[name, capacity] : bundle_resource_labels_) {
auto idx = name.find(pattern);
if (idx != std::string::npos &&
name.find("_", idx + pattern.size()) == std::string::npos) {
wildcard_resources[name] = capacity;
}
}
return wildcard_resources;
}

BundleID BundleSpecification::BundleId() const {
if (message_->bundle_id()
.placement_group_id()
Expand Down
4 changes: 4 additions & 0 deletions src/ray/common/bundle_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ class BundleSpecification : public MessageWrapper<rpc::Bundle> {
return bundle_resource_labels_;
}

/// TODO(Chong-Li): This function is used for updating PG's wildcard resources
/// incrementally in gcs. It should be removed when PG scheduling is refactored.
absl::flat_hash_map<std::string, double> GetWildcardResources() const;

std::string DebugString() const;

private:
Expand Down
16 changes: 15 additions & 1 deletion src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,22 @@ void GcsPlacementGroupScheduler::CommitAllBundles(
schedule_success_handler](const Status &status) {
for (const auto &bundle : bundles_per_node) {
lease_status_tracker->MarkCommitRequestReturned(node_id, bundle, status);

auto resources = bundle->GetFormattedResources();
// For gcs actor scheduler, placement group's wildcard resources have to be
// updated incrementally.
// TODO(Chong-Li): This part should be removed when PG scheduling is refactored.
if (RayConfig::instance().gcs_actor_scheduling_enabled()) {
auto wildcard_resources = bundle->GetWildcardResources();
for (const auto &resource_entry : wildcard_resources) {
auto capacity = gcs_resource_manager_
.GetNodeResources(scheduling::NodeID(node_id.Binary()))
.total.Get(scheduling::ResourceID(resource_entry.first))
.Double();
resources[resource_entry.first] += capacity;
}
}
// Update the resource in gcs resource manager
auto &resources = bundle->GetFormattedResources();
gcs_resource_manager_.UpdateResources(node_id, resources);

if (ray_syncer_ != nullptr) {
Expand Down
19 changes: 16 additions & 3 deletions src/ray/gcs/gcs_server/gcs_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ GcsResourceManager::GcsResourceManager(
instrumented_io_context &io_context,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
ClusterResourceManager &cluster_resource_manager,
scheduling::NodeID local_node_id)
NodeID local_node_id,
std::shared_ptr<ClusterTaskManager> cluster_task_manager)
: io_context_(io_context),
gcs_table_storage_(gcs_table_storage),
cluster_resource_manager_(cluster_resource_manager),
local_node_id_(local_node_id) {}
local_node_id_(std::move(local_node_id)),
cluster_task_manager_(std::move(cluster_task_manager)) {}

void GcsResourceManager::ConsumeSyncMessage(
std::shared_ptr<const syncer::RaySyncMessage> message) {
Expand Down Expand Up @@ -152,8 +154,9 @@ void GcsResourceManager::HandleGetAllAvailableResources(
const rpc::GetAllAvailableResourcesRequest &request,
rpc::GetAllAvailableResourcesReply *reply,
rpc::SendReplyCallback send_reply_callback) {
auto local_scheduling_node_id = scheduling::NodeID(local_node_id_.Binary());
for (const auto &node_resources_entry : cluster_resource_manager_.GetResourceView()) {
if (node_resources_entry.first == local_node_id_) {
if (node_resources_entry.first == local_scheduling_node_id) {
continue;
}
rpc::AvailableResources resource;
Expand Down Expand Up @@ -229,6 +232,11 @@ void GcsResourceManager::HandleGetAllResourceUsage(
const rpc::GetAllResourceUsageRequest &request,
rpc::GetAllResourceUsageReply *reply,
rpc::SendReplyCallback send_reply_callback) {
if (cluster_task_manager_ && RayConfig::instance().gcs_actor_scheduling_enabled()) {
rpc::ResourcesData resources_data;
cluster_task_manager_->FillPendingActorInfo(resources_data);
node_resource_usages_[local_node_id_].CopyFrom(resources_data);
}
if (!node_resource_usages_.empty()) {
auto batch = std::make_shared<rpc::ResourceUsageBatchData>();
std::unordered_map<google::protobuf::Map<std::string, double>, rpc::ResourceDemand>
Expand Down Expand Up @@ -385,5 +393,10 @@ std::string GcsResourceManager::ToString() const {
return ostr.str();
}

const NodeResources &GcsResourceManager::GetNodeResources(
scheduling::NodeID node_id) const {
return cluster_resource_manager_.GetNodeResources(node_id);
}

} // namespace gcs
} // namespace ray
15 changes: 13 additions & 2 deletions src/ray/gcs/gcs_server/gcs_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
#include "ray/gcs/gcs_server/gcs_table_storage.h"
#include "ray/raylet/scheduling/cluster_resource_data.h"
#include "ray/raylet/scheduling/cluster_resource_manager.h"
#include "ray/raylet/scheduling/cluster_task_manager.h"
#include "ray/rpc/client_call.h"
#include "ray/rpc/gcs_server/gcs_rpc_server.h"
#include "src/ray/protobuf/gcs.pb.h"

namespace ray {
using raylet::ClusterTaskManager;
namespace gcs {
/// Ideally, the logic related to resource calculation should be moved from
/// `gcs_resoruce_manager` to `cluster_resource_manager`, and all logic related to
Expand Down Expand Up @@ -55,7 +57,8 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler,
instrumented_io_context &io_context,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
ClusterResourceManager &cluster_resource_manager,
scheduling::NodeID local_node_id_ = scheduling::NodeID::Nil());
NodeID local_node_id,
std::shared_ptr<ClusterTaskManager> cluster_task_manager = nullptr);

virtual ~GcsResourceManager() {}

Expand Down Expand Up @@ -145,6 +148,13 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler,
/// \param data The resource loads reported by raylet.
void UpdateResourceLoads(const rpc::ResourcesData &data);

/// Get the resources of a specified node.
/// TODO(Chong-Li): This function is only used for updating PG's wildcard resources
/// incrementally in gcs. It should be removed when PG scheduling is refactored.
///
/// \param node_id ID of the specified node.
const NodeResources &GetNodeResources(scheduling::NodeID node_id) const;

private:
/// io context. This is to ensure thread safety. Ideally, all public
/// funciton needs to post job to this io_context.
Expand Down Expand Up @@ -173,7 +183,8 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler,
uint64_t counts_[CountType::CountType_MAX] = {0};

ClusterResourceManager &cluster_resource_manager_;
scheduling::NodeID local_node_id_;
NodeID local_node_id_;
std::shared_ptr<ClusterTaskManager> cluster_task_manager_;
};

} // namespace gcs
Expand Down
7 changes: 4 additions & 3 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,13 @@ void GcsServer::InitGcsHeartbeatManager(const GcsInitData &gcs_init_data) {
}

void GcsServer::InitGcsResourceManager(const GcsInitData &gcs_init_data) {
RAY_CHECK(gcs_table_storage_ && cluster_resource_scheduler_);
RAY_CHECK(gcs_table_storage_ && cluster_resource_scheduler_ && cluster_task_manager_);
gcs_resource_manager_ = std::make_shared<GcsResourceManager>(
main_service_,
gcs_table_storage_,
cluster_resource_scheduler_->GetClusterResourceManager(),
scheduling::NodeID(local_node_id_.Binary()));
local_node_id_,
cluster_task_manager_);

// Initialize by gcs tables data.
gcs_resource_manager_->Initialize(gcs_init_data);
Expand Down Expand Up @@ -324,7 +325,7 @@ void GcsServer::InitClusterTaskManager() {
/*announce_infeasible_task=*/
nullptr,
/*local_task_manager=*/
nullptr);
std::make_shared<NoopLocalTaskManager>());
}

void GcsServer::InitGcsJobManager(const GcsInitData &gcs_init_data) {
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/gcs_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

namespace ray {
using raylet::ClusterTaskManager;
using raylet::NoopLocalTaskManager;
namespace gcs {

struct GcsServerConfig {
Expand Down
3 changes: 2 additions & 1 deletion src/ray/gcs/gcs_server/test/gcs_actor_scheduler_mock_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
using namespace ::testing;

namespace ray {
using raylet::NoopLocalTaskManager;
namespace gcs {
struct MockCallback {
MOCK_METHOD(void, Call, ((std::shared_ptr<GcsActor>)));
Expand Down Expand Up @@ -60,7 +61,7 @@ class GcsActorSchedulerMockTest : public Test {
/*announce_infeasible_task=*/
nullptr,
/*local_task_manager=*/
nullptr);
std::make_shared<NoopLocalTaskManager>());
actor_scheduler = std::make_unique<GcsActorScheduler>(
io_context,
*actor_table,
Expand Down
6 changes: 4 additions & 2 deletions src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
// clang-format on

namespace ray {
using raylet::NoopLocalTaskManager;
namespace gcs {

class GcsActorSchedulerTest : public ::testing::Test {
Expand Down Expand Up @@ -59,11 +60,12 @@ class GcsActorSchedulerTest : public ::testing::Test {
/*announce_infeasible_task=*/
nullptr,
/*local_task_manager=*/
nullptr);
std::make_shared<NoopLocalTaskManager>());
auto gcs_resource_manager = std::make_shared<gcs::GcsResourceManager>(
io_service_,
gcs_table_storage_,
cluster_resource_scheduler->GetClusterResourceManager());
cluster_resource_scheduler->GetClusterResourceManager(),
local_node_id_);
gcs_actor_scheduler_ = std::make_shared<GcsServerMocker::MockedGcsActorScheduler>(
io_service_,
*gcs_actor_table_,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class GcsPlacementGroupManagerMockTest : public Test {
gcs_placement_group_scheduler_ =
std::make_shared<MockGcsPlacementGroupSchedulerInterface>();
resource_manager_ = std::make_shared<MockGcsResourceManager>(
io_context_, nullptr, cluster_resource_manager_);
io_context_, nullptr, cluster_resource_manager_, NodeID::FromRandom());

gcs_placement_group_manager_ =
std::make_unique<GcsPlacementGroupManager>(io_context_,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class GcsPlacementGroupManagerTest : public ::testing::Test {
std::make_shared<GcsPublisher>(std::make_unique<ray::pubsub::MockPublisher>());
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
gcs_resource_manager_ = std::make_shared<gcs::GcsResourceManager>(
io_service_, nullptr, cluster_resource_manager_);
io_service_, nullptr, cluster_resource_manager_, NodeID::FromRandom());
gcs_placement_group_manager_.reset(new gcs::GcsPlacementGroupManager(
io_service_,
mock_placement_group_scheduler_,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,18 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
gcs_publisher_ = std::make_shared<gcs::GcsPublisher>(
std::make_unique<ray::pubsub::MockPublisher>());
auto local_node_id = NodeID::FromRandom();
cluster_resource_scheduler_ = std::make_shared<ClusterResourceScheduler>(
scheduling::NodeID(NodeID::FromRandom().Binary()),
scheduling::NodeID(local_node_id.Binary()),
NodeResources(),
/*is_node_available_fn=*/
[](auto) { return true; },
/*is_local_node_with_raylet=*/false);
gcs_resource_manager_ = std::make_shared<gcs::GcsResourceManager>(
io_service_,
gcs_table_storage_,
cluster_resource_scheduler_->GetClusterResourceManager());
cluster_resource_scheduler_->GetClusterResourceManager(),
local_node_id);
ray_syncer_ = std::make_shared<ray::gcs_syncer::RaySyncer>(
io_service_, nullptr, *gcs_resource_manager_);
store_client_ = std::make_shared<gcs::InMemoryStoreClient>(io_service_);
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/test/gcs_resource_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class GcsResourceManagerTest : public ::testing::Test {
public:
GcsResourceManagerTest() {
gcs_resource_manager_ = std::make_shared<gcs::GcsResourceManager>(
io_service_, nullptr, cluster_resource_manager_);
io_service_, nullptr, cluster_resource_manager_, NodeID::FromRandom());
}

instrumented_io_context io_service_;
Expand Down
18 changes: 13 additions & 5 deletions src/ray/raylet/scheduling/cluster_resource_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,22 @@ scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode(
&_unused,
is_infeasible);

// If there is no other available nodes, prefer waiting on the local node
// since the local node is chosen for a reason (e.g. spread).
if (prioritize_local_node && !best_node.IsNil() &&
// There is no other available nodes.
if (!best_node.IsNil() &&
!IsSchedulableOnNode(best_node,
task_spec.GetRequiredResources().GetResourceMap(),
requires_object_store_memory)) {
*is_infeasible = false;
return local_node_id_;
// Prefer waiting on the local node since the local node is chosen for a reason (e.g.
// spread).
if (prioritize_local_node) {
*is_infeasible = false;
return local_node_id_;
}
// If the task is being scheduled by gcs, return nil to make it stay in the
// `cluster_task_manager`'s queue.
if (!is_local_node_with_raylet_) {
return scheduling::NodeID::Nil();
}
}

return best_node;
Expand Down
Loading

0 comments on commit 5bb46ca

Please sign in to comment.