Skip to content

Commit 16e9aff

Browse files
authored
HBASE-28292 Make Delay prefetch property to be dynamically configured (apache#5605)
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org> Signed-off-by: Peter Somogyi <psomogyi@apache.org>
1 parent 4c29c5d commit 16e9aff

File tree

6 files changed

+230
-8
lines changed

6 files changed

+230
-8
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,8 @@ HFileScanner getScanner(Configuration conf, boolean cacheBlocks, boolean pread,
457457

458458
boolean prefetchComplete();
459459

460+
boolean prefetchStarted();
461+
460462
/**
461463
* To close the stream's socket. Note: This can be concurrently called from multiple threads and
462464
* implementation should take care of thread safety.

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1658,6 +1658,15 @@ public boolean prefetchComplete() {
16581658
return PrefetchExecutor.isCompleted(path);
16591659
}
16601660

1661+
/**
1662+
* Returns true if block prefetching was started after waiting for specified delay, false
1663+
* otherwise
1664+
*/
1665+
@Override
1666+
public boolean prefetchStarted() {
1667+
return PrefetchExecutor.isPrefetchStarted();
1668+
}
1669+
16611670
/**
16621671
* Create a Scanner on this file. No seeks or reads are done on creation. Call
16631672
* {@link HFileScanner#seekTo(Cell)} to position an start the read. There is nothing to clean up

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java

Lines changed: 82 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,18 @@
1717
*/
1818
package org.apache.hadoop.hbase.io.hfile;
1919

20+
import com.google.errorprone.annotations.RestrictedApi;
2021
import java.util.Map;
2122
import java.util.concurrent.ConcurrentSkipListMap;
2223
import java.util.concurrent.Future;
2324
import java.util.concurrent.RejectedExecutionException;
2425
import java.util.concurrent.ScheduledExecutorService;
26+
import java.util.concurrent.ScheduledFuture;
2527
import java.util.concurrent.ScheduledThreadPoolExecutor;
2628
import java.util.concurrent.ThreadFactory;
2729
import java.util.concurrent.ThreadLocalRandom;
2830
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.atomic.AtomicBoolean;
2932
import java.util.regex.Pattern;
3033
import org.apache.hadoop.conf.Configuration;
3134
import org.apache.hadoop.fs.Path;
@@ -41,23 +44,30 @@
4144
public final class PrefetchExecutor {
4245

4346
private static final Logger LOG = LoggerFactory.getLogger(PrefetchExecutor.class);
47+
/** Wait time in miliseconds before executing prefetch */
48+
public static final String PREFETCH_DELAY = "hbase.hfile.prefetch.delay";
49+
public static final String PREFETCH_DELAY_VARIATION = "hbase.hfile.prefetch.delay.variation";
50+
public static final float PREFETCH_DELAY_VARIATION_DEFAULT_VALUE = 0.2f;
4451

4552
/** Futures for tracking block prefetch activity */
4653
private static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>();
54+
/** Runnables for resetting the prefetch activity */
55+
private static final Map<Path, Runnable> prefetchRunnable = new ConcurrentSkipListMap<>();
4756
/** Executor pool shared among all HFiles for block prefetch */
4857
private static final ScheduledExecutorService prefetchExecutorPool;
4958
/** Delay before beginning prefetch */
50-
private static final int prefetchDelayMillis;
59+
private static int prefetchDelayMillis;
5160
/** Variation in prefetch delay times, to mitigate stampedes */
52-
private static final float prefetchDelayVariation;
61+
private static float prefetchDelayVariation;
5362
static {
5463
// Consider doing this on demand with a configuration passed in rather
5564
// than in a static initializer.
5665
Configuration conf = HBaseConfiguration.create();
5766
// 1s here for tests, consider 30s in hbase-default.xml
5867
// Set to 0 for no delay
59-
prefetchDelayMillis = conf.getInt("hbase.hfile.prefetch.delay", 1000);
60-
prefetchDelayVariation = conf.getFloat("hbase.hfile.prefetch.delay.variation", 0.2f);
68+
prefetchDelayMillis = conf.getInt(PREFETCH_DELAY, 1000);
69+
prefetchDelayVariation =
70+
conf.getFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE);
6171
int prefetchThreads = conf.getInt("hbase.hfile.thread.prefetch", 4);
6272
prefetchExecutorPool = new ScheduledThreadPoolExecutor(prefetchThreads, new ThreadFactory() {
6373
@Override
@@ -95,15 +105,18 @@ public static void request(Path path, Runnable runnable) {
95105
final Future<?> future =
96106
prefetchExecutorPool.schedule(tracedRunnable, delay, TimeUnit.MILLISECONDS);
97107
prefetchFutures.put(path, future);
108+
prefetchRunnable.put(path, runnable);
98109
} catch (RejectedExecutionException e) {
99110
prefetchFutures.remove(path);
111+
prefetchRunnable.remove(path);
100112
LOG.warn("Prefetch request rejected for {}", path);
101113
}
102114
}
103115
}
104116

105117
public static void complete(Path path) {
106118
prefetchFutures.remove(path);
119+
prefetchRunnable.remove(path);
107120
if (LOG.isDebugEnabled()) {
108121
LOG.debug("Prefetch completed for {}", path.getName());
109122
}
@@ -115,23 +128,85 @@ public static void cancel(Path path) {
115128
// ok to race with other cancellation attempts
116129
future.cancel(true);
117130
prefetchFutures.remove(path);
131+
prefetchRunnable.remove(path);
118132
LOG.debug("Prefetch cancelled for {}", path);
119133
}
120134
}
121135

122-
public static boolean isCompleted(Path path) {
136+
public static void interrupt(Path path) {
123137
Future<?> future = prefetchFutures.get(path);
124138
if (future != null) {
125-
return future.isDone();
139+
prefetchFutures.remove(path);
140+
// ok to race with other cancellation attempts
141+
future.cancel(true);
142+
LOG.debug("Prefetch cancelled for {}", path);
126143
}
127-
return true;
128144
}
129145

130146
private PrefetchExecutor() {
131147
}
132148

149+
public static boolean isCompleted(Path path) {
150+
Future<?> future = prefetchFutures.get(path);
151+
if (future != null) {
152+
return future.isDone();
153+
}
154+
return true;
155+
}
156+
133157
/* Visible for testing only */
158+
@RestrictedApi(explanation = "Should only be called in tests", link = "",
159+
allowedOnPath = ".*/src/test/.*")
134160
static ScheduledExecutorService getExecutorPool() {
135161
return prefetchExecutorPool;
136162
}
163+
164+
@RestrictedApi(explanation = "Should only be called in tests", link = "",
165+
allowedOnPath = ".*/src/test/.*")
166+
static Map<Path, Future<?>> getPrefetchFutures() {
167+
return prefetchFutures;
168+
}
169+
170+
@RestrictedApi(explanation = "Should only be called in tests", link = "",
171+
allowedOnPath = ".*/src/test/.*")
172+
static Map<Path, Runnable> getPrefetchRunnable() {
173+
return prefetchRunnable;
174+
}
175+
176+
static boolean isPrefetchStarted() {
177+
AtomicBoolean prefetchStarted = new AtomicBoolean(false);
178+
for (Map.Entry<Path, Future<?>> entry : prefetchFutures.entrySet()) {
179+
Path k = entry.getKey();
180+
Future<?> v = entry.getValue();
181+
ScheduledFuture sf = (ScheduledFuture) prefetchFutures.get(k);
182+
long waitTime = sf.getDelay(TimeUnit.MILLISECONDS);
183+
if (waitTime < 0) {
184+
// At this point prefetch is started
185+
prefetchStarted.set(true);
186+
break;
187+
}
188+
}
189+
return prefetchStarted.get();
190+
}
191+
192+
public static int getPrefetchDelay() {
193+
return prefetchDelayMillis;
194+
}
195+
196+
public static void loadConfiguration(Configuration conf) {
197+
prefetchDelayMillis = conf.getInt(PREFETCH_DELAY, 1000);
198+
prefetchDelayVariation =
199+
conf.getFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE);
200+
prefetchFutures.forEach((k, v) -> {
201+
ScheduledFuture sf = (ScheduledFuture) prefetchFutures.get(k);
202+
if (!(sf.getDelay(TimeUnit.MILLISECONDS) > 0)) {
203+
// the thread is still pending delay expiration and has not started to run yet, so can be
204+
// re-scheduled at no cost.
205+
interrupt(k);
206+
request(k, prefetchRunnable.get(k));
207+
}
208+
LOG.debug("Reset called on Prefetch of file {} with delay {}, delay variation {}", k,
209+
prefetchDelayMillis, prefetchDelayVariation);
210+
});
211+
}
137212
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,9 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
495495
*/
496496
private ReplicationMarkerChore replicationMarkerChore;
497497

498+
// A timer submit requests to the PrefetchExecutor
499+
private PrefetchExecutorNotifier prefetchExecutorNotifier;
500+
498501
/**
499502
* Starts a HRegionServer at the default location.
500503
* <p/>
@@ -2039,6 +2042,9 @@ private void initializeThreads() {
20392042
// Compaction thread
20402043
this.compactSplitThread = new CompactSplit(this);
20412044

2045+
// Prefetch Notifier
2046+
this.prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf);
2047+
20422048
// Background thread to check for compactions; needed if region has not gotten updates
20432049
// in a while. It will take care of not checking too frequently on store-by-store basis.
20442050
this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this);
@@ -2128,6 +2134,7 @@ private void registerConfigurationObservers() {
21282134
configurationManager.registerObserver(this.compactSplitThread);
21292135
configurationManager.registerObserver(this.cacheFlusher);
21302136
configurationManager.registerObserver(this.rpcServices);
2137+
configurationManager.registerObserver(this.prefetchExecutorNotifier);
21312138
configurationManager.registerObserver(this);
21322139
}
21332140

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.regionserver;
19+
20+
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.hbase.conf.ConfigurationManager;
22+
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
23+
import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor;
24+
import org.apache.yetus.audience.InterfaceAudience;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
/**
29+
* Class to submit requests for PrefetchExecutor depending on configuration change
30+
*/
31+
@InterfaceAudience.Private
32+
public final class PrefetchExecutorNotifier implements PropagatingConfigurationObserver {
33+
private static final Logger LOG = LoggerFactory.getLogger(PrefetchExecutorNotifier.class);
34+
35+
/** Wait time in miliseconds before executing prefetch */
36+
public static final String PREFETCH_DELAY = "hbase.hfile.prefetch.delay";
37+
private final Configuration conf;
38+
39+
// only for test
40+
public PrefetchExecutorNotifier(Configuration conf) {
41+
this.conf = conf;
42+
}
43+
44+
/**
45+
* {@inheritDoc}
46+
*/
47+
@Override
48+
public void onConfigurationChange(Configuration newConf) {
49+
// Update prefetch delay in the prefetch executor class
50+
// interrupt and restart threads which have not started executing
51+
PrefetchExecutor.loadConfiguration(conf);
52+
LOG.info("Config hbase.hfile.prefetch.delay is changed to {}",
53+
conf.getInt(PREFETCH_DELAY, 1000));
54+
}
55+
56+
/**
57+
* {@inheritDoc}
58+
*/
59+
@Override
60+
public void registerChildren(ConfigurationManager manager) {
61+
// No children to register.
62+
}
63+
64+
/**
65+
* {@inheritDoc}
66+
*/
67+
@Override
68+
public void deregisterChildren(ConfigurationManager manager) {
69+
// No children to register
70+
}
71+
72+
public int getPrefetchDelay() {
73+
return PrefetchExecutor.getPrefetchDelay();
74+
}
75+
}

hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
2121
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
2222
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY;
23+
import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY;
24+
import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY_VARIATION;
25+
import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY_VARIATION_DEFAULT_VALUE;
2326
import static org.apache.hadoop.hbase.regionserver.CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION;
2427
import static org.hamcrest.MatcherAssert.assertThat;
2528
import static org.hamcrest.Matchers.allOf;
@@ -65,6 +68,7 @@
6568
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
6669
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
6770
import org.apache.hadoop.hbase.regionserver.HStoreFile;
71+
import org.apache.hadoop.hbase.regionserver.PrefetchExecutorNotifier;
6872
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
6973
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
7074
import org.apache.hadoop.hbase.regionserver.TestHStoreFile;
@@ -95,7 +99,6 @@ public class TestPrefetch {
9599
private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2;
96100
private static final int DATA_BLOCK_SIZE = 2048;
97101
private static final int NUM_KV = 1000;
98-
99102
private Configuration conf;
100103
private CacheConfig cacheConf;
101104
private FileSystem fs;
@@ -336,6 +339,54 @@ public void testPrefetchDoesntSkipRefs() throws Exception {
336339
});
337340
}
338341

342+
@Test
343+
public void testOnConfigurationChange() {
344+
PrefetchExecutorNotifier prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf);
345+
conf.setInt(PREFETCH_DELAY, 40000);
346+
prefetchExecutorNotifier.onConfigurationChange(conf);
347+
assertEquals(prefetchExecutorNotifier.getPrefetchDelay(), 40000);
348+
349+
// restore
350+
conf.setInt(PREFETCH_DELAY, 30000);
351+
prefetchExecutorNotifier.onConfigurationChange(conf);
352+
assertEquals(prefetchExecutorNotifier.getPrefetchDelay(), 30000);
353+
354+
conf.setInt(PREFETCH_DELAY, 1000);
355+
prefetchExecutorNotifier.onConfigurationChange(conf);
356+
}
357+
358+
@Test
359+
public void testPrefetchWithDelay() throws Exception {
360+
// Configure custom delay
361+
PrefetchExecutorNotifier prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf);
362+
conf.setInt(PREFETCH_DELAY, 25000);
363+
conf.setFloat(PREFETCH_DELAY_VARIATION, 0.0f);
364+
prefetchExecutorNotifier.onConfigurationChange(conf);
365+
366+
HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ)
367+
.withBlockSize(DATA_BLOCK_SIZE).build();
368+
Path storeFile = writeStoreFile("TestPrefetchWithDelay", context);
369+
HFile.Reader reader = HFile.createReader(fs, storeFile, cacheConf, true, conf);
370+
long startTime = System.currentTimeMillis();
371+
372+
// Wait for 20 seconds, no thread should start prefetch
373+
Thread.sleep(20000);
374+
assertFalse("Prefetch threads should not be running at this point", reader.prefetchStarted());
375+
while (!reader.prefetchStarted()) {
376+
assertTrue("Prefetch delay has not been expired yet",
377+
getElapsedTime(startTime) < PrefetchExecutor.getPrefetchDelay());
378+
}
379+
if (reader.prefetchStarted()) {
380+
// Added some delay as we have started the timer a bit late.
381+
Thread.sleep(500);
382+
assertTrue("Prefetch should start post configured delay",
383+
getElapsedTime(startTime) > PrefetchExecutor.getPrefetchDelay());
384+
}
385+
conf.setInt(PREFETCH_DELAY, 1000);
386+
conf.setFloat(PREFETCH_DELAY_VARIATION, PREFETCH_DELAY_VARIATION_DEFAULT_VALUE);
387+
prefetchExecutorNotifier.onConfigurationChange(conf);
388+
}
389+
339390
@Test
340391
public void testPrefetchDoesntSkipHFileLink() throws Exception {
341392
testPrefetchWhenHFileLink(c -> {
@@ -490,4 +541,7 @@ public static KeyValue.Type generateKeyType(Random rand) {
490541
}
491542
}
492543

544+
private long getElapsedTime(long startTime) {
545+
return System.currentTimeMillis() - startTime;
546+
}
493547
}

0 commit comments

Comments
 (0)