Skip to content

CDPD-60490 Make Delay prefetch property to be dynamically configured #5449

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,21 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public final class PrefetchExecutor {
public final class PrefetchExecutor implements PropagatingConfigurationObserver {

private static final Logger LOG = LoggerFactory.getLogger(PrefetchExecutor.class);
public static final String PREFETCH_EXECUTE_DELAY = "hbase.hfile.prefetch.execute.delay";
/** Wait time in miliseconds before executing prefetch */
private static int prefetchExecuteDelay;

/** Futures for tracking block prefetch activity */
private static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>();
Expand All @@ -59,6 +64,7 @@ public final class PrefetchExecutor {
prefetchDelayMillis = conf.getInt("hbase.hfile.prefetch.delay", 1000);
prefetchDelayVariation = conf.getFloat("hbase.hfile.prefetch.delay.variation", 0.2f);
int prefetchThreads = conf.getInt("hbase.hfile.thread.prefetch", 4);
prefetchExecuteDelay = conf.getInt(PREFETCH_EXECUTE_DELAY, 0);
prefetchExecutorPool = new ScheduledThreadPoolExecutor(prefetchThreads, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Expand All @@ -73,15 +79,17 @@ public Thread newThread(Runnable r) {
// TODO: We want HFile, which is where the blockcache lives, to handle
// prefetching of file blocks but the Store level is where path convention
// knowledge should be contained
private static final Pattern prefetchPathExclude =
private static final Pattern prefetchPathExclude =`
Pattern.compile("(" + Path.SEPARATOR_CHAR + HConstants.HBASE_TEMP_DIRECTORY.replace(".", "\\.")
+ Path.SEPARATOR_CHAR + ")|(" + Path.SEPARATOR_CHAR
+ HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") + Path.SEPARATOR_CHAR + ")");

public static void request(Path path, Runnable runnable) {
if (!prefetchPathExclude.matcher(path.toString()).find()) {
long delay;
if (prefetchDelayMillis > 0) {
if (prefetchExecuteDelay > 0) {
delay = prefetchExecuteDelay;
} else if (prefetchDelayMillis > 0) {
delay = (long) ((prefetchDelayMillis * (1.0f - (prefetchDelayVariation / 2)))
+ (prefetchDelayMillis * (prefetchDelayVariation / 2)
* ThreadLocalRandom.current().nextFloat()));
Expand Down Expand Up @@ -127,6 +135,28 @@ public static boolean isCompleted(Path path) {
return true;
}

private PrefetchExecutor() {
@Override
public void onConfigurationChange(Configuration conf) {
LOG.debug("PrefetchExecutor.onConfigurationChange");
PrefetchExecutor.loadConfiguration(conf);
}

@Override
public void registerChildren(ConfigurationManager manager) {
LOG.debug("PrefetchExecutor.registerChildren");
}

@Override
public void deregisterChildren(ConfigurationManager manager) {
LOG.debug("PrefetchExecutor.deregisterChildren");
}

public static int getPrefetchExecuteDelay() {
return prefetchExecuteDelay;
}

public static void loadConfiguration(Configuration conf) {
LOG.debug("PrefetchExecutor.loadConfiguration");
prefetchExecuteDelay = conf.getInt(PREFETCH_EXECUTE_DELAY, 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY;
import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.*;
import static org.apache.hadoop.hbase.regionserver.CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
Expand Down Expand Up @@ -264,6 +265,42 @@ public void testPrefetchDoesntSkipHFileLink() throws Exception {
});
}

@Test
public void testOnConfigurationChange() {
// change PREFETCH_DELAY_ENABLE_KEY from false to true
conf.setInt(PREFETCH_EXECUTE_DELAY, 2000);
PrefetchExecutor.loadConfiguration(conf);
assertTrue(getPrefetchExecuteDelay() == 2000);

// restore
conf.setInt(PREFETCH_EXECUTE_DELAY, 0);
PrefetchExecutor.loadConfiguration(conf);
assertTrue(getPrefetchExecuteDelay() == 0);
}

@Test
public void testPrefetchWithDelay() throws Exception {
conf.setInt(PREFETCH_EXECUTE_DELAY, 2000);
PrefetchExecutor.loadConfiguration(conf);

HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ)
.withBlockSize(DATA_BLOCK_SIZE).build();
Path storeFile = writeStoreFile("TestPrefetchWithDelay", context);
readStoreFile(storeFile);
conf.setInt(PREFETCH_EXECUTE_DELAY, 0);
}

@Test
public void testPrefetchWithDefaultDelay() throws Exception {
conf.setInt(PREFETCH_EXECUTE_DELAY, 0);
PrefetchExecutor.loadConfiguration(conf);

HFileContext context = new HFileContextBuilder().withCompression(Compression.Algorithm.GZ)
.withBlockSize(DATA_BLOCK_SIZE).build();
Path storeFile = writeStoreFile("TestPrefetchWithDelay", context);
readStoreFile(storeFile);
}

private void testPrefetchWhenRefs(boolean compactionEnabled, Consumer<Cacheable> test)
throws Exception {
cacheConf = new CacheConfig(conf, blockCache);
Expand Down