Skip to content

Commit 1cee26a

Browse files
authored
Merge pull request #1 from HubSpot/HBASE-26576
2 parents 8e6589a + b0007c9 commit 1cee26a

File tree

3 files changed

+49
-5
lines changed

3 files changed

+49
-5
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ public abstract class RpcExecutor {
8484

8585
public static final String PLUGGABLE_CALL_QUEUE_CLASS_NAME =
8686
"hbase.ipc.server.callqueue.pluggable.queue.class.name";
87+
public static final String PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED =
88+
"hbase.ipc.server.callqueue.pluggable.queue.fast.path.enabled";
8789

8890
private LongAdder numGeneralCallsDropped = new LongAdder();
8991
private LongAdder numLifoModeSwitches = new LongAdder();
@@ -465,6 +467,11 @@ public static boolean isPluggableQueueType(String callQueueType) {
465467
return callQueueType.equals(CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE);
466468
}
467469

470+
public static boolean isPluggableQueueWithFastPath(String callQueueType, Configuration conf) {
471+
return isPluggableQueueType(callQueueType) &&
472+
conf.getBoolean(PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED, false);
473+
}
474+
468475
private Optional<Class<? extends BlockingQueue<CallRunner>>> getPluggableQueueClass() {
469476
String queueClassName = conf.get(PLUGGABLE_CALL_QUEUE_CLASS_NAME);
470477

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,9 @@ public SimpleRpcScheduler(
8888
callExecutor = new RWQueueRpcExecutor("default.RWQ", Math.max(2, handlerCount),
8989
maxQueueLength, priority, conf, server);
9090
} else {
91-
if (RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType)) {
91+
if (RpcExecutor.isFifoQueueType(callQueueType) ||
92+
RpcExecutor.isCodelQueueType(callQueueType) ||
93+
RpcExecutor.isPluggableQueueWithFastPath(callQueueType, conf)) {
9294
callExecutor = new FastPathBalancedQueueRpcExecutor("default.FPBQ", handlerCount,
9395
maxQueueLength, priority, conf, server);
9496
} else {

hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.apache.hadoop.hbase.util.Bytes;
5353
import org.apache.hadoop.hbase.util.EnvironmentEdge;
5454
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
55+
import org.apache.hadoop.hbase.util.ReflectionUtils;
5556
import org.apache.hadoop.hbase.util.Threads;
5657
import org.junit.Before;
5758
import org.junit.ClassRule;
@@ -274,6 +275,41 @@ public void testPluggableRpcQueue() throws Exception {
274275
}
275276
}
276277

278+
@Test
279+
public void testPluggableRpcQueueWireUpWithFastPathExecutor() throws Exception {
280+
String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE;
281+
Configuration schedConf = HBaseConfiguration.create();
282+
schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
283+
schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
284+
schedConf.setBoolean(RpcExecutor.PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED, true);
285+
286+
PriorityFunction priority = mock(PriorityFunction.class);
287+
when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
288+
SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 0, 0, 0, priority,
289+
HConstants.QOS_THRESHOLD);
290+
291+
Field f = scheduler.getClass().getDeclaredField("callExecutor");
292+
f.setAccessible(true);
293+
assertTrue(f.get(scheduler) instanceof FastPathBalancedQueueRpcExecutor);
294+
}
295+
296+
@Test
297+
public void testPluggableRpcQueueWireUpWithoutFastPathExecutor() throws Exception {
298+
String queueType = RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE;
299+
Configuration schedConf = HBaseConfiguration.create();
300+
schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
301+
schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
302+
303+
PriorityFunction priority = mock(PriorityFunction.class);
304+
when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
305+
SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 0, 0, 0, priority,
306+
HConstants.QOS_THRESHOLD);
307+
308+
Field f = scheduler.getClass().getDeclaredField("callExecutor");
309+
f.setAccessible(true);
310+
assertTrue(f.get(scheduler) instanceof BalancedQueueRpcExecutor);
311+
}
312+
277313
@Test
278314
public void testPluggableRpcQueueCanListenToConfigurationChanges() throws Exception {
279315

@@ -316,9 +352,7 @@ private void testRpcScheduler(final String queueType) throws Exception {
316352
testRpcScheduler(queueType, null);
317353
}
318354

319-
private void testRpcScheduler(final String queueType, final String pluggableQueueClass)
320-
throws Exception {
321-
355+
private void testRpcScheduler(final String queueType, final String pluggableQueueClass) throws Exception {
322356
Configuration schedConf = HBaseConfiguration.create();
323357
schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
324358

@@ -388,7 +422,8 @@ private void testRpcScheduler(final String queueType, final String pluggableQueu
388422
// -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue)
389423
if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
390424
assertEquals(530, totalTime);
391-
} else if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) {
425+
} else if (queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE) ||
426+
queueType.equals(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE)) {
392427
assertEquals(930, totalTime);
393428
}
394429
} finally {

0 commit comments

Comments
 (0)