Skip to content

Commit 3a14cfc

Browse files
authored
HBASE-26551 Add FastPath feature to HBase RWQueueRpcExecutor (#3929)
Signed-off-by: Reid Chan <reidchan@apache.org>
1 parent 1388ca3 commit 3a14cfc

File tree

8 files changed

+316
-155
lines changed

8 files changed

+316
-155
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java

Lines changed: 9 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
4141
/*
4242
* Stack of Handlers waiting for work.
4343
*/
44-
private final Deque<FastPathHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>();
44+
private final Deque<FastPathRpcHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>();
4545

4646
public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCount,
4747
final int maxQueueLength, final PriorityFunction priority, final Configuration conf,
@@ -56,10 +56,12 @@ public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCoun
5656
}
5757

5858
@Override
59-
protected Handler getHandler(String name, double handlerFailureThreshhold,
60-
BlockingQueue<CallRunner> q, AtomicInteger activeHandlerCount) {
61-
return new FastPathHandler(name, handlerFailureThreshhold, q, activeHandlerCount,
62-
fastPathHandlerStack);
59+
protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold,
60+
final int handlerCount, final BlockingQueue<CallRunner> q,
61+
final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount,
62+
final Abortable abortable) {
63+
return new FastPathRpcHandler(name, handlerFailureThreshhold, handlerCount, q,
64+
activeHandlerCount, failedHandlerCount, abortable, fastPathHandlerStack);
6365
}
6466

6567
@Override
@@ -69,62 +71,14 @@ public boolean dispatch(CallRunner callTask) throws InterruptedException {
6971
if (currentQueueLimit == 0){
7072
return false;
7173
}
72-
FastPathHandler handler = popReadyHandler();
74+
FastPathRpcHandler handler = popReadyHandler();
7375
return handler != null? handler.loadCallRunner(callTask): super.dispatch(callTask);
7476
}
7577

7678
/**
7779
* @return Pop a Handler instance if one available ready-to-go or else return null.
7880
*/
79-
private FastPathHandler popReadyHandler() {
81+
private FastPathRpcHandler popReadyHandler() {
8082
return this.fastPathHandlerStack.poll();
8183
}
82-
83-
class FastPathHandler extends Handler {
84-
// Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque
85-
// if an empty queue of CallRunners so we are available for direct handoff when one comes in.
86-
final Deque<FastPathHandler> fastPathHandlerStack;
87-
// Semaphore to coordinate loading of fastpathed loadedTask and our running it.
88-
// UNFAIR synchronization.
89-
private Semaphore semaphore = new Semaphore(0);
90-
// The task we get when fast-pathing.
91-
private CallRunner loadedCallRunner;
92-
93-
FastPathHandler(String name, double handlerFailureThreshhold, BlockingQueue<CallRunner> q,
94-
final AtomicInteger activeHandlerCount,
95-
final Deque<FastPathHandler> fastPathHandlerStack) {
96-
super(name, handlerFailureThreshhold, q, activeHandlerCount);
97-
this.fastPathHandlerStack = fastPathHandlerStack;
98-
}
99-
100-
@Override
101-
protected CallRunner getCallRunner() throws InterruptedException {
102-
// Get a callrunner if one in the Q.
103-
CallRunner cr = this.q.poll();
104-
if (cr == null) {
105-
// Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for
106-
// the fastpath handoff done via fastPathHandlerStack.
107-
if (this.fastPathHandlerStack != null) {
108-
this.fastPathHandlerStack.push(this);
109-
this.semaphore.acquire();
110-
cr = this.loadedCallRunner;
111-
this.loadedCallRunner = null;
112-
} else {
113-
// No fastpath available. Block until a task comes available.
114-
cr = super.getCallRunner();
115-
}
116-
}
117-
return cr;
118-
}
119-
120-
/**
121-
* @param cr Task gotten via fastpath.
122-
* @return True if we successfully loaded our task
123-
*/
124-
boolean loadCallRunner(final CallRunner cr) {
125-
this.loadedCallRunner = cr;
126-
this.semaphore.release();
127-
return true;
128-
}
129-
}
13084
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/**
2+
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.apache.hadoop.hbase.ipc;
20+
21+
import java.util.Deque;
22+
import java.util.concurrent.BlockingQueue;
23+
import java.util.concurrent.ConcurrentLinkedDeque;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
import org.apache.hadoop.conf.Configuration;
26+
import org.apache.hadoop.hbase.Abortable;
27+
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
28+
import org.apache.yetus.audience.InterfaceAudience;
29+
import org.apache.yetus.audience.InterfaceStability;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
/**
34+
* RPC Executor that extends {@link RWQueueRpcExecutor} with fast-path feature, used in
35+
* {@link FastPathBalancedQueueRpcExecutor}.
36+
*/
37+
@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
38+
@InterfaceStability.Evolving
39+
public class FastPathRWQueueRpcExecutor extends RWQueueRpcExecutor {
40+
private static final Logger LOG = LoggerFactory.getLogger(RWQueueRpcExecutor.class);
41+
42+
private final Deque<FastPathRpcHandler> readHandlerStack = new ConcurrentLinkedDeque<>();
43+
private final Deque<FastPathRpcHandler> writeHandlerStack = new ConcurrentLinkedDeque<>();
44+
private final Deque<FastPathRpcHandler> scanHandlerStack = new ConcurrentLinkedDeque<>();
45+
46+
public FastPathRWQueueRpcExecutor(String name, int handlerCount, int maxQueueLength,
47+
PriorityFunction priority, Configuration conf, Abortable abortable) {
48+
super(name, handlerCount, maxQueueLength, priority, conf, abortable);
49+
}
50+
51+
@Override
52+
protected RpcHandler getHandler(final String name, final double handlerFailureThreshhold,
53+
final int handlerCount, final BlockingQueue<CallRunner> q,
54+
final AtomicInteger activeHandlerCount, final AtomicInteger failedHandlerCount,
55+
final Abortable abortable) {
56+
Deque<FastPathRpcHandler> handlerStack = name.contains("read") ? readHandlerStack :
57+
name.contains("write") ? writeHandlerStack : scanHandlerStack;
58+
return new FastPathRpcHandler(name, handlerFailureThreshhold, handlerCount, q,
59+
activeHandlerCount, failedHandlerCount, abortable, handlerStack);
60+
}
61+
62+
@Override
63+
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
64+
RpcCall call = callTask.getRpcCall();
65+
boolean shouldDispatchToWriteQueue = isWriteRequest(call.getHeader(), call.getParam());
66+
boolean shouldDispatchToScanQueue = shouldDispatchToScanQueue(callTask);
67+
FastPathRpcHandler handler = shouldDispatchToWriteQueue ? writeHandlerStack.poll() :
68+
shouldDispatchToScanQueue ? scanHandlerStack.poll() : readHandlerStack.poll();
69+
return handler != null ? handler.loadCallRunner(callTask) :
70+
dispatchTo(shouldDispatchToWriteQueue, shouldDispatchToScanQueue, callTask);
71+
}
72+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.ipc;
19+
20+
import java.util.Deque;
21+
import java.util.concurrent.BlockingQueue;
22+
import java.util.concurrent.Semaphore;
23+
import java.util.concurrent.atomic.AtomicInteger;
24+
import org.apache.hadoop.hbase.Abortable;
25+
import org.apache.yetus.audience.InterfaceAudience;
26+
27+
@InterfaceAudience.Private
28+
public class FastPathRpcHandler extends RpcHandler {
29+
// Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque
30+
// if an empty queue of CallRunners so we are available for direct handoff when one comes in.
31+
final Deque<FastPathRpcHandler> fastPathHandlerStack;
32+
// Semaphore to coordinate loading of fastpathed loadedTask and our running it.
33+
// UNFAIR synchronization.
34+
private Semaphore semaphore = new Semaphore(0);
35+
// The task we get when fast-pathing.
36+
private CallRunner loadedCallRunner;
37+
38+
FastPathRpcHandler(String name, double handlerFailureThreshhold, int handlerCount,
39+
BlockingQueue<CallRunner> q, AtomicInteger activeHandlerCount,
40+
AtomicInteger failedHandlerCount, final Abortable abortable,
41+
final Deque<FastPathRpcHandler> fastPathHandlerStack) {
42+
super(name, handlerFailureThreshhold, handlerCount, q, activeHandlerCount, failedHandlerCount,
43+
abortable);
44+
this.fastPathHandlerStack = fastPathHandlerStack;
45+
}
46+
47+
@Override
48+
protected CallRunner getCallRunner() throws InterruptedException {
49+
// Get a callrunner if one in the Q.
50+
CallRunner cr = this.q.poll();
51+
if (cr == null) {
52+
// Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for
53+
// the fastpath handoff done via fastPathHandlerStack.
54+
if (this.fastPathHandlerStack != null) {
55+
this.fastPathHandlerStack.push(this);
56+
this.semaphore.acquire();
57+
cr = this.loadedCallRunner;
58+
this.loadedCallRunner = null;
59+
} else {
60+
// No fastpath available. Block until a task comes available.
61+
cr = super.getCallRunner();
62+
}
63+
}
64+
return cr;
65+
}
66+
67+
/**
68+
* @param cr Task gotten via fastpath.
69+
* @return True if we successfully loaded our task
70+
*/
71+
boolean loadCallRunner(final CallRunner cr) {
72+
this.loadedCallRunner = cr;
73+
this.semaphore.release();
74+
return true;
75+
}
76+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.hadoop.hbase.ipc;
2121

22+
import java.util.Queue;
2223
import java.util.concurrent.BlockingQueue;
2324
import java.util.concurrent.atomic.AtomicInteger;
2425

@@ -130,16 +131,21 @@ protected void startHandlers(final int port) {
130131
@Override
131132
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
132133
RpcCall call = callTask.getRpcCall();
134+
return dispatchTo(isWriteRequest(call.getHeader(), call.getParam()),
135+
shouldDispatchToScanQueue(callTask), callTask);
136+
}
137+
138+
protected boolean dispatchTo(boolean toWriteQueue, boolean toScanQueue,
139+
final CallRunner callTask) {
133140
int queueIndex;
134-
if (isWriteRequest(call.getHeader(), call.getParam())) {
141+
if (toWriteQueue) {
135142
queueIndex = writeBalancer.getNextQueue();
136-
} else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.getParam())) {
143+
} else if (toScanQueue) {
137144
queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue();
138145
} else {
139146
queueIndex = numWriteQueues + readBalancer.getNextQueue();
140147
}
141-
142-
BlockingQueue<CallRunner> queue = queues.get(queueIndex);
148+
Queue<CallRunner> queue = queues.get(queueIndex);
143149
if (queue.size() >= currentQueueLimit) {
144150
return false;
145151
}
@@ -232,6 +238,11 @@ private boolean isScanRequest(final RequestHeader header, final Message param) {
232238
return param instanceof ScanRequest;
233239
}
234240

241+
protected boolean shouldDispatchToScanQueue(final CallRunner task) {
242+
RpcCall call = task.getRpcCall();
243+
return numScanQueues > 0 && isScanRequest(call.getHeader(), call.getParam());
244+
}
245+
235246
protected float getReadShare(final Configuration conf) {
236247
return conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
237248
}

0 commit comments

Comments
 (0)