Skip to content

Commit

Permalink
[GCS]GCS resource manager remove cluster_resources_ (ray-project#12972)
Browse files Browse the repository at this point in the history
  • Loading branch information
ffbin authored Dec 21, 2020
1 parent b2bcab7 commit 4caa6c6
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 39 deletions.
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ void GcsNodeManager::AddNode(std::shared_ptr<rpc::GcsNodeInfo> node) {
for (auto &listener : node_added_listeners_) {
listener(node);
}
gcs_resource_manager_->OnNodeAdd(node_id);
gcs_resource_manager_->OnNodeAdd(*node);
}
}

Expand Down
98 changes: 64 additions & 34 deletions src/ray/gcs/gcs_server/gcs_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@ void GcsResourceManager::HandleGetResources(const rpc::GetResourcesRequest &requ
rpc::GetResourcesReply *reply,
rpc::SendReplyCallback send_reply_callback) {
NodeID node_id = NodeID::FromBinary(request.node_id());
auto iter = cluster_resources_.find(node_id);
if (iter != cluster_resources_.end()) {
for (const auto &resource : iter->second.items()) {
(*reply->mutable_resources())[resource.first] = resource.second;
auto iter = cluster_scheduling_resources_.find(node_id);
if (iter != cluster_scheduling_resources_.end()) {
const auto &resource_map = iter->second.GetTotalResources().GetResourceMap();
rpc::ResourceTableData resource_table_data;
for (const auto &resource : resource_map) {
resource_table_data.set_resource_capacity(resource.second);
(*reply->mutable_resources())[resource.first] = resource_table_data;
}
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
Expand All @@ -41,29 +44,35 @@ void GcsResourceManager::HandleUpdateResources(
rpc::SendReplyCallback send_reply_callback) {
NodeID node_id = NodeID::FromBinary(request.node_id());
RAY_LOG(DEBUG) << "Updating resources, node id = " << node_id;
auto iter = cluster_resources_.find(node_id);
std::unordered_map<std::string, double> to_be_updated_resources;
auto changed_resources = std::make_shared<std::unordered_map<std::string, double>>();
for (const auto &entry : request.resources()) {
to_be_updated_resources.emplace(entry.first, entry.second.resource_capacity());
changed_resources->emplace(entry.first, entry.second.resource_capacity());
}

if (iter != cluster_resources_.end()) {
for (const auto &entry : request.resources()) {
(*iter->second.mutable_items())[entry.first] = entry.second;
auto iter = cluster_scheduling_resources_.find(node_id);
if (iter != cluster_scheduling_resources_.end()) {
// Update `cluster_scheduling_resources_`.
SchedulingResources &scheduling_resources = iter->second;
for (const auto &entry : *changed_resources) {
scheduling_resources.UpdateResourceCapacity(entry.first, entry.second);
}

// Update gcs storage.
rpc::ResourceMap resource_map;
for (const auto &entry : iter->second.GetTotalResources().GetResourceMap()) {
(*resource_map.mutable_items())[entry.first].set_resource_capacity(entry.second);
}
for (const auto &entry : *changed_resources) {
(*resource_map.mutable_items())[entry.first].set_resource_capacity(entry.second);
}
UpdateResourceCapacity(node_id, to_be_updated_resources);
auto on_done = [this, node_id, to_be_updated_resources, reply,

auto on_done = [this, node_id, changed_resources, reply,
send_reply_callback](const Status &status) {
RAY_CHECK_OK(status);
rpc::NodeResourceChange node_resource_change;
node_resource_change.set_node_id(node_id.Binary());
for (const auto &it : to_be_updated_resources) {
const auto &resource_name = it.first;
const auto &resource_capacity = it.second;
auto &node_updated_resources =
(*node_resource_change.mutable_updated_resources());
node_updated_resources[resource_name] = resource_capacity;
}
node_resource_change.mutable_updated_resources()->insert(changed_resources->begin(),
changed_resources->end());
RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_RESOURCE_CHANNEL, node_id.Hex(),
node_resource_change.SerializeAsString(),
nullptr));
Expand All @@ -73,7 +82,7 @@ void GcsResourceManager::HandleUpdateResources(
};

RAY_CHECK_OK(
gcs_table_storage_->NodeResourceTable().Put(node_id, iter->second, on_done));
gcs_table_storage_->NodeResourceTable().Put(node_id, resource_map, on_done));
} else {
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::Invalid("Node is not exist."));
RAY_LOG(ERROR) << "Failed to update resources as node " << node_id
Expand All @@ -88,13 +97,23 @@ void GcsResourceManager::HandleDeleteResources(
NodeID node_id = NodeID::FromBinary(request.node_id());
RAY_LOG(DEBUG) << "Deleting node resources, node id = " << node_id;
auto resource_names = VectorFromProtobuf(request.resource_name_list());
auto iter = cluster_resources_.find(node_id);
if (iter != cluster_resources_.end()) {
DeleteResources(node_id, resource_names);
auto iter = cluster_scheduling_resources_.find(node_id);
if (iter != cluster_scheduling_resources_.end()) {
// Update `cluster_scheduling_resources_`.
for (const auto &resource_name : resource_names) {
iter->second.DeleteResource(resource_name);
}

// Update gcs storage.
rpc::ResourceMap resource_map;
auto resources = iter->second.GetTotalResources().GetResourceMap();
for (const auto &resource_name : resource_names) {
RAY_IGNORE_EXPR(iter->second.mutable_items()->erase(resource_name));
resources.erase(resource_name);
}
for (const auto &entry : resources) {
(*resource_map.mutable_items())[entry.first].set_resource_capacity(entry.second);
}

auto on_done = [this, node_id, resource_names, reply,
send_reply_callback](const Status &status) {
RAY_CHECK_OK(status);
Expand All @@ -110,7 +129,7 @@ void GcsResourceManager::HandleDeleteResources(
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
RAY_CHECK_OK(
gcs_table_storage_->NodeResourceTable().Put(node_id, iter->second, on_done));
gcs_table_storage_->NodeResourceTable().Put(node_id, resource_map, on_done));
} else {
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
RAY_LOG(DEBUG) << "Finished deleting node resources, node id = " << node_id;
Expand All @@ -136,10 +155,20 @@ void GcsResourceManager::HandleGetAllAvailableResources(

void GcsResourceManager::Initialize(const GcsInitData &gcs_init_data) {
const auto &nodes = gcs_init_data.Nodes();
for (auto &entry : gcs_init_data.ClusterResources()) {
const auto &iter = nodes.find(entry.first);
if (iter->second.state() == rpc::GcsNodeInfo::ALIVE) {
cluster_resources_[entry.first] = entry.second;
for (const auto &entry : nodes) {
if (entry.second.state() == rpc::GcsNodeInfo::ALIVE) {
OnNodeAdd(entry.second);
}
}

const auto &cluster_resources = gcs_init_data.ClusterResources();
for (const auto &entry : cluster_resources) {
const auto &iter = cluster_scheduling_resources_.find(entry.first);
if (iter != cluster_scheduling_resources_.end()) {
for (const auto &resource : entry.second.items()) {
iter->second.UpdateResourceCapacity(resource.first,
resource.second.resource_capacity());
}
}
}
}
Expand Down Expand Up @@ -173,19 +202,20 @@ void GcsResourceManager::DeleteResources(
const NodeID &node_id, const std::vector<std::string> &deleted_resources) {
auto iter = cluster_scheduling_resources_.find(node_id);
if (iter != cluster_scheduling_resources_.end()) {
for (auto &resource_name : deleted_resources) {
for (const auto &resource_name : deleted_resources) {
iter->second.DeleteResource(resource_name);
}
}
}

void GcsResourceManager::OnNodeAdd(const NodeID &node_id) {
// Add an empty resources for this node.
cluster_resources_.emplace(node_id, rpc::ResourceMap());
void GcsResourceManager::OnNodeAdd(const rpc::GcsNodeInfo &node) {
auto node_id = NodeID::FromBinary(node.node_id());
if (!cluster_scheduling_resources_.contains(node_id)) {
cluster_scheduling_resources_.emplace(node_id, SchedulingResources());
}
}

void GcsResourceManager::OnNodeDead(const NodeID &node_id) {
cluster_resources_.erase(node_id);
cluster_scheduling_resources_.erase(node_id);
}

Expand Down
6 changes: 2 additions & 4 deletions src/ray/gcs/gcs_server/gcs_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler {

/// Handle a node registration.
///
/// \param node_id The specified node id.
void OnNodeAdd(const NodeID &node_id);
/// \param node The specified node to add.
void OnNodeAdd(const rpc::GcsNodeInfo &node);

/// Handle a node death.
///
Expand Down Expand Up @@ -130,8 +130,6 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler {
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
/// Storage for GCS tables.
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
/// Cluster resources.
absl::flat_hash_map<NodeID, rpc::ResourceMap> cluster_resources_;
/// Map from node id to the scheduling resources of the node.
absl::flat_hash_map<NodeID, SchedulingResources> cluster_scheduling_resources_;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
rpc::ResourcesData resource;
resource.set_node_id(node->node_id());
(*resource.mutable_resources_available())["CPU"] = cpu_num;
resource.set_resources_available_changed(true);
gcs_node_manager_->UpdateNodeRealtimeResources(NodeID::FromBinary(node->node_id()),
resource);
}
Expand Down

0 comments on commit 4caa6c6

Please sign in to comment.