Skip to content

[HBASE-22606] : BucketCache additional tests #332

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ public static Iterable<Object[]> data() {
final long capacitySize = 32 * 1024 * 1024;
final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
String ioEngineName = "offheap";
String persistencePath = null;
private String ioEngineName = "offheap";

private static class MockedBucketCache extends BucketCache {

Expand All @@ -136,7 +135,7 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
@Before
public void setup() throws IOException {
cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
constructedBlockSizes, writeThreads, writerQLen, null);
}

@After
Expand Down Expand Up @@ -258,17 +257,52 @@ public void testRetrieveFromFile() throws Exception {
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
Path testDir = TEST_UTIL.getDataTestDir();
TEST_UTIL.getTestFileSystem().mkdirs(testDir);

String ioEngineName = "file:" + testDir + "/bucket.cache";
testRetrievalUtils(testDir, ioEngineName);
int[] smallBucketSizes = new int[]{3 * 1024, 5 * 1024};
String persistencePath = testDir + "/bucket.persistence";
BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
smallBucketSizes, writeThreads, writerQLen, persistencePath);
assertFalse(new File(persistencePath).exists());
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());
TEST_UTIL.cleanupTestDir();
}

@Test
public void testRetrieveFromMMap() throws Exception {
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
Path testDir = TEST_UTIL.getDataTestDir();
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
final String ioEngineName = "mmap:" + testDir + "/bucket.cache";
testRetrievalUtils(testDir, ioEngineName);
}

@Test
public void testRetrieveFromPMem() throws Exception {
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
Path testDir = TEST_UTIL.getDataTestDir();
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
final String ioEngineName = "pmem:" + testDir + "/bucket.cache";
testRetrievalUtils(testDir, ioEngineName);
int[] smallBucketSizes = new int[]{3 * 1024, 5 * 1024};
String persistencePath = testDir + "/bucket.persistence";
BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
smallBucketSizes, writeThreads, writerQLen, persistencePath);
assertFalse(new File(persistencePath).exists());
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());
TEST_UTIL.cleanupTestDir();
}

private void testRetrievalUtils(Path testDir, String ioEngineName)
throws IOException, InterruptedException {
final String persistencePath = testDir + "/bucket.persistence";
BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
long usedSize = bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedSize);

HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
// Add blocks
for (HFileBlockPair block : blocks) {
bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
}
Expand All @@ -277,28 +311,49 @@ public void testRetrieveFromFile() throws Exception {
}
usedSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedSize);
// persist cache to file
bucketCache.shutdown();
assertTrue(new File(persistencePath).exists());

// restore cache from file
bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
assertFalse(new File(persistencePath).exists());
assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
// persist cache to file
bucketCache.shutdown();
assertTrue(new File(persistencePath).exists());
}

@Test(expected = IllegalArgumentException.class)
public void testRetrieveFailure() throws Exception {
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
Path testDir = TEST_UTIL.getDataTestDir();
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
final String ioEngineName = testDir + "/bucket.cache";
testRetrievalUtils(testDir, ioEngineName);
}

// reconfig buckets sizes, the biggest bucket is small than constructedBlockSize (8k or 16k)
// so it can't restore cache from file
int[] smallBucketSizes = new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024 };
@Test
public void testRetrieveFromFileWithoutPersistence() throws Exception {
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
Path testDir = TEST_UTIL.getDataTestDir();
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
String ioEngineName = "file:" + testDir + "/bucket.cache";
BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, null);
long usedSize = bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedSize);
HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
for (HFileBlockPair block : blocks) {
bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
}
for (HFileBlockPair block : blocks) {
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
}
usedSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedSize);
bucketCache.shutdown();
bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
smallBucketSizes, writeThreads, writerQLen, persistencePath);
assertFalse(new File(persistencePath).exists());
constructedBlockSizes, writeThreads, writerQLen, null);
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());

bucketCache.shutdown();
TEST_UTIL.cleanupTestDir();
}

Expand All @@ -322,13 +377,32 @@ public void testGetPartitionSize() throws IOException {
conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);

BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf);
constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);

validateGetPartitionSize(cache, 0.1f, 0.5f);
validateGetPartitionSize(cache, 0.7f, 0.5f);
validateGetPartitionSize(cache, 0.2f, 0.5f);
}

@Test
public void testCacheSizeCapacity() throws IOException {
validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR,
BucketCache.DEFAULT_MIN_FACTOR);
Configuration conf = HBaseConfiguration.create();
conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
try {
new BucketCache(ioEngineName, Long.MAX_VALUE, 1, constructedBlockSizes, writeThreads,
writerQLen, null, 100, conf);
Assert.fail("Should have thrown IllegalArgumentException because of large cache capacity!");
} catch (IllegalArgumentException e) {
Assert.assertEquals("Cache capacity is too large, only support 32TB now", e.getMessage());
}
}


@Test
public void testValidBucketCacheConfigs() throws IOException {
Configuration conf = HBaseConfiguration.create();
Expand All @@ -340,7 +414,7 @@ public void testValidBucketCacheConfigs() throws IOException {
conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);

BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf);
constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);

assertEquals(BucketCache.ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f,
cache.getAcceptableFactor(), 0);
Expand Down Expand Up @@ -408,7 +482,7 @@ private void checkConfigValues(Configuration conf, Map<String, float[]> configMa
conf.setFloat(configName, configMap.get(configName)[i]);
}
BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf);
constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i] + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
} catch (IllegalArgumentException e) {
assertFalse("Created BucketCache and expected it to succeed: " + expectSuccess[i] + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
Expand Down