Skip to content

Commit acb6ed2

Browse files
jinbum-kimrueian
authored andcommitted
[core][autoscaler][v1] use cluster_resource_state for node state info to fix over-provisioning (#57130)
This PR is related to [https://github.com/ray-project/ray/issues/52864](https://github.com/ray-project/ray/issues/52864) The v1 autoscaler monitor currently is pulling metrics from two different modules in GCS: - **`GcsResourceManager`** (v1, legacy): manages `node_resource_usages_` and updates it at two different intervals (`UpdateNodeResourceUsage` every 0.1s, `UpdateResourceLoads` every 1s). - **`GcsAutoscalerStateManager`** (v2): manages `node_resource_info_` and updates it via `UpdateResourceLoadAndUsage`. This module is already the source for the v2 autoscaler. | Field | Source (before, v1) | Source (after) | Change? | Notes | | -------------------------- | -------------------------- | -------------------------- | -------------------------- | -------------------------- | | current cluster resources | RaySyncer | `GcsResourceManager::UpdateNodeResourceUsage` | 100ms (`raylet_report_resources_period_milliseconds`) | [gcs_resource_manager.cc#L170](https://github.com/ray-project/ray/blob/main/src/ray/gcs/gcs_resource_manager.cc#L170) | | current pending resources | GcsServer | `GcsResourceManager::UpdateResourceLoads` | 1s (`gcs_pull_resource_loads_period_milliseconds`) | [gcs_server.cc#L422](https://github.com/ray-project/ray/blob/main/src/ray/gcs/gcs_server.cc#L422) | Because these two modules update asynchronously, the autoscaler can end up seeing inconsistent resource states. That causes a race condition where extra nodes may be launched before the updated availability actually shows up. In practice, this means clusters can become over-provisioned even though the demand was already satisfied. In the long run, the right fix is to fully switch the v1 autoscaler over to GcsAutoscalerStateManager::HandleGetClusterResourceState, just like v2 already does. But since v1 will eventually be deprecated, this PR takes a practical interim step: it merges the necessary info from both GcsResourceManager::HandleGetAllResourceUsage and GcsAutoscalerStateManager::HandleGetClusterResourceState in a hybrid approach. This keeps v1 correct without big changes, while still leaving the path open for a clean migration to v2 later on. ## Details This PR follows the fix suggested by @rueian in #52864 by switching the v1 autoscaler's node state source from `GcsResourceManager::HandleGetAllResourceUsage` to `GcsAutoscalerStateManager::HandleGetClusterResourceState`. Root Cause: The v1 autoscaler previously getting data from two asynchronous update cycles: - Node resources: updated every ~100ms via `UpdateNodeResourceUsage` - Resource demands: updated every ~1s via `UpdateResourceLoads` This created a race condition where newly allocated resources would be visible before demand metrics updated, causing the autoscaler to incorrectly perceive unmet demand and launch extra nodes. The Fix: By using v2's `HandleGetClusterResourceState` for node iteration, both current resources and pending demands now come from the same consistent snapshot (same tick), so the extra-node race condition goes away. ## Proposed Changes in update_load_metrics() This PR updates how the v1 autoscaler collects cluster metrics. Most node state information is now taken from **v2 (`GcsAutoscalerStateManager::HandleGetClusterResourceState`)**, while certain fields still rely on **v1 (`GcsResourceManager::HandleGetAllResourceUsage`)** because v2 doesn't have an equivalent yet. | Field | Source (before, v1) | Source (after) | Change? | Notes | | -------------------------- | -------------------------- | -------------------------- | -------------------------- | -------------------------- | | Node states (id, ip, resources, idle duration) | [gcs.proto#L526-L527](https://github.com/ray-project/ray/blob/3b1de771d5bb0e5289c4f13e9819bc3e8a0ad99e/src/ray/protobuf/gcs.proto#L526-L527) (`resources_batch_data.batch`) | [autoscaler.proto#L206-L212](https://github.com/ray-project/ray/blob/3b1de771d5bb0e5289c4f13e9819bc3e8a0ad99e/src/ray/protobuf/autoscaler.proto#L206-L212) (`cluster_resource_state.node_states`) | O | Now aligned with v2. Verified no regressions in tests. | | waiting_bundles / infeasible_bundles | `resource_load_by_shape` | same as before | X | v2 does not separate ready vs infeasible requests. Still needed for metrics/debugging. | | pending_placement_groups | `placement_group_load` | same as before | X | No validated equivalent in v2 yet. May migrate later. | | cluster_full | response flag (`cluster_full_of_actors_detected`) | same as before | X | No replacement in v2 fields, so kept as is. | ### Additional Notes - This hybrid approach addresses the race condition while still using legacy fields where v2 has no equivalent. - All existing autoscaler monitor tests still pass, which shows that the change is backward-compatible and does not break existing behavior. ## Changed Behavior (Observed) (Autoscaler config & serving code are same as this [https://github.com/ray-project/ray/issues/52864](https://github.com/ray-project/ray/issues/52864)) After switching to v2 autoscaler state (cluster resource), the issue no longer occurs: - Even with `gcs_pull_resource_loads_period_milliseconds=20000`, Node Provider only launches a single `ray.worker.4090.standard` node. (No extra requests for additional nodes are observed.) [debug.log](https://github.com/user-attachments/files/22659163/debug.log) ## Related issue number Closes #52864 Signed-off-by: jinbum-kim <jinbum9958@gmail.com> Co-authored-by: Rueian <rueiancsie@gmail.com>
1 parent 5ecf9a5 commit acb6ed2

File tree

2 files changed

+98
-14
lines changed

2 files changed

+98
-14
lines changed

python/ray/autoscaler/_private/monitor.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,9 @@ def get_latest_readonly_config():
243243
def update_load_metrics(self):
244244
"""Fetches resource usage data from GCS and updates load metrics."""
245245

246+
# TODO(jinbum-kim): Still needed since some fields aren't in cluster_resource_state.
247+
# Remove after v1 autoscaler fully migrates to get_cluster_resource_state().
248+
# ref: https://github.com/ray-project/ray/pull/57130
246249
response = self.gcs_client.get_all_resource_usage(timeout=60)
247250
resources_batch_data = response.resource_usage_data
248251
log_resource_batch_data_if_desired(resources_batch_data)
@@ -259,41 +262,41 @@ def update_load_metrics(self):
259262
# Tell the readonly node provider what nodes to report.
260263
if self.readonly_config:
261264
new_nodes = []
262-
for msg in list(resources_batch_data.batch):
265+
for msg in list(cluster_resource_state.node_states):
263266
node_id = msg.node_id.hex()
264-
new_nodes.append((node_id, msg.node_manager_address))
267+
new_nodes.append((node_id, msg.node_ip_address))
265268
self.autoscaler.provider._set_nodes(new_nodes)
266269

267270
mirror_node_types = {}
268-
cluster_full = False
271+
legacy_cluster_full_detected = any(
272+
getattr(entry, "cluster_full_of_actors_detected", False)
273+
for entry in resources_batch_data.batch
274+
)
275+
cluster_full = legacy_cluster_full_detected or getattr(
276+
response, "cluster_full_of_actors_detected_by_gcs", False
277+
)
269278
if (
270279
hasattr(response, "cluster_full_of_actors_detected_by_gcs")
271280
and response.cluster_full_of_actors_detected_by_gcs
272281
):
273282
# GCS has detected the cluster full of actors.
274283
cluster_full = True
275-
for resource_message in resources_batch_data.batch:
284+
for resource_message in cluster_resource_state.node_states:
276285
node_id = resource_message.node_id
277286
# Generate node type config based on GCS reported node list.
278287
if self.readonly_config:
279288
# Keep prefix in sync with ReadonlyNodeProvider.
280289
node_type = format_readonly_node_type(node_id.hex())
281290
resources = {}
282-
for k, v in resource_message.resources_total.items():
291+
for k, v in resource_message.total_resources.items():
283292
resources[k] = v
284293
mirror_node_types[node_type] = {
285294
"resources": resources,
286295
"node_config": {},
287296
"max_workers": 1,
288297
}
289-
if (
290-
hasattr(resource_message, "cluster_full_of_actors_detected")
291-
and resource_message.cluster_full_of_actors_detected
292-
):
293-
# A worker node has detected the cluster full of actors.
294-
cluster_full = True
295-
total_resources = dict(resource_message.resources_total)
296-
available_resources = dict(resource_message.resources_available)
298+
total_resources = dict(resource_message.total_resources)
299+
available_resources = dict(resource_message.available_resources)
297300

298301
waiting_bundles, infeasible_bundles = parse_resource_demands(
299302
resources_batch_data.resource_load_by_shape
@@ -319,7 +322,7 @@ def update_load_metrics(self):
319322
else:
320323
ip = node_id.hex()
321324
else:
322-
ip = resource_message.node_manager_address
325+
ip = resource_message.node_ip_address
323326

324327
idle_duration_s = 0.0
325328
if node_id in ray_nodes_idle_duration_ms_by_id:

python/ray/tests/test_monitor.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
11
import sys
2+
import types
23

34
import pytest
45

56
import ray
67
import ray._private.gcs_utils as gcs_utils
8+
from ray.autoscaler._private import (
9+
load_metrics as load_metrics_module,
10+
monitor as monitor_module,
11+
)
12+
from ray.autoscaler._private.load_metrics import LoadMetrics
713
from ray.autoscaler._private.monitor import parse_resource_demands
14+
from ray.core.generated import autoscaler_pb2, gcs_service_pb2
815

916
ray.experimental.internal_kv.redis = False
1017

@@ -51,5 +58,79 @@ def test_parse_resource_demands():
5158
assert len(waiting + infeasible) == 10
5259

5360

61+
def test_update_load_metrics_uses_cluster_state(monkeypatch):
62+
"""Ensure cluster_resource_state fields flow into LoadMetrics.
63+
64+
Verify node data comes from cluster_resource_state while demand parsing
65+
still uses resource_load_by_shape.
66+
"""
67+
68+
monitor = monitor_module.Monitor.__new__(monitor_module.Monitor)
69+
monitor.gcs_client = types.SimpleNamespace()
70+
monitor.load_metrics = LoadMetrics()
71+
monitor.autoscaler = types.SimpleNamespace(config={"provider": {}})
72+
monitor.autoscaling_config = None
73+
monitor.readonly_config = None
74+
monitor.prom_metrics = None
75+
monitor.event_summarizer = None
76+
77+
usage_reply = gcs_service_pb2.GetAllResourceUsageReply()
78+
demand = (
79+
usage_reply.resource_usage_data.resource_load_by_shape.resource_demands.add()
80+
)
81+
demand.shape["CPU"] = 1.0
82+
demand.num_ready_requests_queued = 2
83+
demand.backlog_size = 1
84+
85+
monitor.gcs_client.get_all_resource_usage = lambda timeout: usage_reply
86+
87+
cluster_state = autoscaler_pb2.ClusterResourceState()
88+
node_state = cluster_state.node_states.add()
89+
node_state.node_id = bytes.fromhex("ab" * 20)
90+
node_state.node_ip_address = "1.2.3.4"
91+
node_state.total_resources["CPU"] = 4.0
92+
node_state.available_resources["CPU"] = 1.5
93+
node_state.idle_duration_ms = 1500
94+
95+
monkeypatch.setattr(
96+
monitor_module, "get_cluster_resource_state", lambda gcs_client: cluster_state
97+
)
98+
99+
seen = {}
100+
orig_parse = monitor_module.parse_resource_demands
101+
102+
def spy_parse(arg):
103+
# Spy on the legacy parser to ensure resource_load_by_shape still feeds it.
104+
seen["arg"] = arg
105+
return orig_parse(arg)
106+
107+
monkeypatch.setattr(monitor_module, "parse_resource_demands", spy_parse)
108+
109+
fixed_time = 1000.0
110+
monkeypatch.setattr(
111+
load_metrics_module, "time", types.SimpleNamespace(time=lambda: fixed_time)
112+
)
113+
114+
monitor.update_load_metrics()
115+
116+
resources = monitor.load_metrics.static_resources_by_ip
117+
assert resources["1.2.3.4"]["CPU"] == pytest.approx(4.0)
118+
119+
usage = monitor.load_metrics.dynamic_resources_by_ip
120+
assert usage["1.2.3.4"]["CPU"] == pytest.approx(1.5)
121+
122+
assert seen["arg"] is usage_reply.resource_usage_data.resource_load_by_shape
123+
124+
assert monitor.load_metrics.pending_placement_groups == []
125+
126+
waiting = monitor.load_metrics.waiting_bundles
127+
infeasible = monitor.load_metrics.infeasible_bundles
128+
assert waiting.count({"CPU": 1.0}) == 3
129+
assert not infeasible
130+
131+
last_used = monitor.load_metrics.ray_nodes_last_used_time_by_ip["1.2.3.4"]
132+
assert last_used == pytest.approx(fixed_time - 1.5)
133+
134+
54135
if __name__ == "__main__":
55136
sys.exit(pytest.main(["-sv", __file__]))

0 commit comments

Comments
 (0)