Skip to content

Commit c6a54a0

Browse files
committed
HBASE-23968 Periodically check whether a system stop is requested in compaction by time.
1 parent d019fd2 commit c6a54a0

File tree

8 files changed

+278
-69
lines changed

8 files changed

+278
-69
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
5050
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
5151
import org.apache.hadoop.hbase.regionserver.StoreScanner;
52+
import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
5253
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
5354
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
5455
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
@@ -260,7 +261,6 @@ private void calculateMobLengthMap(List<Path> mobFiles) throws IOException {
260261
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
261262
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
262263
boolean major, int numofFilesToCompact) throws IOException {
263-
long bytesWrittenProgressForCloseCheck = 0;
264264
long bytesWrittenProgressForLog = 0;
265265
long bytesWrittenProgressForShippedCall = 0;
266266
// Clear old mob references
@@ -284,11 +284,12 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
284284
// we have to use a do/while loop.
285285
List<Cell> cells = new ArrayList<>();
286286
// Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
287-
int closeCheckSizeLimit = HStore.getCloseCheckInterval();
287+
long currentTime = EnvironmentEdgeManager.currentTime();
288288
long lastMillis = 0;
289289
if (LOG.isDebugEnabled()) {
290-
lastMillis = EnvironmentEdgeManager.currentTime();
290+
lastMillis = currentTime;
291291
}
292+
CloseChecker closeChecker = new CloseChecker(conf, currentTime);
292293
String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
293294
long now = 0;
294295
boolean hasMore;
@@ -317,7 +318,14 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
317318

318319
do {
319320
hasMore = scanner.next(cells, scannerContext);
320-
now = EnvironmentEdgeManager.currentTime();
321+
currentTime = EnvironmentEdgeManager.currentTime();
322+
if (LOG.isDebugEnabled()) {
323+
now = currentTime;
324+
}
325+
if (closeChecker.isClosedByTimeLimit(store, currentTime)) {
326+
progress.cancel();
327+
return false;
328+
}
321329
for (Cell c : cells) {
322330
if (compactMOBs) {
323331
if (MobUtils.isMobReferenceCell(c)) {
@@ -481,16 +489,9 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
481489
bytesWrittenProgressForLog += len;
482490
}
483491
throughputController.control(compactionName, len);
484-
// check periodically to see if a system stop is requested
485-
if (closeCheckSizeLimit > 0) {
486-
bytesWrittenProgressForCloseCheck += len;
487-
if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) {
488-
bytesWrittenProgressForCloseCheck = 0;
489-
if (!store.areWritesEnabled()) {
490-
progress.cancel();
491-
return false;
492-
}
493-
}
492+
if (closeChecker.isClosedBySizeLimit(store, len)) {
493+
progress.cancel();
494+
return false;
494495
}
495496
if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
496497
((ShipperListener) writer).beforeShipped();

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2203,15 +2203,13 @@ private boolean shouldForbidMajorCompaction() {
22032203
* }
22042204
* Also in compactor.performCompaction():
22052205
* check periodically to see if a system stop is requested
2206-
* if (closeCheckInterval > 0) {
2207-
* bytesWritten += len;
2208-
* if (bytesWritten > closeCheckInterval) {
2209-
* bytesWritten = 0;
2210-
* if (!store.areWritesEnabled()) {
2211-
* progress.cancel();
2212-
* return false;
2213-
* }
2214-
* }
2206+
* if (closeChecker != null && closeChecker.isClosedByTimeLimit(store, now)) {
2207+
* progress.cancel();
2208+
* return false;
2209+
* }
2210+
* if (closeChecker != null && closeChecker.isClosedBySizeLimit(store, len)) {
2211+
* progress.cancel();
2212+
* return false;
22152213
* }
22162214
*/
22172215
public boolean compact(CompactionContext compaction, HStore store,

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
154154
protected CacheConfig cacheConf;
155155
private long lastCompactSize = 0;
156156
volatile boolean forceMajor = false;
157-
/* how many bytes to write between status checks */
158-
static int closeCheckInterval = 0;
159157
private AtomicLong storeSize = new AtomicLong();
160158
private AtomicLong totalUncompressedBytes = new AtomicLong();
161159

@@ -297,11 +295,6 @@ protected HStore(final HRegion region, final ColumnFamilyDescriptor family,
297295
this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER;
298296
}
299297

300-
if (HStore.closeCheckInterval == 0) {
301-
HStore.closeCheckInterval = conf.getInt(
302-
"hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
303-
}
304-
305298
this.storeEngine = createStoreEngine(this, this.conf, this.comparator);
306299
List<HStoreFile> hStoreFiles = loadStoreFiles(warmup);
307300
// Move the storeSize calculation out of loadStoreFiles() method, because the secondary read
@@ -490,13 +483,6 @@ public static ChecksumType getChecksumType(Configuration conf) {
490483
}
491484
}
492485

493-
/**
494-
* @return how many bytes to write between status checks
495-
*/
496-
public static int getCloseCheckInterval() {
497-
return closeCheckInterval;
498-
}
499-
500486
@Override
501487
public ColumnFamilyDescriptor getColumnFamilyDescriptor() {
502488
return this.family;
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.apache.hadoop.hbase.regionserver.compactions;
17+
18+
import org.apache.hadoop.conf.Configuration;
19+
import org.apache.hadoop.hbase.regionserver.Store;
20+
import org.apache.yetus.audience.InterfaceAudience;
21+
22+
/**
23+
* Check periodically to see if a system stop is requested
24+
*/
25+
@InterfaceAudience.Private
26+
public class CloseChecker {
27+
public static final String SIZE_LIMIT_KEY = "hbase.hstore.close.check.interval";
28+
public static final String TIME_LIMIT_KEY = "hbase.hstore.close.check.time.interval";
29+
30+
private final int closeCheckSizeLimit;
31+
private final long closeCheckTimeLimit;
32+
33+
private long bytesWrittenProgressForCloseCheck;
34+
private long lastCloseCheckMillis;
35+
36+
public CloseChecker(Configuration conf, long currentTime) {
37+
this.closeCheckSizeLimit = conf.getInt(SIZE_LIMIT_KEY, 10 * 1000 * 1000 /* 10 MB */);
38+
this.closeCheckTimeLimit = conf.getLong(TIME_LIMIT_KEY, 10 * 1000L /* 10 s */);
39+
this.bytesWrittenProgressForCloseCheck = 0;
40+
this.lastCloseCheckMillis = currentTime;
41+
}
42+
43+
public boolean isClosedBySizeLimit(Store store, long bytesWritten) {
44+
if (closeCheckSizeLimit <= 0) {
45+
return false;
46+
}
47+
48+
bytesWrittenProgressForCloseCheck += bytesWritten;
49+
if (bytesWrittenProgressForCloseCheck <= closeCheckSizeLimit) {
50+
return false;
51+
}
52+
53+
bytesWrittenProgressForCloseCheck = 0;
54+
return !store.areWritesEnabled();
55+
}
56+
57+
public boolean isClosedByTimeLimit(Store store, long now) {
58+
if (closeCheckTimeLimit <= 0) {
59+
return false;
60+
}
61+
62+
final long elapsedMillis = now - lastCloseCheckMillis;
63+
if (elapsedMillis <= closeCheckTimeLimit) {
64+
return false;
65+
}
66+
67+
lastCloseCheckMillis = now;
68+
return !store.areWritesEnabled();
69+
}
70+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -368,17 +368,17 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
368368
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
369369
boolean major, int numofFilesToCompact) throws IOException {
370370
assert writer instanceof ShipperListener;
371-
long bytesWrittenProgressForCloseCheck = 0;
372371
long bytesWrittenProgressForLog = 0;
373372
long bytesWrittenProgressForShippedCall = 0;
374373
// Since scanner.next() can return 'false' but still be delivering data,
375374
// we have to use a do/while loop.
376375
List<Cell> cells = new ArrayList<>();
377-
long closeCheckSizeLimit = HStore.getCloseCheckInterval();
376+
long currentTime = EnvironmentEdgeManager.currentTime();
378377
long lastMillis = 0;
379378
if (LOG.isDebugEnabled()) {
380-
lastMillis = EnvironmentEdgeManager.currentTime();
379+
lastMillis = currentTime;
381380
}
381+
CloseChecker closeChecker = new CloseChecker(conf, currentTime);
382382
String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
383383
long now = 0;
384384
boolean hasMore;
@@ -392,8 +392,13 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
392392
try {
393393
do {
394394
hasMore = scanner.next(cells, scannerContext);
395+
currentTime = EnvironmentEdgeManager.currentTime();
395396
if (LOG.isDebugEnabled()) {
396-
now = EnvironmentEdgeManager.currentTime();
397+
now = currentTime;
398+
}
399+
if (closeChecker.isClosedByTimeLimit(store, currentTime)) {
400+
progress.cancel();
401+
return false;
397402
}
398403
// output to writer:
399404
Cell lastCleanCell = null;
@@ -416,16 +421,9 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
416421
bytesWrittenProgressForLog += len;
417422
}
418423
throughputController.control(compactionName, len);
419-
// check periodically to see if a system stop is requested
420-
if (closeCheckSizeLimit > 0) {
421-
bytesWrittenProgressForCloseCheck += len;
422-
if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) {
423-
bytesWrittenProgressForCloseCheck = 0;
424-
if (!store.areWritesEnabled()) {
425-
progress.cancel();
426-
return false;
427-
}
428-
}
424+
if (closeChecker.isClosedBySizeLimit(store, len)) {
425+
progress.cancel();
426+
return false;
429427
}
430428
if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
431429
if (lastCleanCell != null) {

hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.hadoop.hbase.regionserver.ScannerContext;
4242
import org.apache.hadoop.hbase.regionserver.ShipperListener;
4343
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
44+
import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
4445
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
4546
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
4647
import org.apache.hadoop.hbase.util.Bytes;
@@ -93,7 +94,6 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
9394
if (major) {
9495
totalMajorCompactions.incrementAndGet();
9596
}
96-
long bytesWrittenProgressForCloseCheck = 0;
9797
long bytesWrittenProgressForLog = 0;
9898
long bytesWrittenProgressForShippedCall = 0;
9999
// Clear old mob references
@@ -119,11 +119,12 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
119119
// we have to use a do/while loop.
120120
List<Cell> cells = new ArrayList<>();
121121
// Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
122-
int closeCheckSizeLimit = HStore.getCloseCheckInterval();
122+
long currentTime = EnvironmentEdgeManager.currentTime();
123123
long lastMillis = 0;
124124
if (LOG.isDebugEnabled()) {
125-
lastMillis = EnvironmentEdgeManager.currentTime();
125+
lastMillis = currentTime;
126126
}
127+
CloseChecker closeChecker = new CloseChecker(conf, currentTime);
127128
String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
128129
long now = 0;
129130
boolean hasMore;
@@ -168,8 +169,13 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
168169
}
169170
do {
170171
hasMore = scanner.next(cells, scannerContext);
172+
currentTime = EnvironmentEdgeManager.currentTime();
171173
if (LOG.isDebugEnabled()) {
172-
now = EnvironmentEdgeManager.currentTime();
174+
now = currentTime;
175+
}
176+
if (closeChecker.isClosedByTimeLimit(store, currentTime)) {
177+
progress.cancel();
178+
return false;
173179
}
174180
for (Cell c : cells) {
175181
counter++;
@@ -291,16 +297,9 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
291297
bytesWrittenProgressForLog += len;
292298
}
293299
throughputController.control(compactionName, len);
294-
// check periodically to see if a system stop is requested
295-
if (closeCheckSizeLimit > 0) {
296-
bytesWrittenProgressForCloseCheck += len;
297-
if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) {
298-
bytesWrittenProgressForCloseCheck = 0;
299-
if (!store.areWritesEnabled()) {
300-
progress.cancel();
301-
return false;
302-
}
303-
}
300+
if (closeChecker.isClosedBySizeLimit(store, len)) {
301+
progress.cancel();
302+
return false;
304303
}
305304
if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
306305
((ShipperListener) writer).beforeShipped();

0 commit comments

Comments
 (0)