17
17
*/
18
18
package org .apache .hadoop .hbase .master .balancer ;
19
19
20
+ import com .google .common .base .Suppliers ;
20
21
import com .google .errorprone .annotations .RestrictedApi ;
21
22
import java .lang .reflect .Constructor ;
22
23
import java .util .ArrayDeque ;
23
24
import java .util .ArrayList ;
24
25
import java .util .Arrays ;
26
+ import java .util .Collections ;
25
27
import java .util .Deque ;
26
28
import java .util .HashMap ;
27
29
import java .util .List ;
28
30
import java .util .Map ;
29
31
import java .util .Optional ;
30
32
import java .util .concurrent .ThreadLocalRandom ;
33
+ import java .util .concurrent .TimeUnit ;
31
34
import java .util .function .Supplier ;
32
35
import org .apache .hadoop .conf .Configuration ;
33
36
import org .apache .hadoop .hbase .ClusterMetrics ;
@@ -159,6 +162,13 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
159
162
160
163
protected Map <Class <? extends CandidateGenerator >, CandidateGenerator > candidateGenerators ;
161
164
private Map <Class <? extends CandidateGenerator >, Double > weightsOfGenerators ;
165
+ private final Supplier <List <Class <? extends CandidateGenerator >>> shuffledGeneratorClasses =
166
+ Suppliers .memoizeWithExpiration (() -> {
167
+ List <Class <? extends CandidateGenerator >> shuffled =
168
+ new ArrayList <>(candidateGenerators .keySet ());
169
+ Collections .shuffle (shuffled );
170
+ return shuffled ;
171
+ }, 5 , TimeUnit .SECONDS );
162
172
163
173
private final BalancerConditionals balancerConditionals = BalancerConditionals .INSTANCE ;
164
174
@@ -435,47 +445,50 @@ Pair<CandidateGenerator, BalanceAction> nextAction(BalancerClusterState cluster)
435
445
* all cost functions that benefit from it.
436
446
*/
437
447
protected CandidateGenerator getRandomGenerator (BalancerClusterState cluster ) {
438
- double sum = 0 ;
439
- for (Class <? extends CandidateGenerator > clazz : candidateGenerators .keySet ()) {
440
- sum += weightsOfGenerators .getOrDefault (clazz , 0.0 );
441
- }
442
- if (sum == 0 ) {
443
- return candidateGenerators .values ().stream ().findAny ().orElseThrow ();
448
+ // Prefer conditional generators if they have moves to make
449
+ if (balancerConditionals .isConditionalBalancingEnabled ()) {
450
+ for (RegionPlanConditional conditional : balancerConditionals .getConditionals ()) {
451
+ Optional <RegionPlanConditionalCandidateGenerator > generator =
452
+ conditional .getCandidateGenerator ();
453
+ if (generator .isPresent () && generator .get ().getWeight (cluster ) > 0 ) {
454
+ return generator .get ();
455
+ }
456
+ }
444
457
}
445
458
446
- for (Class <? extends CandidateGenerator > clazz : candidateGenerators .keySet ()) {
447
- weightsOfGenerators .put (clazz ,
448
- Math .min (CandidateGenerator .MAX_WEIGHT , weightsOfGenerators .get (clazz ) / sum ));
459
+ List <Class <? extends CandidateGenerator >> generatorClasses = shuffledGeneratorClasses .get ();
460
+ List <Double > partialSums = new ArrayList <>(generatorClasses .size ());
461
+ double sum = 0.0 ;
462
+ for (Class <? extends CandidateGenerator > clazz : generatorClasses ) {
463
+ double weight = weightsOfGenerators .getOrDefault (clazz , 0.0 );
464
+ sum += weight ;
465
+ partialSums .add (sum );
449
466
}
450
467
451
- for ( RegionPlanConditional conditional : balancerConditionals . getConditionals ()) {
452
- Optional < RegionPlanConditionalCandidateGenerator > generator =
453
- conditional . getCandidateGenerator ();
454
- if (generator . isPresent () && generator . get (). getWeight ( cluster ) > 0 ) {
455
- return generator . get ( );
468
+ // If the sum of all weights is zero, fall back to a default (e.g., first in the list)
469
+ if ( sum == 0.0 ) {
470
+ // If no generators at all, fail fast or throw
471
+ if (generatorClasses . isEmpty () ) {
472
+ throw new IllegalStateException ( "No candidate generators available" );
456
473
}
474
+ return candidateGenerators .get (generatorClasses .get (0 ));
475
+ }
476
+
477
+ // Normalize partial sums so that the last one should be exactly 1.0
478
+ for (int i = 0 ; i < partialSums .size (); i ++) {
479
+ partialSums .set (i , partialSums .get (i ) / sum );
457
480
}
458
481
482
+ // Generate a random number and pick the first generator whose partial sum is >= rand
459
483
double rand = ThreadLocalRandom .current ().nextDouble ();
460
- CandidateGenerator greatestWeightGenerator = null ;
461
- double greatestWeight = 0 ;
462
- for (Map .Entry <Class <? extends CandidateGenerator >,
463
- CandidateGenerator > entry : candidateGenerators .entrySet ()) {
464
- Class <? extends CandidateGenerator > clazz = entry .getKey ();
465
- double generatorWeight = weightsOfGenerators .get (clazz );
466
- if (generatorWeight > greatestWeight ) {
467
- greatestWeight = generatorWeight ;
468
- greatestWeightGenerator = entry .getValue ();
469
- }
470
- if (rand <= generatorWeight ) {
471
- return entry .getValue ();
484
+ for (int i = 0 ; i < partialSums .size (); i ++) {
485
+ if (rand <= partialSums .get (i )) {
486
+ return candidateGenerators .get (generatorClasses .get (i ));
472
487
}
473
488
}
474
489
475
- if (greatestWeightGenerator != null ) {
476
- return greatestWeightGenerator ;
477
- }
478
- return candidateGenerators .values ().stream ().findAny ().orElseThrow ();
490
+ // Fallback: if for some reason we didn't return above, return the last generator
491
+ return candidateGenerators .get (generatorClasses .get (generatorClasses .size () - 1 ));
479
492
}
480
493
481
494
@ RestrictedApi (explanation = "Should only be called in tests" , link = "" ,
@@ -565,7 +578,7 @@ protected List<RegionPlan> balanceTable(TableName tableName,
565
578
// Perform a stochastic walk to see if we can get a good fit.
566
579
long step ;
567
580
568
- boolean improvedConditionals = false ;
581
+ boolean planImprovedConditionals = false ;
569
582
Map <Class <? extends CandidateGenerator >, Long > generatorToStepCount = new HashMap <>();
570
583
Map <Class <? extends CandidateGenerator >, Long > generatorToApprovedActionCount = new HashMap <>();
571
584
for (step = 0 ; step < computedMaxSteps ; step ++) {
@@ -576,41 +589,50 @@ protected List<RegionPlan> balanceTable(TableName tableName,
576
589
if (action .getType () == BalanceAction .Type .NULL ) {
577
590
continue ;
578
591
}
592
+
579
593
generatorToStepCount .merge (generator .getClass (), action .getStepCount (), Long ::sum );
580
- step += action .getStepCount () - 1 ;
581
-
582
- // Always accept a conditional generator output. Sometimes conditional generators
583
- // may need to make controversial moves in order to break what would otherwise
584
- // be a deadlocked situation.
585
- // Otherwise, for normal moves, evaluate the action.
586
- int conditionalViolationsChange ;
587
- boolean isViolating = false ;
588
- if (RegionPlanConditionalCandidateGenerator .class .isAssignableFrom (generator .getClass ())) {
589
- conditionalViolationsChange = -1 ;
590
- } else {
591
- conditionalViolationsChange = balancerConditionals .getViolationCountChange (cluster , action );
592
- isViolating = balancerConditionals .isViolating (cluster , action );
594
+ long additionalSteps = action .getStepCount () - 1 ;
595
+ if (additionalSteps > 0 ) {
596
+ step += additionalSteps ;
597
+ }
598
+
599
+ int conditionalViolationsChange = 0 ;
600
+ boolean isViolatingConditionals = false ;
601
+ boolean moveImprovedConditionals = false ;
602
+ // Only check conditionals if they are enabled
603
+ if (balancerConditionals .isConditionalBalancingEnabled ()) {
604
+ // Always accept a conditional generator output. Sometimes conditional generators
605
+ // may need to make controversial moves in order to break what would otherwise
606
+ // be a deadlocked situation.
607
+ // Otherwise, for normal moves, evaluate the action.
608
+ if (RegionPlanConditionalCandidateGenerator .class .isAssignableFrom (generator .getClass ())) {
609
+ conditionalViolationsChange = -1 ;
610
+ } else {
611
+ conditionalViolationsChange =
612
+ balancerConditionals .getViolationCountChange (cluster , action );
613
+ isViolatingConditionals = balancerConditionals .isViolating (cluster , action );
614
+ }
615
+ moveImprovedConditionals = conditionalViolationsChange < 0 ;
616
+ if (moveImprovedConditionals ) {
617
+ planImprovedConditionals = true ;
618
+ }
593
619
}
594
620
595
621
// Change state and evaluate costs
596
622
cluster .doAction (action );
623
+ updateCostsAndWeightsWithAction (cluster , action );
597
624
newCost = computeCost (cluster , currentCost );
598
625
599
- boolean conditionalsImproved = conditionalViolationsChange < 0 ;
600
- if (conditionalsImproved ) {
601
- improvedConditionals = true ;
602
- }
603
626
boolean conditionalsSimilarCostsImproved =
604
- (newCost < currentCost && conditionalViolationsChange == 0 && !isViolating );
627
+ (newCost < currentCost && conditionalViolationsChange == 0 && !isViolatingConditionals );
605
628
// Our first priority is to reduce conditional violations
606
629
// Our second priority is to reduce balancer cost
607
630
// change, regardless of cost change
608
- if (conditionalsImproved || conditionalsSimilarCostsImproved ) {
631
+ if (moveImprovedConditionals || conditionalsSimilarCostsImproved ) {
609
632
currentCost = newCost ;
610
633
generatorToApprovedActionCount .merge (generator .getClass (), action .getStepCount (),
611
634
Long ::sum );
612
635
balancerConditionals .loadClusterState (cluster );
613
- updateCostsAndWeightsWithAction (cluster , action );
614
636
615
637
// save for JMX
616
638
curOverallCost = currentCost ;
@@ -641,7 +663,7 @@ protected List<RegionPlan> balanceTable(TableName tableName,
641
663
642
664
metricsBalancer .balanceCluster (endTime - startTime );
643
665
644
- if (improvedConditionals || initCost > currentCost ) {
666
+ if (planImprovedConditionals || initCost > currentCost ) {
645
667
updateStochasticCosts (tableName , curOverallCost , curFunctionCosts );
646
668
List <RegionPlan > plans = createRegionPlans (cluster );
647
669
LOG .info (
0 commit comments