Skip to content

Commit 0ad40cd

Browse files
committed
HBASE-24428 : Update compaction priority for recently split daughter regions (#1784)
Signed-off-by: Andrew Purtell <apurtell@apache.org>
1 parent 6e1d7f0 commit 0ad40cd

File tree

6 files changed

+126
-8
lines changed

6 files changed

+126
-8
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
142142
public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
143143
public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 16;
144144

145+
// HBASE-24428 : Update compaction priority for recently split daughter regions
146+
// so as to prioritize their compaction.
147+
// Any compaction candidate with higher priority than compaction of newly split daugher regions
148+
// should have priority value < (Integer.MIN_VALUE + 1000)
149+
private static final int SPLIT_REGION_COMPACTION_PRIORITY = Integer.MIN_VALUE + 1000;
150+
145151
private static final Logger LOG = LoggerFactory.getLogger(HStore.class);
146152

147153
protected final MemStore memstore;
@@ -1856,7 +1862,22 @@ public Optional<CompactionContext> requestCompaction(int priority,
18561862

18571863
// Set common request properties.
18581864
// Set priority, either override value supplied by caller or from store.
1859-
request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
1865+
final int compactionPriority =
1866+
(priority != Store.NO_PRIORITY) ? priority : getCompactPriority();
1867+
request.setPriority(compactionPriority);
1868+
1869+
if (request.isAfterSplit()) {
1870+
// If the store belongs to recently splitted daughter regions, better we consider
1871+
// them with the higher priority in the compaction queue.
1872+
// Override priority if it is lower (higher int value) than
1873+
// SPLIT_REGION_COMPACTION_PRIORITY
1874+
final int splitHousekeepingPriority =
1875+
Math.min(compactionPriority, SPLIT_REGION_COMPACTION_PRIORITY);
1876+
request.setPriority(splitHousekeepingPriority);
1877+
LOG.info("Keeping/Overriding Compaction request priority to {} for CF {} since it"
1878+
+ " belongs to recently split daughter region {}", splitHousekeepingPriority,
1879+
this.getColumnFamilyName(), getRegionInfo().getRegionNameAsString());
1880+
}
18601881
request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
18611882
request.setTracker(tracker);
18621883
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public static OptionalInt getDeterministicRandomSeed(Collection<HStoreFile> file
5252
*/
5353
public static boolean hasReferences(Collection<HStoreFile> files) {
5454
// TODO: make sure that we won't pass null here in the future.
55-
return files != null ? files.stream().anyMatch(HStoreFile::isReference) : false;
55+
return files != null && files.stream().anyMatch(HStoreFile::isReference);
5656
}
5757

5858
/**

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequestImpl.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ private enum DisplayCompactionType { MINOR, ALL_FILES, MAJOR }
4343
private DisplayCompactionType isMajor = DisplayCompactionType.MINOR;
4444
private int priority = NO_PRIORITY;
4545
private Collection<HStoreFile> filesToCompact;
46+
private boolean isAfterSplit = false;
4647

4748
// CompactRequest object creation time.
4849
private long selectionTime;
@@ -136,6 +137,14 @@ public CompactionLifeCycleTracker getTracker() {
136137
return tracker;
137138
}
138139

140+
public boolean isAfterSplit() {
141+
return isAfterSplit;
142+
}
143+
144+
public void setAfterSplit(boolean afterSplit) {
145+
isAfterSplit = afterSplit;
146+
}
147+
139148
@Override
140149
public int hashCode() {
141150
final int prime = 31;
@@ -149,6 +158,7 @@ public int hashCode() {
149158
result = prime * result + ((storeName == null) ? 0 : storeName.hashCode());
150159
result = prime * result + (int) (totalSize ^ (totalSize >>> 32));
151160
result = prime * result + ((tracker == null) ? 0 : tracker.hashCode());
161+
result = prime * result + (isAfterSplit ? 1231 : 1237);
152162
return result;
153163
}
154164

@@ -200,6 +210,9 @@ public boolean equals(Object obj) {
200210
if (totalSize != other.totalSize) {
201211
return false;
202212
}
213+
if (isAfterSplit != other.isAfterSplit) {
214+
return false;
215+
}
203216
if (tracker == null) {
204217
if (other.tracker != null) {
205218
return false;

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ public CompactionRequestImpl selectCompaction(Collection<HStoreFile> candidateFi
8484

8585
CompactionRequestImpl result = createCompactionRequest(candidateSelection,
8686
isTryingMajor || isAfterSplit, mayUseOffPeak, mayBeStuck);
87+
result.setAfterSplit(isAfterSplit);
8788

8889
ArrayList<HStoreFile> filesToCompact = Lists.newArrayList(result.getFiles());
8990
removeExcessFiles(filesToCompact, isUserCompaction, isTryingMajor);

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
122122
SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
123123
allFiles, OPEN_KEY, OPEN_KEY, targetKvs);
124124
request.setMajorRangeFull();
125+
request.getRequest().setAfterSplit(true);
125126
return request;
126127
}
127128

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java

Lines changed: 88 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@
1919

2020
import static org.junit.Assert.assertEquals;
2121
import static org.junit.Assert.assertFalse;
22+
import static org.junit.Assert.assertNotEquals;
2223
import static org.junit.Assert.assertNotNull;
2324
import static org.junit.Assert.assertNotSame;
2425
import static org.junit.Assert.assertNull;
2526
import static org.junit.Assert.assertTrue;
2627
import static org.junit.Assert.fail;
2728

2829
import java.io.IOException;
30+
import java.lang.reflect.Field;
31+
import java.util.ArrayList;
2932
import java.util.Collection;
3033
import java.util.List;
3134
import java.util.Map;
@@ -73,6 +76,7 @@
7376
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
7477
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
7578
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
79+
import org.apache.hadoop.hbase.io.Reference;
7680
import org.apache.hadoop.hbase.master.HMaster;
7781
import org.apache.hadoop.hbase.master.LoadBalancer;
7882
import org.apache.hadoop.hbase.master.MasterRpcServices;
@@ -84,6 +88,7 @@
8488
import org.apache.hadoop.hbase.master.assignment.RegionStates;
8589
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
8690
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
91+
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
8792
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
8893
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
8994
import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -106,6 +111,7 @@
106111
import org.junit.Test;
107112
import org.junit.experimental.categories.Category;
108113
import org.junit.rules.TestName;
114+
import org.mockito.Mockito;
109115
import org.slf4j.Logger;
110116
import org.slf4j.LoggerFactory;
111117

@@ -275,6 +281,79 @@ public void testSplitFailedCompactionAndSplit() throws Exception {
275281
assertEquals(2, cluster.getRegions(tableName).size());
276282
}
277283

284+
@Test
285+
public void testSplitCompactWithPriority() throws Exception {
286+
final TableName tableName = TableName.valueOf(name.getMethodName());
287+
// Create table then get the single region for our new table.
288+
byte[] cf = Bytes.toBytes("cf");
289+
TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
290+
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf)).build();
291+
admin.createTable(htd);
292+
293+
assertNotEquals("Unable to retrieve regions of the table", -1,
294+
TESTING_UTIL.waitFor(10000, () -> cluster.getRegions(tableName).size() == 1));
295+
296+
HRegion region = cluster.getRegions(tableName).get(0);
297+
HStore store = region.getStore(cf);
298+
int regionServerIndex = cluster.getServerWith(region.getRegionInfo().getRegionName());
299+
HRegionServer regionServer = cluster.getRegionServer(regionServerIndex);
300+
301+
Table table = TESTING_UTIL.getConnection().getTable(tableName);
302+
// insert data
303+
insertData(tableName, admin, table);
304+
insertData(tableName, admin, table, 20);
305+
insertData(tableName, admin, table, 40);
306+
307+
// Compaction Request
308+
store.triggerMajorCompaction();
309+
Optional<CompactionContext> compactionContext = store.requestCompaction();
310+
assertTrue(compactionContext.isPresent());
311+
assertFalse(compactionContext.get().getRequest().isAfterSplit());
312+
assertEquals(compactionContext.get().getRequest().getPriority(), 13);
313+
314+
// Split
315+
long procId =
316+
cluster.getMaster().splitRegion(region.getRegionInfo(), Bytes.toBytes("row4"), 0, 0);
317+
318+
// wait for the split to complete or get interrupted. If the split completes successfully,
319+
// the procedure will return true; if the split fails, the procedure would throw exception.
320+
ProcedureTestingUtility.waitProcedure(cluster.getMaster().getMasterProcedureExecutor(),
321+
procId);
322+
323+
assertEquals(2, cluster.getRegions(tableName).size());
324+
// we have 2 daughter regions
325+
HRegion hRegion1 = cluster.getRegions(tableName).get(0);
326+
HRegion hRegion2 = cluster.getRegions(tableName).get(1);
327+
HStore hStore1 = hRegion1.getStore(cf);
328+
HStore hStore2 = hRegion2.getStore(cf);
329+
330+
// For hStore1 && hStore2, set mock reference to one of the storeFiles
331+
StoreFileInfo storeFileInfo1 = new ArrayList<>(hStore1.getStorefiles()).get(0).getFileInfo();
332+
StoreFileInfo storeFileInfo2 = new ArrayList<>(hStore2.getStorefiles()).get(0).getFileInfo();
333+
Field field = StoreFileInfo.class.getDeclaredField("reference");
334+
field.setAccessible(true);
335+
field.set(storeFileInfo1, Mockito.mock(Reference.class));
336+
field.set(storeFileInfo2, Mockito.mock(Reference.class));
337+
hStore1.triggerMajorCompaction();
338+
hStore2.triggerMajorCompaction();
339+
340+
compactionContext = hStore1.requestCompaction();
341+
assertTrue(compactionContext.isPresent());
342+
// since we set mock reference to one of the storeFiles, we will get isAfterSplit=true &&
343+
// highest priority for hStore1's compactionContext
344+
assertTrue(compactionContext.get().getRequest().isAfterSplit());
345+
assertEquals(compactionContext.get().getRequest().getPriority(), Integer.MIN_VALUE + 1000);
346+
347+
compactionContext =
348+
hStore2.requestCompaction(Integer.MIN_VALUE + 10, CompactionLifeCycleTracker.DUMMY, null);
349+
assertTrue(compactionContext.isPresent());
350+
// compaction request contains higher priority than default priority of daughter region
351+
// compaction (Integer.MIN_VALUE + 1000), hence we are expecting request priority to
352+
// be accepted.
353+
assertTrue(compactionContext.get().getRequest().isAfterSplit());
354+
assertEquals(compactionContext.get().getRequest().getPriority(), Integer.MIN_VALUE + 10);
355+
}
356+
278357
public static class FailingSplitMasterObserver implements MasterCoprocessor, MasterObserver {
279358
volatile CountDownLatch latch;
280359

@@ -634,18 +713,21 @@ public void testSplitWithRegionReplicas() throws Exception {
634713
}
635714
}
636715

637-
private void insertData(final TableName tableName, Admin admin, Table t) throws IOException,
638-
InterruptedException {
639-
Put p = new Put(Bytes.toBytes("row1"));
716+
private void insertData(final TableName tableName, Admin admin, Table t) throws IOException {
717+
insertData(tableName, admin, t, 1);
718+
}
719+
720+
private void insertData(TableName tableName, Admin admin, Table t, int i) throws IOException {
721+
Put p = new Put(Bytes.toBytes("row" + i));
640722
p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("1"));
641723
t.put(p);
642-
p = new Put(Bytes.toBytes("row2"));
724+
p = new Put(Bytes.toBytes("row" + (i + 1)));
643725
p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("2"));
644726
t.put(p);
645-
p = new Put(Bytes.toBytes("row3"));
727+
p = new Put(Bytes.toBytes("row" + (i + 2)));
646728
p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("3"));
647729
t.put(p);
648-
p = new Put(Bytes.toBytes("row4"));
730+
p = new Put(Bytes.toBytes("row" + (i + 3)));
649731
p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("4"));
650732
t.put(p);
651733
admin.flush(tableName);

0 commit comments

Comments
 (0)