Skip to content

Commit 8147683

Browse files
authored
[Data] Optimize autoscaler to support configurable step size for actor pool scaling (#58726)
## Summary Add support for configurable upscaling step size in the actor pool autoscaler. This enables rapid scale-up and efficient resource utilization by allowing the autoscaler to scale up multiple actors at once, instead of scaling up one actor at a time. ## Description ### Background Currently, the actor pool autoscaler scales up actors one at a time, which can be slow in certain scenarios: 1. **Slow actor startup**: When actor initialization logic is complex, actors may remain in pending state for extended periods. The autoscaler skips scaling when it encounters pending actors, preventing further scaling. 2. **Elastic cluster with unstable resources**: In environments where available resources are uncertain, users often configure large concurrency ranges (e.g., (10,1000)) for `map_batches`. In these cases, rapid startup and scaling are critical to utilize available resources efficiently. ### Solution This PR adds support for configurable upscaling step size in the actor pool autoscaler. Instead of always scaling up by 1 actor at a time, the autoscaler can now scale up multiple actors based on utilization metrics, while respecting resource constraints. ## Related issues <!-- Add related issue numbers if applicable --> Signed-off-by: dragongu <andrewgu@vip.qq.com>
1 parent 96b07e5 commit 8147683

File tree

3 files changed

+227
-2
lines changed

3 files changed

+227
-2
lines changed

python/ray/data/_internal/actor_autoscaler/default_actor_autoscaler.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ def __init__(
3131
self._actor_pool_scaling_down_threshold = (
3232
config.actor_pool_util_downscaling_threshold
3333
)
34+
self._actor_pool_max_upscaling_delta = config.actor_pool_max_upscaling_delta
3435

3536
self._validate_autoscaling_config()
3637

@@ -89,11 +90,25 @@ def _derive_target_scaling_config(
8990
reason="operator exceeding resource quota"
9091
)
9192
budget = self._resource_manager.get_budget(op)
92-
if _get_max_scale_up(actor_pool, budget) == 0:
93+
max_scale_up = _get_max_scale_up(actor_pool, budget)
94+
if max_scale_up == 0:
9395
return ActorPoolScalingRequest.no_op(reason="exceeded resource limits")
9496

97+
# Calculate desired delta based on utilization
98+
plan_delta = math.ceil(
99+
actor_pool.current_size()
100+
* (util / self._actor_pool_scaling_up_threshold - 1)
101+
)
102+
103+
upscale_capacities = self._get_upscale_capacities(actor_pool, max_scale_up)
104+
delta = min(
105+
plan_delta,
106+
*upscale_capacities,
107+
)
108+
delta = max(1, delta) # At least scale up by 1
109+
95110
return ActorPoolScalingRequest.upscale(
96-
delta=1,
111+
delta=delta,
97112
reason=(
98113
f"utilization of {util} >= "
99114
f"{self._actor_pool_scaling_up_threshold}"
@@ -120,10 +135,36 @@ def _derive_target_scaling_config(
120135
)
121136

122137
def _validate_autoscaling_config(self):
138+
# Validate that max upscaling delta is positive to prevent override by safeguard
139+
if self._actor_pool_max_upscaling_delta <= 0:
140+
raise ValueError(
141+
f"actor_pool_max_upscaling_delta must be positive, "
142+
f"got {self._actor_pool_max_upscaling_delta}"
143+
)
144+
# Validate that upscaling threshold is positive to prevent division by zero
145+
# and incorrect scaling calculations
146+
if self._actor_pool_scaling_up_threshold <= 0:
147+
raise ValueError(
148+
f"actor_pool_util_upscaling_threshold must be positive, "
149+
f"got {self._actor_pool_scaling_up_threshold}"
150+
)
151+
123152
for op, state in self._topology.items():
124153
for actor_pool in op.get_autoscaling_actor_pools():
125154
self._validate_actor_pool_autoscaling_config(actor_pool, op)
126155

156+
def _get_upscale_capacities(
157+
self,
158+
actor_pool: "AutoscalingActorPool",
159+
max_scale_up: Optional[int],
160+
):
161+
limits = []
162+
if max_scale_up is not None:
163+
limits.append(max_scale_up)
164+
limits.append(self._actor_pool_max_upscaling_delta)
165+
limits.append(actor_pool.max_size() - actor_pool.current_size())
166+
return limits
167+
127168
def _validate_actor_pool_autoscaling_config(
128169
self,
129170
actor_pool: "AutoscalingActorPool",

python/ray/data/context.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,11 @@ class ShuffleStrategy(str, enum.Enum):
240240
0.5,
241241
)
242242

243+
DEFAULT_ACTOR_POOL_MAX_UPSCALING_DELTA: int = env_integer(
244+
"RAY_DATA_DEFAULT_ACTOR_POOL_MAX_UPSCALING_DELTA",
245+
1,
246+
)
247+
243248

244249
DEFAULT_ENABLE_DYNAMIC_OUTPUT_QUEUE_SIZE_BACKPRESSURE: bool = env_bool(
245250
"RAY_DATA_ENABLE_DYNAMIC_OUTPUT_QUEUE_SIZE_BACKPRESSURE", False
@@ -265,6 +270,9 @@ class AutoscalingConfig:
265270
between autoscaling speed and resource efficiency (i.e.,
266271
making tasks wait instead of immediately triggering execution).
267272
actor_pool_util_downscaling_threshold: Actor Pool utilization threshold for downscaling.
273+
actor_pool_max_upscaling_delta: Maximum number of actors to scale up in a single scaling decision.
274+
This limits how many actors can be added at once to prevent resource contention
275+
and scheduling pressure. Defaults to 1 for conservative scaling.
268276
"""
269277

270278
actor_pool_util_upscaling_threshold: float = (
@@ -276,6 +284,9 @@ class AutoscalingConfig:
276284
DEFAULT_ACTOR_POOL_UTIL_DOWNSCALING_THRESHOLD
277285
)
278286

287+
# Maximum number of actors to scale up in a single scaling decision
288+
actor_pool_max_upscaling_delta: int = DEFAULT_ACTOR_POOL_MAX_UPSCALING_DELTA
289+
279290

280291
def _execution_options_factory() -> "ExecutionOptions":
281292
# Lazily import to avoid circular dependencies.

python/ray/data/tests/test_autoscaler.py

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,84 @@ def assert_autoscaling_action(
209209
)
210210

211211

212+
@pytest.fixture
213+
def autoscaler_max_upscaling_delta_setup():
214+
resource_manager = MagicMock(
215+
spec=ResourceManager, get_budget=MagicMock(return_value=None)
216+
)
217+
218+
actor_pool = MagicMock(
219+
spec=_ActorPool,
220+
min_size=MagicMock(return_value=5),
221+
max_size=MagicMock(return_value=20),
222+
current_size=MagicMock(return_value=10),
223+
get_current_size=MagicMock(return_value=10),
224+
num_pending_actors=MagicMock(return_value=0),
225+
get_pool_util=MagicMock(return_value=2.0),
226+
)
227+
228+
op = MagicMock(
229+
spec=InternalQueueOperatorMixin,
230+
completed=MagicMock(return_value=False),
231+
_inputs_complete=False,
232+
)
233+
op_state = MagicMock(
234+
spec=OpState,
235+
total_enqueued_input_blocks=MagicMock(return_value=1),
236+
)
237+
op_state._scheduling_status = MagicMock(under_resource_limits=True)
238+
return resource_manager, actor_pool, op, op_state
239+
240+
241+
def test_actor_pool_scaling_respects_small_max_upscaling_delta(
242+
autoscaler_max_upscaling_delta_setup,
243+
):
244+
resource_manager, actor_pool, op, op_state = autoscaler_max_upscaling_delta_setup
245+
autoscaler = DefaultActorAutoscaler(
246+
topology=MagicMock(),
247+
resource_manager=resource_manager,
248+
config=AutoscalingConfig(
249+
actor_pool_util_upscaling_threshold=1.0,
250+
actor_pool_util_downscaling_threshold=0.5,
251+
actor_pool_max_upscaling_delta=3,
252+
),
253+
)
254+
request = autoscaler._derive_target_scaling_config(
255+
actor_pool=actor_pool,
256+
op=op,
257+
op_state=op_state,
258+
)
259+
# With current_size=10, util=2.0, threshold=1.0:
260+
# plan_delta = ceil(10 * (2.0/1.0 - 1)) = ceil(10) = 10
261+
# However, delta is limited by max_upscaling_delta=3, so delta = min(10, 3) = 3
262+
assert request.delta == 3
263+
264+
265+
def test_actor_pool_scaling_respects_large_max_upscaling_delta(
266+
autoscaler_max_upscaling_delta_setup,
267+
):
268+
resource_manager, actor_pool, op, op_state = autoscaler_max_upscaling_delta_setup
269+
autoscaler = DefaultActorAutoscaler(
270+
topology=MagicMock(),
271+
resource_manager=resource_manager,
272+
config=AutoscalingConfig(
273+
actor_pool_util_upscaling_threshold=1.0,
274+
actor_pool_util_downscaling_threshold=0.5,
275+
actor_pool_max_upscaling_delta=100,
276+
),
277+
)
278+
request = autoscaler._derive_target_scaling_config(
279+
actor_pool=actor_pool,
280+
op=op,
281+
op_state=op_state,
282+
)
283+
# With current_size=10, util=2.0, threshold=1.0:
284+
# plan_delta = ceil(10 * (2.0/1.0 - 1)) = ceil(10) = 10
285+
# max_upscaling_delta=100 is large enough, but delta is limited by max_size:
286+
# max_size(20) - current_size(10) = 10, so delta = min(10, 100, 10) = 10
287+
assert request.delta == 10
288+
289+
212290
def test_cluster_scaling():
213291
"""Test `_try_scale_up_cluster` in `DefaultAutoscaler`"""
214292
op1 = MagicMock(
@@ -417,6 +495,101 @@ def __call__(self, row):
417495
assert expected_message not in wanr_log_args_str
418496

419497

498+
@pytest.fixture
499+
def autoscaler_config_mocks():
500+
resource_manager = MagicMock(spec=ResourceManager)
501+
topology = MagicMock()
502+
topology.items = MagicMock(return_value=[])
503+
return resource_manager, topology
504+
505+
506+
def test_autoscaling_config_validation_zero_delta(autoscaler_config_mocks):
507+
resource_manager, topology = autoscaler_config_mocks
508+
509+
with pytest.raises(
510+
ValueError, match="actor_pool_max_upscaling_delta must be positive"
511+
):
512+
DefaultActorAutoscaler(
513+
topology=topology,
514+
resource_manager=resource_manager,
515+
config=AutoscalingConfig(
516+
actor_pool_util_upscaling_threshold=1.0,
517+
actor_pool_util_downscaling_threshold=0.5,
518+
actor_pool_max_upscaling_delta=0,
519+
),
520+
)
521+
522+
523+
def test_autoscaling_config_validation_negative_delta(autoscaler_config_mocks):
524+
resource_manager, topology = autoscaler_config_mocks
525+
526+
with pytest.raises(
527+
ValueError, match="actor_pool_max_upscaling_delta must be positive"
528+
):
529+
DefaultActorAutoscaler(
530+
topology=topology,
531+
resource_manager=resource_manager,
532+
config=AutoscalingConfig(
533+
actor_pool_util_upscaling_threshold=1.0,
534+
actor_pool_util_downscaling_threshold=0.5,
535+
actor_pool_max_upscaling_delta=-1,
536+
),
537+
)
538+
539+
540+
def test_autoscaling_config_validation_positive_delta(autoscaler_config_mocks):
541+
resource_manager, topology = autoscaler_config_mocks
542+
543+
autoscaler = DefaultActorAutoscaler(
544+
topology=topology,
545+
resource_manager=resource_manager,
546+
config=AutoscalingConfig(
547+
actor_pool_util_upscaling_threshold=1.0,
548+
actor_pool_util_downscaling_threshold=0.5,
549+
actor_pool_max_upscaling_delta=5,
550+
),
551+
)
552+
assert autoscaler._actor_pool_max_upscaling_delta == 5
553+
554+
555+
def test_autoscaling_config_validation_zero_upscaling_threshold(
556+
autoscaler_config_mocks,
557+
):
558+
resource_manager, topology = autoscaler_config_mocks
559+
560+
with pytest.raises(
561+
ValueError, match="actor_pool_util_upscaling_threshold must be positive"
562+
):
563+
DefaultActorAutoscaler(
564+
topology=topology,
565+
resource_manager=resource_manager,
566+
config=AutoscalingConfig(
567+
actor_pool_util_upscaling_threshold=0,
568+
actor_pool_util_downscaling_threshold=0.5,
569+
actor_pool_max_upscaling_delta=5,
570+
),
571+
)
572+
573+
574+
def test_autoscaling_config_validation_negative_upscaling_threshold(
575+
autoscaler_config_mocks,
576+
):
577+
resource_manager, topology = autoscaler_config_mocks
578+
579+
with pytest.raises(
580+
ValueError, match="actor_pool_util_upscaling_threshold must be positive"
581+
):
582+
DefaultActorAutoscaler(
583+
topology=topology,
584+
resource_manager=resource_manager,
585+
config=AutoscalingConfig(
586+
actor_pool_util_upscaling_threshold=-1.0,
587+
actor_pool_util_downscaling_threshold=0.5,
588+
actor_pool_max_upscaling_delta=5,
589+
),
590+
)
591+
592+
420593
if __name__ == "__main__":
421594
import sys
422595

0 commit comments

Comments
 (0)