Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ default Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repe
*/
int evictBlocksByHfileName(String hfileName);

/**
* Evicts all blocks for the given HFile by path.
* @return the number of blocks evicted
*/
default int evictBlocksByHfilePath(Path hfilePath) {
return evictBlocksByHfileName(hfilePath.getName());
}

/**
* Get the statistics for this block cache.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class BlockCacheKey implements HeapSize, java.io.Serializable {
private final long offset;
private BlockType blockType;
private final boolean isPrimaryReplicaBlock;

private Path filePath;

/**
Expand Down Expand Up @@ -116,4 +117,9 @@ public void setBlockType(BlockType blockType) {
public Path getFilePath() {
return filePath;
}

public void setFilePath(Path filePath) {
this.filePath = filePath;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ public int evictBlocksByHfileName(String hfileName) {
return l1Cache.evictBlocksByHfileName(hfileName) + l2Cache.evictBlocksByHfileName(hfileName);
}

@Override
public int evictBlocksByHfilePath(Path hfilePath) {
return l1Cache.evictBlocksByHfilePath(hfilePath) + l2Cache.evictBlocksByHfilePath(hfilePath);
}

@Override
public CacheStats getStats() {
return this.combinedCacheStats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void close(boolean evictOnClose) throws IOException {
// Deallocate data blocks
cacheConf.getBlockCache().ifPresent(cache -> {
if (evictOnClose) {
int numEvicted = cache.evictBlocksByHfileName(name);
int numEvicted = cache.evictBlocksByHfilePath(path);
if (LOG.isTraceEnabled()) {
LOG.trace("On close, file= {} evicted= {} block(s)", name, numEvicted);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.IdReadWriteLock;
import org.apache.hadoop.hbase.util.IdReadWriteLockStrongRef;
import org.apache.hadoop.hbase.util.IdReadWriteLockWithObjectPool;
Expand Down Expand Up @@ -722,7 +723,7 @@ public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
// the cache map state might differ from the actual cache. If we reach this block,
// we should remove the cache key entry from the backing map
backingMap.remove(key);
fullyCachedFiles.remove(key.getHfileName());
fileNotFullyCached(key, bucketEntry);
LOG.debug("Failed to fetch block for cache key: {}.", key, hioex);
} catch (IOException ioex) {
LOG.error("Failed reading block " + key + " from bucket cache", ioex);
Expand All @@ -747,7 +748,7 @@ void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decre
if (decrementBlockNumber) {
this.blockNumber.decrement();
if (ioEngine.isPersistent()) {
fileNotFullyCached(cacheKey.getHfileName());
fileNotFullyCached(cacheKey, bucketEntry);
}
}
if (evictedByEvictionProcess) {
Expand All @@ -758,23 +759,11 @@ void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decre
}
}

private void fileNotFullyCached(String hfileName) {
// Update the regionPrefetchedSizeMap before removing the file from prefetchCompleted
if (fullyCachedFiles.containsKey(hfileName)) {
Pair<String, Long> regionEntry = fullyCachedFiles.get(hfileName);
String regionEncodedName = regionEntry.getFirst();
long filePrefetchSize = regionEntry.getSecond();
LOG.debug("Removing file {} for region {}", hfileName, regionEncodedName);
regionCachedSize.computeIfPresent(regionEncodedName, (rn, pf) -> pf - filePrefetchSize);
// If all the blocks for a region are evicted from the cache, remove the entry for that region
if (
regionCachedSize.containsKey(regionEncodedName)
&& regionCachedSize.get(regionEncodedName) == 0
) {
regionCachedSize.remove(regionEncodedName);
}
}
fullyCachedFiles.remove(hfileName);
private void fileNotFullyCached(BlockCacheKey key, BucketEntry entry) {
// Update the updateRegionCachedSize before removing the file from fullyCachedFiles.
// This computation should happen even if the file is not in fullyCachedFiles map.
updateRegionCachedSize(key.getFilePath(), (entry.getLength() * -1));
fullyCachedFiles.remove(key.getHfileName());
}

public void fileCacheCompleted(Path filePath, long size) {
Expand All @@ -788,9 +777,19 @@ public void fileCacheCompleted(Path filePath, long size) {

private void updateRegionCachedSize(Path filePath, long cachedSize) {
if (filePath != null) {
String regionName = filePath.getParent().getParent().getName();
regionCachedSize.merge(regionName, cachedSize,
(previousSize, newBlockSize) -> previousSize + newBlockSize);
if (HFileArchiveUtil.isHFileArchived(filePath)) {
LOG.trace("Skipping region cached size update for archived file: {}", filePath);
} else {
String regionName = filePath.getParent().getParent().getName();
regionCachedSize.merge(regionName, cachedSize,
(previousSize, newBlockSize) -> previousSize + newBlockSize);
LOG.trace("Updating region cached size for region: {}", regionName);
// If all the blocks for a region are evicted from the cache,
// remove the entry for that region from regionCachedSize map.
if (regionCachedSize.get(regionName) <= 0) {
regionCachedSize.remove(regionName);
}
}
}
}

Expand Down Expand Up @@ -1698,7 +1697,7 @@ private void verifyFileIntegrity(BucketCacheProtos.BucketCacheEntry proto) {
} catch (IOException e1) {
LOG.debug("Check for key {} failed. Evicting.", keyEntry.getKey());
evictBlock(keyEntry.getKey());
fileNotFullyCached(keyEntry.getKey().getHfileName());
fileNotFullyCached(keyEntry.getKey(), keyEntry.getValue());
}
}
backingMapValidated.set(true);
Expand Down Expand Up @@ -1928,20 +1927,32 @@ public int evictBlocksByHfileName(String hfileName) {
}

@Override
public int evictBlocksRangeByHfileName(String hfileName, long initOffset, long endOffset) {
fileNotFullyCached(hfileName);
public int evictBlocksByHfilePath(Path hfilePath) {
return evictBlocksRangeByHfileName(hfilePath.getName(), hfilePath, 0, Long.MAX_VALUE);
}

public int evictBlocksRangeByHfileName(String hfileName, Path filePath, long initOffset,
long endOffset) {
Set<BlockCacheKey> keySet = getAllCacheKeysForFile(hfileName, initOffset, endOffset);
LOG.debug("found {} blocks for file {}, starting offset: {}, end offset: {}", keySet.size(),
hfileName, initOffset, endOffset);
int numEvicted = 0;
for (BlockCacheKey key : keySet) {
if (filePath != null) {
key.setFilePath(filePath);
}
if (evictBlock(key)) {
++numEvicted;
}
}
return numEvicted;
}

@Override
public int evictBlocksRangeByHfileName(String hfileName, long initOffset, long endOffset) {
return evictBlocksRangeByHfileName(hfileName, null, initOffset, endOffset);
}

private Set<BlockCacheKey> getAllCacheKeysForFile(String hfileName, long init, long end) {
return blocksByHFile.subSet(new BlockCacheKey(hfileName, init), true,
new BlockCacheKey(hfileName, end), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,4 +200,16 @@ public static TableName getTableName(Path archivePath) {
if (p == null) return null;
return TableName.valueOf(p.getName(), tbl);
}

public static boolean isHFileArchived(Path path) {
Path currentDir = path;
for (int i = 0; i < 6; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this always iterate 6 times? My assumption is it's because that's the max depth of an archived file. Can that ever be more than 6?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so.

currentDir = currentDir.getParent();
if (currentDir == null) {
return false;
}
}
return HConstants.HFILE_ARCHIVE_DIRECTORY.equals(currentDir.getName());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
Expand All @@ -66,11 +67,16 @@ public static void setUp() throws Exception {
UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
UTIL.getConfiguration().setBoolean(CACHE_BLOCKS_ON_WRITE_KEY, true);
UTIL.getConfiguration().setBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, true);
UTIL.getConfiguration().set(BUCKET_CACHE_IOENGINE_KEY, "offheap");
UTIL.getConfiguration().setInt(BUCKET_CACHE_SIZE_KEY, 200);
UTIL.getConfiguration().set(StoreFileTrackerFactory.TRACKER_IMPL, "FILE");
}

@Before
public void testSetup() {
UTIL.getConfiguration().set(BUCKET_CACHE_IOENGINE_KEY,
"file:" + UTIL.getDataTestDir() + "/bucketcache");
}

@Test
public void testEvictOnSplit() throws Exception {
doTestEvictOnSplit("testEvictOnSplit", true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
Expand All @@ -71,12 +72,14 @@
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.StoreContext;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
Expand Down Expand Up @@ -363,7 +366,7 @@ public void testPrefetchMetricProgress() throws Exception {
BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
MutableLong regionCachedSize = new MutableLong(0);
// Our file should have 6 DATA blocks. We should wait for all of them to be cached
long waitedTime = Waiter.waitFor(conf, 300, () -> {
Waiter.waitFor(conf, 300, () -> {
if (bc.getBackingMap().size() > 0) {
long currentSize = bc.getRegionCachedInfo().get().get(regionName);
assertTrue(regionCachedSize.getValue() <= currentSize);
Expand All @@ -374,6 +377,132 @@ public void testPrefetchMetricProgress() throws Exception {
});
}

@Test
public void testPrefetchMetricProgressForLinks() throws Exception {
conf.setLong(BUCKET_CACHE_SIZE_KEY, 200);
blockCache = BlockCacheFactory.createBlockCache(conf);
cacheConf = new CacheConfig(conf, blockCache);
final RegionInfo hri =
RegionInfoBuilder.newBuilder(TableName.valueOf(name.getMethodName())).build();
// force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/
Configuration testConf = new Configuration(this.conf);
Path testDir = TEST_UTIL.getDataTestDir(name.getMethodName());
CommonFSUtils.setRootDir(testConf, testDir);
Path tableDir = CommonFSUtils.getTableDir(testDir, hri.getTable());
RegionInfo region = RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build();
Path regionDir = new Path(tableDir, region.getEncodedName());
Path cfDir = new Path(regionDir, "cf");
HRegionFileSystem regionFS =
HRegionFileSystem.createRegionOnFileSystem(testConf, fs, tableDir, region);
Path storeFile = writeStoreFile(100, cfDir);
// Prefetches the file blocks
LOG.debug("First read should prefetch the blocks.");
readStoreFile(storeFile);
BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
// Our file should have 6 DATA blocks. We should wait for all of them to be cached
Waiter.waitFor(testConf, 300, () -> bc.getBackingMap().size() == 6);
long cachedSize = bc.getRegionCachedInfo().get().get(region.getEncodedName());

final RegionInfo dstHri =
RegionInfoBuilder.newBuilder(TableName.valueOf(name.getMethodName())).build();
HRegionFileSystem dstRegionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs,
CommonFSUtils.getTableDir(testDir, dstHri.getTable()), dstHri);

Path dstPath = new Path(regionFS.getTableDir(), new Path(dstHri.getRegionNameAsString(), "cf"));

Path linkFilePath =
new Path(dstPath, HFileLink.createHFileLinkName(region, storeFile.getName()));

StoreFileTracker sft = StoreFileTrackerFactory.create(testConf, false,
StoreContext.getBuilder().withFamilyStoreDirectoryPath(dstPath)
.withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.of("cf"))
.withRegionFileSystem(dstRegionFs).build());
sft.createHFileLink(hri.getTable(), hri.getEncodedName(), storeFile.getName(), true);
StoreFileInfo sfi = sft.getStoreFileInfo(linkFilePath, true);

HStoreFile hsf = new HStoreFile(sfi, BloomType.NONE, cacheConf);
assertTrue(sfi.isLink());
hsf.initReader();
HFile.Reader reader = hsf.getReader().getHFileReader();
while (!reader.prefetchComplete()) {
// Sleep for a bit
Thread.sleep(1000);
}
// HFileLink use the path of the target file to create a reader, so it should resolve to the
// already cached blocks and not insert new blocks in the cache.
Waiter.waitFor(testConf, 300, () -> bc.getBackingMap().size() == 6);

assertEquals(cachedSize, (long) bc.getRegionCachedInfo().get().get(region.getEncodedName()));
}

@Test
public void testPrefetchMetricProgressForLinksToArchived() throws Exception {
conf.setLong(BUCKET_CACHE_SIZE_KEY, 200);
blockCache = BlockCacheFactory.createBlockCache(conf);
cacheConf = new CacheConfig(conf, blockCache);

// force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/
Configuration testConf = new Configuration(this.conf);
Path testDir = TEST_UTIL.getDataTestDir(name.getMethodName());
CommonFSUtils.setRootDir(testConf, testDir);

final RegionInfo hri =
RegionInfoBuilder.newBuilder(TableName.valueOf(name.getMethodName())).build();
Path tableDir = CommonFSUtils.getTableDir(testDir, hri.getTable());
RegionInfo region = RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build();
Path regionDir = new Path(tableDir, region.getEncodedName());
Path cfDir = new Path(regionDir, "cf");

Path storeFile = writeStoreFile(100, cfDir);
// Prefetches the file blocks
LOG.debug("First read should prefetch the blocks.");
readStoreFile(storeFile);
BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
// Our file should have 6 DATA blocks. We should wait for all of them to be cached
Waiter.waitFor(testConf, 300, () -> bc.getBackingMap().size() == 6);
long cachedSize = bc.getRegionCachedInfo().get().get(region.getEncodedName());

// create another file, but in the archive dir, hence it won't be cached
Path archiveRoot = new Path(testDir, "archive");
Path archiveTableDir = CommonFSUtils.getTableDir(archiveRoot, hri.getTable());
Path archiveRegionDir = new Path(archiveTableDir, region.getEncodedName());
Path archiveCfDir = new Path(archiveRegionDir, "cf");
Path archivedFile = writeStoreFile(100, archiveCfDir);

final RegionInfo testRegion =
RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build();
final HRegionFileSystem testRegionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs,
CommonFSUtils.getTableDir(testDir, testRegion.getTable()), testRegion);
// Just create a link to the archived file
Path dstPath = new Path(tableDir, new Path(testRegion.getEncodedName(), "cf"));

Path linkFilePath =
new Path(dstPath, HFileLink.createHFileLinkName(region, archivedFile.getName()));

StoreFileTracker sft = StoreFileTrackerFactory.create(testConf, false,
StoreContext.getBuilder().withFamilyStoreDirectoryPath(dstPath)
.withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.of("cf"))
.withRegionFileSystem(testRegionFs).build());
sft.createHFileLink(hri.getTable(), hri.getEncodedName(), storeFile.getName(), true);
StoreFileInfo sfi = sft.getStoreFileInfo(linkFilePath, true);

HStoreFile hsf = new HStoreFile(sfi, BloomType.NONE, cacheConf);
assertTrue(sfi.isLink());
hsf.initReader();
HFile.Reader reader = hsf.getReader().getHFileReader();
while (!reader.prefetchComplete()) {
// Sleep for a bit
Thread.sleep(1000);
}
// HFileLink use the path of the target file to create a reader, but the target file is in the
// archive, so it wasn't cached previously and should be cached when we open the link.
Waiter.waitFor(testConf, 300, () -> bc.getBackingMap().size() == 12);
// cached size for the region of target file shouldn't change
assertEquals(cachedSize, (long) bc.getRegionCachedInfo().get().get(region.getEncodedName()));
// cached size for the region with link pointing to archive dir shouldn't be updated
assertNull(bc.getRegionCachedInfo().get().get(testRegion.getEncodedName()));
}

private void readStoreFile(Path storeFilePath) throws Exception {
readStoreFile(storeFilePath, (r, o) -> {
HFileBlock block = null;
Expand Down