Skip to content

HBASE-28900: Avoid resetting the bucket cache during recovery from persistence. (#6360) #6365

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public long allocate() {
return offset;
}

public void addAllocation(long offset) throws BucketAllocatorException {
public boolean addAllocation(long offset) throws BucketAllocatorException {
offset -= baseOffset;
if (offset < 0 || offset % itemAllocationSize != 0)
throw new BucketAllocatorException("Attempt to add allocation for bad offset: " + offset
Expand All @@ -137,10 +137,14 @@ public void addAllocation(long offset) throws BucketAllocatorException {
if (matchFound) freeList[i - 1] = freeList[i];
else if (freeList[i] == idx) matchFound = true;
}
if (!matchFound) throw new BucketAllocatorException(
"Couldn't find match for index " + idx + " in free list");
if (!matchFound) {
LOG.warn("We found more entries for bucket starting at offset {} for blocks of {} size. "
+ "Skipping entry at cache offset {}", baseOffset, itemAllocationSize, offset);
return false;
}
++usedCount;
--freeCount;
return true;
}

private void free(long offset) {
Expand Down Expand Up @@ -402,10 +406,11 @@ public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) {
bsi.instantiateBucket(b);
reconfigured[bucketNo] = true;
}
realCacheSize.add(foundLen);
buckets[bucketNo].addAllocation(foundOffset);
usedSize += buckets[bucketNo].getItemAllocationSize();
bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
if (buckets[bucketNo].addAllocation(foundOffset)) {
realCacheSize.add(foundLen);
usedSize += buckets[bucketNo].getItemAllocationSize();
bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
}
}

if (sizeNotMatchedCount > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry,
* it is {@link ByteBuffAllocator#putbackBuffer}.
* </pre>
*/
private Recycler createRecycler(final BucketEntry bucketEntry) {
public Recycler createRecycler(final BucketEntry bucketEntry) {
return () -> {
freeBucketEntry(bucketEntry);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
package org.apache.hadoop.hbase.io.hfile.bucket;

import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -199,6 +202,67 @@ public void testValidateCacheInitialization() throws Exception {
TEST_UTIL.cleanupTestDir();
}

@Test
public void testBucketCacheRecoveryWithAllocationInconsistencies() throws Exception {
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
Path testDir = TEST_UTIL.getDataTestDir();
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
Configuration conf = HBaseConfiguration.create();
// Disables the persister thread by setting its interval to MAX_VALUE
conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
conf.setDouble(MIN_FACTOR_CONFIG_NAME, 0.99);
conf.setDouble(ACCEPT_FACTOR_CONFIG_NAME, 1);
conf.setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.01);
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 36 * 1024, 8192,
bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
DEFAULT_ERROR_TOLERATION_DURATION, conf);
assertTrue(bucketCache.waitForCacheInitialization(1000));
assertTrue(
bucketCache.isCacheInitialized("testBucketCacheRecovery") && bucketCache.isCacheEnabled());

CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 5);

// Add four blocks
cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock());
cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock());
cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock());
cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock());

// creates a entry for a 5th block with the same cache offset of the 1st block. Just add it
// straight to the backingMap, bypassing caching, in order to fabricate an inconsistency
BucketEntry bucketEntry =
new BucketEntry(bucketCache.backingMap.get(blocks[0].getBlockName()).offset(),
blocks[4].getBlock().getSerializedLength(), blocks[4].getBlock().getOnDiskSizeWithHeader(),
0, false, bucketCache::createRecycler, blocks[4].getBlock().getByteBuffAllocator());
bucketEntry.setDeserializerReference(blocks[4].getBlock().getDeserializer());
bucketCache.getBackingMap().put(blocks[4].getBlockName(), bucketEntry);

// saves the current state of the cache: 5 blocks in the map, but we only have cached 4. The
// 5th block has same cache offset as the first
bucketCache.persistToFile();

BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 36 * 1024,
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
DEFAULT_ERROR_TOLERATION_DURATION, conf);
while (!newBucketCache.getBackingMapValidated().get()) {
Thread.sleep(10);
}

assertNull(newBucketCache.getBlock(blocks[4].getBlockName(), false, false, false));
// The backing map entry with key blocks[0].getBlockName() for the may point to a valid entry
// or null based on different ordering of the keys in the backing map.
// Hence, skipping the check for that key.
assertEquals(blocks[1].getBlock(),
newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false));
assertEquals(blocks[2].getBlock(),
newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false));
assertEquals(blocks[3].getBlock(),
newBucketCache.getBlock(blocks[3].getBlockName(), false, false, false));
assertEquals(4, newBucketCache.backingMap.size());
TEST_UTIL.cleanupTestDir();
}

private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
throws InterruptedException {
Waiter.waitFor(HBaseConfiguration.create(), 12000,
Expand Down