Skip to content

Commit 191de57

Browse files
committed
[Data] Optimize autoscaler to support configurable step size for actor pool scaling
Signed-off-by: dragongu <andrewgu@vip.qq.com>
1 parent fa8bc74 commit 191de57

File tree

3 files changed

+175
-4
lines changed

3 files changed

+175
-4
lines changed

python/ray/data/_internal/actor_autoscaler/default_actor_autoscaler.py

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ def __init__(
3131
self._actor_pool_scaling_down_threshold = (
3232
config.actor_pool_util_downscaling_threshold
3333
)
34+
self._actor_pool_max_upscaling_delta = config.actor_pool_max_upscaling_delta
3435

3536
self._validate_autoscaling_config()
3637

@@ -89,14 +90,27 @@ def _derive_target_scaling_config(
8990
reason="operator exceeding resource quota"
9091
)
9192
budget = self._resource_manager.get_budget(op)
92-
if _get_max_scale_up(actor_pool, budget) == 0:
93+
max_scale_up = _get_max_scale_up(actor_pool, budget)
94+
if max_scale_up == 0:
9395
return ActorPoolScalingRequest.no_op(reason="exceeded resource limits")
9496

97+
# Calculate desired delta based on utilization
98+
plan_delta = math.ceil(
99+
actor_pool.current_size() * (util / self._actor_pool_scaling_up_threshold - 1)
100+
)
101+
102+
delta = min(
103+
plan_delta,
104+
*self._get_upscale_capacities(actor_pool, max_scale_up),
105+
)
106+
delta = max(1, delta) # At least scale up by 1
107+
95108
return ActorPoolScalingRequest.upscale(
96-
delta=1,
109+
delta=delta,
97110
reason=(
98-
f"utilization of {util} >= "
99-
f"{self._actor_pool_scaling_up_threshold}"
111+
f"utilization {util:.2f} >= threshold {self._actor_pool_scaling_up_threshold:.2f} "
112+
f"(plan_delta={plan_delta}, max_scale_up={max_scale_up}, "
113+
f"max_upscaling_delta={self._actor_pool_max_upscaling_delta}, final_delta={delta})"
100114
),
101115
)
102116
elif util <= self._actor_pool_scaling_down_threshold:
@@ -120,10 +134,29 @@ def _derive_target_scaling_config(
120134
)
121135

122136
def _validate_autoscaling_config(self):
137+
# Validate that max upscaling delta is positive to prevent override by safeguard
138+
if self._actor_pool_max_upscaling_delta <= 0:
139+
raise ValueError(
140+
f"actor_pool_max_upscaling_delta must be positive, "
141+
f"got {self._actor_pool_max_upscaling_delta}"
142+
)
143+
123144
for op, state in self._topology.items():
124145
for actor_pool in op.get_autoscaling_actor_pools():
125146
self._validate_actor_pool_autoscaling_config(actor_pool, op)
126147

148+
def _get_upscale_capacities(
149+
self,
150+
actor_pool: "AutoscalingActorPool",
151+
max_scale_up: Optional[int],
152+
):
153+
limits = []
154+
if max_scale_up is not None:
155+
limits.append(max_scale_up)
156+
limits.append(self._actor_pool_max_upscaling_delta)
157+
limits.append(actor_pool.max_size() - actor_pool.get_current_size())
158+
return limits
159+
127160
def _validate_actor_pool_autoscaling_config(
128161
self,
129162
actor_pool: "AutoscalingActorPool",

python/ray/data/context.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,11 @@ class ShuffleStrategy(str, enum.Enum):
240240
0.5,
241241
)
242242

243+
DEFAULT_ACTOR_POOL_MAX_UPSCALING_DELTA: int = env_integer(
244+
"RAY_DATA_DEFAULT_ACTOR_POOL_MAX_UPSCALING_DELTA",
245+
1,
246+
)
247+
243248

244249
DEFAULT_ENABLE_DYNAMIC_OUTPUT_QUEUE_SIZE_BACKPRESSURE: bool = env_bool(
245250
"RAY_DATA_ENABLE_DYNAMIC_OUTPUT_QUEUE_SIZE_BACKPRESSURE", False
@@ -265,6 +270,9 @@ class AutoscalingConfig:
265270
between autoscaling speed and resource efficiency (i.e.,
266271
making tasks wait instead of immediately triggering execution).
267272
actor_pool_util_downscaling_threshold: Actor Pool utilization threshold for downscaling.
273+
actor_pool_max_upscaling_delta: Maximum number of actors to scale up in a single scaling decision.
274+
This limits how many actors can be added at once to prevent resource contention
275+
and scheduling pressure. Defaults to 1 for conservative scaling.
268276
"""
269277

270278
actor_pool_util_upscaling_threshold: float = (
@@ -276,6 +284,9 @@ class AutoscalingConfig:
276284
DEFAULT_ACTOR_POOL_UTIL_DOWNSCALING_THRESHOLD
277285
)
278286

287+
# Maximum number of actors to scale up in a single scaling decision
288+
actor_pool_max_upscaling_delta: int = DEFAULT_ACTOR_POOL_MAX_UPSCALING_DELTA
289+
279290

280291
def _execution_options_factory() -> "ExecutionOptions":
281292
# Lazily import to avoid circular dependencies.

python/ray/data/tests/test_autoscaler.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,77 @@ def assert_autoscaling_action(
209209
)
210210

211211

212+
@pytest.fixture
213+
def autoscaler_max_upscaling_delta_setup():
214+
resource_manager = MagicMock(
215+
spec=ResourceManager, get_budget=MagicMock(return_value=None)
216+
)
217+
218+
actor_pool = MagicMock(
219+
spec=_ActorPool,
220+
min_size=MagicMock(return_value=5),
221+
max_size=MagicMock(return_value=20),
222+
current_size=MagicMock(return_value=10),
223+
get_current_size=MagicMock(return_value=10),
224+
num_pending_actors=MagicMock(return_value=0),
225+
get_pool_util=MagicMock(return_value=2.0),
226+
)
227+
228+
op = MagicMock(
229+
spec=InternalQueueOperatorMixin,
230+
completed=MagicMock(return_value=False),
231+
_inputs_complete=False,
232+
)
233+
op_state = MagicMock(
234+
spec=OpState,
235+
total_enqueued_input_blocks=MagicMock(return_value=1),
236+
)
237+
op_state._scheduling_status = MagicMock(under_resource_limits=True)
238+
return resource_manager, actor_pool, op, op_state
239+
240+
def test_actor_pool_scaling_respects_small_max_upscaling_delta(
241+
autoscaler_max_upscaling_delta_setup,
242+
):
243+
resource_manager, actor_pool, op, op_state = autoscaler_max_upscaling_delta_setup
244+
autoscaler = DefaultActorAutoscaler(
245+
topology=MagicMock(),
246+
resource_manager=resource_manager,
247+
config=AutoscalingConfig(
248+
actor_pool_util_upscaling_threshold=1.0,
249+
actor_pool_util_downscaling_threshold=0.5,
250+
actor_pool_max_upscaling_delta=3,
251+
),
252+
)
253+
request = autoscaler._derive_target_scaling_config(
254+
actor_pool=actor_pool,
255+
op=op,
256+
op_state=op_state,
257+
)
258+
assert request.delta == 3
259+
assert "max_upscaling_delta=3" in request.reason
260+
261+
def test_actor_pool_scaling_respects_large_max_upscaling_delta(
262+
autoscaler_max_upscaling_delta_setup,
263+
):
264+
resource_manager, actor_pool, op, op_state = autoscaler_max_upscaling_delta_setup
265+
autoscaler = DefaultActorAutoscaler(
266+
topology=MagicMock(),
267+
resource_manager=resource_manager,
268+
config=AutoscalingConfig(
269+
actor_pool_util_upscaling_threshold=1.0,
270+
actor_pool_util_downscaling_threshold=0.5,
271+
actor_pool_max_upscaling_delta=100,
272+
),
273+
)
274+
request = autoscaler._derive_target_scaling_config(
275+
actor_pool=actor_pool,
276+
op=op,
277+
op_state=op_state,
278+
)
279+
assert request.delta == 10
280+
assert "max_upscaling_delta=10" in request.reason
281+
282+
212283
def test_cluster_scaling():
213284
"""Test `_try_scale_up_cluster` in `DefaultAutoscaler`"""
214285
op1 = MagicMock(
@@ -417,6 +488,62 @@ def __call__(self, row):
417488
assert expected_message not in wanr_log_args_str
418489

419490

491+
@pytest.fixture
492+
def autoscaler_config_mocks():
493+
resource_manager = MagicMock(spec=ResourceManager)
494+
topology = MagicMock()
495+
topology.items = MagicMock(return_value=[])
496+
return resource_manager, topology
497+
498+
499+
def test_autoscaling_config_validation_zero_delta(autoscaler_config_mocks):
500+
resource_manager, topology = autoscaler_config_mocks
501+
502+
with pytest.raises(
503+
ValueError, match="actor_pool_max_upscaling_delta must be positive"
504+
):
505+
DefaultActorAutoscaler(
506+
topology=topology,
507+
resource_manager=resource_manager,
508+
config=AutoscalingConfig(
509+
actor_pool_util_upscaling_threshold=1.0,
510+
actor_pool_util_downscaling_threshold=0.5,
511+
actor_pool_max_upscaling_delta=0,
512+
),
513+
)
514+
515+
def test_autoscaling_config_validation_negative_delta(autoscaler_config_mocks):
516+
resource_manager, topology = autoscaler_config_mocks
517+
518+
with pytest.raises(
519+
ValueError, match="actor_pool_max_upscaling_delta must be positive"
520+
):
521+
DefaultActorAutoscaler(
522+
topology=topology,
523+
resource_manager=resource_manager,
524+
config=AutoscalingConfig(
525+
actor_pool_util_upscaling_threshold=1.0,
526+
actor_pool_util_downscaling_threshold=0.5,
527+
actor_pool_max_upscaling_delta=-1,
528+
),
529+
)
530+
531+
532+
def test_autoscaling_config_validation_positive_delta(autoscaler_config_mocks):
533+
resource_manager, topology = autoscaler_config_mocks
534+
535+
autoscaler = DefaultActorAutoscaler(
536+
topology=topology,
537+
resource_manager=resource_manager,
538+
config=AutoscalingConfig(
539+
actor_pool_util_upscaling_threshold=1.0,
540+
actor_pool_util_downscaling_threshold=0.5,
541+
actor_pool_max_upscaling_delta=5,
542+
),
543+
)
544+
assert autoscaler._actor_pool_max_upscaling_delta == 5
545+
546+
420547
if __name__ == "__main__":
421548
import sys
422549

0 commit comments

Comments
 (0)