Skip to content

Commit 38b5207

Browse files
committed
[Data] Optimize autoscaler to support configurable step size for actor pool scaling
Signed-off-by: dragongu <andrewgu@vip.qq.com>
1 parent ff4a546 commit 38b5207

File tree

3 files changed

+174
-3
lines changed

3 files changed

+174
-3
lines changed

python/ray/data/_internal/actor_autoscaler/default_actor_autoscaler.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ def __init__(
3131
self._actor_pool_scaling_down_threshold = (
3232
config.actor_pool_util_downscaling_threshold
3333
)
34+
self._actor_pool_max_upscaling_delta = config.actor_pool_max_upscaling_delta
3435

3536
self._validate_autoscaling_config()
3637

@@ -89,11 +90,23 @@ def _derive_target_scaling_config(
8990
reason="operator exceeding resource quota"
9091
)
9192
budget = self._resource_manager.get_budget(op)
92-
if _get_max_scale_up(actor_pool, budget) == 0:
93+
max_scale_up = _get_max_scale_up(actor_pool, budget)
94+
if max_scale_up == 0:
9395
return ActorPoolScalingRequest.no_op(reason="exceeded resource limits")
9496

97+
# Calculate desired delta based on utilization
98+
plan_delta = math.ceil(
99+
actor_pool.current_size() * (util / self._actor_pool_scaling_up_threshold - 1)
100+
)
101+
102+
delta = min(
103+
plan_delta,
104+
*self._get_upscale_capacities(actor_pool, max_scale_up),
105+
)
106+
delta = max(1, delta) # At least scale up by 1
107+
95108
return ActorPoolScalingRequest.upscale(
96-
delta=1,
109+
delta=delta,
97110
reason=(
98111
f"utilization of {util} >= "
99112
f"{self._actor_pool_scaling_up_threshold}"
@@ -120,10 +133,29 @@ def _derive_target_scaling_config(
120133
)
121134

122135
def _validate_autoscaling_config(self):
136+
# Validate that max upscaling delta is positive to prevent override by safeguard
137+
if self._actor_pool_max_upscaling_delta <= 0:
138+
raise ValueError(
139+
f"actor_pool_max_upscaling_delta must be positive, "
140+
f"got {self._actor_pool_max_upscaling_delta}"
141+
)
142+
123143
for op, state in self._topology.items():
124144
for actor_pool in op.get_autoscaling_actor_pools():
125145
self._validate_actor_pool_autoscaling_config(actor_pool, op)
126146

147+
def _get_upscale_capacities(
148+
self,
149+
actor_pool: "AutoscalingActorPool",
150+
max_scale_up: Optional[int],
151+
):
152+
limits = []
153+
if max_scale_up is not None:
154+
limits.append(max_scale_up)
155+
limits.append(self._actor_pool_max_upscaling_delta)
156+
limits.append(actor_pool.max_size() - actor_pool.current_size())
157+
return limits
158+
127159
def _validate_actor_pool_autoscaling_config(
128160
self,
129161
actor_pool: "AutoscalingActorPool",

python/ray/data/context.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,11 @@ class ShuffleStrategy(str, enum.Enum):
240240
0.5,
241241
)
242242

243+
DEFAULT_ACTOR_POOL_MAX_UPSCALING_DELTA: int = env_integer(
244+
"RAY_DATA_DEFAULT_ACTOR_POOL_MAX_UPSCALING_DELTA",
245+
1,
246+
)
247+
243248

244249
DEFAULT_ENABLE_DYNAMIC_OUTPUT_QUEUE_SIZE_BACKPRESSURE: bool = env_bool(
245250
"RAY_DATA_ENABLE_DYNAMIC_OUTPUT_QUEUE_SIZE_BACKPRESSURE", False
@@ -265,6 +270,9 @@ class AutoscalingConfig:
265270
between autoscaling speed and resource efficiency (i.e.,
266271
making tasks wait instead of immediately triggering execution).
267272
actor_pool_util_downscaling_threshold: Actor Pool utilization threshold for downscaling.
273+
actor_pool_max_upscaling_delta: Maximum number of actors to scale up in a single scaling decision.
274+
This limits how many actors can be added at once to prevent resource contention
275+
and scheduling pressure. Defaults to 1 for conservative scaling.
268276
"""
269277

270278
actor_pool_util_upscaling_threshold: float = (
@@ -276,6 +284,9 @@ class AutoscalingConfig:
276284
DEFAULT_ACTOR_POOL_UTIL_DOWNSCALING_THRESHOLD
277285
)
278286

287+
# Maximum number of actors to scale up in a single scaling decision
288+
actor_pool_max_upscaling_delta: int = DEFAULT_ACTOR_POOL_MAX_UPSCALING_DELTA
289+
279290

280291
def _execution_options_factory() -> "ExecutionOptions":
281292
# Lazily import to avoid circular dependencies.

python/ray/data/tests/test_autoscaler.py

Lines changed: 129 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,10 @@ def assert_autoscaling_action(
9797
assert actor_pool.get_pool_util() == 1.5
9898
assert_autoscaling_action(
9999
delta=1,
100-
expected_reason="utilization of 1.5 >= 1.0",
100+
expected_reason=(
101+
"utilization 1.50 >= threshold 1.00 "
102+
"(plan_delta=5, max_scale_up=None, max_upscaling_delta=1, final_delta=1)"
103+
),
101104
)
102105

103106
# Should be no-op since the util is below the threshold.
@@ -209,6 +212,77 @@ def assert_autoscaling_action(
209212
)
210213

211214

215+
@pytest.fixture
216+
def autoscaler_max_upscaling_delta_setup():
217+
resource_manager = MagicMock(
218+
spec=ResourceManager, get_budget=MagicMock(return_value=None)
219+
)
220+
221+
actor_pool = MagicMock(
222+
spec=_ActorPool,
223+
min_size=MagicMock(return_value=5),
224+
max_size=MagicMock(return_value=20),
225+
current_size=MagicMock(return_value=10),
226+
get_current_size=MagicMock(return_value=10),
227+
num_pending_actors=MagicMock(return_value=0),
228+
get_pool_util=MagicMock(return_value=2.0),
229+
)
230+
231+
op = MagicMock(
232+
spec=InternalQueueOperatorMixin,
233+
completed=MagicMock(return_value=False),
234+
_inputs_complete=False,
235+
)
236+
op_state = MagicMock(
237+
spec=OpState,
238+
total_enqueued_input_blocks=MagicMock(return_value=1),
239+
)
240+
op_state._scheduling_status = MagicMock(under_resource_limits=True)
241+
return resource_manager, actor_pool, op, op_state
242+
243+
def test_actor_pool_scaling_respects_small_max_upscaling_delta(
244+
autoscaler_max_upscaling_delta_setup,
245+
):
246+
resource_manager, actor_pool, op, op_state = autoscaler_max_upscaling_delta_setup
247+
autoscaler = DefaultActorAutoscaler(
248+
topology=MagicMock(),
249+
resource_manager=resource_manager,
250+
config=AutoscalingConfig(
251+
actor_pool_util_upscaling_threshold=1.0,
252+
actor_pool_util_downscaling_threshold=0.5,
253+
actor_pool_max_upscaling_delta=3,
254+
),
255+
)
256+
request = autoscaler._derive_target_scaling_config(
257+
actor_pool=actor_pool,
258+
op=op,
259+
op_state=op_state,
260+
)
261+
assert request.delta == 3
262+
assert "max_upscaling_delta=3" in request.reason
263+
264+
def test_actor_pool_scaling_respects_large_max_upscaling_delta(
265+
autoscaler_max_upscaling_delta_setup,
266+
):
267+
resource_manager, actor_pool, op, op_state = autoscaler_max_upscaling_delta_setup
268+
autoscaler = DefaultActorAutoscaler(
269+
topology=MagicMock(),
270+
resource_manager=resource_manager,
271+
config=AutoscalingConfig(
272+
actor_pool_util_upscaling_threshold=1.0,
273+
actor_pool_util_downscaling_threshold=0.5,
274+
actor_pool_max_upscaling_delta=100,
275+
),
276+
)
277+
request = autoscaler._derive_target_scaling_config(
278+
actor_pool=actor_pool,
279+
op=op,
280+
op_state=op_state,
281+
)
282+
assert request.delta == 10
283+
assert "max_upscaling_delta=10" in request.reason
284+
285+
212286
def test_cluster_scaling():
213287
"""Test `_try_scale_up_cluster` in `DefaultAutoscaler`"""
214288
op1 = MagicMock(
@@ -416,6 +490,60 @@ def __call__(self, row):
416490

417491
assert expected_message not in wanr_log_args_str
418492

493+
@pytest.fixture
494+
def autoscaler_config_mocks():
495+
resource_manager = MagicMock(spec=ResourceManager)
496+
topology = MagicMock()
497+
topology.items = MagicMock(return_value=[])
498+
return resource_manager, topology
499+
500+
def test_autoscaling_config_validation_zero_delta(autoscaler_config_mocks):
501+
resource_manager, topology = autoscaler_config_mocks
502+
503+
with pytest.raises(
504+
ValueError, match="actor_pool_max_upscaling_delta must be positive"
505+
):
506+
DefaultActorAutoscaler(
507+
topology=topology,
508+
resource_manager=resource_manager,
509+
config=AutoscalingConfig(
510+
actor_pool_util_upscaling_threshold=1.0,
511+
actor_pool_util_downscaling_threshold=0.5,
512+
actor_pool_max_upscaling_delta=0,
513+
),
514+
)
515+
516+
def test_autoscaling_config_validation_negative_delta(autoscaler_config_mocks):
517+
resource_manager, topology = autoscaler_config_mocks
518+
519+
with pytest.raises(
520+
ValueError, match="actor_pool_max_upscaling_delta must be positive"
521+
):
522+
DefaultActorAutoscaler(
523+
topology=topology,
524+
resource_manager=resource_manager,
525+
config=AutoscalingConfig(
526+
actor_pool_util_upscaling_threshold=1.0,
527+
actor_pool_util_downscaling_threshold=0.5,
528+
actor_pool_max_upscaling_delta=-1,
529+
),
530+
)
531+
532+
533+
def test_autoscaling_config_validation_positive_delta(autoscaler_config_mocks):
534+
resource_manager, topology = autoscaler_config_mocks
535+
536+
autoscaler = DefaultActorAutoscaler(
537+
topology=topology,
538+
resource_manager=resource_manager,
539+
config=AutoscalingConfig(
540+
actor_pool_util_upscaling_threshold=1.0,
541+
actor_pool_util_downscaling_threshold=0.5,
542+
actor_pool_max_upscaling_delta=5,
543+
),
544+
)
545+
assert autoscaler._actor_pool_max_upscaling_delta == 5
546+
419547

420548
if __name__ == "__main__":
421549
import sys

0 commit comments

Comments
 (0)