Skip to content

Commit b8eb4dd

Browse files
committed
[Data] Optimize autoscaler to support configurable step size for actor pool scaling
Signed-off-by: dragongu <andrewgu@vip.qq.com>
1 parent ff4a546 commit b8eb4dd

File tree

3 files changed

+236
-4
lines changed

3 files changed

+236
-4
lines changed

python/ray/data/_internal/actor_autoscaler/default_actor_autoscaler.py

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ def __init__(
3131
self._actor_pool_scaling_down_threshold = (
3232
config.actor_pool_util_downscaling_threshold
3333
)
34+
self._actor_pool_max_upscaling_delta = config.actor_pool_max_upscaling_delta
3435

3536
self._validate_autoscaling_config()
3637

@@ -89,14 +90,35 @@ def _derive_target_scaling_config(
8990
reason="operator exceeding resource quota"
9091
)
9192
budget = self._resource_manager.get_budget(op)
92-
if _get_max_scale_up(actor_pool, budget) == 0:
93+
max_scale_up = _get_max_scale_up(actor_pool, budget)
94+
if max_scale_up == 0:
9395
return ActorPoolScalingRequest.no_op(reason="exceeded resource limits")
9496

97+
# Calculate desired delta based on utilization
98+
plan_delta = math.ceil(
99+
actor_pool.current_size()
100+
* (util / self._actor_pool_scaling_up_threshold - 1)
101+
)
102+
103+
upscale_capacities = self._get_upscale_capacities(actor_pool, max_scale_up)
104+
max_upscaling_delta_limit = (
105+
min(upscale_capacities) if upscale_capacities else float("inf")
106+
)
107+
108+
delta = min(
109+
plan_delta,
110+
*upscale_capacities,
111+
)
112+
delta = max(1, delta) # At least scale up by 1
113+
95114
return ActorPoolScalingRequest.upscale(
96-
delta=1,
115+
delta=delta,
97116
reason=(
98-
f"utilization of {util} >= "
99-
f"{self._actor_pool_scaling_up_threshold}"
117+
f"utilization {util:.2f} >= threshold "
118+
f"{self._actor_pool_scaling_up_threshold:.2f} "
119+
f"(plan_delta={plan_delta}, max_scale_up={max_scale_up}, "
120+
f"max_upscaling_delta={max_upscaling_delta_limit}, "
121+
f"final_delta={delta})"
100122
),
101123
)
102124
elif util <= self._actor_pool_scaling_down_threshold:
@@ -120,10 +142,36 @@ def _derive_target_scaling_config(
120142
)
121143

122144
def _validate_autoscaling_config(self):
145+
# Validate that max upscaling delta is positive to prevent override by safeguard
146+
if self._actor_pool_max_upscaling_delta <= 0:
147+
raise ValueError(
148+
f"actor_pool_max_upscaling_delta must be positive, "
149+
f"got {self._actor_pool_max_upscaling_delta}"
150+
)
151+
# Validate that upscaling threshold is positive to prevent division by zero
152+
# and incorrect scaling calculations
153+
if self._actor_pool_scaling_up_threshold <= 0:
154+
raise ValueError(
155+
f"actor_pool_util_upscaling_threshold must be positive, "
156+
f"got {self._actor_pool_scaling_up_threshold}"
157+
)
158+
123159
for op, state in self._topology.items():
124160
for actor_pool in op.get_autoscaling_actor_pools():
125161
self._validate_actor_pool_autoscaling_config(actor_pool, op)
126162

163+
def _get_upscale_capacities(
164+
self,
165+
actor_pool: "AutoscalingActorPool",
166+
max_scale_up: Optional[int],
167+
):
168+
limits = []
169+
if max_scale_up is not None:
170+
limits.append(max_scale_up)
171+
limits.append(self._actor_pool_max_upscaling_delta)
172+
limits.append(actor_pool.max_size() - actor_pool.current_size())
173+
return limits
174+
127175
def _validate_actor_pool_autoscaling_config(
128176
self,
129177
actor_pool: "AutoscalingActorPool",

python/ray/data/context.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,11 @@ class ShuffleStrategy(str, enum.Enum):
240240
0.5,
241241
)
242242

243+
DEFAULT_ACTOR_POOL_MAX_UPSCALING_DELTA: int = env_integer(
244+
"RAY_DATA_DEFAULT_ACTOR_POOL_MAX_UPSCALING_DELTA",
245+
1,
246+
)
247+
243248

244249
DEFAULT_ENABLE_DYNAMIC_OUTPUT_QUEUE_SIZE_BACKPRESSURE: bool = env_bool(
245250
"RAY_DATA_ENABLE_DYNAMIC_OUTPUT_QUEUE_SIZE_BACKPRESSURE", False
@@ -265,6 +270,9 @@ class AutoscalingConfig:
265270
between autoscaling speed and resource efficiency (i.e.,
266271
making tasks wait instead of immediately triggering execution).
267272
actor_pool_util_downscaling_threshold: Actor Pool utilization threshold for downscaling.
273+
actor_pool_max_upscaling_delta: Maximum number of actors to scale up in a single scaling decision.
274+
This limits how many actors can be added at once to prevent resource contention
275+
and scheduling pressure. Defaults to 1 for conservative scaling.
268276
"""
269277

270278
actor_pool_util_upscaling_threshold: float = (
@@ -276,6 +284,9 @@ class AutoscalingConfig:
276284
DEFAULT_ACTOR_POOL_UTIL_DOWNSCALING_THRESHOLD
277285
)
278286

287+
# Maximum number of actors to scale up in a single scaling decision
288+
actor_pool_max_upscaling_delta: int = DEFAULT_ACTOR_POOL_MAX_UPSCALING_DELTA
289+
279290

280291
def _execution_options_factory() -> "ExecutionOptions":
281292
# Lazily import to avoid circular dependencies.

python/ray/data/tests/test_autoscaler.py

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,84 @@ def assert_autoscaling_action(
209209
)
210210

211211

212+
@pytest.fixture
213+
def autoscaler_max_upscaling_delta_setup():
214+
resource_manager = MagicMock(
215+
spec=ResourceManager, get_budget=MagicMock(return_value=None)
216+
)
217+
218+
actor_pool = MagicMock(
219+
spec=_ActorPool,
220+
min_size=MagicMock(return_value=5),
221+
max_size=MagicMock(return_value=20),
222+
current_size=MagicMock(return_value=10),
223+
get_current_size=MagicMock(return_value=10),
224+
num_pending_actors=MagicMock(return_value=0),
225+
get_pool_util=MagicMock(return_value=2.0),
226+
)
227+
228+
op = MagicMock(
229+
spec=InternalQueueOperatorMixin,
230+
completed=MagicMock(return_value=False),
231+
_inputs_complete=False,
232+
)
233+
op_state = MagicMock(
234+
spec=OpState,
235+
total_enqueued_input_blocks=MagicMock(return_value=1),
236+
)
237+
op_state._scheduling_status = MagicMock(under_resource_limits=True)
238+
return resource_manager, actor_pool, op, op_state
239+
240+
241+
def test_actor_pool_scaling_respects_small_max_upscaling_delta(
242+
autoscaler_max_upscaling_delta_setup,
243+
):
244+
resource_manager, actor_pool, op, op_state = autoscaler_max_upscaling_delta_setup
245+
autoscaler = DefaultActorAutoscaler(
246+
topology=MagicMock(),
247+
resource_manager=resource_manager,
248+
config=AutoscalingConfig(
249+
actor_pool_util_upscaling_threshold=1.0,
250+
actor_pool_util_downscaling_threshold=0.5,
251+
actor_pool_max_upscaling_delta=3,
252+
),
253+
)
254+
request = autoscaler._derive_target_scaling_config(
255+
actor_pool=actor_pool,
256+
op=op,
257+
op_state=op_state,
258+
)
259+
# With current_size=10, util=2.0, threshold=1.0:
260+
# plan_delta = ceil(10 * (2.0/1.0 - 1)) = ceil(10) = 10
261+
# However, delta is limited by max_upscaling_delta=3, so delta = min(10, 3) = 3
262+
assert request.delta == 3
263+
264+
265+
def test_actor_pool_scaling_respects_large_max_upscaling_delta(
266+
autoscaler_max_upscaling_delta_setup,
267+
):
268+
resource_manager, actor_pool, op, op_state = autoscaler_max_upscaling_delta_setup
269+
autoscaler = DefaultActorAutoscaler(
270+
topology=MagicMock(),
271+
resource_manager=resource_manager,
272+
config=AutoscalingConfig(
273+
actor_pool_util_upscaling_threshold=1.0,
274+
actor_pool_util_downscaling_threshold=0.5,
275+
actor_pool_max_upscaling_delta=100,
276+
),
277+
)
278+
request = autoscaler._derive_target_scaling_config(
279+
actor_pool=actor_pool,
280+
op=op,
281+
op_state=op_state,
282+
)
283+
# With current_size=10, util=2.0, threshold=1.0:
284+
# plan_delta = ceil(10 * (2.0/1.0 - 1)) = ceil(10) = 10
285+
# max_upscaling_delta=100 is large enough, but delta is limited by max_size:
286+
# max_size(20) - current_size(10) = 10, so delta = min(10, 100, 10) = 10
287+
assert request.delta == 10
288+
289+
212290
def test_cluster_scaling():
213291
"""Test `_try_scale_up_cluster` in `DefaultAutoscaler`"""
214292
op1 = MagicMock(
@@ -417,6 +495,101 @@ def __call__(self, row):
417495
assert expected_message not in wanr_log_args_str
418496

419497

498+
@pytest.fixture
499+
def autoscaler_config_mocks():
500+
resource_manager = MagicMock(spec=ResourceManager)
501+
topology = MagicMock()
502+
topology.items = MagicMock(return_value=[])
503+
return resource_manager, topology
504+
505+
506+
def test_autoscaling_config_validation_zero_delta(autoscaler_config_mocks):
507+
resource_manager, topology = autoscaler_config_mocks
508+
509+
with pytest.raises(
510+
ValueError, match="actor_pool_max_upscaling_delta must be positive"
511+
):
512+
DefaultActorAutoscaler(
513+
topology=topology,
514+
resource_manager=resource_manager,
515+
config=AutoscalingConfig(
516+
actor_pool_util_upscaling_threshold=1.0,
517+
actor_pool_util_downscaling_threshold=0.5,
518+
actor_pool_max_upscaling_delta=0,
519+
),
520+
)
521+
522+
523+
def test_autoscaling_config_validation_negative_delta(autoscaler_config_mocks):
524+
resource_manager, topology = autoscaler_config_mocks
525+
526+
with pytest.raises(
527+
ValueError, match="actor_pool_max_upscaling_delta must be positive"
528+
):
529+
DefaultActorAutoscaler(
530+
topology=topology,
531+
resource_manager=resource_manager,
532+
config=AutoscalingConfig(
533+
actor_pool_util_upscaling_threshold=1.0,
534+
actor_pool_util_downscaling_threshold=0.5,
535+
actor_pool_max_upscaling_delta=-1,
536+
),
537+
)
538+
539+
540+
def test_autoscaling_config_validation_positive_delta(autoscaler_config_mocks):
541+
resource_manager, topology = autoscaler_config_mocks
542+
543+
autoscaler = DefaultActorAutoscaler(
544+
topology=topology,
545+
resource_manager=resource_manager,
546+
config=AutoscalingConfig(
547+
actor_pool_util_upscaling_threshold=1.0,
548+
actor_pool_util_downscaling_threshold=0.5,
549+
actor_pool_max_upscaling_delta=5,
550+
),
551+
)
552+
assert autoscaler._actor_pool_max_upscaling_delta == 5
553+
554+
555+
def test_autoscaling_config_validation_zero_upscaling_threshold(
556+
autoscaler_config_mocks,
557+
):
558+
resource_manager, topology = autoscaler_config_mocks
559+
560+
with pytest.raises(
561+
ValueError, match="actor_pool_util_upscaling_threshold must be positive"
562+
):
563+
DefaultActorAutoscaler(
564+
topology=topology,
565+
resource_manager=resource_manager,
566+
config=AutoscalingConfig(
567+
actor_pool_util_upscaling_threshold=0,
568+
actor_pool_util_downscaling_threshold=0.5,
569+
actor_pool_max_upscaling_delta=5,
570+
),
571+
)
572+
573+
574+
def test_autoscaling_config_validation_negative_upscaling_threshold(
575+
autoscaler_config_mocks,
576+
):
577+
resource_manager, topology = autoscaler_config_mocks
578+
579+
with pytest.raises(
580+
ValueError, match="actor_pool_util_upscaling_threshold must be positive"
581+
):
582+
DefaultActorAutoscaler(
583+
topology=topology,
584+
resource_manager=resource_manager,
585+
config=AutoscalingConfig(
586+
actor_pool_util_upscaling_threshold=-1.0,
587+
actor_pool_util_downscaling_threshold=0.5,
588+
actor_pool_max_upscaling_delta=5,
589+
),
590+
)
591+
592+
420593
if __name__ == "__main__":
421594
import sys
422595

0 commit comments

Comments
 (0)