Skip to content

Commit 3a2333e

Browse files
authored
HBASE-27997 Enhance prefetch executor to record region prefetch infor… (#5339)
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org> Reviewew-by: Kota-SH <shanmukhaharipriya@gmail.com>
1 parent cf81fd3 commit 3a2333e

File tree

6 files changed

+159
-22
lines changed

6 files changed

+159
-22
lines changed

hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,10 @@ option optimize_for = SPEED;
2727

2828

2929
message PrefetchedHfileName {
30-
map<string, bool> prefetched_files = 1;
30+
map<string, RegionFileSizeMap> prefetched_files = 1;
31+
}
32+
33+
message RegionFileSizeMap {
34+
required string region_name = 1;
35+
required uint64 region_prefetch_size = 2;
3136
}

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ public void run() {
7777
block.release();
7878
}
7979
}
80+
String regionName = getRegionName(path);
81+
PrefetchExecutor.complete(regionName, path, offset);
8082
} catch (IOException e) {
8183
// IOExceptions are probably due to region closes (relocation, etc.)
8284
if (LOG.isTraceEnabled()) {
@@ -93,13 +95,21 @@ public void run() {
9395
LOG.warn("Close prefetch stream reader failed, path: " + path, e);
9496
}
9597
}
96-
PrefetchExecutor.complete(path);
9798
}
9899
}
99100
});
100101
}
101102
}
102103

104+
/*
105+
* Get the region name for the given file path. A HFile is always kept under the <region>/<column
106+
* family>/<hfile>. To find the region for a given hFile, just find the name of the grandparent
107+
* directory.
108+
*/
109+
private static String getRegionName(Path path) {
110+
return path.getParent().getParent().getName();
111+
}
112+
103113
private static String getPathOffsetEndStr(final Path path, final long offset, final long end) {
104114
return "path=" + path.toString() + ", offset=" + offset + ", end=" + end;
105115
}

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java

Lines changed: 65 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717
*/
1818
package org.apache.hadoop.hbase.io.hfile;
1919

20+
import com.google.errorprone.annotations.RestrictedApi;
2021
import java.io.File;
2122
import java.io.FileInputStream;
2223
import java.io.FileOutputStream;
2324
import java.io.IOException;
25+
import java.util.Collections;
2426
import java.util.HashMap;
2527
import java.util.Map;
28+
import java.util.concurrent.ConcurrentHashMap;
2629
import java.util.concurrent.ConcurrentSkipListMap;
2730
import java.util.concurrent.Future;
2831
import java.util.concurrent.RejectedExecutionException;
@@ -38,6 +41,7 @@
3841
import org.apache.hadoop.hbase.HConstants;
3942
import org.apache.hadoop.hbase.trace.TraceUtil;
4043
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
44+
import org.apache.hadoop.hbase.util.Pair;
4145
import org.apache.yetus.audience.InterfaceAudience;
4246
import org.slf4j.Logger;
4347
import org.slf4j.LoggerFactory;
@@ -53,7 +57,16 @@ public final class PrefetchExecutor {
5357
private static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>();
5458
/** Set of files for which prefetch is completed */
5559
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL")
56-
private static HashMap<String, Boolean> prefetchCompleted = new HashMap<>();
60+
/**
61+
* Map of region -> total size of the region prefetched on this region server. This is the total
62+
* size of hFiles for this region prefetched on this region server
63+
*/
64+
private static Map<String, Long> regionPrefetchSizeMap = new ConcurrentHashMap<>();
65+
/**
66+
* Map of hFile -> Region -> File size. This map is used by the prefetch executor while caching or
67+
* evicting individual hFiles.
68+
*/
69+
private static Map<String, Pair<String, Long>> prefetchCompleted = new HashMap<>();
5770
/** Executor pool shared among all HFiles for block prefetch */
5871
private static final ScheduledExecutorService prefetchExecutorPool;
5972
/** Delay before beginning prefetch */
@@ -120,9 +133,30 @@ public static void request(Path path, Runnable runnable) {
120133
}
121134
}
122135

123-
public static void complete(Path path) {
136+
private static void removeFileFromPrefetch(String hFileName) {
137+
// Update the regionPrefetchedSizeMap before removing the file from prefetchCompleted
138+
if (prefetchCompleted.containsKey(hFileName)) {
139+
Pair<String, Long> regionEntry = prefetchCompleted.get(hFileName);
140+
String regionEncodedName = regionEntry.getFirst();
141+
long filePrefetchedSize = regionEntry.getSecond();
142+
LOG.debug("Removing file {} for region {}", hFileName, regionEncodedName);
143+
regionPrefetchSizeMap.computeIfPresent(regionEncodedName,
144+
(rn, pf) -> pf - filePrefetchedSize);
145+
// If all the blocks for a region are evicted from the cache, remove the entry for that region
146+
if (
147+
regionPrefetchSizeMap.containsKey(regionEncodedName)
148+
&& regionPrefetchSizeMap.get(regionEncodedName) == 0
149+
) {
150+
regionPrefetchSizeMap.remove(regionEncodedName);
151+
}
152+
}
153+
prefetchCompleted.remove(hFileName);
154+
}
155+
156+
public static void complete(final String regionName, Path path, long size) {
124157
prefetchFutures.remove(path);
125-
prefetchCompleted.put(path.getName(), true);
158+
prefetchCompleted.put(path.getName(), new Pair<>(regionName, size));
159+
regionPrefetchSizeMap.merge(regionName, size, (oldpf, fileSize) -> oldpf + fileSize);
126160
LOG.debug("Prefetch completed for {}", path.getName());
127161
}
128162

@@ -173,11 +207,25 @@ public static void retrieveFromFile(String path) throws IOException {
173207
try (FileInputStream fis = deleteFileOnClose(prefetchPersistenceFile)) {
174208
PersistentPrefetchProtos.PrefetchedHfileName proto =
175209
PersistentPrefetchProtos.PrefetchedHfileName.parseDelimitedFrom(fis);
176-
Map<String, Boolean> protoPrefetchedFilesMap = proto.getPrefetchedFilesMap();
177-
prefetchCompleted.putAll(protoPrefetchedFilesMap);
210+
Map<String, PersistentPrefetchProtos.RegionFileSizeMap> protoPrefetchedFilesMap =
211+
proto.getPrefetchedFilesMap();
212+
prefetchCompleted.putAll(PrefetchProtoUtils.fromPB(protoPrefetchedFilesMap));
213+
updateRegionSizeMapWhileRetrievingFromFile();
178214
}
179215
}
180216

217+
private static void updateRegionSizeMapWhileRetrievingFromFile() {
218+
// Update the regionPrefetchedSizeMap with the region size while restarting the region server
219+
LOG.debug("Updating region size map after retrieving prefetch file list");
220+
prefetchCompleted.forEach((hFileName, hFileSize) -> {
221+
// Get the region name for each file
222+
String regionEncodedName = hFileSize.getFirst();
223+
long filePrefetchSize = hFileSize.getSecond();
224+
regionPrefetchSizeMap.merge(regionEncodedName, filePrefetchSize,
225+
(oldpf, fileSize) -> oldpf + fileSize);
226+
});
227+
}
228+
181229
private static FileInputStream deleteFileOnClose(final File file) throws IOException {
182230
return new FileInputStream(file) {
183231
private File myFile;
@@ -203,13 +251,24 @@ public void close() throws IOException {
203251
}
204252

205253
public static void removePrefetchedFileWhileEvict(String hfileName) {
206-
prefetchCompleted.remove(hfileName);
254+
removeFileFromPrefetch(hfileName);
207255
}
208256

209257
public static boolean isFilePrefetched(String hfileName) {
210258
return prefetchCompleted.containsKey(hfileName);
211259
}
212260

261+
public static Map<String, Long> getRegionPrefetchInfo() {
262+
return Collections.unmodifiableMap(regionPrefetchSizeMap);
263+
}
264+
265+
@RestrictedApi(explanation = "Should only be called in tests", link = "",
266+
allowedOnPath = ".*(/src/test/.*|PrefetchExecutor).java")
267+
public static void reset() {
268+
prefetchCompleted = new HashMap<>();
269+
regionPrefetchSizeMap = new ConcurrentHashMap<>();
270+
}
271+
213272
private PrefetchExecutor() {
214273
}
215274
}

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchProtoUtils.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
*/
1818
package org.apache.hadoop.hbase.io.hfile;
1919

20+
import java.util.HashMap;
2021
import java.util.Map;
22+
import org.apache.hadoop.hbase.util.Pair;
2123

2224
import org.apache.hadoop.hbase.shaded.protobuf.generated.PersistentPrefetchProtos;
2325

@@ -26,8 +28,26 @@ private PrefetchProtoUtils() {
2628
}
2729

2830
static PersistentPrefetchProtos.PrefetchedHfileName
29-
toPB(Map<String, Boolean> prefetchedHfileNames) {
30-
return PersistentPrefetchProtos.PrefetchedHfileName.newBuilder()
31-
.putAllPrefetchedFiles(prefetchedHfileNames).build();
31+
toPB(Map<String, Pair<String, Long>> prefetchedHfileNames) {
32+
Map<String, PersistentPrefetchProtos.RegionFileSizeMap> tmpMap = new HashMap<>();
33+
prefetchedHfileNames.forEach((hFileName, regionPrefetchMap) -> {
34+
PersistentPrefetchProtos.RegionFileSizeMap tmpRegionFileSize =
35+
PersistentPrefetchProtos.RegionFileSizeMap.newBuilder()
36+
.setRegionName(regionPrefetchMap.getFirst())
37+
.setRegionPrefetchSize(regionPrefetchMap.getSecond()).build();
38+
tmpMap.put(hFileName, tmpRegionFileSize);
39+
});
40+
return PersistentPrefetchProtos.PrefetchedHfileName.newBuilder().putAllPrefetchedFiles(tmpMap)
41+
.build();
42+
}
43+
44+
static Map<String, Pair<String, Long>>
45+
fromPB(Map<String, PersistentPrefetchProtos.RegionFileSizeMap> prefetchHFileNames) {
46+
Map<String, Pair<String, Long>> hFileMap = new HashMap<>();
47+
prefetchHFileNames.forEach((hFileName, regionPrefetchMap) -> {
48+
hFileMap.put(hFileName,
49+
new Pair<>(regionPrefetchMap.getRegionName(), regionPrefetchMap.getRegionPrefetchSize()));
50+
});
51+
return hFileMap;
3252
}
3353
}

hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.io.hfile.bucket;
1919

20+
import static org.apache.hadoop.hbase.regionserver.HRegionFileSystem.REGION_INFO_FILE;
2021
import static org.junit.Assert.assertFalse;
2122
import static org.junit.Assert.assertTrue;
2223

@@ -106,8 +107,8 @@ public void testPrefetchPersistenceCrash() throws Exception {
106107
CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
107108
FileSystem fs = HFileSystem.get(conf);
108109
// Load Cache
109-
Path storeFile = writeStoreFile("TestPrefetch0", conf, cacheConf, fs);
110-
Path storeFile2 = writeStoreFile("TestPrefetch1", conf, cacheConf, fs);
110+
Path storeFile = writeStoreFile("Region0", "TestPrefetch0", conf, cacheConf, fs);
111+
Path storeFile2 = writeStoreFile("Region1", "TestPrefetch1", conf, cacheConf, fs);
111112
readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache);
112113
readStoreFile(storeFile2, 0, fs, cacheConf, conf, bucketCache);
113114
Thread.sleep(bucketCachePersistInterval);
@@ -126,7 +127,7 @@ public void testPrefetchPersistenceCrashNegative() throws Exception {
126127
CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
127128
FileSystem fs = HFileSystem.get(conf);
128129
// Load Cache
129-
Path storeFile = writeStoreFile("TestPrefetch2", conf, cacheConf, fs);
130+
Path storeFile = writeStoreFile("Region2", "TestPrefetch2", conf, cacheConf, fs);
130131
readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache);
131132
assertFalse(new File(testDir + "/prefetch.persistence").exists());
132133
assertFalse(new File(testDir + "/bucket.persistence").exists());
@@ -140,14 +141,18 @@ public void testPrefetchListUponBlockEviction() throws Exception {
140141
CacheConfig cacheConf = new CacheConfig(conf, bucketCache1);
141142
FileSystem fs = HFileSystem.get(conf);
142143
// Load Blocks in cache
143-
Path storeFile = writeStoreFile("TestPrefetch3", conf, cacheConf, fs);
144+
Path storeFile = writeStoreFile("Region3", "TestPrefetch3", conf, cacheConf, fs);
144145
readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache1);
145146
Thread.sleep(500);
146147
// Evict Blocks from cache
147148
BlockCacheKey bucketCacheKey = bucketCache1.backingMap.entrySet().iterator().next().getKey();
148149
assertTrue(PrefetchExecutor.isFilePrefetched(storeFile.getName()));
150+
int initialRegionPrefetchInfoSize = PrefetchExecutor.getRegionPrefetchInfo().size();
151+
assertTrue(initialRegionPrefetchInfoSize > 0);
149152
bucketCache1.evictBlock(bucketCacheKey);
150153
assertFalse(PrefetchExecutor.isFilePrefetched(storeFile.getName()));
154+
int newRegionPrefetchInfoSize = PrefetchExecutor.getRegionPrefetchInfo().size();
155+
assertTrue(initialRegionPrefetchInfoSize - newRegionPrefetchInfoSize == 1);
151156
}
152157

153158
public void readStoreFile(Path storeFilePath, long offset, FileSystem fs, CacheConfig cacheConf,
@@ -172,9 +177,12 @@ public void readStoreFile(Path storeFilePath, long offset, FileSystem fs, CacheC
172177
}
173178
}
174179

175-
public Path writeStoreFile(String fname, Configuration conf, CacheConfig cacheConf, FileSystem fs)
176-
throws IOException {
177-
Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
180+
public Path writeStoreFile(String regionName, String fname, Configuration conf,
181+
CacheConfig cacheConf, FileSystem fs) throws IOException {
182+
// Create store files as per the following directory structure
183+
// <region name>/<column family>/<hFile>
184+
Path regionDir = new Path(TEST_UTIL.getDataTestDir(), regionName);
185+
Path storeFileParentDir = new Path(regionDir, fname);
178186
HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
179187
StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
180188
.withOutputDir(storeFileParentDir).withFileContext(meta).build();
@@ -190,6 +198,18 @@ public Path writeStoreFile(String fname, Configuration conf, CacheConfig cacheCo
190198
}
191199

192200
sfw.close();
201+
202+
// Create a dummy .regioninfo file as the PrefetchExecutor needs it to find out the region
203+
// name to be added to the prefetch file list
204+
Path regionInfoFilePath = new Path(regionDir, REGION_INFO_FILE);
205+
File regionInfoFile = new File(regionInfoFilePath.toString());
206+
try {
207+
if (!regionInfoFile.createNewFile()) {
208+
assertFalse("Unable to create .regioninfo file", true);
209+
}
210+
} catch (IOException e) {
211+
e.printStackTrace();
212+
}
193213
return sfw.getPath();
194214
}
195215

hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.io.hfile.bucket;
1919

20+
import static org.apache.hadoop.hbase.regionserver.HRegionFileSystem.REGION_INFO_FILE;
2021
import static org.junit.Assert.assertEquals;
2122
import static org.junit.Assert.assertFalse;
2223
import static org.junit.Assert.assertNotEquals;
@@ -123,14 +124,16 @@ public void testPrefetchPersistence() throws Exception {
123124
assertEquals(0, usedSize);
124125
assertTrue(new File(testDir + "/bucket.cache").exists());
125126
// Load Cache
126-
Path storeFile = writeStoreFile("TestPrefetch0");
127-
Path storeFile2 = writeStoreFile("TestPrefetch1");
127+
Path storeFile = writeStoreFile("Region0", "TestPrefetch0");
128+
Path storeFile2 = writeStoreFile("Region1", "TestPrefetch1");
128129
readStoreFile(storeFile, 0);
129130
readStoreFile(storeFile2, 0);
130131
usedSize = bucketCache.getAllocator().getUsedSize();
131132
assertNotEquals(0, usedSize);
132133

133134
bucketCache.shutdown();
135+
// Reset the info maintained in PrefetchExecutor
136+
PrefetchExecutor.reset();
134137
assertTrue(new File(testDir + "/bucket.persistence").exists());
135138
assertTrue(new File(testDir + "/prefetch.persistence").exists());
136139
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
@@ -149,8 +152,12 @@ public void testPrefetchPersistence() throws Exception {
149152
public void closeStoreFile(Path path) throws Exception {
150153
HFile.Reader reader = HFile.createReader(fs, path, cacheConf, true, conf);
151154
assertTrue(PrefetchExecutor.isFilePrefetched(path.getName()));
155+
int initialRegionPrefetchInfoSize = PrefetchExecutor.getRegionPrefetchInfo().size();
156+
assertTrue(initialRegionPrefetchInfoSize > 0);
152157
reader.close(true);
153158
assertFalse(PrefetchExecutor.isFilePrefetched(path.getName()));
159+
int newRegionPrefetchInfoSize = PrefetchExecutor.getRegionPrefetchInfo().size();
160+
assertTrue(initialRegionPrefetchInfoSize - newRegionPrefetchInfoSize == 1);
154161
}
155162

156163
public void readStoreFile(Path storeFilePath, long offset) throws Exception {
@@ -174,8 +181,11 @@ public void readStoreFile(Path storeFilePath, long offset) throws Exception {
174181
}
175182
}
176183

177-
public Path writeStoreFile(String fname) throws IOException {
178-
Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
184+
public Path writeStoreFile(String regionName, String fname) throws IOException {
185+
// Create store files as per the following directory structure
186+
// <region name>/<column family>/<hFile>
187+
Path regionDir = new Path(TEST_UTIL.getDataTestDir(), regionName);
188+
Path storeFileParentDir = new Path(regionDir, fname);
179189
HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
180190
StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
181191
.withOutputDir(storeFileParentDir).withFileContext(meta).build();
@@ -191,6 +201,19 @@ public Path writeStoreFile(String fname) throws IOException {
191201
}
192202

193203
sfw.close();
204+
205+
// Create a dummy .regioninfo file as the PrefetchExecutor needs it to find out the region name
206+
// to be added to the prefetch file list
207+
Path regionInfoFilePath = new Path(regionDir, REGION_INFO_FILE);
208+
File regionInfoFile = new File(regionInfoFilePath.toString());
209+
LOG.info("Create file: {}", regionInfoFilePath);
210+
try {
211+
if (!regionInfoFile.exists() && !regionInfoFile.createNewFile()) {
212+
assertFalse("Unable to create .regioninfo file", true);
213+
}
214+
} catch (IOException e) {
215+
e.printStackTrace();
216+
}
194217
return sfw.getPath();
195218
}
196219

0 commit comments

Comments
 (0)