Skip to content

Commit

Permalink
[core] Autoscaler v2 consistency fix [1/2] (ray-project#40369)
Browse files Browse the repository at this point in the history
Signed-off-by: vitsai <vitsai@cs.stanford.edu>
  • Loading branch information
vitsai authored Oct 20, 2023
1 parent e615eaf commit 17e6cc2
Show file tree
Hide file tree
Showing 13 changed files with 339 additions and 154 deletions.
73 changes: 71 additions & 2 deletions python/ray/autoscaler/v2/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,76 @@ def is_head_node_from_resource_usage(usage: Dict[str, float]) -> bool:
return False


def test_autoscaler_no_churn():
num_cpus_per_node = 4
expected_nodes = 6
cluster = AutoscalingCluster(
head_resources={"CPU": num_cpus_per_node},
worker_node_types={
"type-1": {
"resources": {"CPU": num_cpus_per_node},
"node_config": {},
"min_workers": 0,
"max_workers": 2 * expected_nodes,
},
},
)

driver_script = f"""
import time
import ray
@ray.remote(num_cpus=1)
def foo():
time.sleep(60)
return True
ray.init("auto")
print("start")
assert(ray.get([foo.remote() for _ in range({num_cpus_per_node * expected_nodes})]))
print("end")
"""

try:
cluster.start()
ray.init("auto")
gcs_address = ray.get_runtime_context().gcs_address

def tasks_run():
tasks = list_tasks()
# Waiting til the driver in the run_string_as_driver_nonblocking is running
assert len(tasks) > 0
return True

run_string_as_driver_nonblocking(driver_script)
wait_for_condition(tasks_run)

reached_threshold = False
for _ in range(30):
# verify no pending task + with resource used.
status = get_cluster_status(gcs_address)
has_task_demand = len(status.resource_demands.ray_task_actor_demand) > 0

# Check that we don't overscale
assert len(status.active_nodes) <= expected_nodes

# Check there's no demand if we've reached the expected number of nodes
if reached_threshold:
assert not has_task_demand

# Load disappears in the next cycle after we've fully scaled up.
if len(status.active_nodes) == expected_nodes:
reached_threshold = True

time.sleep(1)

assert reached_threshold
finally:
# TODO(rickyx): refactor into a fixture for autoscaling cluster.
ray.shutdown()
cluster.shutdown()


# TODO(rickyx): We are NOT able to counter multi-node inconsistency yet. The problem is
# right now, when node A (head node) has an infeasible task,
# node B just finished running previous task.
Expand All @@ -31,8 +101,7 @@ def is_head_node_from_resource_usage(usage: Dict[str, float]) -> bool:
# node A: 1 pending task (infeasible)
# node B: 0 pending task, but **CPU used = 1**
#
# @pytest.mark.parametrize("mode", (["single_node", "multi_node"]))
@pytest.mark.parametrize("mode", (["single_node"]))
@pytest.mark.parametrize("mode", (["single_node", "multi_node"]))
def test_scheduled_task_no_pending_demand(mode):

# So that head node will need to dispatch tasks to worker node.
Expand Down
8 changes: 0 additions & 8 deletions python/ray/serve/tests/test_max_replicas_per_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ def get_node_to_deployment_to_num_replicas():
return node_to_deployment_to_num_replicas


@pytest.mark.skipif(
sys.platform == "win32",
reason="Flaky on Windows due to https://github.com/ray-project/ray/issues/36926.",
)
@pytest.mark.parametrize(
"ray_autoscaling_cluster",
[
Expand Down Expand Up @@ -86,10 +82,6 @@ def __call__(self):
assert deployment_to_num_replicas["deploy2"] == 1


@pytest.mark.skipif(
sys.platform == "win32",
reason="Flaky on Windows due to https://github.com/ray-project/ray/issues/36926.",
)
@pytest.mark.parametrize(
"ray_autoscaling_cluster",
[
Expand Down
100 changes: 78 additions & 22 deletions src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,19 @@

#include "ray/gcs/gcs_server/gcs_node_manager.h"
#include "ray/gcs/gcs_server/gcs_placement_group_manager.h"
#include "ray/gcs/gcs_server/gcs_resource_manager.h"
#include "ray/gcs/gcs_server/state_util.h"
#include "ray/gcs/pb_util.h"
#include "ray/raylet/scheduling/cluster_resource_manager.h"

namespace ray {
namespace gcs {

GcsAutoscalerStateManager::GcsAutoscalerStateManager(
const std::string &session_name,
const ClusterResourceManager &cluster_resource_manager,
const GcsResourceManager &gcs_resource_manager,
const GcsNodeManager &gcs_node_manager,
const GcsPlacementGroupManager &gcs_placement_group_manager,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool)
: session_name_(session_name),
cluster_resource_manager_(cluster_resource_manager),
gcs_node_manager_(gcs_node_manager),
gcs_resource_manager_(gcs_resource_manager),
gcs_placement_group_manager_(gcs_placement_group_manager),
raylet_client_pool_(std::move(raylet_client_pool)),
last_cluster_resource_state_version_(0),
Expand Down Expand Up @@ -184,13 +179,61 @@ void GcsAutoscalerStateManager::GetClusterResourceConstraints(
}
}

void GcsAutoscalerStateManager::OnNodeAdd(const rpc::GcsNodeInfo &node) {
NodeID node_id = NodeID::FromBinary(node.node_id());
auto node_info =
node_resource_info_
.emplace(node_id, std::make_pair(absl::Now(), rpc::ResourcesData()))
.first;
// Note: We populate total available resources but not load (which is only received from
// autoscaler reports). Temporary underreporting when node is added is fine.
(*node_info->second.second.mutable_resources_total()) = node.resources_total();
(*node_info->second.second.mutable_resources_available()) = node.resources_total();
}

void GcsAutoscalerStateManager::UpdateResourceLoadAndUsage(
const rpc::ResourcesData &data) {
NodeID node_id = NodeID::FromBinary(data.node_id());
auto iter = node_resource_info_.find(node_id);
if (iter == node_resource_info_.end()) {
RAY_LOG(WARNING) << "Ignoring resource usage for node that is not alive: "
<< node_id.Hex() << ".";
return;
}

auto &new_data = iter->second.second;

(*new_data.mutable_resource_load()) = data.resource_load();
(*new_data.mutable_resource_load_by_shape()) = data.resource_load_by_shape();

if (data.resources_total_size() > 0) {
(*new_data.mutable_resources_total()) = data.resources_total();
}

(*new_data.mutable_resources_available()) = data.resources_available();

new_data.set_object_pulls_queued(data.object_pulls_queued());
new_data.set_idle_duration_ms(data.idle_duration_ms());
new_data.set_is_draining(data.is_draining());

// Last update time
iter->second.first = absl::Now();
}

std::unordered_map<google::protobuf::Map<std::string, double>, rpc::ResourceDemand>
GcsAutoscalerStateManager::GetAggregatedResourceLoad() const {
std::unordered_map<google::protobuf::Map<std::string, double>, rpc::ResourceDemand>
aggregate_load;
for (const auto &info : node_resource_info_) {
// Aggregate the load reported by each raylet.
gcs::FillAggregateLoad(info.second.second, &aggregate_load);
}
return aggregate_load;
};

void GcsAutoscalerStateManager::GetPendingResourceRequests(
rpc::autoscaler::ClusterResourceState *state) {
// TODO(rickyx): We could actually get the load of each node from the cluster resource
// manager. Need refactoring on the GcsResourceManager.
// We could then do cluster_resource_manager_GetResourceLoad(), and decouple it
// from gcs_resource_manager_.
auto aggregate_load = gcs_resource_manager_.GetAggregatedResourceLoad();
auto aggregate_load = GetAggregatedResourceLoad();
for (const auto &[shape, demand] : aggregate_load) {
auto num_pending = demand.num_infeasible_requests_queued() + demand.backlog_size() +
demand.num_ready_requests_queued();
Expand All @@ -214,6 +257,15 @@ void GcsAutoscalerStateManager::GetNodeStates(
node_state_proto->set_node_ip_address(gcs_node_info.node_manager_address());
node_state_proto->set_instance_type_name(gcs_node_info.instance_type_name());

// The only node state we use from GcsNodeInfo is the dead state.
// All others are populated with the locally kept ResourcesData,
// which may be more stale than GcsNodeInfo but is more consistent between
// usage and load. GcsNodeInfo state contains only usage and is updated with
// Ray Syncer usage messages, which happen at a much higher cadence than
// autoscaler status polls, and so could be out of sync with load data,
// which is only sent in response to the poll.
//
// See (https://github.com/ray-project/ray/issues/36926) for examples.
if (gcs_node_info.state() == rpc::GcsNodeInfo::DEAD) {
node_state_proto->set_status(rpc::autoscaler::NodeStatus::DEAD);
// We don't need populate other info for a dead node.
Expand All @@ -224,37 +276,41 @@ void GcsAutoscalerStateManager::GetNodeStates(
gcs_node_info.state_snapshot().node_activity());

// The node is alive. We need to check if the node is idle.
auto const &node_resource_data = cluster_resource_manager_.GetNodeResources(
scheduling::NodeID(node_state_proto->node_id()));
if (node_resource_data.is_draining) {
auto const node_resource_iter =
node_resource_info_.find(NodeID::FromBinary(node_state_proto->node_id()));

RAY_CHECK(node_resource_iter != node_resource_info_.end());

auto const &node_resource_item = node_resource_iter->second;
auto const &node_resource_data = node_resource_item.second;
if (node_resource_data.is_draining()) {
node_state_proto->set_status(rpc::autoscaler::NodeStatus::DRAINING);
} else if (node_resource_data.idle_resource_duration_ms > 0) {
} else if (node_resource_data.idle_duration_ms() > 0) {
// The node was reported idle.
node_state_proto->set_status(rpc::autoscaler::NodeStatus::IDLE);

// We approximate the idle duration by the time since the last idle report
// plus the idle duration reported by the node:
// idle_dur = <idle-dur-reported-by-raylet> + <time-since-gcs-gets-last-report>
// idle_dur = <idle-dur-reported-by-raylet> +
// <time-since-autoscaler-state-manager-gets-last-report>
//
// This is because with lightweight resource update, we don't keep reporting
// the idle time duration when there's no resource change. We also don't want to
// use raylet reported idle timestamp since there might be clock skew.
RAY_CHECK(node_resource_data.last_resource_update_time != absl::nullopt);
node_state_proto->set_idle_duration_ms(
node_resource_data.idle_resource_duration_ms +
absl::ToInt64Milliseconds(
absl::Now() - node_resource_data.last_resource_update_time.value()));
node_resource_data.idle_duration_ms() +
absl::ToInt64Milliseconds(absl::Now() - node_resource_item.first));
} else {
node_state_proto->set_status(rpc::autoscaler::NodeStatus::RUNNING);
}

// Copy resource available
const auto &available = node_resource_data.available.GetResourceMap();
const auto &available = node_resource_data.resources_available();
node_state_proto->mutable_available_resources()->insert(available.begin(),
available.end());

// Copy total resources
const auto &total = node_resource_data.total.GetResourceMap();
const auto &total = node_resource_data.resources_total();
node_state_proto->mutable_total_resources()->insert(total.begin(), total.end());

// Add dynamic PG labels.
Expand Down
32 changes: 23 additions & 9 deletions src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include "src/ray/protobuf/gcs.pb.h"

namespace ray {
class ClusterResourceManager;
namespace gcs {

class GcsResourceManager;
Expand All @@ -30,8 +29,6 @@ class GcsAutoscalerStateManager : public rpc::autoscaler::AutoscalerStateHandler
public:
GcsAutoscalerStateManager(
const std::string &session_name,
const ClusterResourceManager &cluster_resource_manager,
const GcsResourceManager &gcs_resource_manager,
const GcsNodeManager &gcs_node_manager,
const GcsPlacementGroupManager &gcs_placement_group_manager,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool);
Expand Down Expand Up @@ -59,11 +56,26 @@ class GcsAutoscalerStateManager : public rpc::autoscaler::AutoscalerStateHandler
rpc::autoscaler::DrainNodeReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

void UpdateResourceLoadAndUsage(const rpc::ResourcesData &data);

void RecordMetrics() const { throw std::runtime_error("Unimplemented"); }

std::string DebugString() const { throw std::runtime_error("Unimplemented"); }

void OnNodeAdd(const rpc::GcsNodeInfo &node);

void OnNodeDead(const NodeID &node) { node_resource_info_.erase(node); }

const absl::flat_hash_map<ray::NodeID, std::pair<absl::Time, rpc::ResourcesData>>
&GetNodeResourceInfo() const {
return node_resource_info_;
}

private:
/// \brief Get the aggregated resource load from all nodes.
std::unordered_map<google::protobuf::Map<std::string, double>, rpc::ResourceDemand>
GetAggregatedResourceLoad() const;

/// \brief Internal method for populating the rpc::ClusterResourceState
/// protobuf.
/// \param state The state to be filled.
Expand Down Expand Up @@ -125,15 +137,9 @@ class GcsAutoscalerStateManager : public rpc::autoscaler::AutoscalerStateHandler
// Ray cluster session name.
const std::string session_name_ = "";

/// Cluster resources manager that provides cluster resources information.
const ClusterResourceManager &cluster_resource_manager_;

/// Gcs node manager that provides node status information.
const GcsNodeManager &gcs_node_manager_;

/// GCS resource manager that provides resource demand/load information.
const GcsResourceManager &gcs_resource_manager_;

/// GCS placement group manager reference.
const GcsPlacementGroupManager &gcs_placement_group_manager_;

Expand Down Expand Up @@ -163,6 +169,14 @@ class GcsAutoscalerStateManager : public rpc::autoscaler::AutoscalerStateHandler
/// Cached autoscaling state.
absl::optional<rpc::autoscaler::AutoscalingState> autoscaling_state_ = absl::nullopt;

/// Resource load and usage of all nodes.
/// Note: This is similar to the data structure in `gcs_resource_manager`
/// but we update load and usage together.
///
/// The absl::Time in the pair is the last time the item was updated.
absl::flat_hash_map<ray::NodeID, std::pair<absl::Time, rpc::ResourcesData>>
node_resource_info_;

FRIEND_TEST(GcsAutoscalerStateManagerTest, TestReportAutoscalingState);
};

Expand Down
Loading

0 comments on commit 17e6cc2

Please sign in to comment.