Skip to content

Commit 8df6327

Browse files
openinxhuzheng
authored andcommitted
HBASE-22491 Separate the heap HFileBlock and offheap HFileBlock because the heap block won't need refCnt and save into prevBlocks list before shipping (apache#268)
1 parent 0dc6cf4 commit 8df6327

19 files changed

+706
-305
lines changed

hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,12 @@ public void clean() {
297297
}
298298
}
299299
}
300+
this.usedBufCount.set(0);
301+
this.maxPoolSizeInfoLevelLogged = false;
302+
this.poolAllocationBytes.reset();
303+
this.heapAllocationBytes.reset();
304+
this.lastPoolAllocationBytes = 0;
305+
this.lastHeapAllocationBytes = 0;
300306
}
301307

302308
/**
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.io.hfile;
19+
20+
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
21+
import org.apache.hadoop.hbase.nio.ByteBuff;
22+
import org.apache.yetus.audience.InterfaceAudience;
23+
24+
/**
25+
* The {@link ByteBuffAllocator} won't allocate pooled heap {@link ByteBuff} now; at the same time,
26+
* if allocate an off-heap {@link ByteBuff} from allocator, then it must be a pooled one. That's to
27+
* say, an exclusive memory HFileBlock would must be an heap block and a shared memory HFileBlock
28+
* would must be an off-heap block.
29+
* <p>
30+
* The exclusive memory HFileBlock will do nothing when calling retain or release methods, because
31+
* its memory will be garbage collected by JVM, even if its reference count decrease to zero, we can
32+
* do nothing for the de-allocating.
33+
* <p>
34+
* @see org.apache.hadoop.hbase.io.hfile.SharedMemHFileBlock
35+
*/
36+
@InterfaceAudience.Private
37+
public class ExclusiveMemHFileBlock extends HFileBlock {
38+
39+
ExclusiveMemHFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
40+
int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader,
41+
long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader,
42+
HFileContext fileContext) {
43+
super(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, buf,
44+
fillHeader, offset, nextBlockOnDiskSize, onDiskDataSizeWithHeader, fileContext,
45+
ByteBuffAllocator.HEAP);
46+
}
47+
48+
@Override
49+
public int refCnt() {
50+
return 0;
51+
}
52+
53+
@Override
54+
public ExclusiveMemHFileBlock retain() {
55+
// do nothing
56+
return this;
57+
}
58+
59+
@Override
60+
public boolean release() {
61+
// do nothing
62+
return false;
63+
}
64+
65+
@Override
66+
public boolean isSharedMem() {
67+
return false;
68+
}
69+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java

Lines changed: 90 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ public HFileBlock deserialize(ByteBuff buf, ByteBuffAllocator alloc)
285285
boolean usesChecksum = buf.get() == (byte) 1;
286286
long offset = buf.getLong();
287287
int nextBlockOnDiskSize = buf.getInt();
288-
return new HFileBlock(newByteBuff, usesChecksum, offset, nextBlockOnDiskSize, null, alloc);
288+
return createFromBuff(newByteBuff, usesChecksum, offset, nextBlockOnDiskSize, null, alloc);
289289
}
290290

291291
@Override
@@ -300,28 +300,6 @@ public int getDeserializerIdentifier() {
300300
CacheableDeserializerIdManager.registerDeserializer(BLOCK_DESERIALIZER);
301301
}
302302

303-
/**
304-
* Copy constructor. Creates a shallow copy of {@code that}'s buffer.
305-
*/
306-
private HFileBlock(HFileBlock that) {
307-
this(that, false);
308-
}
309-
310-
/**
311-
* Copy constructor. Creates a shallow/deep copy of {@code that}'s buffer as per the boolean
312-
* param.
313-
*/
314-
private HFileBlock(HFileBlock that, boolean bufCopy) {
315-
init(that.blockType, that.onDiskSizeWithoutHeader, that.uncompressedSizeWithoutHeader,
316-
that.prevBlockOffset, that.offset, that.onDiskDataSizeWithHeader, that.nextBlockOnDiskSize,
317-
that.fileContext, that.allocator);
318-
if (bufCopy) {
319-
this.buf = ByteBuff.wrap(ByteBuffer.wrap(that.buf.toBytes(0, that.buf.limit())));
320-
} else {
321-
this.buf = that.buf.duplicate();
322-
}
323-
}
324-
325303
/**
326304
* Creates a new {@link HFile} block from the given fields. This constructor
327305
* is used only while writing blocks and caching,
@@ -337,20 +315,27 @@ private HFileBlock(HFileBlock that, boolean bufCopy) {
337315
* @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader}
338316
* @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader}
339317
* @param prevBlockOffset see {@link #prevBlockOffset}
340-
* @param b block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes)
318+
* @param buf block buffer with header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes)
341319
* @param fillHeader when true, write the first 4 header fields into passed buffer.
342320
* @param offset the file offset the block was read from
343321
* @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader}
344322
* @param fileContext HFile meta data
345323
*/
346324
@VisibleForTesting
347325
public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
348-
int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer b, boolean fillHeader,
349-
long offset, final int nextBlockOnDiskSize, int onDiskDataSizeWithHeader,
350-
HFileContext fileContext, ByteBuffAllocator allocator) {
351-
init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, offset,
352-
onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext, allocator);
353-
this.buf = new SingleByteBuff(b);
326+
int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuff buf, boolean fillHeader,
327+
long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, HFileContext fileContext,
328+
ByteBuffAllocator allocator) {
329+
this.blockType = blockType;
330+
this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
331+
this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
332+
this.prevBlockOffset = prevBlockOffset;
333+
this.offset = offset;
334+
this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
335+
this.nextBlockOnDiskSize = nextBlockOnDiskSize;
336+
this.fileContext = fileContext;
337+
this.allocator = allocator;
338+
this.buf = buf;
354339
if (fillHeader) {
355340
overwriteHeader();
356341
}
@@ -364,7 +349,7 @@ public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
364349
* to that point.
365350
* @param buf Has header, content, and trailing checksums if present.
366351
*/
367-
HFileBlock(ByteBuff buf, boolean usesHBaseChecksum, final long offset,
352+
static HFileBlock createFromBuff(ByteBuff buf, boolean usesHBaseChecksum, final long offset,
368353
final int nextBlockOnDiskSize, HFileContext fileContext, ByteBuffAllocator allocator)
369354
throws IOException {
370355
buf.rewind();
@@ -375,15 +360,15 @@ public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
375360
final long prevBlockOffset = buf.getLong(Header.PREV_BLOCK_OFFSET_INDEX);
376361
// This constructor is called when we deserialize a block from cache and when we read a block in
377362
// from the fs. fileCache is null when deserialized from cache so need to make up one.
378-
HFileContextBuilder fileContextBuilder = fileContext != null?
379-
new HFileContextBuilder(fileContext): new HFileContextBuilder();
363+
HFileContextBuilder fileContextBuilder =
364+
fileContext != null ? new HFileContextBuilder(fileContext) : new HFileContextBuilder();
380365
fileContextBuilder.withHBaseCheckSum(usesHBaseChecksum);
381366
int onDiskDataSizeWithHeader;
382367
if (usesHBaseChecksum) {
383368
byte checksumType = buf.get(Header.CHECKSUM_TYPE_INDEX);
384369
int bytesPerChecksum = buf.getInt(Header.BYTES_PER_CHECKSUM_INDEX);
385370
onDiskDataSizeWithHeader = buf.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
386-
// Use the checksum type and bytes per checksum from header, not from filecontext.
371+
// Use the checksum type and bytes per checksum from header, not from fileContext.
387372
fileContextBuilder.withChecksumType(ChecksumType.codeToType(checksumType));
388373
fileContextBuilder.withBytesPerCheckSum(bytesPerChecksum);
389374
} else {
@@ -394,29 +379,19 @@ public HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
394379
}
395380
fileContext = fileContextBuilder.build();
396381
assert usesHBaseChecksum == fileContext.isUseHBaseChecksum();
397-
init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, offset,
398-
onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext, allocator);
399-
this.offset = offset;
400-
this.buf = buf;
401-
this.buf.rewind();
402-
}
403-
404-
/**
405-
* Called from constructors.
406-
*/
407-
private void init(BlockType blockType, int onDiskSizeWithoutHeader,
408-
int uncompressedSizeWithoutHeader, long prevBlockOffset, long offset,
409-
int onDiskDataSizeWithHeader, final int nextBlockOnDiskSize, HFileContext fileContext,
410-
ByteBuffAllocator allocator) {
411-
this.blockType = blockType;
412-
this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
413-
this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader;
414-
this.prevBlockOffset = prevBlockOffset;
415-
this.offset = offset;
416-
this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
417-
this.nextBlockOnDiskSize = nextBlockOnDiskSize;
418-
this.fileContext = fileContext;
419-
this.allocator = allocator;
382+
return new HFileBlockBuilder()
383+
.withBlockType(blockType)
384+
.withOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader)
385+
.withUncompressedSizeWithoutHeader(uncompressedSizeWithoutHeader)
386+
.withPrevBlockOffset(prevBlockOffset)
387+
.withOffset(offset)
388+
.withOnDiskDataSizeWithHeader(onDiskDataSizeWithHeader)
389+
.withNextBlockOnDiskSize(nextBlockOnDiskSize)
390+
.withHFileContext(fileContext)
391+
.withByteBuffAllocator(allocator)
392+
.withByteBuff(buf.rewind())
393+
.withShared(!buf.hasArray())
394+
.build();
420395
}
421396

422397
/**
@@ -640,7 +615,7 @@ public String toString() {
640615
.append("(").append(onDiskSizeWithoutHeader)
641616
.append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
642617
}
643-
String dataBegin = null;
618+
String dataBegin;
644619
if (buf.hasArray()) {
645620
dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
646621
Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()));
@@ -674,7 +649,7 @@ HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException
674649
return this;
675650
}
676651

677-
HFileBlock unpacked = new HFileBlock(this);
652+
HFileBlock unpacked = shallowClone(this);
678653
unpacked.allocateBuffer(); // allocates space for the decompressed block
679654
boolean succ = false;
680655
try {
@@ -762,10 +737,16 @@ public long heapSize() {
762737
}
763738

764739
/**
765-
* @return true to indicate the block is allocated from JVM heap, otherwise from off-heap.
740+
* Will be override by {@link SharedMemHFileBlock} or {@link ExclusiveMemHFileBlock}. Return true
741+
* by default.
766742
*/
767-
boolean isOnHeap() {
768-
return buf.hasArray();
743+
public boolean isSharedMem() {
744+
if (this instanceof SharedMemHFileBlock) {
745+
return true;
746+
} else if (this instanceof ExclusiveMemHFileBlock) {
747+
return false;
748+
}
749+
return true;
769750
}
770751

771752
/**
@@ -1040,8 +1021,7 @@ void writeHeaderAndData(FSDataOutputStream out) throws IOException {
10401021
+ offset);
10411022
}
10421023
startOffset = offset;
1043-
1044-
finishBlockAndWriteHeaderAndData((DataOutputStream) out);
1024+
finishBlockAndWriteHeaderAndData(out);
10451025
}
10461026

10471027
/**
@@ -1252,13 +1232,27 @@ HFileBlock getBlockForCaching(CacheConfig cacheConf) {
12521232
.withIncludesMvcc(fileContext.isIncludesMvcc())
12531233
.withIncludesTags(fileContext.isIncludesTags())
12541234
.build();
1255-
return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
1256-
getUncompressedSizeWithoutHeader(), prevOffset,
1257-
cacheConf.shouldCacheCompressed(blockType.getCategory()) ? cloneOnDiskBufferWithHeader()
1258-
: cloneUncompressedBufferWithHeader(),
1259-
FILL_HEADER, startOffset, UNSET,
1260-
onDiskBlockBytesWithHeader.size() + onDiskChecksum.length, newContext,
1261-
cacheConf.getByteBuffAllocator());
1235+
// Build the HFileBlock.
1236+
HFileBlockBuilder builder = new HFileBlockBuilder();
1237+
ByteBuffer buffer;
1238+
if (cacheConf.shouldCacheCompressed(blockType.getCategory())) {
1239+
buffer = cloneOnDiskBufferWithHeader();
1240+
} else {
1241+
buffer = cloneUncompressedBufferWithHeader();
1242+
}
1243+
return builder.withBlockType(blockType)
1244+
.withOnDiskSizeWithoutHeader(getOnDiskSizeWithoutHeader())
1245+
.withUncompressedSizeWithoutHeader(getUncompressedSizeWithoutHeader())
1246+
.withPrevBlockOffset(prevOffset)
1247+
.withByteBuff(ByteBuff.wrap(buffer))
1248+
.withFillHeader(FILL_HEADER)
1249+
.withOffset(startOffset)
1250+
.withNextBlockOnDiskSize(UNSET)
1251+
.withOnDiskDataSizeWithHeader(onDiskBlockBytesWithHeader.size() + onDiskChecksum.length)
1252+
.withHFileContext(newContext)
1253+
.withByteBuffAllocator(cacheConf.getByteBuffAllocator())
1254+
.withShared(!buffer.hasArray())
1255+
.build();
12621256
}
12631257
}
12641258

@@ -1782,8 +1776,8 @@ protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
17821776
// The onDiskBlock will become the headerAndDataBuffer for this block.
17831777
// If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
17841778
// contains the header of next block, so no need to set next block's header in it.
1785-
HFileBlock hFileBlock = new HFileBlock(curBlock, checksumSupport, offset,
1786-
nextBlockOnDiskSize, fileContext, intoHeap ? HEAP : allocator);
1779+
HFileBlock hFileBlock = createFromBuff(curBlock, checksumSupport, offset,
1780+
nextBlockOnDiskSize, fileContext, intoHeap ? HEAP : allocator);
17871781
// Run check on uncompressed sizings.
17881782
if (!fileContext.isCompressedOrEncrypted()) {
17891783
hFileBlock.sanityCheckUncompressed();
@@ -1948,7 +1942,7 @@ public boolean equals(Object comparison) {
19481942
if (comparison == null) {
19491943
return false;
19501944
}
1951-
if (comparison.getClass() != this.getClass()) {
1945+
if (!(comparison instanceof HFileBlock)) {
19521946
return false;
19531947
}
19541948

@@ -2085,7 +2079,27 @@ static String toStringHeader(ByteBuff buf) throws IOException {
20852079
" onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
20862080
}
20872081

2088-
public HFileBlock deepCloneOnHeap() {
2089-
return new HFileBlock(this, true);
2082+
private static HFileBlockBuilder createBuilder(HFileBlock blk){
2083+
return new HFileBlockBuilder()
2084+
.withBlockType(blk.blockType)
2085+
.withOnDiskSizeWithoutHeader(blk.onDiskSizeWithoutHeader)
2086+
.withUncompressedSizeWithoutHeader(blk.uncompressedSizeWithoutHeader)
2087+
.withPrevBlockOffset(blk.prevBlockOffset)
2088+
.withByteBuff(blk.buf.duplicate()) // Duplicate the buffer.
2089+
.withOffset(blk.offset)
2090+
.withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader)
2091+
.withNextBlockOnDiskSize(blk.nextBlockOnDiskSize)
2092+
.withHFileContext(blk.fileContext)
2093+
.withByteBuffAllocator(blk.allocator)
2094+
.withShared(blk.isSharedMem());
2095+
}
2096+
2097+
static HFileBlock shallowClone(HFileBlock blk) {
2098+
return createBuilder(blk).build();
2099+
}
2100+
2101+
static HFileBlock deepCloneOnHeap(HFileBlock blk) {
2102+
ByteBuff deepCloned = ByteBuff.wrap(ByteBuffer.wrap(blk.buf.toBytes(0, blk.buf.limit())));
2103+
return createBuilder(blk).withByteBuff(deepCloned).withShared(false).build();
20902104
}
20912105
}

0 commit comments

Comments
 (0)