Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ public class CacheConfig {
*/
public static final String PREFETCH_BLOCKS_ON_OPEN_KEY = "hbase.rs.prefetchblocksonopen";

/**
* Configuration key to cache blocks when a compacted file is written, predicated on prefetching
* being enabled for the column family.
*/
public static final String PREFETCH_COMPACTED_BLOCKS_ON_WRITE_KEY =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit confusing.. Are we doing the prefetch of the new compacted file once it is written?
Dont think so.. When we write the file, that time itself the caching happens. So it is cache on write. Why its called prefetch then? There is no extra fetch op happening right?

"hbase.rs.prefetchcompactedblocksonwrite";

public static final String DROP_BEHIND_CACHE_COMPACTION_KEY =
"hbase.hfile.drop.behind.compaction";

Expand All @@ -93,6 +100,7 @@ public class CacheConfig {
public static final boolean DEFAULT_EVICT_ON_CLOSE = false;
public static final boolean DEFAULT_CACHE_DATA_COMPRESSED = false;
public static final boolean DEFAULT_PREFETCH_ON_OPEN = false;
public static final boolean DEFAULT_PREFETCH_COMPACTED_BLOCKS_ON_WRITE = false;
public static final boolean DROP_BEHIND_CACHE_COMPACTION_DEFAULT = true;

/**
Expand Down Expand Up @@ -124,6 +132,12 @@ public class CacheConfig {
/** Whether data blocks should be prefetched into the cache */
private final boolean prefetchOnOpen;

/**
* Whether data blocks should be cached when compacted file is written for column families with
* prefetching
*/
private final boolean prefetchCompactedDataOnWrite;

private final boolean dropBehindCompaction;

// Local reference to the block cache
Expand Down Expand Up @@ -174,6 +188,8 @@ public CacheConfig(Configuration conf, ColumnFamilyDescriptor family, BlockCache
(family == null ? false : family.isEvictBlocksOnClose());
this.prefetchOnOpen = conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN) ||
(family == null ? false : family.isPrefetchBlocksOnOpen());
this.prefetchCompactedDataOnWrite = conf.getBoolean(PREFETCH_COMPACTED_BLOCKS_ON_WRITE_KEY,
DEFAULT_PREFETCH_COMPACTED_BLOCKS_ON_WRITE);
this.blockCache = blockCache;
this.byteBuffAllocator = byteBuffAllocator;
LOG.info("Created cacheConfig: " + this + (family == null ? "" : " for family " + family) +
Expand All @@ -193,6 +209,7 @@ public CacheConfig(CacheConfig cacheConf) {
this.evictOnClose = cacheConf.evictOnClose;
this.cacheDataCompressed = cacheConf.cacheDataCompressed;
this.prefetchOnOpen = cacheConf.prefetchOnOpen;
this.prefetchCompactedDataOnWrite = cacheConf.prefetchCompactedDataOnWrite;
this.dropBehindCompaction = cacheConf.dropBehindCompaction;
this.blockCache = cacheConf.blockCache;
this.byteBuffAllocator = cacheConf.byteBuffAllocator;
Expand All @@ -207,6 +224,7 @@ private CacheConfig() {
this.evictOnClose = false;
this.cacheDataCompressed = false;
this.prefetchOnOpen = false;
this.prefetchCompactedDataOnWrite = false;
this.dropBehindCompaction = false;
this.blockCache = null;
this.byteBuffAllocator = ByteBuffAllocator.HEAP;
Expand Down Expand Up @@ -319,6 +337,13 @@ public boolean shouldPrefetchOnOpen() {
return this.prefetchOnOpen;
}

/**
* @return true if blocks should be cached while writing during compaction, false if not
*/
public boolean shouldCacheCompactedBlocksOnWrite() {
return this.prefetchCompactedDataOnWrite && this.prefetchOnOpen;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh.. So the cache on write (at compaction) happens iff prefetch config is ON ! Anyways in ur case the prefetch which is another config, is ON right? I think this is the reason why the new config you have named that way. But some how I feel that config name is bit misleading.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the cache size should be much bigger than the hot data set size if u want to do cache on compact. Because the compacted away data might be already in cache (Those are flused files or a result of another compaction). Those are recently been accessed also (by the compaction thread). This feature should be very carefully used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking at this. My understanding is that in cases where prefetch is enabled, the new file is going to be read into the cache after compaction completes anyway. So the cache size requirements are the same when this new setting is enabled. This is why I wanted to limit the scope of the cache on write to only apply where prefetching is enabled: it simply is a way to do the cache loading more efficiently while we are writing the data out rather than having to read it back after compaction is done which I've found is very expensive when data is in S3.

As far as the name goes, I struggled to come up with something intuitive - how do I explain in the name alone that this only applies when prefetching is on? I tried to convey "when prefetching, do the prefetch of compacted data on write." I'm not in love with the name and I'm open to suggestions. I didn't want to give the false impression that all compacted data is going to be cached on write. Maybe "cacheCompactedDataOnWriteIfPrefetching"? Is that too wordy?

}

/**
* Return true if we may find this type of block in block cache.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1118,9 +1118,9 @@ public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm
boolean shouldDropBehind) throws IOException {
final CacheConfig writerCacheConf;
if (isCompaction) {
// Don't cache data on write on compactions.
// Don't cache data on write on compactions, unless specifically configured to do so
writerCacheConf = new CacheConfig(cacheConf);
writerCacheConf.setCacheDataOnWrite(false);
writerCacheConf.setCacheDataOnWrite(cacheConf.shouldCacheCompactedBlocksOnWrite());
} else {
writerCacheConf = cacheConf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
Expand Down Expand Up @@ -405,8 +407,14 @@ private void writeStoreFile(boolean useTags) throws IOException {
storeFilePath = sfw.getPath();
}

private void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags)
private void testCachingDataBlocksDuringCompactionInternals(boolean useTags,
boolean prefetchCompactedBlocksOnWrite)
throws IOException, InterruptedException {

// Set the conf if testing caching compacted blocks on write
conf.setBoolean(CacheConfig.PREFETCH_COMPACTED_BLOCKS_ON_WRITE_KEY,
prefetchCompactedBlocksOnWrite);

// TODO: need to change this test if we add a cache size threshold for
// compactions, or if we implement some other kind of intelligent logic for
// deciding what blocks to cache-on-write on compaction.
Expand All @@ -415,7 +423,8 @@ private void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags)
final byte[] cfBytes = Bytes.toBytes(cf);
final int maxVersions = 3;
ColumnFamilyDescriptor cfd =
ColumnFamilyDescriptorBuilder.newBuilder(cfBytes).setCompressionType(compress)
ColumnFamilyDescriptorBuilder.newBuilder(cfBytes)
.setPrefetchBlocksOnOpen(prefetchCompactedBlocksOnWrite).setCompressionType(compress)
.setBloomFilterType(BLOOM_TYPE).setMaxVersions(maxVersions)
.setDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()).build();
HRegion region = TEST_UTIL.createTestRegion(table, cfd, blockCache);
Expand Down Expand Up @@ -448,16 +457,42 @@ private void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags)
}
region.flush(true);
}

// If prefetching is enabled, cancel all prefetches before clearing block cache
if (prefetchCompactedBlocksOnWrite) {
for (HStore store : region.getStores()) {
for (HStoreFile storeFile : store.getStorefiles()) {
PrefetchExecutor.cancel(storeFile.getPath());
}
}
}
clearBlockCache(blockCache);
assertEquals(0, blockCache.getBlockCount());

region.compact(false);
LOG.debug("compactStores() returned");

boolean dataBlockCached = false;
for (CachedBlock block: blockCache) {
assertNotEquals(BlockType.ENCODED_DATA, block.getBlockType());
assertNotEquals(BlockType.DATA, block.getBlockType());
if (BlockType.ENCODED_DATA.equals(block.getBlockType())
|| BlockType.DATA.equals(block.getBlockType())) {
dataBlockCached = true;
break;
}
}

// Data blocks should be cached in instances where we are caching blocks on write. In the case
// of testing
// BucketCache, we cannot verify block type as it is not stored in the cache.
assertTrue(
"\nTest description: " + testDescription + "\nprefetchCompactedBlocksOnWrite: "
+ prefetchCompactedBlocksOnWrite + "\n",
(prefetchCompactedBlocksOnWrite && !(blockCache instanceof BucketCache)) == dataBlockCached);

region.close();

// Clear the cache on write setting
conf.setBoolean(CacheConfig.PREFETCH_COMPACTED_BLOCKS_ON_WRITE_KEY, false);
}

@Test
Expand All @@ -467,8 +502,9 @@ public void testStoreFileCacheOnWrite() throws IOException {
}

@Test
public void testNotCachingDataBlocksDuringCompaction() throws IOException, InterruptedException {
testNotCachingDataBlocksDuringCompactionInternals(false);
testNotCachingDataBlocksDuringCompactionInternals(true);
public void testCachingDataBlocksDuringCompaction() throws IOException, InterruptedException {
testCachingDataBlocksDuringCompactionInternals(false, false);
testCachingDataBlocksDuringCompactionInternals(true, false);
testCachingDataBlocksDuringCompactionInternals(true, true);
}
}