Skip to content

Commit 80b0d68

Browse files
committed
[Data] Optimize autoscaler to support configurable step size for actor pool scaling
Signed-off-by: dragongu <andrewgu@vip.qq.com>
1 parent fa8bc74 commit 80b0d68

File tree

3 files changed

+90
-15
lines changed

3 files changed

+90
-15
lines changed

python/ray/data/_internal/actor_autoscaler/default_actor_autoscaler.py

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ def __init__(
3131
self._actor_pool_scaling_down_threshold = (
3232
config.actor_pool_util_downscaling_threshold
3333
)
34+
self._actor_pool_max_upscaling_delta = config.actor_pool_max_upscaling_delta
3435

3536
self._validate_autoscaling_config()
3637

@@ -89,14 +90,36 @@ def _derive_target_scaling_config(
8990
reason="operator exceeding resource quota"
9091
)
9192
budget = self._resource_manager.get_budget(op)
92-
if _get_max_scale_up(actor_pool, budget) == 0:
93+
max_scale_up = _get_max_scale_up(actor_pool, budget)
94+
if max_scale_up == 0:
9395
return ActorPoolScalingRequest.no_op(reason="exceeded resource limits")
9496

97+
current_size = actor_pool.current_size()
98+
# Calculate desired delta based on utilization
99+
if current_size > 0 and util > 0:
100+
plan_delta = math.ceil(
101+
current_size * (util / self._actor_pool_scaling_up_threshold - 1)
102+
)
103+
else:
104+
plan_delta = 1
105+
106+
# Apply limits: resource budget, configured max delta, and max pool size
107+
# The resource budget already provides protection against resource contention.
108+
limits = []
109+
if max_scale_up is not None:
110+
limits.append(max_scale_up)
111+
limits.append(self._actor_pool_max_upscaling_delta)
112+
limits.append(actor_pool.max_size() - current_size)
113+
114+
delta = min(plan_delta, *limits)
115+
delta = max(1, delta) # At least scale up by 1
116+
95117
return ActorPoolScalingRequest.upscale(
96-
delta=1,
118+
delta=delta,
97119
reason=(
98-
f"utilization of {util} >= "
99-
f"{self._actor_pool_scaling_up_threshold}"
120+
f"utilization {util:.2f} >= threshold {self._actor_pool_scaling_up_threshold:.2f} "
121+
f"(plan_delta={plan_delta}, max_scale_up={max_scale_up}, "
122+
f"max_upscaling_delta={self._actor_pool_max_upscaling_delta}, final_delta={delta})"
100123
),
101124
)
102125
elif util <= self._actor_pool_scaling_down_threshold:
@@ -120,6 +143,13 @@ def _derive_target_scaling_config(
120143
)
121144

122145
def _validate_autoscaling_config(self):
146+
# Validate that scaling up threshold is positive to prevent ZeroDivisionError
147+
if self._actor_pool_scaling_up_threshold <= 0:
148+
raise ValueError(
149+
f"actor_pool_util_upscaling_threshold must be positive, "
150+
f"got {self._actor_pool_scaling_up_threshold}"
151+
)
152+
123153
for op, state in self._topology.items():
124154
for actor_pool in op.get_autoscaling_actor_pools():
125155
self._validate_actor_pool_autoscaling_config(actor_pool, op)

python/ray/data/context.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,11 @@ class ShuffleStrategy(str, enum.Enum):
240240
0.5,
241241
)
242242

243+
DEFAULT_ACTOR_POOL_MAX_UPSCALING_DELTA: int = env_integer(
244+
"RAY_DATA_DEFAULT_ACTOR_POOL_MAX_UPSCALING_DELTA",
245+
1,
246+
)
247+
243248

244249
DEFAULT_ENABLE_DYNAMIC_OUTPUT_QUEUE_SIZE_BACKPRESSURE: bool = env_bool(
245250
"RAY_DATA_ENABLE_DYNAMIC_OUTPUT_QUEUE_SIZE_BACKPRESSURE", False
@@ -265,6 +270,9 @@ class AutoscalingConfig:
265270
between autoscaling speed and resource efficiency (i.e.,
266271
making tasks wait instead of immediately triggering execution).
267272
actor_pool_util_downscaling_threshold: Actor Pool utilization threshold for downscaling.
273+
actor_pool_max_upscaling_delta: Maximum number of actors to scale up in a single scaling decision.
274+
This limits how many actors can be added at once to prevent resource contention
275+
and scheduling pressure. Defaults to 1 for conservative scaling.
268276
"""
269277

270278
actor_pool_util_upscaling_threshold: float = (
@@ -276,6 +284,9 @@ class AutoscalingConfig:
276284
DEFAULT_ACTOR_POOL_UTIL_DOWNSCALING_THRESHOLD
277285
)
278286

287+
# Maximum number of actors to scale up in a single scaling decision
288+
actor_pool_max_upscaling_delta: int = DEFAULT_ACTOR_POOL_MAX_UPSCALING_DELTA
289+
279290

280291
def _execution_options_factory() -> "ExecutionOptions":
281292
# Lazily import to avoid circular dependencies.

python/ray/data/tests/test_autoscaler.py

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
1+
import math
12
import time
23
from contextlib import contextmanager
34
from types import MethodType
4-
from typing import Optional
5+
from typing import Callable, Union
56
from unittest.mock import MagicMock
67

78
import pytest
89

910
import ray
1011
from ray.data import ExecutionResources
1112
from ray.data._internal.actor_autoscaler import (
12-
ActorPoolScalingRequest,
1313
DefaultActorAutoscaler,
1414
)
1515
from ray.data._internal.cluster_autoscaler import DefaultClusterAutoscaler
@@ -24,7 +24,8 @@
2424
)
2525

2626

27-
def test_actor_pool_scaling():
27+
@pytest.mark.parametrize("max_upscaling_delta", [1, 2, 5, 10])
28+
def test_actor_pool_scaling(max_upscaling_delta):
2829
"""Test `_actor_pool_should_scale_up` and `_actor_pool_should_scale_down`
2930
in `DefaultAutoscaler`"""
3031

@@ -37,6 +38,7 @@ def test_actor_pool_scaling():
3738
config=AutoscalingConfig(
3839
actor_pool_util_upscaling_threshold=1.0,
3940
actor_pool_util_downscaling_threshold=0.5,
41+
actor_pool_max_upscaling_delta=max_upscaling_delta,
4042
),
4143
)
4244

@@ -82,22 +84,47 @@ def patch(mock, attr, value, is_method=True):
8284
yield
8385
setattr(mock, attr, original)
8486

87+
ExpectedReason = Union[str, Callable[[str], bool], None]
88+
8589
def assert_autoscaling_action(
86-
*, delta: int, expected_reason: Optional[str], force: bool = False
90+
*, delta: int, expected_reason: ExpectedReason, force: bool = False
8791
):
8892
nonlocal actor_pool, op, op_state
8993

90-
assert autoscaler._derive_target_scaling_config(
94+
request = autoscaler._derive_target_scaling_config(
9195
actor_pool=actor_pool,
9296
op=op,
9397
op_state=op_state,
94-
) == ActorPoolScalingRequest(delta=delta, force=force, reason=expected_reason)
98+
)
99+
100+
assert request.delta == delta
101+
assert request.force == force
102+
103+
if callable(expected_reason):
104+
assert expected_reason(
105+
request.reason
106+
), f"Unexpected reason: {request.reason}"
107+
else:
108+
assert request.reason == expected_reason
109+
110+
def calculate_plan_delta(util: float, current_size: int, threshold: float) -> int:
111+
"""Calculate plan_delta based on utilization formula."""
112+
return (
113+
math.ceil(current_size * (util / threshold - 1))
114+
if current_size > 0 and util > 0
115+
else 1
116+
)
95117

96118
# Should scale up since the util above the threshold.
97-
assert actor_pool.get_pool_util() == 1.5
119+
util = actor_pool.get_pool_util()
120+
assert util == 1.5
121+
threshold = autoscaler._actor_pool_scaling_up_threshold
122+
plan_delta = calculate_plan_delta(util, actor_pool.current_size(), threshold)
98123
assert_autoscaling_action(
99-
delta=1,
100-
expected_reason="utilization of 1.5 >= 1.0",
124+
delta=min(plan_delta, max_upscaling_delta),
125+
expected_reason=lambda reason: reason.startswith(
126+
f"utilization {util:.2f} >= threshold {threshold:.2f}"
127+
),
101128
)
102129

103130
# Should be no-op since the util is below the threshold.
@@ -161,9 +188,16 @@ def assert_autoscaling_action(
161188

162189
# If the input queue is empty but inputs did not complete,
163190
# allow to scale up still
191+
util = actor_pool.get_pool_util()
192+
threshold = autoscaler._actor_pool_scaling_up_threshold
193+
plan_delta = calculate_plan_delta(
194+
util, actor_pool.current_size(), threshold
195+
)
164196
assert_autoscaling_action(
165-
delta=1,
166-
expected_reason="utilization of 1.5 >= 1.0",
197+
delta=min(plan_delta, max_upscaling_delta),
198+
expected_reason=lambda reason: reason.startswith(
199+
f"utilization {util:.2f} >= threshold {threshold:.2f}"
200+
),
167201
)
168202

169203
# Should be no-op since the op doesn't have enough resources.

0 commit comments

Comments
 (0)