1+ import math
12import time
23from contextlib import contextmanager
34from types import MethodType
4- from typing import Optional
5+ from typing import Callable , Union
56from unittest .mock import MagicMock
67
78import pytest
89
910import ray
1011from ray .data import ExecutionResources
1112from ray .data ._internal .actor_autoscaler import (
12- ActorPoolScalingRequest ,
1313 DefaultActorAutoscaler ,
1414)
1515from ray .data ._internal .cluster_autoscaler import DefaultClusterAutoscaler
2424)
2525
2626
27- def test_actor_pool_scaling ():
27+ @pytest .mark .parametrize ("max_upscaling_delta" , [1 , 2 , 5 , 10 ])
28+ def test_actor_pool_scaling (max_upscaling_delta ):
2829 """Test `_actor_pool_should_scale_up` and `_actor_pool_should_scale_down`
2930 in `DefaultAutoscaler`"""
3031
@@ -37,6 +38,7 @@ def test_actor_pool_scaling():
3738 config = AutoscalingConfig (
3839 actor_pool_util_upscaling_threshold = 1.0 ,
3940 actor_pool_util_downscaling_threshold = 0.5 ,
41+ actor_pool_max_upscaling_delta = max_upscaling_delta ,
4042 ),
4143 )
4244
@@ -82,22 +84,47 @@ def patch(mock, attr, value, is_method=True):
8284 yield
8385 setattr (mock , attr , original )
8486
87+ ExpectedReason = Union [str , Callable [[str ], bool ], None ]
88+
8589 def assert_autoscaling_action (
86- * , delta : int , expected_reason : Optional [ str ] , force : bool = False
90+ * , delta : int , expected_reason : ExpectedReason , force : bool = False
8791 ):
8892 nonlocal actor_pool , op , op_state
8993
90- assert autoscaler ._derive_target_scaling_config (
94+ request = autoscaler ._derive_target_scaling_config (
9195 actor_pool = actor_pool ,
9296 op = op ,
9397 op_state = op_state ,
94- ) == ActorPoolScalingRequest (delta = delta , force = force , reason = expected_reason )
98+ )
99+
100+ assert request .delta == delta
101+ assert request .force == force
102+
103+ if callable (expected_reason ):
104+ assert expected_reason (
105+ request .reason
106+ ), f"Unexpected reason: { request .reason } "
107+ else :
108+ assert request .reason == expected_reason
109+
110+ def calculate_plan_delta (util : float , current_size : int , threshold : float ) -> int :
111+ """Calculate plan_delta based on utilization formula."""
112+ return (
113+ math .ceil (current_size * (util / threshold - 1 ))
114+ if current_size > 0 and util > 0
115+ else 1
116+ )
95117
96118 # Should scale up since the util above the threshold.
97- assert actor_pool .get_pool_util () == 1.5
119+ util = actor_pool .get_pool_util ()
120+ assert util == 1.5
121+ threshold = autoscaler ._actor_pool_scaling_up_threshold
122+ plan_delta = calculate_plan_delta (util , actor_pool .current_size (), threshold )
98123 assert_autoscaling_action (
99- delta = 1 ,
100- expected_reason = "utilization of 1.5 >= 1.0" ,
124+ delta = min (plan_delta , max_upscaling_delta ),
125+ expected_reason = lambda reason : reason .startswith (
126+ f"utilization { util :.2f} >= threshold { threshold :.2f} "
127+ ),
101128 )
102129
103130 # Should be no-op since the util is below the threshold.
@@ -161,9 +188,16 @@ def assert_autoscaling_action(
161188
162189 # If the input queue is empty but inputs did not complete,
163190 # allow to scale up still
191+ util = actor_pool .get_pool_util ()
192+ threshold = autoscaler ._actor_pool_scaling_up_threshold
193+ plan_delta = calculate_plan_delta (
194+ util , actor_pool .current_size (), threshold
195+ )
164196 assert_autoscaling_action (
165- delta = 1 ,
166- expected_reason = "utilization of 1.5 >= 1.0" ,
197+ delta = min (plan_delta , max_upscaling_delta ),
198+ expected_reason = lambda reason : reason .startswith (
199+ f"utilization { util :.2f} >= threshold { threshold :.2f} "
200+ ),
167201 )
168202
169203 # Should be no-op since the op doesn't have enough resources.
@@ -417,6 +451,57 @@ def __call__(self, row):
417451 assert expected_message not in wanr_log_args_str
418452
419453
454+ def test_autoscaling_config_validation ():
455+ """Test that validation raises ValueError for invalid actor_pool_max_upscaling_delta."""
456+ from unittest .mock import MagicMock
457+
458+ from ray .data ._internal .execution .resource_manager import ResourceManager
459+
460+ resource_manager = MagicMock (spec = ResourceManager )
461+ topology = MagicMock ()
462+ topology .items = MagicMock (return_value = [])
463+
464+ # Test that 0 raises ValueError
465+ with pytest .raises (
466+ ValueError , match = "actor_pool_max_upscaling_delta must be positive"
467+ ):
468+ DefaultActorAutoscaler (
469+ topology = topology ,
470+ resource_manager = resource_manager ,
471+ config = AutoscalingConfig (
472+ actor_pool_util_upscaling_threshold = 1.0 ,
473+ actor_pool_util_downscaling_threshold = 0.5 ,
474+ actor_pool_max_upscaling_delta = 0 ,
475+ ),
476+ )
477+
478+ # Test that negative value raises ValueError
479+ with pytest .raises (
480+ ValueError , match = "actor_pool_max_upscaling_delta must be positive"
481+ ):
482+ DefaultActorAutoscaler (
483+ topology = topology ,
484+ resource_manager = resource_manager ,
485+ config = AutoscalingConfig (
486+ actor_pool_util_upscaling_threshold = 1.0 ,
487+ actor_pool_util_downscaling_threshold = 0.5 ,
488+ actor_pool_max_upscaling_delta = - 1 ,
489+ ),
490+ )
491+
492+ # Test that positive value works
493+ autoscaler = DefaultActorAutoscaler (
494+ topology = topology ,
495+ resource_manager = resource_manager ,
496+ config = AutoscalingConfig (
497+ actor_pool_util_upscaling_threshold = 1.0 ,
498+ actor_pool_util_downscaling_threshold = 0.5 ,
499+ actor_pool_max_upscaling_delta = 5 ,
500+ ),
501+ )
502+ assert autoscaler ._actor_pool_max_upscaling_delta == 5
503+
504+
420505if __name__ == "__main__" :
421506 import sys
422507
0 commit comments