Skip to content

Commit c2ede4b

Browse files
committed
HBASE-27227 Long running heavily filtered scans hold up too many ByteBuffAllocator buffers
1 parent 7ef63b6 commit c2ede4b

File tree

14 files changed

+811
-55
lines changed

14 files changed

+811
-55
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,12 @@ public void close() {
277277
public void shipped() throws IOException {
278278
this.delegate.shipped();
279279
}
280+
281+
@Override
282+
public void checkpoint(State state) {
283+
this.delegate.checkpoint(state);
284+
}
285+
280286
};
281287
}
282288

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2070,7 +2070,8 @@ private static HFileBlock shallowClone(HFileBlock blk, ByteBuff newBuf) {
20702070
return createBuilder(blk, newBuf).build();
20712071
}
20722072

2073-
static HFileBlock deepCloneOnHeap(HFileBlock blk) {
2073+
// Publicly visible for access in tests
2074+
public static HFileBlock deepCloneOnHeap(HFileBlock blk) {
20742075
ByteBuff deepCloned = ByteBuff.wrap(ByteBuffer.wrap(blk.buf.toBytes(0, blk.buf.limit())));
20752076
return createBuilder(blk, deepCloned).build();
20762077
}

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,13 @@ protected static class HFileScannerImpl implements HFileScanner {
336336
// RegionScannerImpl#handleException). Call the releaseIfNotCurBlock() to release the
337337
// unreferenced block please.
338338
protected HFileBlock curBlock;
339-
// Previous blocks that were used in the course of the read
339+
340+
// Updated to the current prevBlocks size when checkpoint is called. Used to eagerly release
341+
// any blocks accumulated in the fetching of a row, if that row is thrown away due to filterRow.
342+
private int lastCheckpointIndex = -1;
343+
344+
// Previous blocks that were used in the course of the read, to be released at close,
345+
// checkpoint, or shipped
340346
protected final ArrayList<HFileBlock> prevBlocks = new ArrayList<>();
341347

342348
public HFileScannerImpl(final HFile.Reader reader, final boolean cacheBlocks,
@@ -366,8 +372,15 @@ void reset() {
366372
}
367373

368374
private void returnBlocks(boolean returnAll) {
369-
this.prevBlocks.forEach(HFileBlock::release);
375+
this.prevBlocks.forEach((block) -> {
376+
if (block != null) {
377+
block.release();
378+
}
379+
});
370380
this.prevBlocks.clear();
381+
if (lastCheckpointIndex > 0) {
382+
this.lastCheckpointIndex = 0;
383+
}
371384
if (returnAll && this.curBlock != null) {
372385
this.curBlock.release();
373386
this.curBlock = null;
@@ -1047,6 +1060,23 @@ public int compareKey(CellComparator comparator, Cell key) {
10471060
public void shipped() throws IOException {
10481061
this.returnBlocks(false);
10491062
}
1063+
1064+
/**
1065+
* Sets the last checkpoint index to the current prevBlocks size. If called with
1066+
* {@link State#FILTERED}, releases and nulls out any prevBlocks entries which were added since
1067+
* the last checkpoint. Nulls out instead of removing to avoid unnecessary resizing of the list.
1068+
*/
1069+
@Override
1070+
public void checkpoint(State state) {
1071+
if (state == State.FILTERED) {
1072+
assert lastCheckpointIndex >= 0;
1073+
for (int i = lastCheckpointIndex; i < prevBlocks.size(); i++) {
1074+
prevBlocks.get(i).release();
1075+
prevBlocks.set(i, null);
1076+
}
1077+
}
1078+
lastCheckpointIndex = prevBlocks.size();
1079+
}
10501080
}
10511081

10521082
@Override

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,4 +420,24 @@ public void shipped() throws IOException {
420420
}
421421
}
422422
}
423+
424+
@Override
425+
public void checkpoint(State state) {
426+
if (current != null) {
427+
current.checkpoint(state);
428+
}
429+
if (this.heap != null) {
430+
for (KeyValueScanner scanner : this.heap) {
431+
scanner.checkpoint(state);
432+
}
433+
}
434+
// Also checkpoint any scanners for delayed close. These would be exhausted scanners,
435+
// which may contain blocks that were totally filtered during a request. If so, the checkpoint
436+
// will release them.
437+
if (scannersForDelayedClose != null) {
438+
for (KeyValueScanner scanner : scannersForDelayedClose) {
439+
scanner.checkpoint(state);
440+
}
441+
}
442+
}
423443
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,9 @@ public Cell getNextIndexedKey() {
7878
public void shipped() throws IOException {
7979
// do nothing
8080
}
81+
82+
@Override
83+
public void checkpoint(State state) {
84+
// do nothing
85+
}
8186
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,8 @@ private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
426426
// Used to check time limit
427427
LimitScope limitScope = LimitScope.BETWEEN_CELLS;
428428

429+
checkpoint(State.START);
430+
429431
// The loop here is used only when at some point during the next we determine
430432
// that due to effects of filters or otherwise, we have an empty row in the result.
431433
// Then we loop and try again. Otherwise, we must get out on the first iteration via return,
@@ -501,6 +503,7 @@ private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
501503
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
502504
}
503505
results.clear();
506+
checkpoint(State.FILTERED);
504507

505508
// Read nothing as the rowkey was filtered, but still need to check time limit
506509
if (scannerContext.checkTimeLimit(limitScope)) {
@@ -553,6 +556,7 @@ private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
553556
if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {
554557
incrementCountOfRowsFilteredMetric(scannerContext);
555558
results.clear();
559+
checkpoint(State.FILTERED);
556560
boolean moreRows = nextRow(scannerContext, current);
557561
if (!moreRows) {
558562
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
@@ -602,6 +606,7 @@ private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
602606
// Double check to prevent empty rows from appearing in result. It could be
603607
// the case when SingleColumnValueExcludeFilter is used.
604608
if (results.isEmpty()) {
609+
checkpoint(State.FILTERED);
605610
incrementCountOfRowsFilteredMetric(scannerContext);
606611
boolean moreRows = nextRow(scannerContext, current);
607612
if (!moreRows) {
@@ -783,6 +788,16 @@ public void shipped() throws IOException {
783788
}
784789
}
785790

791+
@Override
792+
public void checkpoint(State state) {
793+
if (storeHeap != null) {
794+
storeHeap.checkpoint(state);
795+
}
796+
if (joinedHeap != null) {
797+
joinedHeap.checkpoint(state);
798+
}
799+
}
800+
786801
@Override
787802
public void run() throws IOException {
788803
// This is the RPC callback method executed. We do the close in of the scanner in this

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,11 @@ public void shipped() throws IOException {
309309
// do nothing
310310
}
311311

312+
@Override
313+
public void checkpoint(State state) {
314+
// do nothing
315+
}
316+
312317
// debug method
313318
@Override
314319
public String toString() {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Shipper.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,11 @@
2323
/**
2424
* This interface denotes a scanner as one which can ship cells. Scan operation do many RPC requests
2525
* to server and fetch N rows/RPC. These are then shipped to client. At the end of every such batch
26-
* {@link #shipped()} will get called.
26+
* {@link #shipped()} will get called. <br>
27+
* Scans of large numbers of fully filtered blocks (due to Filter, or sparse columns, etc) can cause
28+
* excess memory to be held while waiting for {@link #shipped()} to be called. Therefore, there's a
29+
* checkpoint mechanism via {@link #checkpoint(State)}. These enable fully filtered blocks to be
30+
* eagerly released, since they are not referenced by cells being returned to clients.
2731
*/
2832
@InterfaceAudience.Private
2933
public interface Shipper {
@@ -33,4 +37,19 @@ public interface Shipper {
3337
* can be done here.
3438
*/
3539
void shipped() throws IOException;
40+
41+
enum State {
42+
START,
43+
FILTERED
44+
}
45+
46+
/**
47+
* Called during processing of a batch of scanned rows, before returning to the client. Allows
48+
* releasing of blocks which have been totally skipped in the result set due to filters. <br>
49+
* Should be called with {@link State#START} at the beginning of a request for a row. This will
50+
* set state necessary to handle {@link State#FILTERED}. Calling with {@link State#FILTERED} will
51+
* release any blocks which have been fully processed since the last call to
52+
* {@link #checkpoint(State)}. Calling again with {@link State#START} will reset the pointers.
53+
*/
54+
void checkpoint(State state);
3655
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,4 +558,9 @@ public Cell getNextIndexedKey() {
558558
public void shipped() throws IOException {
559559
this.hfs.shipped();
560560
}
561+
562+
@Override
563+
public void checkpoint(State state) {
564+
this.hfs.checkpoint(state);
565+
}
561566
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,12 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
159159
protected final long readPt;
160160
private boolean topChanged = false;
161161

162+
// when creating new scanners, i.e. in flush or switching to stream read, we want
163+
// to checkpoint the new scanners iff we've received a checkpoint call ourselves.
164+
// this keeps the new scanners in sync with the old in terms of enabling eager release
165+
// of unneeded blocks.
166+
private boolean checkpointed = false;
167+
162168
/** An internal constructor. */
163169
private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo, int numColumns, long readPt,
164170
boolean cacheBlocks, ScanType scanType) {
@@ -250,8 +256,8 @@ public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byt
250256
// key does not exist, then to the start of the next matching Row).
251257
// Always check bloom filter to optimize the top row seek for delete
252258
// family marker.
253-
seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally,
254-
parallelSeekEnabled);
259+
seekScannersWithCheckpoint(scanners, matcher.getStartKey(),
260+
explicitColumnQuery && lazySeekEnabledGlobally, parallelSeekEnabled);
255261

256262
// set storeLimit
257263
this.storeLimit = scan.getMaxResultsPerColumnFamily();
@@ -317,7 +323,7 @@ private StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueSca
317323
scanners = selectScannersFrom(store, scanners);
318324

319325
// Seek all scanners to the initial key
320-
seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
326+
seekScannersWithCheckpoint(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
321327
addCurrentScanners(scanners);
322328
// Combine all seeked scanners with a heap
323329
resetKVHeap(scanners, comparator);
@@ -326,7 +332,7 @@ private StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueSca
326332
private void seekAllScanner(ScanInfo scanInfo, List<? extends KeyValueScanner> scanners)
327333
throws IOException {
328334
// Seek all scanners to the initial key
329-
seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
335+
seekScannersWithCheckpoint(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
330336
addCurrentScanners(scanners);
331337
resetKVHeap(scanners, comparator);
332338
}
@@ -359,9 +365,15 @@ public StoreScanner(ScanInfo scanInfo, ScanType scanType,
359365

360366
// Used to instantiate a scanner for user scan in test
361367
StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
368+
List<? extends KeyValueScanner> scanners) throws IOException {
369+
this(null, scan, scanInfo, columns, scanners);
370+
}
371+
372+
// Used to instantiate a scanner for user scan in test
373+
StoreScanner(HStore store, Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
362374
List<? extends KeyValueScanner> scanners) throws IOException {
363375
// 0 is passed as readpoint because the test bypasses Store
364-
this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(),
376+
this(store, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(),
365377
ScanType.USER_SCAN);
366378
this.matcher =
367379
UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null);
@@ -383,6 +395,25 @@ boolean isScanUsePread() {
383395
return this.scanUsePread;
384396
}
385397

398+
/**
399+
* Seek the specified scanners with the given key. Delegates to
400+
* {@link #seekScanners(List, Cell, boolean, boolean)}, but also checkpoints the scanners
401+
* afterward if this StoreScanner has been checkpointed yet.
402+
* @param scanners the scanners to seek
403+
* @param seekKey the key to seek to
404+
* @param isLazy true if lazy seek
405+
* @param isParallelSeek true if using parallel seek
406+
*/
407+
private void seekScannersWithCheckpoint(List<? extends KeyValueScanner> scanners, Cell seekKey,
408+
boolean isLazy, boolean isParallelSeek) throws IOException {
409+
seekScanners(scanners, seekKey, isLazy, isParallelSeek);
410+
if (checkpointed) {
411+
for (KeyValueScanner scanner : scanners) {
412+
scanner.checkpoint(State.START);
413+
}
414+
}
415+
}
416+
386417
/**
387418
* Seek the specified scanners with the given key
388419
* @param isLazy true if using lazy seek
@@ -764,6 +795,11 @@ public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws
764795
if (count > 0 && matcher.isUserScan()) {
765796
// if true increment memstore metrics, if not the mixed one
766797
updateMetricsStore(onlyFromMemstore);
798+
} else if (count == 0 && checkpointed) {
799+
// If we returned nothing, it means the row has been filtered for this store. If we've
800+
// previously checkpointed, we can call checkpoint again here to release any blocks we may
801+
// have scanned in reaching this point.
802+
checkpoint(State.FILTERED);
767803
}
768804
}
769805
}
@@ -1011,7 +1047,7 @@ protected final boolean reopenAfterFlush() throws IOException {
10111047
}
10121048

10131049
// Seek the new scanners to the last key
1014-
seekScanners(scanners, lastTop, false, parallelSeekEnabled);
1050+
seekScannersWithCheckpoint(scanners, lastTop, false, parallelSeekEnabled);
10151051
// remove the older memstore scanner
10161052
for (int i = currentScanners.size() - 1; i >= 0; i--) {
10171053
if (!currentScanners.get(i).isFileScanner()) {
@@ -1117,7 +1153,7 @@ void trySwitchToStreamRead() {
11171153
if (fileScanners == null) {
11181154
return;
11191155
}
1120-
seekScanners(fileScanners, lastTop, false, parallelSeekEnabled);
1156+
seekScannersWithCheckpoint(fileScanners, lastTop, false, parallelSeekEnabled);
11211157
newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size());
11221158
newCurrentScanners.addAll(fileScanners);
11231159
newCurrentScanners.addAll(memstoreScanners);
@@ -1241,4 +1277,18 @@ public void shipped() throws IOException {
12411277
trySwitchToStreamRead();
12421278
}
12431279
}
1280+
1281+
@Override
1282+
public void checkpoint(State state) {
1283+
this.checkpointed = true;
1284+
if (this.heap != null) {
1285+
this.heap.checkpoint(state);
1286+
}
1287+
// Also checkpoint any scanners for delayed close. These would be exhausted scanners,
1288+
// which may contain blocks that were totally filtered during a request. If so, the checkpoint
1289+
// will release them.
1290+
for (KeyValueScanner scanner : scannersForDelayedClose) {
1291+
scanner.checkpoint(state);
1292+
}
1293+
}
12441294
}

0 commit comments

Comments
 (0)