@@ -71,6 +71,8 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService,
71
71
private static final Duration DEFAULT_MEMORY_REFRESH_RATE = Duration .ofMinutes (15 );
72
72
private static final String MEMORY_STALE = "unable to make scaling decision as job memory requirements are stale" ;
73
73
private static final long NO_SCALE_DOWN_POSSIBLE = -1L ;
74
+ // If ensureScaleDown changes the calculation by more than this much, log the error
75
+ private static final long ACCEPTABLE_DIFFERENCE = ByteSizeValue .ofMb (1 ).getBytes ();
74
76
75
77
public static final String NAME = "ml" ;
76
78
public static final Setting <Integer > NUM_ANOMALY_JOBS_IN_QUEUE = Setting .intSetting ("num_anomaly_jobs_in_queue" , 0 , 0 );
@@ -359,6 +361,7 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
359
361
360
362
final List <DiscoveryNode > nodes = getNodes (clusterState );
361
363
final NativeMemoryCapacity currentScale = currentScale (nodes );
364
+
362
365
final MlScalingReason .Builder reasonBuilder = MlScalingReason .builder ()
363
366
.setWaitingAnomalyJobs (waitingAnomalyJobs )
364
367
.setWaitingAnalyticsJobs (waitingAnalyticsJobs )
@@ -497,9 +500,21 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
497
500
.build ()));
498
501
}
499
502
500
- final Optional <AutoscalingDeciderResult > scaleDownDecision = checkForScaleDown (nodeLoads , largestJob , currentScale , reasonBuilder );
503
+ final Optional <AutoscalingDeciderResult > maybeScaleDown = checkForScaleDown (nodeLoads , largestJob , currentScale , reasonBuilder )
504
+ // Due to weird rounding errors, it may be that a scale down result COULD cause a scale up
505
+ // Ensuring the scaleDown here forces the scale down result to always be lower than the current capacity.
506
+ // This is safe as we know that ALL jobs are assigned at the current capacity
507
+ .map (result -> {
508
+ AutoscalingCapacity capacity = ensureScaleDown (result .requiredCapacity (), context .currentCapacity ());
509
+ if (capacity == null ) {
510
+ return null ;
511
+ }
512
+ return new AutoscalingDeciderResult (capacity , result .reason ());
513
+ });
514
+
515
+ if (maybeScaleDown .isPresent ()) {
516
+ final AutoscalingDeciderResult scaleDownDecisionResult = maybeScaleDown .get ();
501
517
502
- if (scaleDownDecision .isPresent ()) {
503
518
// Given maxOpenJobs, could we scale down to just one node?
504
519
// We have no way of saying "we need X nodes"
505
520
if (nodeLoads .size () > 1 ) {
@@ -516,7 +531,7 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
516
531
MAX_OPEN_JOBS_PER_NODE .getKey ());
517
532
logger .info (() -> new ParameterizedMessage ("{} Calculated potential scaled down capacity [{}] " ,
518
533
msg ,
519
- scaleDownDecision . get () .requiredCapacity ()));
534
+ scaleDownDecisionResult .requiredCapacity ()));
520
535
return new AutoscalingDeciderResult (context .currentCapacity (), reasonBuilder .setSimpleReason (msg ).build ());
521
536
}
522
537
}
@@ -528,14 +543,14 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
528
543
TimeValue downScaleDelay = DOWN_SCALE_DELAY .get (configuration );
529
544
long msLeftToScale = downScaleDelay .millis () - (now - scaleDownDetected );
530
545
if (msLeftToScale <= 0 ) {
531
- return scaleDownDecision . get () ;
546
+ return scaleDownDecisionResult ;
532
547
}
533
548
logger .debug (() -> new ParameterizedMessage (
534
549
"not scaling down as the current scale down delay [{}] is not satisfied." +
535
550
" The last time scale down was detected [{}]. Calculated scaled down capacity [{}] " ,
536
551
downScaleDelay .getStringRep (),
537
552
XContentElasticsearchExtension .DEFAULT_DATE_PRINTER .print (scaleDownDetected ),
538
- scaleDownDecision . get () .requiredCapacity ()));
553
+ scaleDownDecisionResult .requiredCapacity ()));
539
554
return new AutoscalingDeciderResult (
540
555
context .currentCapacity (),
541
556
reasonBuilder
@@ -560,6 +575,31 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
560
575
.build ()));
561
576
}
562
577
578
+ static AutoscalingCapacity ensureScaleDown (AutoscalingCapacity scaleDownResult , AutoscalingCapacity currentCapacity ) {
579
+ if (currentCapacity == null || scaleDownResult == null ) {
580
+ return null ;
581
+ }
582
+ AutoscalingCapacity newCapacity = new AutoscalingCapacity (
583
+ new AutoscalingCapacity .AutoscalingResources (
584
+ currentCapacity .total ().storage (),
585
+ ByteSizeValue .ofBytes (Math .min (scaleDownResult .total ().memory ().getBytes (), currentCapacity .total ().memory ().getBytes ()))
586
+ ),
587
+ new AutoscalingCapacity .AutoscalingResources (
588
+ currentCapacity .node ().storage (),
589
+ ByteSizeValue .ofBytes (Math .min (scaleDownResult .node ().memory ().getBytes (), currentCapacity .node ().memory ().getBytes ()))
590
+ )
591
+ );
592
+ if (scaleDownResult .node ().memory ().getBytes () - newCapacity .node ().memory ().getBytes () > ACCEPTABLE_DIFFERENCE
593
+ || scaleDownResult .total ().memory ().getBytes () - newCapacity .total ().memory ().getBytes () > ACCEPTABLE_DIFFERENCE ) {
594
+ logger .warn (
595
+ "scale down accidentally requested a scale up, auto-corrected; initial scaling [{}], corrected [{}]" ,
596
+ scaleDownResult ,
597
+ newCapacity
598
+ );
599
+ }
600
+ return newCapacity ;
601
+ }
602
+
563
603
AutoscalingDeciderResult noScaleResultOrRefresh (MlScalingReason .Builder reasonBuilder ,
564
604
boolean memoryTrackingStale ,
565
605
AutoscalingDeciderResult potentialResult ) {
@@ -816,8 +856,11 @@ Optional<AutoscalingDeciderResult> checkForScaleDown(List<NodeLoad> nodeLoads,
816
856
// Or our largest job could be on a smaller node (meaning the same size tier but smaller nodes are possible).
817
857
if (currentlyNecessaryTier < currentCapacity .getTier () || currentlyNecessaryNode < currentCapacity .getNode ()) {
818
858
NativeMemoryCapacity nativeMemoryCapacity = new NativeMemoryCapacity (
819
- currentlyNecessaryTier ,
820
- currentlyNecessaryNode ,
859
+ // Since we are in the `scaleDown` branch, we know jobs are running and we could be smaller
860
+ // If we have some weird rounding errors, it may be that the `currentlyNecessary` values are larger than
861
+ // current capacity. We never want to accidentally say "scale up" via a scale down.
862
+ Math .min (currentlyNecessaryTier , currentCapacity .getTier ()),
863
+ Math .min (currentlyNecessaryNode , currentCapacity .getNode ()),
821
864
// If our newly suggested native capacity is the same, we can use the previously stored jvm size
822
865
currentlyNecessaryNode == currentCapacity .getNode () ? currentCapacity .getJvmSize () : null );
823
866
AutoscalingCapacity requiredCapacity = nativeMemoryCapacity .autoscalingCapacity (maxMachineMemoryPercent , useAuto );
0 commit comments