Skip to content

Commit e25b2a7

Browse files
wchevreuilJosh Elser
andcommitted
HBASE-27370 Avoid decompressing blocks when reading from bucket cache… (#4781)
Co-authored-by: Josh Elser <elserj@apache.com> Signed-off-by: Peter Somogyi <psomogyi@apache.org> Signed-off-by: Tak Lon (Stephen) Wu <taklwu@apache.org>
1 parent 393068c commit e25b2a7

File tree

5 files changed

+118
-23
lines changed

5 files changed

+118
-23
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,10 @@ public interface CachingBlockReader {
373373
HFileBlock readBlock(long offset, long onDiskBlockSize, boolean cacheBlock, final boolean pread,
374374
final boolean isCompaction, final boolean updateCacheMetrics, BlockType expectedBlockType,
375375
DataBlockEncoding expectedDataBlockEncoding) throws IOException;
376+
377+
HFileBlock readBlock(long offset, long onDiskBlockSize, boolean cacheBlock, final boolean pread,
378+
final boolean isCompaction, final boolean updateCacheMetrics, BlockType expectedBlockType,
379+
DataBlockEncoding expectedDataBlockEncoding, boolean cacheOnly) throws IOException;
376380
}
377381

378382
/** An interface used by clients to open and iterate an {@link HFile}. */

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public void run() {
5757
// next header, will not have happened...so, pass in the onDiskSize gotten from the
5858
// cached block. This 'optimization' triggers extremely rarely I'd say.
5959
HFileBlock block = readBlock(offset, onDiskSizeOfNextBlock, /* cacheBlock= */true,
60-
/* pread= */true, false, false, null, null);
60+
/* pread= */true, false, false, null, null, true);
6161
try {
6262
onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
6363
offset += block.getOnDiskSizeWithHeader();

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1082,7 +1082,7 @@ public void setConf(Configuration conf) {
10821082
* and its encoding vs. {@code expectedDataBlockEncoding}. Unpacks the block as necessary.
10831083
*/
10841084
private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean cacheBlock, boolean useLock,
1085-
boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType,
1085+
boolean updateCacheMetrics, BlockType expectedBlockType,
10861086
DataBlockEncoding expectedDataBlockEncoding) throws IOException {
10871087
// Check cache for block. If found return.
10881088
BlockCache cache = cacheConf.getBlockCache().orElse(null);
@@ -1187,7 +1187,7 @@ public HFileBlock getMetaBlock(String metaBlockName, boolean cacheBlock) throws
11871187

11881188
cacheBlock &= cacheConf.shouldCacheBlockOnRead(BlockType.META.getCategory());
11891189
HFileBlock cachedBlock =
1190-
getCachedBlock(cacheKey, cacheBlock, false, true, true, BlockType.META, null);
1190+
getCachedBlock(cacheKey, cacheBlock, false, true, BlockType.META, null);
11911191
if (cachedBlock != null) {
11921192
assert cachedBlock.isUnpacked() : "Packed block leak.";
11931193
// Return a distinct 'shallow copy' of the block,
@@ -1234,6 +1234,15 @@ private boolean shouldUseHeap(BlockType expectedBlockType) {
12341234
public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final boolean cacheBlock,
12351235
boolean pread, final boolean isCompaction, boolean updateCacheMetrics,
12361236
BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding) throws IOException {
1237+
return readBlock(dataBlockOffset, onDiskBlockSize, cacheBlock, pread, isCompaction,
1238+
updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding, false);
1239+
}
1240+
1241+
@Override
1242+
public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final boolean cacheBlock,
1243+
boolean pread, final boolean isCompaction, boolean updateCacheMetrics,
1244+
BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding, boolean cacheOnly)
1245+
throws IOException {
12371246
if (dataBlockIndexReader == null) {
12381247
throw new IOException(path + " block index not loaded");
12391248
}
@@ -1257,17 +1266,18 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
12571266
try (TraceScope traceScope = TraceUtil.createTrace("HFileReaderImpl.readBlock")) {
12581267
while (true) {
12591268
// Check cache for block. If found return.
1260-
if (cacheConf.shouldReadBlockFromCache(expectedBlockType)) {
1269+
if (cacheConf.shouldReadBlockFromCache(expectedBlockType) && !cacheOnly) {
12611270
if (useLock) {
12621271
lockEntry = offsetLock.getLockEntry(dataBlockOffset);
12631272
}
12641273
// Try and get the block from the block cache. If the useLock variable is true then this
12651274
// is the second time through the loop and it should not be counted as a block cache miss.
1266-
HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, isCompaction,
1267-
updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding);
1275+
HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, updateCacheMetrics,
1276+
expectedBlockType, expectedDataBlockEncoding);
12681277
if (cachedBlock != null) {
12691278
if (LOG.isTraceEnabled()) {
1270-
LOG.trace("From Cache " + cachedBlock);
1279+
LOG.trace("Block for file {} is coming from Cache {}",
1280+
Bytes.toString(cachedBlock.getHFileContext().getTableName()), cachedBlock);
12711281
}
12721282
TraceUtil.addTimelineAnnotation("blockCacheHit");
12731283
assert cachedBlock.isUnpacked() : "Packed block leak.";
@@ -1304,14 +1314,30 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
13041314
HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread,
13051315
!isCompaction, shouldUseHeap(expectedBlockType));
13061316
validateBlockType(hfileBlock, expectedBlockType);
1307-
HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
13081317
BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory();
1318+
final boolean cacheCompressed = cacheConf.shouldCacheCompressed(category);
1319+
final boolean cacheOnRead = cacheConf.shouldCacheBlockOnRead(category);
1320+
1321+
// Don't need the unpacked block back and we're storing the block in the cache compressed
1322+
if (cacheOnly && cacheCompressed && cacheOnRead) {
1323+
LOG.debug("Skipping decompression of block in prefetch");
1324+
// Cache the block if necessary
1325+
cacheConf.getBlockCache().ifPresent(cache -> {
1326+
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
1327+
cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory());
1328+
}
1329+
});
13091330

1331+
if (updateCacheMetrics && hfileBlock.getBlockType().isData()) {
1332+
HFile.DATABLOCK_READ_COUNT.increment();
1333+
}
1334+
return hfileBlock;
1335+
}
1336+
HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
13101337
// Cache the block if necessary
13111338
cacheConf.getBlockCache().ifPresent(cache -> {
13121339
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
1313-
cache.cacheBlock(cacheKey,
1314-
cacheConf.shouldCacheCompressed(category) ? hfileBlock : unpacked,
1340+
cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked,
13151341
cacheConf.isInMemory());
13161342
}
13171343
});

hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,14 @@ public BlockReaderWrapper(HFileBlock.FSReader realReader) {
176176
public HFileBlock readBlock(long offset, long onDiskSize, boolean cacheBlock, boolean pread,
177177
boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType,
178178
DataBlockEncoding expectedDataBlockEncoding) throws IOException {
179+
return readBlock(offset, onDiskSize, cacheBlock, pread, isCompaction, updateCacheMetrics,
180+
expectedBlockType, expectedDataBlockEncoding, false);
181+
}
182+
183+
@Override
184+
public HFileBlock readBlock(long offset, long onDiskSize, boolean cacheBlock, boolean pread,
185+
boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType,
186+
DataBlockEncoding expectedDataBlockEncoding, boolean cacheOnly) throws IOException {
179187
if (offset == prevOffset && onDiskSize == prevOnDiskSize && pread == prevPread) {
180188
hitCount += 1;
181189
return prevBlock;

hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java

Lines changed: 70 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,16 @@
1717
*/
1818
package org.apache.hadoop.hbase.io.hfile;
1919

20+
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY;
2021
import static org.junit.Assert.assertFalse;
2122
import static org.junit.Assert.assertTrue;
23+
import static org.junit.Assert.fail;
2224

2325
import java.io.IOException;
2426
import java.util.Random;
2527
import java.util.concurrent.ThreadLocalRandom;
28+
import java.util.function.BiConsumer;
29+
import java.util.function.BiFunction;
2630
import org.apache.hadoop.conf.Configuration;
2731
import org.apache.hadoop.fs.FileSystem;
2832
import org.apache.hadoop.fs.Path;
@@ -34,6 +38,7 @@
3438
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
3539
import org.apache.hadoop.hbase.fs.HFileSystem;
3640
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
41+
import org.apache.hadoop.hbase.io.compress.Compression;
3742
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
3843
import org.apache.hadoop.hbase.testclassification.IOTests;
3944
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -112,36 +117,88 @@ private void readStoreFileLikeScanner(Path storeFilePath) throws Exception {
112117
}
113118

114119
private void readStoreFile(Path storeFilePath) throws Exception {
120+
readStoreFile(storeFilePath, (r, o) -> {
121+
HFileBlock block = null;
122+
try {
123+
block = r.readBlock(o, -1, false, true, false, true, null, null);
124+
} catch (IOException e) {
125+
fail(e.getMessage());
126+
}
127+
return block;
128+
}, (key, block) -> {
129+
boolean isCached = blockCache.getBlock(key, true, false, true) != null;
130+
if (
131+
block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
132+
|| block.getBlockType() == BlockType.INTERMEDIATE_INDEX
133+
) {
134+
assertTrue(isCached);
135+
}
136+
});
137+
}
138+
139+
private void readStoreFileCacheOnly(Path storeFilePath) throws Exception {
140+
readStoreFile(storeFilePath, (r, o) -> {
141+
HFileBlock block = null;
142+
try {
143+
block = r.readBlock(o, -1, false, true, false, true, null, null, true);
144+
} catch (IOException e) {
145+
fail(e.getMessage());
146+
}
147+
return block;
148+
}, (key, block) -> {
149+
boolean isCached = blockCache.getBlock(key, true, false, true) != null;
150+
if (block.getBlockType() == BlockType.DATA) {
151+
assertFalse(block.isUnpacked());
152+
} else if (
153+
block.getBlockType() == BlockType.ROOT_INDEX
154+
|| block.getBlockType() == BlockType.INTERMEDIATE_INDEX
155+
) {
156+
assertTrue(block.isUnpacked());
157+
}
158+
assertTrue(isCached);
159+
});
160+
}
161+
162+
private void readStoreFile(Path storeFilePath,
163+
BiFunction<HFile.Reader, Long, HFileBlock> readFunction,
164+
BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception {
115165
// Open the file
116166
HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf);
117167

118168
while (!reader.prefetchComplete()) {
119169
// Sleep for a bit
120170
Thread.sleep(1000);
121171
}
122-
123-
// Check that all of the data blocks were preloaded
124-
BlockCache blockCache = cacheConf.getBlockCache().get();
125172
long offset = 0;
126173
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
127-
HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null);
174+
HFileBlock block = readFunction.apply(reader, offset);
128175
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
129-
boolean isCached = blockCache.getBlock(blockCacheKey, true, false, true) != null;
130-
if (
131-
block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
132-
|| block.getBlockType() == BlockType.INTERMEDIATE_INDEX
133-
) {
134-
assertTrue(isCached);
135-
}
176+
validationFunction.accept(blockCacheKey, block);
136177
offset += block.getOnDiskSizeWithHeader();
137178
}
138179
}
139180

181+
@Test
182+
public void testPrefetchCompressed() throws Exception {
183+
conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, true);
184+
cacheConf = new CacheConfig(conf, blockCache);
185+
HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ)
186+
.withBlockSize(DATA_BLOCK_SIZE).build();
187+
Path storeFile = writeStoreFile("TestPrefetchCompressed", context);
188+
readStoreFileCacheOnly(storeFile);
189+
conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, false);
190+
191+
}
192+
140193
private Path writeStoreFile(String fname) throws IOException {
141-
Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
142194
HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
195+
return writeStoreFile(fname, meta);
196+
}
197+
198+
private Path writeStoreFile(String fname, HFileContext context) throws IOException {
199+
Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
143200
StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
144-
.withOutputDir(storeFileParentDir).withFileContext(meta).build();
201+
.withOutputDir(storeFileParentDir).withFileContext(context).build();
145202
Random rand = ThreadLocalRandom.current();
146203
final int rowLen = 32;
147204
for (int i = 0; i < NUM_KV; ++i) {

0 commit comments

Comments
 (0)