Skip to content

Commit 2c7776e

Browse files
charlesconnellbbeaudreault
authored andcommitted
HBASE-27496 Optionally limit the amount of plans executed in the Normalizer (#4888)
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
1 parent 060ca84 commit 2c7776e

File tree

7 files changed

+145
-1
lines changed

7 files changed

+145
-1
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/MergeNormalizationPlan.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,15 @@ public List<NormalizationTarget> getNormalizationTargets() {
5858
return normalizationTargets;
5959
}
6060

61+
@Override
62+
public long getPlanSizeMb() {
63+
long total = 0;
64+
for (NormalizationTarget target : normalizationTargets) {
65+
total += target.getRegionSizeMb();
66+
}
67+
return total;
68+
}
69+
6170
@Override
6271
public String toString() {
6372
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)

hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/NormalizationPlan.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,6 @@ enum PlanType {
3434

3535
/** Returns the type of this plan */
3636
PlanType getType();
37+
38+
long getPlanSizeMb();
3739
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919

2020
import java.io.IOException;
2121
import java.time.Duration;
22+
import java.util.ArrayList;
2223
import java.util.Collections;
2324
import java.util.List;
25+
import java.util.concurrent.atomic.AtomicLong;
2426
import org.apache.hadoop.conf.Configuration;
2527
import org.apache.hadoop.hbase.HConstants;
2628
import org.apache.hadoop.hbase.TableName;
@@ -53,6 +55,9 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
5355
"hbase.normalizer.throughput.max_bytes_per_sec";
5456
private static final long RATE_UNLIMITED_BYTES = 1_000_000_000_000L; // 1TB/sec
5557

58+
static final String CUMULATIVE_SIZE_LIMIT_MB_KEY = "hbase.normalizer.plans_size_limit.mb";
59+
static final long DEFAULT_CUMULATIVE_SIZE_LIMIT_MB = Long.MAX_VALUE;
60+
5661
private final MasterServices masterServices;
5762
private final RegionNormalizer regionNormalizer;
5863
private final RegionNormalizerWorkQueue<TableName> workQueue;
@@ -62,6 +67,7 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
6267
private final boolean defaultNormalizerTableLevel;
6368
private long splitPlanCount;
6469
private long mergePlanCount;
70+
private final AtomicLong cumulativePlansSizeLimitMb;
6571

6672
RegionNormalizerWorker(final Configuration configuration, final MasterServices masterServices,
6773
final RegionNormalizer regionNormalizer, final RegionNormalizerWorkQueue<TableName> workQueue) {
@@ -73,6 +79,8 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
7379
this.mergePlanCount = 0;
7480
this.rateLimiter = loadRateLimiter(configuration);
7581
this.defaultNormalizerTableLevel = extractDefaultNormalizerValue(configuration);
82+
this.cumulativePlansSizeLimitMb = new AtomicLong(
83+
configuration.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB));
7684
}
7785

7886
private boolean extractDefaultNormalizerValue(final Configuration configuration) {
@@ -96,9 +104,20 @@ public void deregisterChildren(ConfigurationManager manager) {
96104
}
97105
}
98106

107+
private static long logLongConfigurationUpdated(final String key, final long oldValue,
108+
final long newValue) {
109+
if (oldValue != newValue) {
110+
LOG.info("Updated configuration for key '{}' from {} to {}", key, oldValue, newValue);
111+
}
112+
return newValue;
113+
}
114+
99115
@Override
100116
public void onConfigurationChange(Configuration conf) {
101117
rateLimiter.setRate(loadRateLimit(conf));
118+
cumulativePlansSizeLimitMb.set(
119+
logLongConfigurationUpdated(CUMULATIVE_SIZE_LIMIT_MB_KEY, cumulativePlansSizeLimitMb.get(),
120+
conf.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB)));
102121
}
103122

104123
private static RateLimiter loadRateLimiter(final Configuration configuration) {
@@ -207,14 +226,44 @@ private List<NormalizationPlan> calculatePlans(final TableName tableName) {
207226
return Collections.emptyList();
208227
}
209228

210-
final List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tblDesc);
229+
List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tblDesc);
230+
231+
plans = truncateForSize(plans);
232+
211233
if (CollectionUtils.isEmpty(plans)) {
212234
LOG.debug("No normalization required for table {}.", tableName);
213235
return Collections.emptyList();
214236
}
215237
return plans;
216238
}
217239

240+
private List<NormalizationPlan> truncateForSize(List<NormalizationPlan> plans) {
241+
if (cumulativePlansSizeLimitMb.get() != DEFAULT_CUMULATIVE_SIZE_LIMIT_MB) {
242+
List<NormalizationPlan> maybeTruncatedPlans = new ArrayList<>(plans.size());
243+
long totalCumulativeSizeMb = 0;
244+
long truncatedCumulativeSizeMb = 0;
245+
for (NormalizationPlan plan : plans) {
246+
totalCumulativeSizeMb += plan.getPlanSizeMb();
247+
if (totalCumulativeSizeMb <= cumulativePlansSizeLimitMb.get()) {
248+
truncatedCumulativeSizeMb += plan.getPlanSizeMb();
249+
maybeTruncatedPlans.add(plan);
250+
}
251+
}
252+
if (maybeTruncatedPlans.size() != plans.size()) {
253+
LOG.debug(
254+
"Truncating list of normalization plans that RegionNormalizerWorker will process "
255+
+ "because of {}. Original list had {} plan(s), new list has {} plan(s). "
256+
+ "Original list covered regions with cumulative size {} mb, "
257+
+ "new list covers regions with cumulative size {} mb.",
258+
CUMULATIVE_SIZE_LIMIT_MB_KEY, plans.size(), maybeTruncatedPlans.size(),
259+
totalCumulativeSizeMb, truncatedCumulativeSizeMb);
260+
}
261+
return maybeTruncatedPlans;
262+
} else {
263+
return plans;
264+
}
265+
}
266+
218267
private void submitPlans(final List<NormalizationPlan> plans) {
219268
// as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to submit
220269
// task, so there's no artificial rate-limiting of merge/split requests due to this serial loop.

hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.apache.hadoop.hbase.master.normalizer;
1919

20+
import static org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker.CUMULATIVE_SIZE_LIMIT_MB_KEY;
21+
import static org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker.DEFAULT_CUMULATIVE_SIZE_LIMIT_MB;
2022
import static org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils.isEmpty;
2123

2224
import java.time.Instant;
@@ -229,6 +231,14 @@ public List<NormalizationPlan> computePlansForTable(final TableDescriptor tableD
229231
plans.addAll(mergePlans);
230232
}
231233

234+
if (
235+
normalizerConfiguration.getCumulativePlansSizeLimitMb() != DEFAULT_CUMULATIVE_SIZE_LIMIT_MB
236+
) {
237+
// If we are going to truncate our list of plans, shuffle the split and merge plans together
238+
// so that the merge plans, which are listed last, are not starved out.
239+
shuffleNormalizationPlans(plans);
240+
}
241+
232242
LOG.debug("Computed normalization plans for table {}. Total plans: {}, split plans: {}, "
233243
+ "merge plans: {}", table, plans.size(), splitPlansCount, mergePlansCount);
234244
return plans;
@@ -464,6 +474,14 @@ private boolean isLargeEnoughForMerge(final NormalizerConfiguration normalizerCo
464474
return getRegionSizeMB(regionInfo) >= normalizerConfiguration.getMergeMinRegionSizeMb(ctx);
465475
}
466476

477+
/**
478+
* This very simple method exists so we can verify it was called in a unit test. Visible for
479+
* testing.
480+
*/
481+
void shuffleNormalizationPlans(List<NormalizationPlan> plans) {
482+
Collections.shuffle(plans);
483+
}
484+
467485
private static boolean logTraceReason(final BooleanSupplier predicate, final String fmtWhenTrue,
468486
final Object... args) {
469487
final boolean value = predicate.getAsBoolean();
@@ -484,6 +502,7 @@ private static final class NormalizerConfiguration {
484502
private final int mergeMinRegionCount;
485503
private final Period mergeMinRegionAge;
486504
private final long mergeMinRegionSizeMb;
505+
private final long cumulativePlansSizeLimitMb;
487506

488507
private NormalizerConfiguration() {
489508
conf = null;
@@ -492,6 +511,7 @@ private NormalizerConfiguration() {
492511
mergeMinRegionCount = DEFAULT_MERGE_MIN_REGION_COUNT;
493512
mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
494513
mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
514+
cumulativePlansSizeLimitMb = DEFAULT_CUMULATIVE_SIZE_LIMIT_MB;
495515
}
496516

497517
private NormalizerConfiguration(final Configuration conf,
@@ -502,6 +522,8 @@ private NormalizerConfiguration(final Configuration conf,
502522
mergeMinRegionCount = parseMergeMinRegionCount(conf);
503523
mergeMinRegionAge = parseMergeMinRegionAge(conf);
504524
mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
525+
cumulativePlansSizeLimitMb =
526+
conf.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB);
505527
logConfigurationUpdated(SPLIT_ENABLED_KEY, currentConfiguration.isSplitEnabled(),
506528
splitEnabled);
507529
logConfigurationUpdated(MERGE_ENABLED_KEY, currentConfiguration.isMergeEnabled(),
@@ -574,6 +596,10 @@ public long getMergeMinRegionSizeMb(NormalizeContext context) {
574596
}
575597
return mergeMinRegionSizeMb;
576598
}
599+
600+
private long getCumulativePlansSizeLimitMb() {
601+
return cumulativePlansSizeLimitMb;
602+
}
577603
}
578604

579605
/**

hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SplitNormalizationPlan.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ public NormalizationTarget getSplitTarget() {
4545
return splitTarget;
4646
}
4747

48+
@Override
49+
public long getPlanSizeMb() {
50+
return splitTarget.getRegionSizeMb();
51+
}
52+
4853
@Override
4954
public String toString() {
5055
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)

hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,36 @@ public void testRateLimit() throws Exception {
204204
Duration.ofNanos(endTime - startTime), greaterThanOrEqualTo(Duration.ofSeconds(5)));
205205
}
206206

207+
@Test
208+
public void testPlansSizeLimit() throws Exception {
209+
final TableName tn = tableName.getTableName();
210+
final TableDescriptor tnDescriptor =
211+
TableDescriptorBuilder.newBuilder(tn).setNormalizationEnabled(true).build();
212+
final RegionInfo splitRegionInfo = RegionInfoBuilder.newBuilder(tn).build();
213+
final RegionInfo mergeRegionInfo1 = RegionInfoBuilder.newBuilder(tn).build();
214+
final RegionInfo mergeRegionInfo2 = RegionInfoBuilder.newBuilder(tn).build();
215+
when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
216+
when(masterServices.splitRegion(any(), any(), anyLong(), anyLong())).thenReturn(1L);
217+
when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong())).thenReturn(1L);
218+
when(regionNormalizer.computePlansForTable(tnDescriptor)).thenReturn(Arrays.asList(
219+
new SplitNormalizationPlan(splitRegionInfo, 2), new MergeNormalizationPlan.Builder()
220+
.addTarget(mergeRegionInfo1, 1).addTarget(mergeRegionInfo2, 2).build(),
221+
new SplitNormalizationPlan(splitRegionInfo, 1)));
222+
223+
final Configuration conf = testingUtility.getConfiguration();
224+
conf.setLong(RegionNormalizerWorker.CUMULATIVE_SIZE_LIMIT_MB_KEY, 5);
225+
226+
final RegionNormalizerWorker worker = new RegionNormalizerWorker(
227+
testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
228+
workerPool.submit(worker);
229+
queue.put(tn);
230+
231+
assertThatEventually("worker should process first split plan, but not second",
232+
worker::getSplitPlanCount, comparesEqualTo(1L));
233+
assertThatEventually("worker should process merge plan", worker::getMergePlanCount,
234+
comparesEqualTo(1L));
235+
}
236+
207237
/**
208238
* Repeatedly evaluates {@code matcher} against the result of calling {@code actualSupplier} until
209239
* the matcher succeeds or the timeout period of 30 seconds is exhausted.

hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hbase.master.normalizer;
1919

2020
import static java.lang.String.format;
21+
import static org.apache.hadoop.hbase.master.normalizer.RegionNormalizerWorker.CUMULATIVE_SIZE_LIMIT_MB_KEY;
2122
import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.DEFAULT_MERGE_MIN_REGION_AGE_DAYS;
2223
import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.MERGE_ENABLED_KEY;
2324
import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.MERGE_MIN_REGION_AGE_DAYS_KEY;
@@ -30,13 +31,18 @@
3031
import static org.hamcrest.Matchers.empty;
3132
import static org.hamcrest.Matchers.everyItem;
3233
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
34+
import static org.hamcrest.Matchers.hasSize;
3335
import static org.hamcrest.Matchers.instanceOf;
3436
import static org.hamcrest.Matchers.not;
3537
import static org.junit.Assert.assertEquals;
3638
import static org.junit.Assert.assertFalse;
3739
import static org.junit.Assert.assertTrue;
3840
import static org.mockito.ArgumentMatchers.any;
41+
import static org.mockito.ArgumentMatchers.anyList;
3942
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
43+
import static org.mockito.Mockito.spy;
44+
import static org.mockito.Mockito.times;
45+
import static org.mockito.Mockito.verify;
4046
import static org.mockito.Mockito.when;
4147

4248
import java.time.Instant;
@@ -607,6 +613,23 @@ public void testNormalizerCannotMergeNonAdjacentRegions() {
607613
assertThat(plans, empty());
608614
}
609615

616+
@Test
617+
public void testSizeLimitShufflesPlans() {
618+
conf.setLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, 10);
619+
final TableName tableName = name.getTableName();
620+
final List<RegionInfo> regionInfos = createRegionInfos(tableName, 4);
621+
final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 3, 3, 3, 3);
622+
setupMocksForNormalizer(regionSizes, regionInfos);
623+
when(tableDescriptor.getNormalizerTargetRegionSize()).thenReturn(1L);
624+
normalizer = spy(normalizer);
625+
626+
assertTrue(normalizer.isSplitEnabled());
627+
assertTrue(normalizer.isMergeEnabled());
628+
List<NormalizationPlan> computedPlans = normalizer.computePlansForTable(tableDescriptor);
629+
assertThat(computedPlans, hasSize(4));
630+
verify(normalizer, times(1)).shuffleNormalizationPlans(anyList());
631+
}
632+
610633
@SuppressWarnings("MockitoCast")
611634
private void setupMocksForNormalizer(Map<byte[], Integer> regionSizes,
612635
List<RegionInfo> regionInfoList) {

0 commit comments

Comments
 (0)