Skip to content

HBASE-26782 Minor code cleanup in and around RpcExecutor #4144

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -52,7 +52,7 @@ public BalancedQueueRpcExecutor(final String name, final int handlerCount,
}

@Override
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
public boolean dispatch(final CallRunner callTask) {
int queueIndex = balancer.getNextQueue(callTask);
BlockingQueue<CallRunner> queue = queues.get(queueIndex);
// that means we can overflow by at most <num reader> size (5), that's ok
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -20,9 +20,7 @@
import java.util.Deque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -65,7 +63,7 @@ protected RpcHandler getHandler(final String name, final double handlerFailureTh
}

@Override
public boolean dispatch(CallRunner callTask) throws InterruptedException {
public boolean dispatch(CallRunner callTask) {
//FastPathHandlers don't check queue limits, so if we're completely shut down
//we have to prevent ourselves from using the handler in the first place
if (currentQueueLimit == 0){
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
/**

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -27,8 +26,6 @@
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* RPC Executor that extends {@link RWQueueRpcExecutor} with fast-path feature, used in
Expand All @@ -37,7 +34,6 @@
@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
@InterfaceStability.Evolving
public class FastPathRWQueueRpcExecutor extends RWQueueRpcExecutor {
private static final Logger LOG = LoggerFactory.getLogger(RWQueueRpcExecutor.class);

private final Deque<FastPathRpcHandler> readHandlerStack = new ConcurrentLinkedDeque<>();
private final Deque<FastPathRpcHandler> writeHandlerStack = new ConcurrentLinkedDeque<>();
Expand All @@ -60,7 +56,7 @@ protected RpcHandler getHandler(final String name, final double handlerFailureTh
}

@Override
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
public boolean dispatch(final CallRunner callTask) {
RpcCall call = callTask.getRpcCall();
boolean shouldDispatchToWriteQueue = isWriteRequest(call.getHeader(), call.getParam());
boolean shouldDispatchToScanQueue = shouldDispatchToScanQueue(callTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ protected void startHandlers(final int port) {
}

@Override
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
public boolean dispatch(final CallRunner callTask) {
RpcCall call = callTask.getRpcCall();
return dispatchTo(isWriteRequest(call.getHeader(), call.getParam()),
shouldDispatchToScanQueue(callTask), callTask);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -19,8 +19,8 @@
package org.apache.hadoop.hbase.ipc;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -29,20 +29,21 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.base.Strings;
import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;

/**
* Runs the CallRunners passed here via {@link #dispatch(CallRunner)}. Subclass and add particular
Expand All @@ -53,14 +54,16 @@ public abstract class RpcExecutor {
private static final Logger LOG = LoggerFactory.getLogger(RpcExecutor.class);

protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250;
public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = "hbase.ipc.server.callqueue.handler.factor";
public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
"hbase.ipc.server.callqueue.handler.factor";

/** max delay in msec used to bound the deprioritized requests */
public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY = "hbase.ipc.server.queue.max.call.delay";
/** max delay in msec used to bound the de-prioritized requests */
public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY =
"hbase.ipc.server.queue.max.call.delay";

/**
* The default, 'fifo', has the least friction but is dumb. If set to 'deadline', uses a priority
* queue and deprioritizes long-running scans. Sorting by priority comes at a cost, reduced
* queue and de-prioritizes long-running scans. Sorting by priority comes at a cost, reduced
* throughput.
*/
public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel";
Expand All @@ -70,14 +73,18 @@ public abstract class RpcExecutor {
public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE;

public static final String CALL_QUEUE_QUEUE_BALANCER_CLASS = "hbase.ipc.server.callqueue.balancer.class";
public static final String CALL_QUEUE_QUEUE_BALANCER_CLASS =
"hbase.ipc.server.callqueue.balancer.class";
public static final Class<?> CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT = RandomQueueBalancer.class;


// These 3 are only used by Codel executor
public static final String CALL_QUEUE_CODEL_TARGET_DELAY = "hbase.ipc.server.callqueue.codel.target.delay";
public static final String CALL_QUEUE_CODEL_INTERVAL = "hbase.ipc.server.callqueue.codel.interval";
public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD = "hbase.ipc.server.callqueue.codel.lifo.threshold";
public static final String CALL_QUEUE_CODEL_TARGET_DELAY =
"hbase.ipc.server.callqueue.codel.target.delay";
public static final String CALL_QUEUE_CODEL_INTERVAL =
"hbase.ipc.server.callqueue.codel.interval";
public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD =
"hbase.ipc.server.callqueue.codel.lifo.threshold";

public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 100;
public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100;
Expand All @@ -88,16 +95,14 @@ public abstract class RpcExecutor {
public static final String PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED =
"hbase.ipc.server.callqueue.pluggable.queue.fast.path.enabled";

private LongAdder numGeneralCallsDropped = new LongAdder();
private LongAdder numLifoModeSwitches = new LongAdder();
private final LongAdder numGeneralCallsDropped = new LongAdder();
private final LongAdder numLifoModeSwitches = new LongAdder();

protected final int numCallQueues;
protected final List<BlockingQueue<CallRunner>> queues;
private final Class<? extends BlockingQueue> queueClass;
private final Object[] queueInitArgs;

private final PriorityFunction priority;

protected volatile int currentQueueLimit;

private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
Expand All @@ -107,8 +112,8 @@ public abstract class RpcExecutor {

private String name;

private Configuration conf = null;
private Abortable abortable = null;
private final Configuration conf;
private final Abortable abortable;

public RpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
Expand Down Expand Up @@ -144,12 +149,10 @@ public RpcExecutor(final String name, final int handlerCount, final String callQ
this.handlerCount = Math.max(handlerCount, this.numCallQueues);
this.handlers = new ArrayList<>(this.handlerCount);

this.priority = priority;

if (isDeadlineQueueType(callQueueType)) {
this.name += ".Deadline";
this.queueInitArgs = new Object[] { maxQueueLength,
new CallPriorityComparator(conf, this.priority) };
new CallPriorityComparator(conf, priority) };
this.queueClass = BoundedPriorityBlockingQueue.class;
} else if (isCodelQueueType(callQueueType)) {
this.name += ".Codel";
Expand All @@ -159,16 +162,17 @@ public RpcExecutor(final String name, final int handlerCount, final String callQ
double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
this.queueInitArgs = new Object[] { maxQueueLength, codelTargetDelay, codelInterval,
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches };
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches };
this.queueClass = AdaptiveLifoCoDelCallQueue.class;
} else if (isPluggableQueueType(callQueueType)) {
Optional<Class<? extends BlockingQueue<CallRunner>>> pluggableQueueClass = getPluggableQueueClass();
Optional<Class<? extends BlockingQueue<CallRunner>>> pluggableQueueClass =
getPluggableQueueClass();

if (!pluggableQueueClass.isPresent()) {
throw new PluggableRpcQueueNotFound("Pluggable call queue failed to load and selected call"
+ " queue type required");
} else {
this.queueInitArgs = new Object[] { maxQueueLength, this.priority, conf };
this.queueInitArgs = new Object[] { maxQueueLength, priority, conf };
this.queueClass = pluggableQueueClass.get();
}
} else {
Expand All @@ -186,50 +190,41 @@ protected int computeNumCallQueues(final int handlerCount, final float callQueue
return Math.max(1, Math.round(handlerCount * callQueuesHandlersFactor));
}

public Map<String, Long> getCallQueueCountsSummary() {
HashMap<String, Long> callQueueMethodTotalCount = new HashMap<>();

for(BlockingQueue<CallRunner> queue: queues) {
for (CallRunner cr:queue) {
RpcCall rpcCall = cr.getRpcCall();

String method;

if (null==rpcCall.getMethod() ||
StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) {
method = "Unknown";
}
/**
* Return the {@link Descriptors.MethodDescriptor#getName()} from {@code callRunner} or "Unknown".
*/
private static String getMethodName(final CallRunner callRunner) {
return Optional.ofNullable(callRunner)
.map(CallRunner::getRpcCall)
.map(RpcCall::getMethod)
.map(Descriptors.MethodDescriptor::getName)
.orElse("Unknown");
}

callQueueMethodTotalCount.put(method, 1+callQueueMethodTotalCount.getOrDefault(method, 0L));
}
}
/**
* Return the {@link RpcCall#getSize()} from {@code callRunner} or 0L.
*/
private static long getRpcCallSize(final CallRunner callRunner) {
return Optional.ofNullable(callRunner)
.map(CallRunner::getRpcCall)
.map(RpcCall::getSize)
.orElse(0L);
}

return callQueueMethodTotalCount;
public Map<String, Long> getCallQueueCountsSummary() {
return queues.stream()
.flatMap(Collection::stream)
.map(RpcExecutor::getMethodName)
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
}

public Map<String, Long> getCallQueueSizeSummary() {
HashMap<String, Long> callQueueMethodTotalSize = new HashMap<>();

for(BlockingQueue<CallRunner> queue: queues) {
for (CallRunner cr:queue) {
RpcCall rpcCall = cr.getRpcCall();
String method;

if (null==rpcCall.getMethod() ||
StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) {
method = "Unknown";
}

long size = rpcCall.getSize();

callQueueMethodTotalSize.put(method, size+callQueueMethodTotalSize.getOrDefault(method, 0L));
}
}

return callQueueMethodTotalSize;
return queues.stream()
.flatMap(Collection::stream)
.map(callRunner -> new Pair<>(getMethodName(callRunner), getRpcCallSize(callRunner)))
.collect(Collectors.groupingBy(Pair::getFirst, Collectors.summingLong(Pair::getSecond)));
}


protected void initializeQueues(final int numQueues) {
if (queueInitArgs.length > 0) {
currentQueueLimit = (int) queueInitArgs[0];
Expand All @@ -252,7 +247,7 @@ public void stop() {
}

/** Add the request to the executor queue */
public abstract boolean dispatch(final CallRunner callTask) throws InterruptedException;
public abstract boolean dispatch(final CallRunner callTask);

/** Returns the list of request queues */
protected List<BlockingQueue<CallRunner>> getQueues() {
Expand Down Expand Up @@ -298,26 +293,26 @@ protected void startHandlers(final String nameSuffix, final int numHandlers,
handlers.size(), threadPrefix, qsize, port);
}

public static QueueBalancer getBalancer(String executorName, Configuration conf, List<BlockingQueue<CallRunner>> queues) {
/**
* All requests go to the first queue, at index 0
*/
private static final QueueBalancer ONE_QUEUE = val -> 0;

public static QueueBalancer getBalancer(
final String executorName,
final Configuration conf,
final List<BlockingQueue<CallRunner>> queues
) {
Preconditions.checkArgument(queues.size() > 0, "Queue size is <= 0, must be at least 1");
if (queues.size() == 1) {
return ONE_QUEUE;
} else {
Class<?> balancerClass = conf.getClass(CALL_QUEUE_QUEUE_BALANCER_CLASS, CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT);
Class<?> balancerClass = conf.getClass(
CALL_QUEUE_QUEUE_BALANCER_CLASS, CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT);
return (QueueBalancer) ReflectionUtils.newInstance(balancerClass, conf, executorName, queues);
}
}

/**
* All requests go to the first queue, at index 0
*/
private static QueueBalancer ONE_QUEUE = new QueueBalancer() {
@Override
public int getNextQueue(CallRunner callRunner) {
return 0;
}
};

/**
* Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true. It
* uses the calculated "deadline" e.g. to deprioritize long-running job If multiple requests have
Expand Down Expand Up @@ -455,7 +450,8 @@ public void resizeQueues(Configuration conf) {
configKey = RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH;
}
}
currentQueueLimit = conf.getInt(configKey, currentQueueLimit);
final int queueLimit = currentQueueLimit;
currentQueueLimit = conf.getInt(configKey, queueLimit);
}

public void onConfigurationChange(Configuration conf) {
Expand Down