26
26
import java .util .HashMap ;
27
27
import java .util .List ;
28
28
import java .util .Map ;
29
+ import java .util .Set ;
30
+ import java .util .concurrent .TimeUnit ;
29
31
import org .agrona .collections .Hashing ;
30
32
import org .agrona .collections .Int2IntCounterMap ;
31
33
import org .apache .hadoop .hbase .HDFSBlocksDistribution ;
32
34
import org .apache .hadoop .hbase .ServerName ;
33
35
import org .apache .hadoop .hbase .client .RegionInfo ;
34
36
import org .apache .hadoop .hbase .client .RegionReplicaUtil ;
35
37
import org .apache .hadoop .hbase .master .RackManager ;
38
+ import org .apache .hadoop .hbase .master .RegionPlan ;
36
39
import org .apache .hadoop .hbase .net .Address ;
37
40
import org .apache .hadoop .hbase .util .Pair ;
38
41
import org .apache .yetus .audience .InterfaceAudience ;
39
42
import org .slf4j .Logger ;
40
43
import org .slf4j .LoggerFactory ;
41
44
45
+ import org .apache .hbase .thirdparty .com .google .common .base .Supplier ;
46
+ import org .apache .hbase .thirdparty .com .google .common .base .Suppliers ;
47
+ import org .apache .hbase .thirdparty .com .google .common .collect .ImmutableList ;
48
+
42
49
/**
43
50
* An efficient array based implementation similar to ClusterState for keeping the status of the
44
51
* cluster in terms of region assignment and distribution. LoadBalancers, such as
@@ -122,6 +129,14 @@ class BalancerClusterState {
122
129
// Maps regionName -> oldServerName -> cache ratio of the region on the old server
123
130
Map <String , Pair <ServerName , Float >> regionCacheRatioOnOldServerMap ;
124
131
132
+ private Supplier <List <Integer >> shuffledServerIndicesSupplier =
133
+ Suppliers .memoizeWithExpiration (() -> {
134
+ Collection <Integer > serverIndices = serversToIndex .values ();
135
+ List <Integer > shuffledServerIndices = new ArrayList <>(serverIndices );
136
+ Collections .shuffle (shuffledServerIndices );
137
+ return shuffledServerIndices ;
138
+ }, 5 , TimeUnit .SECONDS );
139
+
125
140
static class DefaultRackManager extends RackManager {
126
141
@ Override
127
142
public String getRack (ServerName server ) {
@@ -705,7 +720,41 @@ enum LocalityType {
705
720
RACK
706
721
}
707
722
708
- public void doAction (BalanceAction action ) {
723
+ public List <RegionPlan > convertActionToPlans (BalanceAction action ) {
724
+ switch (action .getType ()) {
725
+ case NULL :
726
+ break ;
727
+ case ASSIGN_REGION :
728
+ // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings
729
+ assert action instanceof AssignRegionAction : action .getClass ();
730
+ AssignRegionAction ar = (AssignRegionAction ) action ;
731
+ return ImmutableList .of (regionMoved (ar .getRegion (), -1 , ar .getServer ()));
732
+ case MOVE_REGION :
733
+ assert action instanceof MoveRegionAction : action .getClass ();
734
+ MoveRegionAction mra = (MoveRegionAction ) action ;
735
+ return ImmutableList
736
+ .of (regionMoved (mra .getRegion (), mra .getFromServer (), mra .getToServer ()));
737
+ case SWAP_REGIONS :
738
+ assert action instanceof SwapRegionsAction : action .getClass ();
739
+ SwapRegionsAction a = (SwapRegionsAction ) action ;
740
+ return ImmutableList .of (regionMoved (a .getFromRegion (), a .getFromServer (), a .getToServer ()),
741
+ regionMoved (a .getToRegion (), a .getToServer (), a .getFromServer ()));
742
+ case MOVE_BATCH :
743
+ assert action instanceof MoveBatchAction : action .getClass ();
744
+ MoveBatchAction mba = (MoveBatchAction ) action ;
745
+ List <RegionPlan > mbRegionPlans = new ArrayList <>();
746
+ for (MoveRegionAction moveRegionAction : mba .getMoveActions ()) {
747
+ mbRegionPlans .add (regionMoved (moveRegionAction .getRegion (),
748
+ moveRegionAction .getFromServer (), moveRegionAction .getToServer ()));
749
+ }
750
+ return mbRegionPlans ;
751
+ default :
752
+ throw new RuntimeException ("Unknown action:" + action .getType ());
753
+ }
754
+ return Collections .emptyList ();
755
+ }
756
+
757
+ public List <RegionPlan > doAction (BalanceAction action ) {
709
758
switch (action .getType ()) {
710
759
case NULL :
711
760
break ;
@@ -715,30 +764,47 @@ public void doAction(BalanceAction action) {
715
764
AssignRegionAction ar = (AssignRegionAction ) action ;
716
765
regionsPerServer [ar .getServer ()] =
717
766
addRegion (regionsPerServer [ar .getServer ()], ar .getRegion ());
718
- regionMoved (ar .getRegion (), -1 , ar .getServer ());
719
- break ;
767
+ return ImmutableList .of (regionMoved (ar .getRegion (), -1 , ar .getServer ()));
720
768
case MOVE_REGION :
721
769
assert action instanceof MoveRegionAction : action .getClass ();
722
770
MoveRegionAction mra = (MoveRegionAction ) action ;
723
771
regionsPerServer [mra .getFromServer ()] =
724
772
removeRegion (regionsPerServer [mra .getFromServer ()], mra .getRegion ());
725
773
regionsPerServer [mra .getToServer ()] =
726
774
addRegion (regionsPerServer [mra .getToServer ()], mra .getRegion ());
727
- regionMoved ( mra . getRegion (), mra . getFromServer (), mra . getToServer ());
728
- break ;
775
+ return ImmutableList
776
+ . of ( regionMoved ( mra . getRegion (), mra . getFromServer (), mra . getToServer ())) ;
729
777
case SWAP_REGIONS :
730
778
assert action instanceof SwapRegionsAction : action .getClass ();
731
779
SwapRegionsAction a = (SwapRegionsAction ) action ;
732
780
regionsPerServer [a .getFromServer ()] =
733
781
replaceRegion (regionsPerServer [a .getFromServer ()], a .getFromRegion (), a .getToRegion ());
734
782
regionsPerServer [a .getToServer ()] =
735
783
replaceRegion (regionsPerServer [a .getToServer ()], a .getToRegion (), a .getFromRegion ());
736
- regionMoved (a .getFromRegion (), a .getFromServer (), a .getToServer ());
737
- regionMoved (a .getToRegion (), a .getToServer (), a .getFromServer ());
738
- break ;
784
+ return ImmutableList .of (regionMoved (a .getFromRegion (), a .getFromServer (), a .getToServer ()),
785
+ regionMoved (a .getToRegion (), a .getToServer (), a .getFromServer ()));
786
+ case MOVE_BATCH :
787
+ assert action instanceof MoveBatchAction : action .getClass ();
788
+ MoveBatchAction mba = (MoveBatchAction ) action ;
789
+ List <RegionPlan > mbRegionPlans = new ArrayList <>();
790
+ for (int serverIndex : mba .getServerToRegionsToRemove ().keySet ()) {
791
+ Set <Integer > regionsToRemove = mba .getServerToRegionsToRemove ().get (serverIndex );
792
+ regionsPerServer [serverIndex ] =
793
+ removeRegions (regionsPerServer [serverIndex ], regionsToRemove );
794
+ }
795
+ for (int serverIndex : mba .getServerToRegionsToAdd ().keySet ()) {
796
+ Set <Integer > regionsToAdd = mba .getServerToRegionsToAdd ().get (serverIndex );
797
+ regionsPerServer [serverIndex ] = addRegions (regionsPerServer [serverIndex ], regionsToAdd );
798
+ }
799
+ for (MoveRegionAction moveRegionAction : mba .getMoveActions ()) {
800
+ mbRegionPlans .add (regionMoved (moveRegionAction .getRegion (),
801
+ moveRegionAction .getFromServer (), moveRegionAction .getToServer ()));
802
+ }
803
+ return mbRegionPlans ;
739
804
default :
740
- throw new RuntimeException ("Uknown action:" + action .getType ());
805
+ throw new RuntimeException ("Unknown action:" + action .getType ());
741
806
}
807
+ return Collections .emptyList ();
742
808
}
743
809
744
810
/**
@@ -822,7 +888,7 @@ void doAssignRegion(RegionInfo regionInfo, ServerName serverName) {
822
888
doAction (new AssignRegionAction (region , server ));
823
889
}
824
890
825
- void regionMoved (int region , int oldServer , int newServer ) {
891
+ RegionPlan regionMoved (int region , int oldServer , int newServer ) {
826
892
regionIndexToServerIndex [region ] = newServer ;
827
893
if (initialRegionIndexToServerIndex [region ] == newServer ) {
828
894
numMovedRegions --; // region moved back to original location
@@ -853,6 +919,11 @@ void regionMoved(int region, int oldServer, int newServer) {
853
919
updateForLocation (serverIndexToRackIndex , regionsPerRack , colocatedReplicaCountsPerRack ,
854
920
oldServer , newServer , primary , region );
855
921
}
922
+
923
+ // old server name can be null
924
+ ServerName oldServerName = oldServer == -1 ? null : servers [oldServer ];
925
+
926
+ return new RegionPlan (regions [region ], oldServerName , servers [newServer ]);
856
927
}
857
928
858
929
/**
@@ -899,6 +970,48 @@ int[] addRegion(int[] regions, int regionIndex) {
899
970
return newRegions ;
900
971
}
901
972
973
+ int [] removeRegions (int [] regions , Set <Integer > regionIndicesToRemove ) {
974
+ // Calculate the size of the new regions array
975
+ int newSize = regions .length - regionIndicesToRemove .size ();
976
+ if (newSize < 0 ) {
977
+ throw new IllegalStateException (
978
+ "Region indices mismatch: more regions to remove than in the regions array" );
979
+ }
980
+
981
+ int [] newRegions = new int [newSize ];
982
+ int newIndex = 0 ;
983
+
984
+ // Copy only the regions not in the removal set
985
+ for (int region : regions ) {
986
+ if (!regionIndicesToRemove .contains (region )) {
987
+ newRegions [newIndex ++] = region ;
988
+ }
989
+ }
990
+
991
+ // If the newIndex is smaller than newSize, some regions were missing from the input array
992
+ if (newIndex != newSize ) {
993
+ throw new IllegalStateException ("Region indices mismatch: some regions in the removal "
994
+ + "set were not found in the regions array" );
995
+ }
996
+
997
+ return newRegions ;
998
+ }
999
+
1000
+ int [] addRegions (int [] regions , Set <Integer > regionIndicesToAdd ) {
1001
+ int [] newRegions = new int [regions .length + regionIndicesToAdd .size ()];
1002
+
1003
+ // Copy the existing regions to the new array
1004
+ System .arraycopy (regions , 0 , newRegions , 0 , regions .length );
1005
+
1006
+ // Add the new regions at the end of the array
1007
+ int newIndex = regions .length ;
1008
+ for (int regionIndex : regionIndicesToAdd ) {
1009
+ newRegions [newIndex ++] = regionIndex ;
1010
+ }
1011
+
1012
+ return newRegions ;
1013
+ }
1014
+
902
1015
int [] addRegionSorted (int [] regions , int regionIndex ) {
903
1016
int [] newRegions = new int [regions .length + 1 ];
904
1017
int i = 0 ;
@@ -998,6 +1111,10 @@ void setNumMovedRegions(int numMovedRegions) {
998
1111
this .numMovedRegions = numMovedRegions ;
999
1112
}
1000
1113
1114
+ List <Integer > getShuffledServerIndices () {
1115
+ return shuffledServerIndicesSupplier .get ();
1116
+ }
1117
+
1001
1118
@ Override
1002
1119
public String toString () {
1003
1120
StringBuilder desc = new StringBuilder ("Cluster={servers=[" );
0 commit comments