Skip to content

Commit 96badc4

Browse files
committed
HBASE-27766 Support steal job queue mode for read RPC queues of RWQueueRpcExecutor
1 parent 79c985f commit 96badc4

File tree

11 files changed

+301
-21
lines changed

11 files changed

+301
-21
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,9 @@ public class RWQueueRpcExecutor extends RpcExecutor {
5858
private final int writeHandlersCount;
5959
private final int readHandlersCount;
6060
private final int scanHandlersCount;
61-
private final int numWriteQueues;
62-
private final int numReadQueues;
63-
private final int numScanQueues;
61+
protected final int numWriteQueues;
62+
protected final int numReadQueues;
63+
protected final int numScanQueues;
6464

6565
private final AtomicInteger activeWriteHandlerCount = new AtomicInteger(0);
6666
private final AtomicInteger activeReadHandlerCount = new AtomicInteger(0);
@@ -97,9 +97,7 @@ public RWQueueRpcExecutor(final String name, final int handlerCount, final int m
9797
numScanQueues = scanQueues;
9898
scanHandlersCount = scanHandlers;
9999

100-
initializeQueues(numWriteQueues);
101-
initializeQueues(numReadQueues);
102-
initializeQueues(numScanQueues);
100+
initQueues();
103101

104102
this.writeBalancer = getBalancer(name, conf, queues.subList(0, numWriteQueues));
105103
this.readBalancer =
@@ -115,6 +113,12 @@ public RWQueueRpcExecutor(final String name, final int handlerCount, final int m
115113
+ numScanQueues + " scanHandlers=" + scanHandlersCount);
116114
}
117115

116+
protected void initQueues() {
117+
initializeQueues(numWriteQueues);
118+
initializeQueues(numReadQueues);
119+
initializeQueues(numScanQueues);
120+
}
121+
118122
@Override
119123
protected int computeNumCallQueues(final int handlerCount, final float callQueuesHandlersFactor) {
120124
// at least 1 read queue and 1 write queue

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ public abstract class RpcExecutor {
7272
public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel";
7373
public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
7474
public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
75+
public static final String CALL_QUEUE_TYPE_READ_STEAL_CONF_VALUE = "readSteal";
7576
public static final String CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE = "pluggable";
7677
public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
7778
public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE;
@@ -101,7 +102,7 @@ public abstract class RpcExecutor {
101102
private final LongAdder numLifoModeSwitches = new LongAdder();
102103

103104
protected final int numCallQueues;
104-
protected final List<BlockingQueue<CallRunner>> queues;
105+
protected List<BlockingQueue<CallRunner>> queues;
105106
private final Class<? extends BlockingQueue> queueClass;
106107
private final Object[] queueInitArgs;
107108

@@ -117,6 +118,8 @@ public abstract class RpcExecutor {
117118
private final Configuration conf;
118119
private final Abortable abortable;
119120

121+
protected int maxQueueLength;
122+
120123
public RpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
121124
final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
122125
this(name, handlerCount, conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_CONF_DEFAULT),
@@ -129,6 +132,7 @@ public RpcExecutor(final String name, final int handlerCount, final String callQ
129132
this.name = Strings.nullToEmpty(name);
130133
this.conf = conf;
131134
this.abortable = abortable;
135+
this.maxQueueLength = maxQueueLength;
132136

133137
float callQueuesHandlersFactor = this.conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.1f);
134138
if (

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.apache.hadoop.hbase.ipc;
1919

20+
import static org.apache.hadoop.hbase.ipc.RpcExecutor.CALL_QUEUE_TYPE_READ_STEAL_CONF_VALUE;
21+
2022
import org.apache.hadoop.conf.Configuration;
2123
import org.apache.hadoop.hbase.Abortable;
2224
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -86,9 +88,14 @@ public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHand
8688
float callqReadShare = conf.getFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
8789

8890
if (callqReadShare > 0) {
89-
// at least 1 read handler and 1 write handler
90-
callExecutor = new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount),
91-
maxQueueLength, priority, conf, server);
91+
if (callQueueType.equals(CALL_QUEUE_TYPE_READ_STEAL_CONF_VALUE)) {
92+
callExecutor = new StealReadJobRWQueueRpcExecutor("default.SRRWQ",
93+
Math.max(2, handlerCount), maxQueueLength, priority, conf, server);
94+
} else {
95+
// at least 1 read handler and 1 write handler
96+
callExecutor = new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount),
97+
maxQueueLength, priority, conf, server);
98+
}
9299
} else {
93100
if (
94101
RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType)
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.ipc;
19+
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import java.util.concurrent.BlockingQueue;
23+
import org.apache.hadoop.conf.Configuration;
24+
import org.apache.hadoop.hbase.Abortable;
25+
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
26+
import org.apache.yetus.audience.InterfaceAudience;
27+
import org.apache.yetus.audience.InterfaceStability;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX })
32+
@InterfaceStability.Evolving
33+
public class StealReadJobRWQueueRpcExecutor extends RWQueueRpcExecutor {
34+
private static final Logger LOG = LoggerFactory.getLogger(StealReadJobRWQueueRpcExecutor.class);
35+
36+
public StealReadJobRWQueueRpcExecutor(String name, int handlerCount, int maxQueueLength,
37+
PriorityFunction priority, Configuration conf, Abortable abortable) {
38+
super(name, handlerCount, maxQueueLength, priority, conf, abortable);
39+
}
40+
41+
@Override
42+
public void initQueues() {
43+
queues = new ArrayList<>(this.numWriteQueues + this.numReadQueues + numScanQueues);
44+
initializeQueues(numWriteQueues);
45+
if (numReadQueues > 0 && numScanQueues > 0) {
46+
int stealQueueCount = Math.min(numReadQueues, numScanQueues);
47+
List<BlockingQueue<CallRunner>> stealScanQueues = new ArrayList<>(stealQueueCount);
48+
for (int i = 0; i < stealQueueCount; i++) {
49+
StealRpcJobQueue scanQueue = new StealRpcJobQueue(maxQueueLength, maxQueueLength);
50+
BlockingQueue<CallRunner> readQueue = scanQueue.getStealFromQueue();
51+
queues.add(readQueue);
52+
stealScanQueues.add(scanQueue);
53+
}
54+
if (numReadQueues > numScanQueues) {
55+
initializeQueues(numReadQueues - numScanQueues);
56+
}
57+
queues.addAll(stealScanQueues);
58+
if (numScanQueues > numReadQueues) {
59+
initializeQueues(numScanQueues - numReadQueues);
60+
}
61+
} else {
62+
initializeQueues(Math.max(numReadQueues, numScanQueues));
63+
}
64+
}
65+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.ipc;
19+
20+
import java.util.Comparator;
21+
import org.apache.hadoop.hbase.util.StealJobQueue;
22+
import org.apache.yetus.audience.InterfaceAudience;
23+
24+
@InterfaceAudience.Private
25+
public class StealRpcJobQueue extends StealJobQueue<CallRunner> {
26+
27+
public final static RpcComparator RPCCOMPARATOR = new RpcComparator();
28+
29+
public StealRpcJobQueue(int initCapacity, int stealFromQueueInitCapacity) {
30+
super(initCapacity, stealFromQueueInitCapacity, RPCCOMPARATOR);
31+
}
32+
33+
public static class RpcComparator implements Comparator<CallRunner> {
34+
public RpcComparator() {
35+
super();
36+
}
37+
38+
@Override
39+
public int compare(CallRunner o1, CallRunner o2) {
40+
long diff = o1.getRpcCall().getReceiveTime() - o2.getRpcCall().getReceiveTime();
41+
if (diff > 0) {
42+
return 1;
43+
} else if (diff < 0) {
44+
return -1;
45+
} else {
46+
return 0;
47+
}
48+
}
49+
}
50+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/util/StealJobQueue.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ public StealJobQueue(Comparator<? super T> comparator) {
4949
this(11, 11, comparator);
5050
}
5151

52+
public StealJobQueue(int initCapacity, int stealFromQueueInitCapacity) {
53+
this(initCapacity, stealFromQueueInitCapacity, null);
54+
}
55+
5256
public StealJobQueue(int initCapacity, int stealFromQueueInitCapacity,
5357
Comparator<? super T> comparator) {
5458
super(initCapacity, comparator);

hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,17 +65,17 @@ public class TestMultiParallel {
6565

6666
private static final Logger LOG = LoggerFactory.getLogger(TestMultiParallel.class);
6767

68-
private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
68+
protected static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
6969
private static final byte[] VALUE = Bytes.toBytes("value");
7070
private static final byte[] QUALIFIER = Bytes.toBytes("qual");
71-
private static final String FAMILY = "family";
72-
private static final TableName TEST_TABLE = TableName.valueOf("multi_test_table");
71+
protected static final String FAMILY = "family";
72+
protected static final TableName TEST_TABLE = TableName.valueOf("multi_test_table");
7373
private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY);
7474
private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
7575
private static final byte[][] KEYS = makeKeys();
7676

77-
private static final int slaves = 5; // also used for testing HTable pool size
78-
private static Connection CONNECTION;
77+
protected static final int slaves = 5; // also used for testing HTable pool size
78+
protected static Connection CONNECTION;
7979

8080
@BeforeClass
8181
public static void beforeClass() throws Exception {
@@ -662,7 +662,7 @@ private void validateSizeAndEmpty(Object[] results, int expectedSize) {
662662

663663
public static class MyMasterObserver implements MasterObserver, MasterCoprocessor {
664664
private static final AtomicInteger postBalanceCount = new AtomicInteger(0);
665-
private static final AtomicBoolean start = new AtomicBoolean(false);
665+
protected static final AtomicBoolean start = new AtomicBoolean(false);
666666

667667
@Override
668668
public void start(CoprocessorEnvironment env) throws IOException {
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.client;
19+
20+
import static org.junit.Assert.assertTrue;
21+
22+
import org.apache.hadoop.hbase.HConstants;
23+
import org.apache.hadoop.hbase.codec.KeyValueCodec;
24+
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
25+
import org.apache.hadoop.hbase.ipc.RWQueueRpcExecutor;
26+
import org.apache.hadoop.hbase.ipc.RpcExecutor;
27+
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
28+
import org.apache.hadoop.hbase.testclassification.MediumTests;
29+
import org.apache.hadoop.hbase.util.Bytes;
30+
import org.junit.Test;
31+
import org.junit.experimental.categories.Category;
32+
33+
@Category({ MediumTests.class })
34+
public class TestMultiParallel2 extends TestMultiParallel {
35+
36+
public static void beforeClass() throws Exception {
37+
// Uncomment the following lines if more verbosity is needed for
38+
// debugging (see HBASE-12285 for details).
39+
// ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
40+
// ((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL);
41+
// ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
42+
UTIL.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
43+
KeyValueCodec.class.getCanonicalName());
44+
// Disable table on master for now as the feature is broken
45+
// UTIL.getConfiguration().setBoolean(LoadBalancer.TABLES_ON_MASTER, true);
46+
// We used to ask for system tables on Master exclusively but not needed by test and doesn't
47+
// work anyways -- so commented out.
48+
// UTIL.getConfiguration().setBoolean(LoadBalancer.SYSTEM_TABLES_ON_MASTER, true);
49+
UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
50+
MyMasterObserver.class.getName());
51+
String queueType = RpcExecutor.CALL_QUEUE_TYPE_READ_STEAL_CONF_VALUE;
52+
UTIL.getConfiguration().set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
53+
UTIL.getConfiguration().setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f);
54+
UTIL.getConfiguration().setFloat(RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 1);
55+
UTIL.startMiniCluster(slaves);
56+
Table t = UTIL.createMultiRegionTable(TEST_TABLE, Bytes.toBytes(FAMILY));
57+
UTIL.waitTableEnabled(TEST_TABLE);
58+
t.close();
59+
CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration());
60+
assertTrue(MyMasterObserver.start.get());
61+
}
62+
63+
@Test
64+
public void test() throws Exception {
65+
testBatchWithGet();
66+
}
67+
}

hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide2.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,15 +52,15 @@ public class TestScannersFromClientSide2 {
5252
public static final HBaseClassTestRule CLASS_RULE =
5353
HBaseClassTestRule.forClass(TestScannersFromClientSide2.class);
5454

55-
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
55+
protected static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
5656

57-
private static TableName TABLE_NAME = TableName.valueOf("scan");
57+
protected static TableName TABLE_NAME = TableName.valueOf("scan");
5858

59-
private static byte[] FAMILY = Bytes.toBytes("cf");
59+
protected static byte[] FAMILY = Bytes.toBytes("cf");
6060

61-
private static byte[] CQ1 = Bytes.toBytes("cq1");
61+
protected static byte[] CQ1 = Bytes.toBytes("cq1");
6262

63-
private static byte[] CQ2 = Bytes.toBytes("cq2");
63+
protected static byte[] CQ2 = Bytes.toBytes("cq2");
6464

6565
@Parameter(0)
6666
public boolean batch;

0 commit comments

Comments
 (0)