Skip to content

Commit dff7fff

Browse files
authored
HBASE-22280 Separate read/write handler for priority request(especial… (#202)
Signed-off-by: Yu Li <liyu@apache.org>
1 parent 978546b commit dff7fff

File tree

5 files changed

+150
-7
lines changed

5 files changed

+150
-7
lines changed
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hbase.ipc;
20+
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.hbase.Abortable;
23+
import org.apache.yetus.audience.InterfaceAudience;
24+
import org.apache.yetus.audience.InterfaceStability;
25+
26+
/**
27+
* RPC Executor that uses different queues for reads and writes for meta.
28+
*/
29+
@InterfaceAudience.Private
30+
@InterfaceStability.Evolving
31+
public class MetaRWQueueRpcExecutor extends RWQueueRpcExecutor {
32+
public static final String META_CALL_QUEUE_READ_SHARE_CONF_KEY =
33+
"hbase.ipc.server.metacallqueue.read.ratio";
34+
public static final String META_CALL_QUEUE_SCAN_SHARE_CONF_KEY =
35+
"hbase.ipc.server.metacallqueue.scan.ratio";
36+
public static final float DEFAULT_META_CALL_QUEUE_READ_SHARE = 0.9f;
37+
38+
public MetaRWQueueRpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
39+
final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
40+
super(name, handlerCount, maxQueueLength, priority, conf, abortable);
41+
}
42+
43+
@Override
44+
protected float getReadShare(final Configuration conf) {
45+
return conf.getFloat(META_CALL_QUEUE_READ_SHARE_CONF_KEY, DEFAULT_META_CALL_QUEUE_READ_SHARE);
46+
}
47+
48+
@Override
49+
protected float getScanShare(final Configuration conf) {
50+
return conf.getFloat(META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
51+
}
52+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ public RWQueueRpcExecutor(final String name, final int handlerCount, final int m
7171
final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
7272
super(name, handlerCount, maxQueueLength, priority, conf, abortable);
7373

74-
float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
75-
float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
74+
float callqReadShare = getReadShare(conf);
75+
float callqScanShare = getScanShare(conf);
7676

7777
numWriteQueues = calcNumWriters(this.numCallQueues, callqReadShare);
7878
writeHandlersCount = Math.max(numWriteQueues, calcNumWriters(handlerCount, callqReadShare));
@@ -195,7 +195,7 @@ public int getActiveScanHandlerCount() {
195195
return activeScanHandlerCount.get();
196196
}
197197

198-
private boolean isWriteRequest(final RequestHeader header, final Message param) {
198+
protected boolean isWriteRequest(final RequestHeader header, final Message param) {
199199
// TODO: Is there a better way to do this?
200200
if (param instanceof MultiRequest) {
201201
MultiRequest multi = (MultiRequest)param;
@@ -232,6 +232,14 @@ private boolean isScanRequest(final RequestHeader header, final Message param) {
232232
return param instanceof ScanRequest;
233233
}
234234

235+
protected float getReadShare(final Configuration conf) {
236+
return conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
237+
}
238+
239+
protected float getScanShare(final Configuration conf) {
240+
return conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
241+
}
242+
235243
/*
236244
* Calculate the number of writers based on the "total count" and the read share.
237245
* You'll get at least one writer.

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,22 @@ public SimpleRpcScheduler(
100100
}
101101
}
102102

103-
// Create 2 queues to help priorityExecutor be more scalable.
104-
this.priorityExecutor = priorityHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor(
105-
"priority.FPBQ", priorityHandlerCount, RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE,
106-
maxPriorityQueueLength, priority, conf, abortable) : null;
103+
float metaCallqReadShare =
104+
conf.getFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY,
105+
MetaRWQueueRpcExecutor.DEFAULT_META_CALL_QUEUE_READ_SHARE);
106+
if (metaCallqReadShare > 0) {
107+
// different read/write handler for meta, at least 1 read handler and 1 write handler
108+
this.priorityExecutor =
109+
new MetaRWQueueRpcExecutor("priority.RWQ", Math.max(2, priorityHandlerCount),
110+
maxPriorityQueueLength, priority, conf, server);
111+
} else {
112+
// Create 2 queues to help priorityExecutor be more scalable.
113+
this.priorityExecutor = priorityHandlerCount > 0 ?
114+
new FastPathBalancedQueueRpcExecutor("priority.FPBQ", priorityHandlerCount,
115+
RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf,
116+
abortable) :
117+
null;
118+
}
107119
this.replicationExecutor =
108120
replicationHandlerCount > 0
109121
? new FastPathBalancedQueueRpcExecutor("replication.FPBQ", replicationHandlerCount,

hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,75 @@ public void testCoDelScheduling() throws Exception {
559559
}
560560
}
561561

562+
@Test
563+
public void testMetaRWScanQueues() throws Exception {
564+
Configuration schedConf = HBaseConfiguration.create();
565+
schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
566+
schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
567+
schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
568+
569+
PriorityFunction priority = mock(PriorityFunction.class);
570+
when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.HIGH_QOS);
571+
572+
RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 3, 1, priority,
573+
HConstants.QOS_THRESHOLD);
574+
try {
575+
scheduler.start();
576+
577+
CallRunner putCallTask = mock(CallRunner.class);
578+
ServerCall putCall = mock(ServerCall.class);
579+
putCall.param = RequestConverter.buildMutateRequest(
580+
Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
581+
RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
582+
when(putCallTask.getRpcCall()).thenReturn(putCall);
583+
when(putCall.getHeader()).thenReturn(putHead);
584+
when(putCall.getParam()).thenReturn(putCall.param);
585+
586+
CallRunner getCallTask = mock(CallRunner.class);
587+
ServerCall getCall = mock(ServerCall.class);
588+
RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
589+
when(getCallTask.getRpcCall()).thenReturn(getCall);
590+
when(getCall.getHeader()).thenReturn(getHead);
591+
592+
CallRunner scanCallTask = mock(CallRunner.class);
593+
ServerCall scanCall = mock(ServerCall.class);
594+
scanCall.param = ScanRequest.newBuilder().build();
595+
RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
596+
when(scanCallTask.getRpcCall()).thenReturn(scanCall);
597+
when(scanCall.getHeader()).thenReturn(scanHead);
598+
when(scanCall.getParam()).thenReturn(scanCall.param);
599+
600+
ArrayList<Integer> work = new ArrayList<>();
601+
doAnswerTaskExecution(putCallTask, work, 1, 1000);
602+
doAnswerTaskExecution(getCallTask, work, 2, 1000);
603+
doAnswerTaskExecution(scanCallTask, work, 3, 1000);
604+
605+
// There are 3 queues: [puts], [gets], [scans]
606+
// so the calls will be interleaved
607+
scheduler.dispatch(putCallTask);
608+
scheduler.dispatch(putCallTask);
609+
scheduler.dispatch(putCallTask);
610+
scheduler.dispatch(getCallTask);
611+
scheduler.dispatch(getCallTask);
612+
scheduler.dispatch(getCallTask);
613+
scheduler.dispatch(scanCallTask);
614+
scheduler.dispatch(scanCallTask);
615+
scheduler.dispatch(scanCallTask);
616+
617+
while (work.size() < 6) {
618+
Thread.sleep(100);
619+
}
620+
621+
for (int i = 0; i < work.size() - 2; i += 3) {
622+
assertNotEquals(work.get(i + 0), work.get(i + 1));
623+
assertNotEquals(work.get(i + 0), work.get(i + 2));
624+
assertNotEquals(work.get(i + 1), work.get(i + 2));
625+
}
626+
} finally {
627+
scheduler.stop();
628+
}
629+
}
630+
562631
// Get mocked call that has the CallRunner sleep for a while so that the fast
563632
// path isn't hit.
564633
private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException {

hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestReportOnlineRegionsRace.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ protected AssignmentManager createAssignmentManager(MasterServices master) {
126126
public static void setUp() throws Exception {
127127
UTIL.getConfiguration().setClass(HConstants.MASTER_IMPL, HMasterForTest.class, HMaster.class);
128128
UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 1000);
129+
UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
130+
HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT);
129131
UTIL.startMiniCluster(1);
130132
UTIL.createTable(NAME, CF);
131133
UTIL.waitTableAvailable(NAME);

0 commit comments

Comments
 (0)