@@ -97,7 +97,10 @@ def assert_autoscaling_action(
9797 assert actor_pool .get_pool_util () == 1.5
9898 assert_autoscaling_action (
9999 delta = 1 ,
100- expected_reason = "utilization of 1.5 >= 1.0" ,
100+ expected_reason = (
101+ "utilization 1.50 >= threshold 1.00 "
102+ "(plan_delta=5, max_scale_up=None, max_upscaling_delta=1, final_delta=1)"
103+ ),
101104 )
102105
103106 # Should be no-op since the util is below the threshold.
@@ -209,6 +212,77 @@ def assert_autoscaling_action(
209212 )
210213
211214
215+ @pytest .fixture
216+ def autoscaler_max_upscaling_delta_setup ():
217+ resource_manager = MagicMock (
218+ spec = ResourceManager , get_budget = MagicMock (return_value = None )
219+ )
220+
221+ actor_pool = MagicMock (
222+ spec = _ActorPool ,
223+ min_size = MagicMock (return_value = 5 ),
224+ max_size = MagicMock (return_value = 20 ),
225+ current_size = MagicMock (return_value = 10 ),
226+ get_current_size = MagicMock (return_value = 10 ),
227+ num_pending_actors = MagicMock (return_value = 0 ),
228+ get_pool_util = MagicMock (return_value = 2.0 ),
229+ )
230+
231+ op = MagicMock (
232+ spec = InternalQueueOperatorMixin ,
233+ completed = MagicMock (return_value = False ),
234+ _inputs_complete = False ,
235+ )
236+ op_state = MagicMock (
237+ spec = OpState ,
238+ total_enqueued_input_blocks = MagicMock (return_value = 1 ),
239+ )
240+ op_state ._scheduling_status = MagicMock (under_resource_limits = True )
241+ return resource_manager , actor_pool , op , op_state
242+
243+ def test_actor_pool_scaling_respects_small_max_upscaling_delta (
244+ autoscaler_max_upscaling_delta_setup ,
245+ ):
246+ resource_manager , actor_pool , op , op_state = autoscaler_max_upscaling_delta_setup
247+ autoscaler = DefaultActorAutoscaler (
248+ topology = MagicMock (),
249+ resource_manager = resource_manager ,
250+ config = AutoscalingConfig (
251+ actor_pool_util_upscaling_threshold = 1.0 ,
252+ actor_pool_util_downscaling_threshold = 0.5 ,
253+ actor_pool_max_upscaling_delta = 3 ,
254+ ),
255+ )
256+ request = autoscaler ._derive_target_scaling_config (
257+ actor_pool = actor_pool ,
258+ op = op ,
259+ op_state = op_state ,
260+ )
261+ assert request .delta == 3
262+ assert "max_upscaling_delta=3" in request .reason
263+
264+ def test_actor_pool_scaling_respects_large_max_upscaling_delta (
265+ autoscaler_max_upscaling_delta_setup ,
266+ ):
267+ resource_manager , actor_pool , op , op_state = autoscaler_max_upscaling_delta_setup
268+ autoscaler = DefaultActorAutoscaler (
269+ topology = MagicMock (),
270+ resource_manager = resource_manager ,
271+ config = AutoscalingConfig (
272+ actor_pool_util_upscaling_threshold = 1.0 ,
273+ actor_pool_util_downscaling_threshold = 0.5 ,
274+ actor_pool_max_upscaling_delta = 100 ,
275+ ),
276+ )
277+ request = autoscaler ._derive_target_scaling_config (
278+ actor_pool = actor_pool ,
279+ op = op ,
280+ op_state = op_state ,
281+ )
282+ assert request .delta == 10
283+ assert "max_upscaling_delta=10" in request .reason
284+
285+
212286def test_cluster_scaling ():
213287 """Test `_try_scale_up_cluster` in `DefaultAutoscaler`"""
214288 op1 = MagicMock (
@@ -417,6 +491,62 @@ def __call__(self, row):
417491 assert expected_message not in wanr_log_args_str
418492
419493
494+ @pytest .fixture
495+ def autoscaler_config_mocks ():
496+ resource_manager = MagicMock (spec = ResourceManager )
497+ topology = MagicMock ()
498+ topology .items = MagicMock (return_value = [])
499+ return resource_manager , topology
500+
501+
502+ def test_autoscaling_config_validation_zero_delta (autoscaler_config_mocks ):
503+ resource_manager , topology = autoscaler_config_mocks
504+
505+ with pytest .raises (
506+ ValueError , match = "actor_pool_max_upscaling_delta must be positive"
507+ ):
508+ DefaultActorAutoscaler (
509+ topology = topology ,
510+ resource_manager = resource_manager ,
511+ config = AutoscalingConfig (
512+ actor_pool_util_upscaling_threshold = 1.0 ,
513+ actor_pool_util_downscaling_threshold = 0.5 ,
514+ actor_pool_max_upscaling_delta = 0 ,
515+ ),
516+ )
517+
518+ def test_autoscaling_config_validation_negative_delta (autoscaler_config_mocks ):
519+ resource_manager , topology = autoscaler_config_mocks
520+
521+ with pytest .raises (
522+ ValueError , match = "actor_pool_max_upscaling_delta must be positive"
523+ ):
524+ DefaultActorAutoscaler (
525+ topology = topology ,
526+ resource_manager = resource_manager ,
527+ config = AutoscalingConfig (
528+ actor_pool_util_upscaling_threshold = 1.0 ,
529+ actor_pool_util_downscaling_threshold = 0.5 ,
530+ actor_pool_max_upscaling_delta = - 1 ,
531+ ),
532+ )
533+
534+
535+ def test_autoscaling_config_validation_positive_delta (autoscaler_config_mocks ):
536+ resource_manager , topology = autoscaler_config_mocks
537+
538+ autoscaler = DefaultActorAutoscaler (
539+ topology = topology ,
540+ resource_manager = resource_manager ,
541+ config = AutoscalingConfig (
542+ actor_pool_util_upscaling_threshold = 1.0 ,
543+ actor_pool_util_downscaling_threshold = 0.5 ,
544+ actor_pool_max_upscaling_delta = 5 ,
545+ ),
546+ )
547+ assert autoscaler ._actor_pool_max_upscaling_delta == 5
548+
549+
420550if __name__ == "__main__" :
421551 import sys
422552
0 commit comments