Skip to content

Commit 092f38a

Browse files
committed
HBASE-23073 Add an optional costFunction to balance regions according to a capacity rule
1 parent 95c9911 commit 092f38a

File tree

5 files changed

+807
-202
lines changed

5 files changed

+807
-202
lines changed
Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.master.balancer;
19+
20+
import java.io.BufferedReader;
21+
import java.io.IOException;
22+
import java.io.InputStreamReader;
23+
import java.util.ArrayList;
24+
import java.util.HashMap;
25+
import java.util.List;
26+
import java.util.Map;
27+
import java.util.regex.Pattern;
28+
import java.util.regex.PatternSyntaxException;
29+
30+
import org.apache.hadoop.conf.Configuration;
31+
import org.apache.hadoop.fs.FileSystem;
32+
import org.apache.hadoop.fs.Path;
33+
import org.apache.hadoop.hbase.ServerName;
34+
import org.apache.yetus.audience.InterfaceAudience;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
37+
38+
/**
39+
* This is an optional Cost function designed to allow region count skew across RegionServers.
40+
* A rule file is loaded from the local FS or HDFS before balancing. It contains lines of rules.
41+
* A rule is composed of a regexp for hostname, and a limit. For example, we could have:
42+
* <p>
43+
* * rs[0-9] 200
44+
* * rs1[0-9] 50
45+
* </p>
46+
* RegionServers with hostname matching the first rules will have a limit of 200, and the others 50.
47+
* If there's no match, a default is set.
48+
* The costFunction is trying to fill all RegionServers linearly, meaning that if the global usage
49+
* is at 50%, then all RegionServers should hold half of their capacity in terms of regions.
50+
* In order to use this CostFunction, you need to set the following options:
51+
* <ul>
52+
* <li>hbase.master.balancer.stochastic.additionalCostFunctions</li>
53+
* <li>hbase.master.balancer.stochastic.heterogeneousRegionCountRulesFile</li>
54+
* <li>hbase.master.balancer.stochastic.heterogeneousRegionCountDefault</li>
55+
* </ul>
56+
*/
57+
@InterfaceAudience.Private
58+
public class HeterogeneousRegionCountCostFunction extends StochasticLoadBalancer.CostFunction {
59+
60+
/**
61+
* configuration used for the path where the rule file is stored.
62+
*/
63+
static final String HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_FILE =
64+
"hbase.master.balancer.heterogeneousRegionCountRulesFile";
65+
private static final Logger LOG = LoggerFactory.getLogger(
66+
HeterogeneousRegionCountCostFunction.class);
67+
/**
68+
* Default rule to apply when the rule file is not found. Default to 200.
69+
*/
70+
private static final String HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_DEFAULT =
71+
"hbase.master.balancer.heterogeneousRegionCountDefault";
72+
/**
73+
* Cost for the function. Default to 500, can be changed.
74+
*/
75+
private static final String REGION_COUNT_SKEW_COST_KEY =
76+
"hbase.master.balancer.stochastic.heterogeneousRegionCountCost";
77+
private static final float DEFAULT_REGION_COUNT_SKEW_COST = 5000;
78+
private final String rulesPath;
79+
80+
/**
81+
* Contains the rules, key is the regexp for ServerName, value is the limit
82+
*/
83+
private final Map<Pattern, Integer> limitPerRule;
84+
85+
/**
86+
* This is a cache, used to not go through all the limitPerRule map when searching for limit
87+
*/
88+
private final Map<ServerName, Integer> limitPerRS;
89+
private int defaultNumberOfRegions;
90+
91+
/**
92+
* Total capacity of regions for the cluster, based on the online RS and their associated rules
93+
*/
94+
private int totalCapacity = 0;
95+
96+
97+
public HeterogeneousRegionCountCostFunction(final Configuration conf) {
98+
super(conf);
99+
this.limitPerRS = new HashMap<>();
100+
this.limitPerRule = new HashMap<>();
101+
this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
102+
103+
this.rulesPath = conf.get(HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_FILE);
104+
105+
this.defaultNumberOfRegions = conf.getInt(
106+
HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_DEFAULT, 200);
107+
if (this.defaultNumberOfRegions < 0) {
108+
LOG.warn("invalid configuration '" + HBASE_MASTER_BALANCER_HETEROGENEOUS_RULES_DEFAULT
109+
+"'. Setting default to 200");
110+
this.defaultNumberOfRegions = 200;
111+
}
112+
113+
if (conf.getFloat(
114+
StochasticLoadBalancer.RegionCountSkewCostFunction
115+
.REGION_COUNT_SKEW_COST_KEY,
116+
StochasticLoadBalancer.RegionCountSkewCostFunction
117+
.DEFAULT_REGION_COUNT_SKEW_COST) > 0) {
118+
LOG.warn("regionCountCost is not set to 0, "
119+
+ " this will interfere with the HeterogeneousRegionCountCostFunction!");
120+
}
121+
}
122+
123+
/**
124+
* Called once per LB invocation to give the cost function
125+
* to initialize it's state, and perform any costly calculation.
126+
*/
127+
@Override
128+
void init(final BaseLoadBalancer.Cluster cluster) {
129+
this.cluster = cluster;
130+
this.loadRules();
131+
}
132+
133+
@Override
134+
protected double cost() {
135+
136+
double cost = 0;
137+
138+
final double targetUsage = ((double) this.cluster.numRegions / (double) this.totalCapacity);
139+
140+
for (int i = 0; i < this.cluster.numServers; i++) {
141+
142+
// retrieve capacity for each RS
143+
final ServerName sn = this.cluster.servers[i];
144+
final double limit = this.limitPerRS.getOrDefault(sn, defaultNumberOfRegions);
145+
final double nbrRegions = this.cluster.regionsPerServer[i].length;
146+
147+
final double usage = nbrRegions / limit;
148+
if (usage > targetUsage) {
149+
// cost is the number of regions above the local limit
150+
final double localCost = (nbrRegions - Math.round(limit * targetUsage)) / limit;
151+
cost += localCost;
152+
}
153+
}
154+
return cost / (double) this.cluster.numServers;
155+
156+
}
157+
158+
/**
159+
* used to load the rule files.
160+
*/
161+
void loadRules() {
162+
final List<String> lines = HeterogeneousRegionCountCostFunction.readFile(this.rulesPath);
163+
if (lines.size() == 0) {
164+
return;
165+
}
166+
167+
LOG.info("loading rules file '" + this.rulesPath + "'");
168+
this.limitPerRule.clear();
169+
for (final String line : lines) {
170+
try {
171+
if (line.length() == 0) {
172+
continue;
173+
}
174+
175+
if (line.startsWith("#")) {
176+
continue;
177+
}
178+
179+
final String[] splits = line.split(" ");
180+
if (splits.length != 2) {
181+
throw new IOException("line '" + line + "' is malformated, " +
182+
"expected [regexp] [limit]. Skipping line");
183+
}
184+
185+
final Pattern pattern = Pattern.compile(splits[0]);
186+
final Integer limit = Integer.parseInt(splits[1]);
187+
this.limitPerRule.put(pattern, limit);
188+
} catch (final IOException | NumberFormatException | PatternSyntaxException e) {
189+
LOG.error("error on line: " + e);
190+
}
191+
}
192+
193+
this.rebuildCache();
194+
}
195+
196+
/**
197+
* used to read the rule files from either HDFS or local FS
198+
*/
199+
private static List<String> readFile(final String filename) {
200+
final List<String> records = new ArrayList<String>();
201+
202+
if (null == filename) {
203+
return records;
204+
}
205+
206+
final Configuration conf = new Configuration();
207+
final Path path = new Path(filename);
208+
try {
209+
final FileSystem fs = FileSystem.get(conf);
210+
final BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(path)));
211+
212+
String line;
213+
while ((line = reader.readLine()) != null) {
214+
records.add(line);
215+
}
216+
reader.close();
217+
return records;
218+
} catch (final IOException e) {
219+
LOG.error("cannot read rules file located at ' " + filename + " ':" + e.getMessage());
220+
}
221+
return records;
222+
}
223+
224+
/**
225+
* Rebuild cache matching ServerNames and their capacity.
226+
*/
227+
private void rebuildCache() {
228+
229+
LOG.debug("Rebuilding cache of capacity for each RS");
230+
231+
this.limitPerRS.clear();
232+
this.totalCapacity = 0;
233+
234+
if (null == this.cluster) {
235+
return;
236+
}
237+
238+
for (int i = 0; i < this.cluster.numServers; i++) {
239+
final ServerName sn = this.cluster.servers[i];
240+
final int capacity = this.findLimitForRS(sn);
241+
this.totalCapacity += capacity;
242+
}
243+
244+
/**
245+
* Overal usage of the cluster
246+
*/
247+
double overallUsage = (double) this.cluster.numRegions / (double) this.totalCapacity;
248+
249+
LOG.info("Cluster can hold " + this.cluster.numRegions + "/" + this.totalCapacity + " regions ("
250+
+ Math.round(overallUsage * 100) + "%)");
251+
}
252+
253+
/**
254+
* Find the limit for a ServerName. If not found then return the default value
255+
*
256+
* @param serverName the server we are looking for
257+
* @return the limit
258+
*/
259+
protected int findLimitForRS(final ServerName serverName) {
260+
261+
boolean matched = false;
262+
int limit = -1;
263+
for (final Map.Entry<Pattern, Integer> entry : this.limitPerRule.entrySet()) {
264+
if (entry.getKey().matcher(serverName.getHostname()).matches()) {
265+
matched = true;
266+
limit = entry.getValue();
267+
break;
268+
}
269+
}
270+
271+
if (!matched) {
272+
limit = this.defaultNumberOfRegions;
273+
}
274+
275+
// Feeding cache
276+
this.limitPerRS.put(serverName, limit);
277+
278+
return limit;
279+
}
280+
281+
int getNumberOfRulesLoaded() {
282+
return this.limitPerRule.size();
283+
}
284+
}

0 commit comments

Comments
 (0)