Skip to content

Commit 58cb1f4

Browse files
authored
HBASE-27686: Recovery of BucketCache and Prefetched data after RS Crash (#5080)
Signed-off-by: Wellington Ramos Chevreuil <wchevreuil@apache.org>
1 parent 22b0c3e commit 58cb1f4

File tree

6 files changed

+290
-19
lines changed

6 files changed

+290
-19
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,12 @@ public class CacheConfig {
9595

9696
public static final String PREFETCH_PERSISTENCE_PATH_KEY = "hbase.prefetch.file.list.path";
9797

98+
/**
99+
* Configuration key to set interval for persisting bucket cache to disk.
100+
*/
101+
public static final String BUCKETCACHE_PERSIST_INTERVAL_KEY =
102+
"hbase.bucketcache.persist.intervalinmillis";
103+
98104
// Defaults
99105
public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
100106
public static final boolean DEFAULT_CACHE_DATA_ON_WRITE = false;

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ public static void persistToFile(String path) throws IOException {
154154
throw new IOException("Error persisting prefetched HFiles set!");
155155
}
156156
if (!prefetchCompleted.isEmpty()) {
157-
try (FileOutputStream fos = new FileOutputStream(prefetchedFileListPath, true)) {
157+
try (FileOutputStream fos = new FileOutputStream(prefetchedFileListPath, false)) {
158158
PrefetchProtoUtils.toPB(prefetchCompleted).writeDelimitedTo(fos);
159159
}
160160
}

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.io.hfile.bucket;
1919

20+
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
2021
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY;
2122

2223
import java.io.File;
@@ -178,6 +179,7 @@ public class BucketCache implements BlockCache, HeapSize {
178179
private final BucketCacheStats cacheStats = new BucketCacheStats();
179180

180181
private final String persistencePath;
182+
static AtomicBoolean isCacheInconsistent = new AtomicBoolean(false);
181183
private final long cacheCapacity;
182184
/** Approximate block size */
183185
private final long blockSize;
@@ -237,6 +239,8 @@ public class BucketCache implements BlockCache, HeapSize {
237239

238240
private String prefetchedFileListPath;
239241

242+
private long bucketcachePersistInterval;
243+
240244
private static final String FILE_VERIFY_ALGORITHM =
241245
"hbase.bucketcache.persistent.file.integrity.check.algorithm";
242246
private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
@@ -288,6 +292,7 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
288292
this.queueAdditionWaitTime =
289293
conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME);
290294
this.prefetchedFileListPath = conf.get(PREFETCH_PERSISTENCE_PATH_KEY);
295+
this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000);
291296

292297
sanityCheckConfigs();
293298

@@ -314,6 +319,7 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
314319
this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity);
315320

316321
if (ioEngine.isPersistent() && persistencePath != null) {
322+
startBucketCachePersisterThread();
317323
try {
318324
retrieveFromFile(bucketSizes);
319325
} catch (IOException ioex) {
@@ -370,6 +376,12 @@ protected void startWriterThreads() {
370376
}
371377
}
372378

379+
void startBucketCachePersisterThread() {
380+
BucketCachePersister cachePersister =
381+
new BucketCachePersister(this, bucketcachePersistInterval);
382+
cachePersister.start();
383+
}
384+
373385
boolean isCacheEnabled() {
374386
return this.cacheEnabled;
375387
}
@@ -597,6 +609,9 @@ void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decre
597609
if (evictedByEvictionProcess) {
598610
cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
599611
}
612+
if (ioEngine.isPersistent()) {
613+
setCacheInconsistent(true);
614+
}
600615
}
601616

602617
/**
@@ -721,6 +736,14 @@ protected boolean removeFromRamCache(BlockCacheKey cacheKey) {
721736
});
722737
}
723738

739+
public boolean isCacheInconsistent() {
740+
return isCacheInconsistent.get();
741+
}
742+
743+
public void setCacheInconsistent(boolean setCacheInconsistent) {
744+
isCacheInconsistent.set(setCacheInconsistent);
745+
}
746+
724747
/*
725748
* Statistics thread. Periodically output cache statistics to the log.
726749
*/
@@ -1167,6 +1190,9 @@ void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws Inte
11671190
// Only add if non-null entry.
11681191
if (bucketEntries[i] != null) {
11691192
putIntoBackingMap(key, bucketEntries[i]);
1193+
if (ioEngine.isPersistent()) {
1194+
setCacheInconsistent(true);
1195+
}
11701196
}
11711197
// Always remove from ramCache even if we failed adding it to the block cache above.
11721198
boolean existed = ramCache.remove(key, re -> {
@@ -1216,8 +1242,7 @@ static List<RAMQueueEntry> getRAMQueueEntries(BlockingQueue<RAMQueueEntry> q,
12161242
*/
12171243
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION",
12181244
justification = "false positive, try-with-resources ensures close is called.")
1219-
private void persistToFile() throws IOException {
1220-
assert !cacheEnabled;
1245+
void persistToFile() throws IOException {
12211246
if (!ioEngine.isPersistent()) {
12221247
throw new IOException("Attempt to persist non-persistent cache mappings!");
12231248
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.io.hfile.bucket;
19+
20+
import java.io.IOException;
21+
import org.apache.yetus.audience.InterfaceAudience;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
@InterfaceAudience.Private
26+
public class BucketCachePersister extends Thread {
27+
private final BucketCache cache;
28+
private final long intervalMillis;
29+
private static final Logger LOG = LoggerFactory.getLogger(BucketCachePersister.class);
30+
31+
public BucketCachePersister(BucketCache cache, long intervalMillis) {
32+
super("bucket-cache-persister");
33+
this.cache = cache;
34+
this.intervalMillis = intervalMillis;
35+
LOG.info("BucketCachePersister started with interval: " + intervalMillis);
36+
}
37+
38+
public void run() {
39+
while (true) {
40+
try {
41+
Thread.sleep(intervalMillis);
42+
if (cache.isCacheInconsistent()) {
43+
LOG.debug("Cache is inconsistent, persisting to disk");
44+
cache.persistToFile();
45+
cache.setCacheInconsistent(false);
46+
}
47+
} catch (IOException | InterruptedException e) {
48+
LOG.warn("Exception in BucketCachePersister" + e.getMessage());
49+
}
50+
}
51+
}
52+
}

hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ public void setup() throws Exception {
8383
}
8484

8585
@Test
86-
public void testRegionClosePrefetchPersistence() throws Exception {
86+
public void testPrefetchPersistence() throws Exception {
87+
8788
// Write to table and flush
8889
TableName tableName = TableName.valueOf("table1");
8990
byte[] row0 = Bytes.toBytes("row1");
@@ -107,8 +108,14 @@ public void testRegionClosePrefetchPersistence() throws Exception {
107108
table.put(put1);
108109
TEST_UTIL.flush(tableName);
109110
} finally {
110-
Thread.sleep(1000);
111+
Thread.sleep(1500);
111112
}
113+
114+
// Default interval for cache persistence is 1000ms. So after 1000ms, both the persistence files
115+
// should exist.
116+
assertTrue(new File(testDir + "/bucket.persistence").exists());
117+
assertTrue(new File(testDir + "/prefetch.persistence").exists());
118+
112119
// Stop the RS
113120
cluster.stopRegionServer(0);
114121
LOG.info("Stopped Region Server 0.");
@@ -118,27 +125,14 @@ public void testRegionClosePrefetchPersistence() throws Exception {
118125

119126
// Start the RS and validate
120127
cluster.startRegionServer();
121-
Thread.sleep(1000);
122-
assertFalse(new File(testDir + "/prefetch.persistence").exists());
123-
assertFalse(new File(testDir + "/bucket.persistence").exists());
124-
}
125-
126-
@Test
127-
public void testPrefetchPersistenceNegative() throws Exception {
128-
cluster.stopRegionServer(0);
129-
LOG.info("Stopped Region Server 0.");
130-
Thread.sleep(1000);
131-
assertFalse(new File(testDir + "/prefetch.persistence").exists());
132-
assertTrue(new File(testDir + "/bucket.persistence").exists());
133-
cluster.startRegionServer();
134-
Thread.sleep(1000);
135128
assertFalse(new File(testDir + "/prefetch.persistence").exists());
136129
assertFalse(new File(testDir + "/bucket.persistence").exists());
137130
}
138131

139132
@After
140133
public void tearDown() throws Exception {
141134
TEST_UTIL.shutdownMiniCluster();
135+
TEST_UTIL.cleanupDataTestDirOnTestFS(String.valueOf(testDir));
142136
if (zkCluster != null) {
143137
zkCluster.shutdown();
144138
}

0 commit comments

Comments
 (0)