|
18 | 18 | package org.apache.hadoop.hbase.io.hfile.bucket;
|
19 | 19 |
|
20 | 20 | import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
|
| 21 | +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME; |
21 | 22 | import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
|
| 23 | +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME; |
| 24 | +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME; |
22 | 25 | import static org.junit.Assert.assertEquals;
|
23 | 26 | import static org.junit.Assert.assertNull;
|
24 | 27 | import static org.junit.Assert.assertTrue;
|
@@ -199,6 +202,67 @@ public void testValidateCacheInitialization() throws Exception {
|
199 | 202 | TEST_UTIL.cleanupTestDir();
|
200 | 203 | }
|
201 | 204 |
|
| 205 | + @Test |
| 206 | + public void testBucketCacheRecoveryWithAllocationInconsistencies() throws Exception { |
| 207 | + HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); |
| 208 | + Path testDir = TEST_UTIL.getDataTestDir(); |
| 209 | + TEST_UTIL.getTestFileSystem().mkdirs(testDir); |
| 210 | + Configuration conf = HBaseConfiguration.create(); |
| 211 | + // Disables the persister thread by setting its interval to MAX_VALUE |
| 212 | + conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); |
| 213 | + conf.setDouble(MIN_FACTOR_CONFIG_NAME, 0.99); |
| 214 | + conf.setDouble(ACCEPT_FACTOR_CONFIG_NAME, 1); |
| 215 | + conf.setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.01); |
| 216 | + int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; |
| 217 | + BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 36 * 1024, 8192, |
| 218 | + bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", |
| 219 | + DEFAULT_ERROR_TOLERATION_DURATION, conf); |
| 220 | + assertTrue(bucketCache.waitForCacheInitialization(1000)); |
| 221 | + assertTrue( |
| 222 | + bucketCache.isCacheInitialized("testBucketCacheRecovery") && bucketCache.isCacheEnabled()); |
| 223 | + |
| 224 | + CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 5); |
| 225 | + |
| 226 | + // Add four blocks |
| 227 | + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); |
| 228 | + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); |
| 229 | + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); |
| 230 | + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); |
| 231 | + |
| 232 | + // creates a entry for a 5th block with the same cache offset of the 1st block. Just add it |
| 233 | + // straight to the backingMap, bypassing caching, in order to fabricate an inconsistency |
| 234 | + BucketEntry bucketEntry = |
| 235 | + new BucketEntry(bucketCache.backingMap.get(blocks[0].getBlockName()).offset(), |
| 236 | + blocks[4].getBlock().getSerializedLength(), blocks[4].getBlock().getOnDiskSizeWithHeader(), |
| 237 | + 0, false, bucketCache::createRecycler, blocks[4].getBlock().getByteBuffAllocator()); |
| 238 | + bucketEntry.setDeserializerReference(blocks[4].getBlock().getDeserializer()); |
| 239 | + bucketCache.getBackingMap().put(blocks[4].getBlockName(), bucketEntry); |
| 240 | + |
| 241 | + // saves the current state of the cache: 5 blocks in the map, but we only have cached 4. The |
| 242 | + // 5th block has same cache offset as the first |
| 243 | + bucketCache.persistToFile(); |
| 244 | + |
| 245 | + BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 36 * 1024, |
| 246 | + 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", |
| 247 | + DEFAULT_ERROR_TOLERATION_DURATION, conf); |
| 248 | + while (!newBucketCache.getBackingMapValidated().get()) { |
| 249 | + Thread.sleep(10); |
| 250 | + } |
| 251 | + |
| 252 | + assertNull(newBucketCache.getBlock(blocks[4].getBlockName(), false, false, false)); |
| 253 | + // The backing map entry with key blocks[0].getBlockName() for the may point to a valid entry |
| 254 | + // or null based on different ordering of the keys in the backing map. |
| 255 | + // Hence, skipping the check for that key. |
| 256 | + assertEquals(blocks[1].getBlock(), |
| 257 | + newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false)); |
| 258 | + assertEquals(blocks[2].getBlock(), |
| 259 | + newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false)); |
| 260 | + assertEquals(blocks[3].getBlock(), |
| 261 | + newBucketCache.getBlock(blocks[3].getBlockName(), false, false, false)); |
| 262 | + assertEquals(4, newBucketCache.backingMap.size()); |
| 263 | + TEST_UTIL.cleanupTestDir(); |
| 264 | + } |
| 265 | + |
202 | 266 | private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
|
203 | 267 | throws InterruptedException {
|
204 | 268 | Waiter.waitFor(HBaseConfiguration.create(), 12000,
|
|
0 commit comments