Skip to content

Commit c1bebbb

Browse files
jhungundwchevreuil
authored andcommitted
HBASE-28468: Integrate the data-tiering logic into cache evictions. (#5829)
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
1 parent 84b3f19 commit c1bebbb

File tree

4 files changed

+263
-6
lines changed

4 files changed

+263
-6
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -974,6 +974,7 @@ void freeSpace(final String why) {
974974
long bytesToFreeWithExtra =
975975
(long) Math.floor(bytesToFreeWithoutExtra * (1 + extraFreeFactor));
976976

977+
long bytesFreed = 0;
977978
// Instantiate priority buckets
978979
BucketEntryGroup bucketSingle =
979980
new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(singleFactor));
@@ -982,9 +983,36 @@ void freeSpace(final String why) {
982983
BucketEntryGroup bucketMemory =
983984
new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(memoryFactor));
984985

986+
// Check the list of files to determine the cold files which can be readily evicted.
987+
Map<String, String> coldFiles = null;
988+
try {
989+
DataTieringManager dataTieringManager = DataTieringManager.getInstance();
990+
coldFiles = dataTieringManager.getColdFilesList();
991+
} catch (IllegalStateException e) {
992+
LOG.warn("Data Tiering Manager is not set. Ignore time-based block evictions.");
993+
}
985994
// Scan entire map putting bucket entry into appropriate bucket entry
986995
// group
987996
for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) {
997+
if (
998+
coldFiles != null && coldFiles.containsKey(bucketEntryWithKey.getKey().getHfileName())
999+
) {
1000+
int freedBlockSize = bucketEntryWithKey.getValue().getLength();
1001+
if (evictBlockIfNoRpcReferenced(bucketEntryWithKey.getKey())) {
1002+
bytesFreed += freedBlockSize;
1003+
}
1004+
if (bytesFreed >= bytesToFreeWithExtra) {
1005+
if (LOG.isDebugEnabled()) {
1006+
LOG.debug(
1007+
"Bucket cache free space completed; required: {} freed: {} from cold data blocks.",
1008+
bytesToFreeWithExtra, StringUtils.byteDesc(bytesFreed));
1009+
}
1010+
// Sufficient bytes have been freed.
1011+
return;
1012+
}
1013+
continue;
1014+
}
1015+
9881016
switch (bucketEntryWithKey.getValue().getPriority()) {
9891017
case SINGLE: {
9901018
bucketSingle.add(bucketEntryWithKey);
@@ -1001,6 +1029,21 @@ void freeSpace(final String why) {
10011029
}
10021030
}
10031031

1032+
// Check if the cold file eviction is sufficient to create enough space.
1033+
bytesToFreeWithExtra -= bytesFreed;
1034+
if (bytesToFreeWithExtra <= 0) {
1035+
LOG.debug("Bucket cache free space completed; freed space : {} bytes of cold data blocks.",
1036+
StringUtils.byteDesc(bytesFreed));
1037+
return;
1038+
}
1039+
1040+
if (LOG.isDebugEnabled()) {
1041+
LOG.debug(
1042+
"Bucket cache free space completed; freed space : {} "
1043+
+ "bytes of cold data blocks. {} more bytes required to be freed.",
1044+
StringUtils.byteDesc(bytesFreed), bytesToFreeWithExtra);
1045+
}
1046+
10041047
PriorityQueue<BucketEntryGroup> bucketQueue =
10051048
new PriorityQueue<>(3, Comparator.comparingLong(BucketEntryGroup::overflow));
10061049

@@ -1009,8 +1052,6 @@ void freeSpace(final String why) {
10091052
bucketQueue.add(bucketMemory);
10101053

10111054
int remainingBuckets = bucketQueue.size();
1012-
long bytesFreed = 0;
1013-
10141055
BucketEntryGroup bucketGroup;
10151056
while ((bucketGroup = bucketQueue.poll()) != null) {
10161057
long overflow = bucketGroup.overflow();

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
2121

2222
import java.io.IOException;
23+
import java.util.HashMap;
2324
import java.util.HashSet;
2425
import java.util.Map;
2526
import java.util.OptionalLong;
@@ -173,12 +174,12 @@ private boolean hotDataValidator(long maxTimestamp, long hotDataAge) {
173174
private long getMaxTimestamp(Path hFilePath) throws DataTieringException {
174175
HStoreFile hStoreFile = getHStoreFile(hFilePath);
175176
if (hStoreFile == null) {
176-
LOG.error("HStoreFile corresponding to " + hFilePath + " doesn't exist");
177+
LOG.error("HStoreFile corresponding to {} doesn't exist", hFilePath);
177178
return Long.MAX_VALUE;
178179
}
179180
OptionalLong maxTimestamp = hStoreFile.getMaximumTimestamp();
180181
if (!maxTimestamp.isPresent()) {
181-
LOG.error("Maximum timestamp not present for " + hFilePath);
182+
LOG.error("Maximum timestamp not present for {}", hFilePath);
182183
return Long.MAX_VALUE;
183184
}
184185
return maxTimestamp.getAsLong();
@@ -270,4 +271,41 @@ private long getDataTieringHotDataAge(Configuration conf) {
270271
return Long.parseLong(
271272
conf.get(DATATIERING_HOT_DATA_AGE_KEY, String.valueOf(DEFAULT_DATATIERING_HOT_DATA_AGE)));
272273
}
274+
275+
/*
276+
* This API traverses through the list of online regions and returns a subset of these files-names
277+
* that are cold.
278+
* @return List of names of files with cold data as per data-tiering logic.
279+
*/
280+
public Map<String, String> getColdFilesList() {
281+
Map<String, String> coldFiles = new HashMap<>();
282+
for (HRegion r : this.onlineRegions.values()) {
283+
for (HStore hStore : r.getStores()) {
284+
Configuration conf = hStore.getReadOnlyConfiguration();
285+
if (getDataTieringType(conf) != DataTieringType.TIME_RANGE) {
286+
// Data-Tiering not enabled for the store. Just skip it.
287+
continue;
288+
}
289+
Long hotDataAge = getDataTieringHotDataAge(conf);
290+
291+
for (HStoreFile hStoreFile : hStore.getStorefiles()) {
292+
String hFileName =
293+
hStoreFile.getFileInfo().getHFileInfo().getHFileContext().getHFileName();
294+
OptionalLong maxTimestamp = hStoreFile.getMaximumTimestamp();
295+
if (!maxTimestamp.isPresent()) {
296+
LOG.warn("maxTimestamp missing for file: {}",
297+
hStoreFile.getFileInfo().getActiveFileName());
298+
continue;
299+
}
300+
long currentTimestamp = EnvironmentEdgeManager.getDelegate().currentTime();
301+
long fileAge = currentTimestamp - maxTimestamp.getAsLong();
302+
if (fileAge > hotDataAge) {
303+
// Values do not matter.
304+
coldFiles.put(hFileName, null);
305+
}
306+
}
307+
}
308+
}
309+
return coldFiles;
310+
}
273311
}

hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -373,8 +373,8 @@ public void testPrefetchWithDelay() throws Exception {
373373
Thread.sleep(20000);
374374
assertFalse("Prefetch threads should not be running at this point", reader.prefetchStarted());
375375
while (!reader.prefetchStarted()) {
376-
assertTrue("Prefetch delay has not been expired yet",
377-
getElapsedTime(startTime) < PrefetchExecutor.getPrefetchDelay());
376+
// Wait until the prefetch is triggered.
377+
Thread.sleep(500);
378378
}
379379
if (reader.prefetchStarted()) {
380380
// Added some delay as we have started the timer a bit late.

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hbase.regionserver;
1919

2020
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
21+
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
2122
import static org.junit.Assert.assertEquals;
2223
import static org.junit.Assert.assertTrue;
2324
import static org.junit.Assert.fail;
@@ -51,7 +52,9 @@
5152
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
5253
import org.apache.hadoop.hbase.io.hfile.BlockType;
5354
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
55+
import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
5456
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
57+
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
5558
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
5659
import org.apache.hadoop.hbase.testclassification.SmallTests;
5760
import org.apache.hadoop.hbase.util.Bytes;
@@ -245,6 +248,181 @@ public void testColdDataFiles() {
245248
}
246249
}
247250

251+
@Test
252+
public void testPickColdDataFiles() {
253+
Map<String, String> coldDataFiles = dataTieringManager.getColdFilesList();
254+
assertEquals(1, coldDataFiles.size());
255+
// hStoreFiles[3] is the cold file.
256+
assert (coldDataFiles.containsKey(hStoreFiles.get(3).getFileInfo().getActiveFileName()));
257+
}
258+
259+
/*
260+
* Verify that two cold blocks(both) are evicted when bucket reaches its capacity. The hot file
261+
* remains in the cache.
262+
*/
263+
@Test
264+
public void testBlockEvictions() throws Exception {
265+
long capacitySize = 40 * 1024;
266+
int writeThreads = 3;
267+
int writerQLen = 64;
268+
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
269+
270+
// Setup: Create a bucket cache with lower capacity
271+
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
272+
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
273+
DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);
274+
275+
// Create three Cache keys with cold data files and a block with hot data.
276+
// hStoreFiles.get(3) is a cold data file, while hStoreFiles.get(0) is a hot file.
277+
Set<BlockCacheKey> cacheKeys = new HashSet<>();
278+
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA));
279+
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 8192, true, BlockType.DATA));
280+
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA));
281+
282+
// Create dummy data to be cached and fill the cache completely.
283+
CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3);
284+
285+
int blocksIter = 0;
286+
for (BlockCacheKey key : cacheKeys) {
287+
bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock());
288+
// Ensure that the block is persisted to the file.
289+
Waiter.waitFor(defaultConf, 10000, 100, () -> (bucketCache.getBackingMap().containsKey(key)));
290+
}
291+
292+
// Verify that the bucket cache contains 3 blocks.
293+
assertEquals(3, bucketCache.getBackingMap().keySet().size());
294+
295+
// Add an additional block into cache with hot data which should trigger the eviction
296+
BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA);
297+
CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1);
298+
299+
bucketCache.cacheBlock(newKey, newBlock[0].getBlock());
300+
Waiter.waitFor(defaultConf, 10000, 100,
301+
() -> (bucketCache.getBackingMap().containsKey(newKey)));
302+
303+
// Verify that the bucket cache now contains 2 hot blocks blocks only.
304+
// Both cold blocks of 8KB will be evicted to make room for 1 block of 8KB + an additional
305+
// space.
306+
validateBlocks(bucketCache.getBackingMap().keySet(), 2, 2, 0);
307+
}
308+
309+
/*
310+
* Verify that two cold blocks(both) are evicted when bucket reaches its capacity, but one cold
311+
* block remains in the cache since the required space is freed.
312+
*/
313+
@Test
314+
public void testBlockEvictionsAllColdBlocks() throws Exception {
315+
long capacitySize = 40 * 1024;
316+
int writeThreads = 3;
317+
int writerQLen = 64;
318+
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
319+
320+
// Setup: Create a bucket cache with lower capacity
321+
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
322+
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
323+
DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);
324+
325+
// Create three Cache keys with three cold data blocks.
326+
// hStoreFiles.get(3) is a cold data file.
327+
Set<BlockCacheKey> cacheKeys = new HashSet<>();
328+
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA));
329+
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 8192, true, BlockType.DATA));
330+
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 16384, true, BlockType.DATA));
331+
332+
// Create dummy data to be cached and fill the cache completely.
333+
CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3);
334+
335+
int blocksIter = 0;
336+
for (BlockCacheKey key : cacheKeys) {
337+
bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock());
338+
// Ensure that the block is persisted to the file.
339+
Waiter.waitFor(defaultConf, 10000, 100, () -> (bucketCache.getBackingMap().containsKey(key)));
340+
}
341+
342+
// Verify that the bucket cache contains 3 blocks.
343+
assertEquals(3, bucketCache.getBackingMap().keySet().size());
344+
345+
// Add an additional block into cache with hot data which should trigger the eviction
346+
BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA);
347+
CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1);
348+
349+
bucketCache.cacheBlock(newKey, newBlock[0].getBlock());
350+
Waiter.waitFor(defaultConf, 10000, 100,
351+
() -> (bucketCache.getBackingMap().containsKey(newKey)));
352+
353+
// Verify that the bucket cache now contains 1 cold block and a newly added hot block.
354+
validateBlocks(bucketCache.getBackingMap().keySet(), 2, 1, 1);
355+
}
356+
357+
/*
358+
* Verify that a hot block evicted along with a cold block when bucket reaches its capacity.
359+
*/
360+
@Test
361+
public void testBlockEvictionsHotBlocks() throws Exception {
362+
long capacitySize = 40 * 1024;
363+
int writeThreads = 3;
364+
int writerQLen = 64;
365+
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
366+
367+
// Setup: Create a bucket cache with lower capacity
368+
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
369+
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
370+
DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);
371+
372+
// Create three Cache keys with two hot data blocks and one cold data block
373+
// hStoreFiles.get(0) is a hot data file and hStoreFiles.get(3) is a cold data file.
374+
Set<BlockCacheKey> cacheKeys = new HashSet<>();
375+
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA));
376+
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 8192, true, BlockType.DATA));
377+
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA));
378+
379+
// Create dummy data to be cached and fill the cache completely.
380+
CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3);
381+
382+
int blocksIter = 0;
383+
for (BlockCacheKey key : cacheKeys) {
384+
bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock());
385+
// Ensure that the block is persisted to the file.
386+
Waiter.waitFor(defaultConf, 10000, 100, () -> (bucketCache.getBackingMap().containsKey(key)));
387+
}
388+
389+
// Verify that the bucket cache contains 3 blocks.
390+
assertEquals(3, bucketCache.getBackingMap().keySet().size());
391+
392+
// Add an additional block which should evict the only cold block with an additional hot block.
393+
BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA);
394+
CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1);
395+
396+
bucketCache.cacheBlock(newKey, newBlock[0].getBlock());
397+
Waiter.waitFor(defaultConf, 10000, 100,
398+
() -> (bucketCache.getBackingMap().containsKey(newKey)));
399+
400+
// Verify that the bucket cache now contains 2 hot blocks.
401+
// Only one of the older hot blocks is retained and other one is the newly added hot block.
402+
validateBlocks(bucketCache.getBackingMap().keySet(), 2, 2, 0);
403+
}
404+
405+
private void validateBlocks(Set<BlockCacheKey> keys, int expectedTotalKeys, int expectedHotBlocks,
406+
int expectedColdBlocks) {
407+
int numHotBlocks = 0, numColdBlocks = 0;
408+
409+
assertEquals(expectedTotalKeys, keys.size());
410+
int iter = 0;
411+
for (BlockCacheKey key : keys) {
412+
try {
413+
if (dataTieringManager.isHotData(key)) {
414+
numHotBlocks++;
415+
} else {
416+
numColdBlocks++;
417+
}
418+
} catch (Exception e) {
419+
fail("Unexpected exception!");
420+
}
421+
}
422+
assertEquals(expectedHotBlocks, numHotBlocks);
423+
assertEquals(expectedColdBlocks, numColdBlocks);
424+
}
425+
248426
private void testDataTieringMethodWithPath(DataTieringMethodCallerWithPath caller, Path path,
249427
boolean expectedResult, DataTieringException exception) {
250428
try {

0 commit comments

Comments
 (0)