Skip to content

Commit fb6d79f

Browse files
author
huiruan
committed
HBASE-27355 Separate meta read requests from master and client
1 parent 748cad6 commit fb6d79f

File tree

10 files changed

+101
-37
lines changed

10 files changed

+101
-37
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ class AsyncBatchRpcRetryingCaller<T> {
116116

117117
private final long startNs;
118118

119+
private final boolean internal;
120+
119121
// we can not use HRegionLocation as the map key because the hashCode and equals method of
120122
// HRegionLocation only consider serverName.
121123
private static final class RegionRequest {
@@ -182,6 +184,7 @@ public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
182184
}
183185
this.action2Errors = new IdentityHashMap<>();
184186
this.startNs = System.nanoTime();
187+
this.internal = conn.getConfiguration().getBoolean(HConstants.HBASE_SERVER_SERVICES, false);
185188
}
186189

187190
private static boolean hasIncrementOrAppend(Row action) {
@@ -397,7 +400,7 @@ private void sendToServer(ServerName serverName, ServerRequest serverReq, int tr
397400
}
398401
HBaseRpcController controller = conn.rpcControllerFactory.newController();
399402
resetController(controller, Math.min(rpcTimeoutNs, remainingNs),
400-
calcPriority(serverReq.getPriority(), tableName));
403+
calcPriority(serverReq.getPriority(), tableName, internal));
401404
if (!cells.isEmpty()) {
402405
controller.setCellScanner(createCellScanner(cells));
403406
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.List;
2727
import java.util.concurrent.CompletableFuture;
2828
import java.util.concurrent.TimeUnit;
29+
import org.apache.hadoop.hbase.HConstants;
2930
import org.apache.hadoop.hbase.HRegionLocation;
3031
import org.apache.hadoop.hbase.ServerName;
3132
import org.apache.hadoop.hbase.TableName;
@@ -63,6 +64,9 @@ private abstract class BuilderBase {
6364
protected int maxAttempts = retries2Attempts(conn.connConf.getMaxRetries());
6465

6566
protected int startLogErrorsCnt = conn.connConf.getStartLogErrorsCnt();
67+
68+
protected boolean internal =
69+
conn.getConfiguration().getBoolean(HConstants.HBASE_SERVER_SERVICES, false);
6670
}
6771

6872
public class SingleRequestCallerBuilder<T> extends BuilderBase {
@@ -140,7 +144,7 @@ public SingleRequestCallerBuilder<T> replicaId(int replicaId) {
140144
}
141145

142146
public SingleRequestCallerBuilder<T> priority(int priority) {
143-
this.priority = priority;
147+
this.priority = calcPriority(priority, tableName, internal);
144148
return this;
145149
}
146150

@@ -150,7 +154,6 @@ private void preCheck() {
150154
checkNotNull(row, "row is null");
151155
checkNotNull(locateType, "locateType is null");
152156
checkNotNull(callable, "action is null");
153-
this.priority = calcPriority(priority, tableName);
154157
}
155158

156159
public AsyncSingleRequestRpcRetryingCaller<T> build() {
@@ -285,7 +288,7 @@ private void preCheck() {
285288
checkNotNull(consumer, "consumer is null");
286289
checkNotNull(stub, "stub is null");
287290
checkNotNull(loc, "location is null");
288-
this.priority = calcPriority(priority, loc.getRegion().getTable());
291+
this.priority = calcPriority(priority, loc.getRegion().getTable(), internal);
289292
}
290293

291294
public AsyncScanSingleRegionRpcRetryingCaller build() {

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -503,14 +503,18 @@ static void validatePutsInRowMutations(RowMutations rowMutations, int maxKeyValu
503503
* <p/>
504504
* The rules are:
505505
* <ol>
506+
* <li>For internal table request, use {@link HConstants#INTERNAL_TABLE_QOS}.</li>
506507
* <li>If user set a priority explicitly, then just use it.</li>
507508
* <li>For system table, use {@link HConstants#SYSTEMTABLE_QOS}.</li>
508509
* <li>For other tables, use {@link HConstants#NORMAL_QOS}.</li>
509510
* </ol>
510511
* @param priority the priority set by user, can be {@link HConstants#PRIORITY_UNSET}.
511512
* @param tableName the table we operate on
512513
*/
513-
static int calcPriority(int priority, TableName tableName) {
514+
static int calcPriority(int priority, TableName tableName, boolean internal) {
515+
if (internal) {
516+
return HConstants.INTERNAL_TABLE_QOS;
517+
}
514518
if (priority != HConstants.PRIORITY_UNSET) {
515519
return priority;
516520
} else {

hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hbase.client;
1919

2020
import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
21+
import static org.apache.hadoop.hbase.HConstants.INTERNAL_TABLE_QOS;
2122
import static org.apache.hadoop.hbase.HConstants.NORMAL_QOS;
2223
import static org.apache.hadoop.hbase.HConstants.SYSTEMTABLE_QOS;
2324
import static org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR;
@@ -49,6 +50,7 @@
4950
import org.apache.hadoop.hbase.CellBuilderType;
5051
import org.apache.hadoop.hbase.HBaseClassTestRule;
5152
import org.apache.hadoop.hbase.HBaseConfiguration;
53+
import org.apache.hadoop.hbase.HConstants;
5254
import org.apache.hadoop.hbase.HRegionLocation;
5355
import org.apache.hadoop.hbase.ServerName;
5456
import org.apache.hadoop.hbase.TableName;
@@ -601,4 +603,21 @@ public void testBatchMetaTable() {
601603
verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS),
602604
any(ClientProtos.MultiRequest.class), any());
603605
}
606+
607+
@Test
608+
public void testInternalGet() {
609+
CONF.setBoolean(HConstants.HBASE_SERVER_SERVICES, true);
610+
conn.getTable(TableName.valueOf(name.getMethodName())).get(new Get(Bytes.toBytes(0))).join();
611+
verify(stub, times(1)).get(assertPriority(INTERNAL_TABLE_QOS), any(GetRequest.class), any());
612+
CONF.unset(HConstants.HBASE_SERVER_SERVICES);
613+
}
614+
615+
@Test
616+
public void testInternalScan() throws Exception {
617+
CONF.setBoolean(HConstants.HBASE_SERVER_SERVICES, true);
618+
CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(INTERNAL_TABLE_QOS);
619+
testForTable(TableName.valueOf(name.getMethodName()), renewFuture,
620+
Optional.of(INTERNAL_TABLE_QOS));
621+
CONF.unset(HConstants.HBASE_SERVER_SERVICES);
622+
}
604623
}

hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1154,6 +1154,10 @@ public enum OperationStatusCode {
11541154
public static final int ADMIN_QOS = 100;
11551155
public static final int HIGH_QOS = 200;
11561156
public static final int SYSTEMTABLE_QOS = HIGH_QOS;
1157+
@InterfaceAudience.Private
1158+
public static final int INTERNAL_TABLE_QOS = 250;
1159+
@InterfaceAudience.Private
1160+
public static final String HBASE_SERVER_SERVICES = "hbase.server.service";
11571161

11581162
/** Directory under /hbase where archived hfiles are stored */
11591163
public static final String HFILE_ARCHIVE_DIRECTORY = "archive";

hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseServerBase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ public HBaseServerBase(Configuration conf, String name) throws IOException {
242242
final Span span = TraceUtil.createSpan("HBaseServerBase.cxtor");
243243
try (Scope ignored = span.makeCurrent()) {
244244
this.conf = conf;
245+
this.conf.setBoolean(HConstants.HBASE_SERVER_SERVICES, true);
245246
this.eventLoopGroupConfig =
246247
NettyEventLoopGroupConfig.setup(conf, getClass().getSimpleName() + "-EventLoopGroup");
247248
this.startcode = EnvironmentEdgeManager.currentTime();

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.hadoop.conf.Configuration;
2121
import org.apache.hadoop.hbase.Abortable;
22+
import org.apache.hadoop.hbase.HConstants;
2223
import org.apache.yetus.audience.InterfaceAudience;
2324
import org.apache.yetus.audience.InterfaceStability;
2425

@@ -32,7 +33,10 @@ public class MetaRWQueueRpcExecutor extends RWQueueRpcExecutor {
3233
"hbase.ipc.server.metacallqueue.read.ratio";
3334
public static final String META_CALL_QUEUE_SCAN_SHARE_CONF_KEY =
3435
"hbase.ipc.server.metacallqueue.scan.ratio";
35-
public static final float DEFAULT_META_CALL_QUEUE_READ_SHARE = 0.9f;
36+
public static final String META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
37+
"hbase.ipc.server.metacallqueue.handler.factor";
38+
public static final float DEFAULT_META_CALL_QUEUE_READ_SHARE = 0.8f;
39+
private static final float DEFAULT_META_CALL_QUEUE_SCAN_SHARE = 0.2f;
3640

3741
public MetaRWQueueRpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
3842
final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
@@ -46,6 +50,22 @@ protected float getReadShare(final Configuration conf) {
4650

4751
@Override
4852
protected float getScanShare(final Configuration conf) {
49-
return conf.getFloat(META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
53+
return conf.getFloat(META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, DEFAULT_META_CALL_QUEUE_SCAN_SHARE);
54+
}
55+
56+
@Override
57+
public boolean dispatch(CallRunner callTask) {
58+
RpcCall call = callTask.getRpcCall();
59+
int level = call.getHeader().getPriority();
60+
final boolean toWriteQueue = isWriteRequest(call.getHeader(), call.getParam());
61+
// dispatch client system read request to read handlers
62+
// dispatch internal system read request to scan handlers
63+
final boolean toScanQueue = getNumScanQueues() > 0 && level == HConstants.INTERNAL_TABLE_QOS;
64+
return dispatchTo(toWriteQueue, toScanQueue, callTask);
65+
}
66+
67+
@Override
68+
protected float getCallQueueHandlerFactor(Configuration conf) {
69+
return conf.getFloat(META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.5f);
5070
}
5171
}

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,4 +295,8 @@ private void propagateBalancerConfigChange(QueueBalancer balancer, Configuration
295295
((ConfigurationObserver) balancer).onConfigurationChange(conf);
296296
}
297297
}
298+
299+
protected int getNumScanQueues() {
300+
return numScanQueues;
301+
}
298302
}

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public RpcExecutor(final String name, final int handlerCount, final String callQ
130130
this.conf = conf;
131131
this.abortable = abortable;
132132

133-
float callQueuesHandlersFactor = this.conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.1f);
133+
float callQueuesHandlersFactor = getCallQueueHandlerFactor(conf);
134134
if (
135135
Float.compare(callQueuesHandlersFactor, 1.0f) > 0
136136
|| Float.compare(0.0f, callQueuesHandlersFactor) > 0
@@ -468,4 +468,8 @@ public void onConfigurationChange(Configuration conf) {
468468
}
469469
}
470470
}
471+
472+
protected float getCallQueueHandlerFactor(Configuration conf) {
473+
return conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.1f);
474+
}
471475
}

hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,11 @@ public void setUp() {
103103

104104
@Test
105105
public void testBasic() throws IOException, InterruptedException {
106-
107106
PriorityFunction qosFunction = mock(PriorityFunction.class);
108107
RpcScheduler scheduler = new SimpleRpcScheduler(conf, 10, 0, 0, qosFunction, 0);
109108
scheduler.init(CONTEXT);
110109
scheduler.start();
111-
CallRunner task = createMockTask();
110+
CallRunner task = createMockTask(HConstants.NORMAL_QOS);
112111
task.setStatus(new MonitoredRPCHandlerImpl());
113112
scheduler.dispatch(task);
114113
verify(task, timeout(10000)).run();
@@ -163,7 +162,7 @@ public void testCallQueueInfo() throws IOException, InterruptedException {
163162

164163
int totalCallMethods = 10;
165164
for (int i = totalCallMethods; i > 0; i--) {
166-
CallRunner task = createMockTask();
165+
CallRunner task = createMockTask(HConstants.NORMAL_QOS);
167166
task.setStatus(new MonitoredRPCHandlerImpl());
168167
scheduler.dispatch(task);
169168
}
@@ -185,9 +184,9 @@ public void testCallQueueInfo() throws IOException, InterruptedException {
185184

186185
@Test
187186
public void testHandlerIsolation() throws IOException, InterruptedException {
188-
CallRunner generalTask = createMockTask();
189-
CallRunner priorityTask = createMockTask();
190-
CallRunner replicationTask = createMockTask();
187+
CallRunner generalTask = createMockTask(HConstants.NORMAL_QOS);
188+
CallRunner priorityTask = createMockTask(HConstants.HIGH_QOS + 1);
189+
CallRunner replicationTask = createMockTask(HConstants.REPLICATION_QOS);
191190
List<CallRunner> tasks = ImmutableList.of(generalTask, priorityTask, replicationTask);
192191
Map<CallRunner, Integer> qos = ImmutableMap.of(generalTask, 0, priorityTask,
193192
HConstants.HIGH_QOS + 1, replicationTask, HConstants.REPLICATION_QOS);
@@ -227,10 +226,12 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
227226
assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size());
228227
}
229228

230-
private CallRunner createMockTask() {
229+
private CallRunner createMockTask(int priority) {
231230
ServerCall call = mock(ServerCall.class);
232231
CallRunner task = mock(CallRunner.class);
232+
RequestHeader header = RequestHeader.newBuilder().setPriority(priority).build();
233233
when(task.getRpcCall()).thenReturn(call);
234+
when(call.getHeader()).thenReturn(header);
234235
return task;
235236
}
236237

@@ -707,7 +708,7 @@ public void testFastPathBalancedQueueRpcExecutorWithQueueLength0() throws Except
707708
@Test
708709
public void testMetaRWScanQueues() throws Exception {
709710
Configuration schedConf = HBaseConfiguration.create();
710-
schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
711+
schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
711712
schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
712713
schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
713714

@@ -728,36 +729,37 @@ public void testMetaRWScanQueues() throws Exception {
728729
when(putCall.getHeader()).thenReturn(putHead);
729730
when(putCall.getParam()).thenReturn(putCall.param);
730731

731-
CallRunner getCallTask = mock(CallRunner.class);
732-
ServerCall getCall = mock(ServerCall.class);
733-
RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
734-
when(getCallTask.getRpcCall()).thenReturn(getCall);
735-
when(getCall.getHeader()).thenReturn(getHead);
736-
737-
CallRunner scanCallTask = mock(CallRunner.class);
738-
ServerCall scanCall = mock(ServerCall.class);
739-
scanCall.param = ScanRequest.newBuilder().build();
740-
RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
741-
when(scanCallTask.getRpcCall()).thenReturn(scanCall);
742-
when(scanCall.getHeader()).thenReturn(scanHead);
743-
when(scanCall.getParam()).thenReturn(scanCall.param);
732+
CallRunner clientReadCallTask = mock(CallRunner.class);
733+
ServerCall clientReadCall = mock(ServerCall.class);
734+
RequestHeader clientReadHead = RequestHeader.newBuilder().setMethodName("get").build();
735+
when(clientReadCallTask.getRpcCall()).thenReturn(clientReadCall);
736+
when(clientReadCall.getHeader()).thenReturn(clientReadHead);
737+
738+
CallRunner internalReadCallTask = mock(CallRunner.class);
739+
ServerCall internalReadCall = mock(ServerCall.class);
740+
internalReadCall.param = ScanRequest.newBuilder().build();
741+
RequestHeader masterReadHead = RequestHeader.newBuilder().setMethodName("get")
742+
.setPriority(HConstants.INTERNAL_TABLE_QOS).build();
743+
when(internalReadCallTask.getRpcCall()).thenReturn(internalReadCall);
744+
when(internalReadCall.getHeader()).thenReturn(masterReadHead);
745+
when(internalReadCall.getParam()).thenReturn(internalReadCall.param);
744746

745747
ArrayList<Integer> work = new ArrayList<>();
746748
doAnswerTaskExecution(putCallTask, work, 1, 1000);
747-
doAnswerTaskExecution(getCallTask, work, 2, 1000);
748-
doAnswerTaskExecution(scanCallTask, work, 3, 1000);
749+
doAnswerTaskExecution(clientReadCallTask, work, 2, 1000);
750+
doAnswerTaskExecution(internalReadCallTask, work, 3, 1000);
749751

750752
// There are 3 queues: [puts], [gets], [scans]
751753
// so the calls will be interleaved
752754
scheduler.dispatch(putCallTask);
753755
scheduler.dispatch(putCallTask);
754756
scheduler.dispatch(putCallTask);
755-
scheduler.dispatch(getCallTask);
756-
scheduler.dispatch(getCallTask);
757-
scheduler.dispatch(getCallTask);
758-
scheduler.dispatch(scanCallTask);
759-
scheduler.dispatch(scanCallTask);
760-
scheduler.dispatch(scanCallTask);
757+
scheduler.dispatch(clientReadCallTask);
758+
scheduler.dispatch(clientReadCallTask);
759+
scheduler.dispatch(clientReadCallTask);
760+
scheduler.dispatch(internalReadCallTask);
761+
scheduler.dispatch(internalReadCallTask);
762+
scheduler.dispatch(internalReadCallTask);
761763

762764
while (work.size() < 6) {
763765
Thread.sleep(100);

0 commit comments

Comments
 (0)