Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GCS]Add gcs actor schedule strategy #13156

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions src/ray/gcs/gcs_server/gcs_actor_schedule_strategy.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "ray/gcs/gcs_server/gcs_actor_schedule_strategy.h"
#include "ray/gcs/gcs_server/gcs_actor_manager.h"

namespace ray {
namespace gcs {

std::shared_ptr<rpc::GcsNodeInfo> GcsRandomActorScheduleStrategy::Schedule(
std::shared_ptr<GcsActor> actor) {
// Select a node to lease worker for the actor.
std::shared_ptr<rpc::GcsNodeInfo> node;

// If an actor has resource requirements, we will try to schedule it on the same node as
// the owner if possible.
const auto &task_spec = actor->GetCreationTaskSpecification();
if (!task_spec.GetRequiredResources().IsEmpty()) {
auto maybe_node = gcs_node_manager_->GetAliveNode(actor->GetOwnerNodeID());
node = maybe_node.has_value() ? maybe_node.value() : SelectNodeRandomly();
} else {
node = SelectNodeRandomly();
}

return node;
}

std::shared_ptr<rpc::GcsNodeInfo> GcsRandomActorScheduleStrategy::SelectNodeRandomly()
const {
auto &alive_nodes = gcs_node_manager_->GetAllAliveNodes();
if (alive_nodes.empty()) {
return nullptr;
}

static std::mt19937_64 gen_(
std::chrono::high_resolution_clock::now().time_since_epoch().count());
std::uniform_int_distribution<int> distribution(0, alive_nodes.size() - 1);
int key_index = distribution(gen_);
int index = 0;
auto iter = alive_nodes.begin();
for (; index != key_index && iter != alive_nodes.end(); ++index, ++iter)
;
return iter->second;
}

} // namespace gcs
} // namespace ray
71 changes: 71 additions & 0 deletions src/ray/gcs/gcs_server/gcs_actor_schedule_strategy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "ray/common/id.h"
#include "ray/gcs/gcs_server/gcs_node_manager.h"

namespace ray {
namespace gcs {

class GcsActor;

/// \class GcsActorScheduleStrategyInterface
///
/// Used for different kinds of actor scheduling strategy.
class GcsActorScheduleStrategyInterface {
public:
virtual ~GcsActorScheduleStrategyInterface() = default;

/// Select a node to schedule the actor.
///
/// \param actor The actor to be scheduled.
/// \return The selected node. If the scheduling fails, nullptr is returned.
virtual std::shared_ptr<rpc::GcsNodeInfo> Schedule(std::shared_ptr<GcsActor> actor) = 0;
};

/// \class GcsRandomActorScheduleStrategy
///
/// This strategy will select node randomly from the node pool to schedule the actor.
class GcsRandomActorScheduleStrategy : public GcsActorScheduleStrategyInterface {
public:
/// Create a GcsRandomActorScheduleStrategy
///
/// \param gcs_node_manager Node management of the cluster, which provides interfaces
/// to access the node information.
explicit GcsRandomActorScheduleStrategy(
std::shared_ptr<GcsNodeManager> gcs_node_manager)
: gcs_node_manager_(std::move(gcs_node_manager)) {}

virtual ~GcsRandomActorScheduleStrategy() = default;

/// Select a node to schedule the actor.
///
/// \param actor The actor to be scheduled.
/// \return The selected node. If the scheduling fails, nullptr is returned.
std::shared_ptr<rpc::GcsNodeInfo> Schedule(std::shared_ptr<GcsActor> actor) override;

private:
/// Select a node from alive nodes randomly.
///
/// \return The selected node. If the scheduling fails, `nullptr` is returned.
std::shared_ptr<rpc::GcsNodeInfo> SelectNodeRandomly() const;

/// The node manager.
std::shared_ptr<GcsNodeManager> gcs_node_manager_;
};

} // namespace gcs
} // namespace ray
14 changes: 3 additions & 11 deletions src/ray/gcs/gcs_server/gcs_actor_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ GcsActorScheduler::GcsActorScheduler(
std::function<void(std::shared_ptr<GcsActor>)> schedule_failure_handler,
std::function<void(std::shared_ptr<GcsActor>)> schedule_success_handler,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
std::shared_ptr<GcsActorScheduleStrategyInterface> actor_schedule_strategy,
rpc::ClientFactoryFn client_factory)
: io_context_(io_context),
gcs_actor_table_(gcs_actor_table),
Expand All @@ -38,6 +39,7 @@ GcsActorScheduler::GcsActorScheduler(
schedule_success_handler_(std::move(schedule_success_handler)),
report_worker_backlog_(RayConfig::instance().report_worker_backlog()),
raylet_client_pool_(raylet_client_pool),
actor_schedule_strategy_(actor_schedule_strategy),
core_worker_clients_(client_factory) {
RAY_CHECK(schedule_failure_handler_ != nullptr && schedule_success_handler_ != nullptr);
}
Expand All @@ -46,17 +48,7 @@ void GcsActorScheduler::Schedule(std::shared_ptr<GcsActor> actor) {
RAY_CHECK(actor->GetNodeID().IsNil() && actor->GetWorkerID().IsNil());

// Select a node to lease worker for the actor.
std::shared_ptr<rpc::GcsNodeInfo> node;

// If an actor has resource requirements, we will try to schedule it on the same node as
// the owner if possible.
const auto &task_spec = actor->GetCreationTaskSpecification();
if (!task_spec.GetRequiredResources().IsEmpty()) {
auto maybe_node = gcs_node_manager_.GetAliveNode(actor->GetOwnerNodeID());
node = maybe_node.has_value() ? maybe_node.value() : SelectNodeRandomly();
} else {
node = SelectNodeRandomly();
}
const auto &node = actor_schedule_strategy_->Schedule(actor);

if (node == nullptr) {
// There are no available nodes to schedule the actor, so just trigger the failed
Expand Down
5 changes: 5 additions & 0 deletions src/ray/gcs/gcs_server/gcs_actor_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "ray/common/task/task_execution_spec.h"
#include "ray/common/task/task_spec.h"
#include "ray/gcs/accessor.h"
#include "ray/gcs/gcs_server/gcs_actor_schedule_strategy.h"
#include "ray/gcs/gcs_server/gcs_node_manager.h"
#include "ray/gcs/gcs_server/gcs_table_storage.h"
#include "ray/raylet_client/raylet_client.h"
Expand Down Expand Up @@ -90,6 +91,7 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
/// \param schedule_success_handler Invoked when actors are created on the worker
/// successfully.
/// \param raylet_client_pool Raylet client pool to construct connections to raylets.
/// \param actor_schedule_strategy Actor schedule strategy.
/// \param client_factory Factory to create remote core worker client, default factor
/// will be used if not set.
explicit GcsActorScheduler(
Expand All @@ -98,6 +100,7 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
std::function<void(std::shared_ptr<GcsActor>)> schedule_failure_handler,
std::function<void(std::shared_ptr<GcsActor>)> schedule_success_handler,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
std::shared_ptr<GcsActorScheduleStrategyInterface> actor_schedule_strategy,
rpc::ClientFactoryFn client_factory = nullptr);
virtual ~GcsActorScheduler() = default;

Expand Down Expand Up @@ -286,6 +289,8 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
absl::flat_hash_set<NodeID> nodes_of_releasing_unused_workers_;
/// The cached raylet clients used to communicate with raylet.
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool_;
/// The actor schedule strategy.
std::shared_ptr<GcsActorScheduleStrategyInterface> actor_schedule_strategy_;
/// The cached core worker clients which are used to communicate with leased worker.
rpc::CoreWorkerClientPool core_worker_clients_;
};
Expand Down
4 changes: 3 additions & 1 deletion src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ void GcsServer::InitGcsJobManager() {

void GcsServer::InitGcsActorManager(const GcsInitData &gcs_init_data) {
RAY_CHECK(gcs_table_storage_ && gcs_pub_sub_ && gcs_node_manager_);
auto actor_schedule_strategy =
std::make_shared<GcsRandomActorScheduleStrategy>(gcs_node_manager_);
auto scheduler = std::make_shared<GcsActorScheduler>(
main_service_, gcs_table_storage_->ActorTable(), *gcs_node_manager_, gcs_pub_sub_,
/*schedule_failure_handler=*/
Expand All @@ -193,7 +195,7 @@ void GcsServer::InitGcsActorManager(const GcsInitData &gcs_init_data) {
[this](std::shared_ptr<GcsActor> actor) {
gcs_actor_manager_->OnActorCreationSuccess(std::move(actor));
},
raylet_client_pool_,
raylet_client_pool_, actor_schedule_strategy,
/*client_factory=*/
[this](const rpc::Address &address) {
return std::make_shared<rpc::CoreWorkerClient>(address, client_call_manager_);
Expand Down
32 changes: 16 additions & 16 deletions src/ray/gcs/gcs_server/gcs_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class GcsServer {
/// Print debug info periodically.
void PrintDebugInfo();

/// Gcs server configuration
/// Gcs server configuration.
GcsServerConfig config_;
/// The main io service to drive event posted from grpc threads.
boost::asio::io_context &main_service_;
Expand All @@ -135,7 +135,7 @@ class GcsServer {
rpc::GrpcServer rpc_server_;
/// The `ClientCallManager` object that is shared by all `NodeManagerWorkerClient`s.
rpc::ClientCallManager client_call_manager_;
/// Node manager client pool
/// Node manager client pool.
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool_;
/// The gcs resource manager.
std::shared_ptr<GcsResourceManager> gcs_resource_manager_;
Expand All @@ -145,37 +145,37 @@ class GcsServer {
std::shared_ptr<GcsHeartbeatManager> gcs_heartbeat_manager_;
/// The gcs redis failure detector.
std::shared_ptr<GcsRedisFailureDetector> gcs_redis_failure_detector_;
/// The gcs actor manager
/// The gcs actor manager.
std::shared_ptr<GcsActorManager> gcs_actor_manager_;
/// The gcs placement group manager
/// The gcs placement group manager.
std::shared_ptr<GcsPlacementGroupManager> gcs_placement_group_manager_;
/// Job info handler and service
/// Job info handler and service.
std::unique_ptr<GcsJobManager> gcs_job_manager_;
std::unique_ptr<rpc::JobInfoGrpcService> job_info_service_;
/// Actor info service
/// Actor info service.
std::unique_ptr<rpc::ActorInfoGrpcService> actor_info_service_;
/// Node info handler and service
/// Node info handler and service.
std::unique_ptr<rpc::NodeInfoGrpcService> node_info_service_;
/// Node resource info handler and service
/// Node resource info handler and service.
std::unique_ptr<rpc::NodeResourceInfoGrpcService> node_resource_info_service_;
/// Heartbeat info handler and service
/// Heartbeat info handler and service.
std::unique_ptr<rpc::HeartbeatInfoGrpcService> heartbeat_info_service_;
/// Object info handler and service
/// Object info handler and service.
std::unique_ptr<gcs::GcsObjectManager> gcs_object_manager_;
std::unique_ptr<rpc::ObjectInfoGrpcService> object_info_service_;
/// Task info handler and service
/// Task info handler and service.
std::unique_ptr<rpc::TaskInfoHandler> task_info_handler_;
std::unique_ptr<rpc::TaskInfoGrpcService> task_info_service_;
/// Stats handler and service
/// Stats handler and service.
std::unique_ptr<rpc::StatsHandler> stats_handler_;
std::unique_ptr<rpc::StatsGrpcService> stats_service_;
/// The gcs worker manager
/// The gcs worker manager.
std::unique_ptr<GcsWorkerManager> gcs_worker_manager_;
/// Worker info service
/// Worker info service.
std::unique_ptr<rpc::WorkerInfoGrpcService> worker_info_service_;
/// Placement Group info handler and service
/// Placement Group info handler and service.
std::unique_ptr<rpc::PlacementGroupInfoGrpcService> placement_group_info_service_;
/// Backend client
/// Backend client.
std::shared_ptr<RedisClient> redis_client_;
/// A publisher for publishing gcs messages.
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
Expand Down
5 changes: 4 additions & 1 deletion src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class GcsActorSchedulerTest : public ::testing::Test {
gcs_table_storage_ = std::make_shared<gcs::RedisGcsTableStorage>(redis_client_);
gcs_node_manager_ =
std::make_shared<gcs::GcsNodeManager>(gcs_pub_sub_, gcs_table_storage_);
gcs_actor_schedule_strategy_ =
std::make_shared<gcs::GcsRandomActorScheduleStrategy>(gcs_node_manager_);
store_client_ = std::make_shared<gcs::InMemoryStoreClient>(io_service_);
gcs_actor_table_ =
std::make_shared<GcsServerMocker::MockedGcsActorTable>(store_client_);
Expand All @@ -43,7 +45,7 @@ class GcsActorSchedulerTest : public ::testing::Test {
[this](std::shared_ptr<gcs::GcsActor> actor) {
success_actors_.emplace_back(std::move(actor));
},
raylet_client_pool_,
raylet_client_pool_, gcs_actor_schedule_strategy_,
/*client_factory=*/
[this](const rpc::Address &address) { return worker_client_; });
}
Expand All @@ -55,6 +57,7 @@ class GcsActorSchedulerTest : public ::testing::Test {
std::shared_ptr<GcsServerMocker::MockRayletClient> raylet_client_;
std::shared_ptr<GcsServerMocker::MockWorkerClient> worker_client_;
std::shared_ptr<gcs::GcsNodeManager> gcs_node_manager_;
std::shared_ptr<gcs::GcsActorScheduleStrategyInterface> gcs_actor_schedule_strategy_;
std::shared_ptr<GcsServerMocker::MockedGcsActorScheduler> gcs_actor_scheduler_;
std::vector<std::shared_ptr<gcs::GcsActor>> success_actors_;
std::vector<std::shared_ptr<gcs::GcsActor>> failure_actors_;
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/test/gcs_server_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "ray/common/task/task_util.h"
#include "ray/common/test_util.h"
#include "ray/gcs/gcs_server/gcs_actor_manager.h"
#include "ray/gcs/gcs_server/gcs_actor_schedule_strategy.h"
#include "ray/gcs/gcs_server/gcs_actor_scheduler.h"
#include "ray/gcs/gcs_server/gcs_node_manager.h"
#include "ray/gcs/gcs_server/gcs_placement_group_manager.h"
Expand Down