Skip to content

Commit d0bf233

Browse files
committed
HBASE-27271 BufferCallBeforeInitHandler should ignore the flush request (apache#4676)
Signed-off-by: Balazs Meszaros <meszibalu@apache.org> (cherry picked from commit fb529e2) Conflicts: hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java (cherry picked from commit 1094b15) Change-Id: Iff314c026f3842cceedb5a11505dcc586747835c
1 parent b0a083c commit d0bf233

File tree

2 files changed

+35
-18
lines changed

2 files changed

+35
-18
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
@InterfaceAudience.Private
3434
class BufferCallBeforeInitHandler extends ChannelDuplexHandler {
3535

36+
static final String NAME = "BufferCall";
37+
3638
private enum BufferCallAction {
3739
FLUSH,
3840
FAIL
@@ -77,6 +79,11 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
7779
}
7880
}
7981

82+
@Override
83+
public void flush(ChannelHandlerContext ctx) throws Exception {
84+
// do not flush anything out
85+
}
86+
8087
@Override
8188
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
8289
if (evt instanceof BufferCallEvent) {

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
4949
import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
5050
import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
51-
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
51+
import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
5252
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
5353
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
5454
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
@@ -152,14 +152,14 @@ public void cleanupConnection() {
152152

153153
private void established(Channel ch) throws IOException {
154154
assert eventLoop.inEventLoop();
155-
ChannelPipeline p = ch.pipeline();
156-
String addBeforeHandler = p.context(BufferCallBeforeInitHandler.class).name();
157-
p.addBefore(addBeforeHandler, null,
158-
new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS));
159-
p.addBefore(addBeforeHandler, null, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4));
160-
p.addBefore(addBeforeHandler, null,
161-
new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor));
162-
p.fireUserEventTriggered(BufferCallEvent.success());
155+
ch.pipeline()
156+
.addBefore(BufferCallBeforeInitHandler.NAME, null,
157+
new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS))
158+
.addBefore(BufferCallBeforeInitHandler.NAME, null,
159+
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4))
160+
.addBefore(BufferCallBeforeInitHandler.NAME, null,
161+
new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor))
162+
.fireUserEventTriggered(BufferCallEvent.success());
163163
}
164164

165165
private boolean reloginInProgress;
@@ -212,7 +212,8 @@ private void saslNegotiate(final Channel ch) {
212212
failInit(ch, e);
213213
return;
214214
}
215-
ch.pipeline().addFirst(new SaslChallengeDecoder(), saslHandler);
215+
ch.pipeline().addBefore(BufferCallBeforeInitHandler.NAME, null, new SaslChallengeDecoder())
216+
.addBefore(BufferCallBeforeInitHandler.NAME, null, saslHandler);
216217
saslPromise.addListener(new FutureListener<Boolean>() {
217218

218219
@Override
@@ -226,20 +227,22 @@ public void operationComplete(Future<Boolean> future) throws Exception {
226227
if (saslHandler.isNeedProcessConnectionHeader()) {
227228
Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise();
228229
// create the handler to handle the connection header
229-
ChannelHandler chHandler = new NettyHBaseRpcConnectionHeaderHandler(
230-
connectionHeaderPromise, conf, connectionHeaderWithLength);
230+
NettyHBaseRpcConnectionHeaderHandler chHandler =
231+
new NettyHBaseRpcConnectionHeaderHandler(connectionHeaderPromise, conf,
232+
connectionHeaderWithLength);
231233

232234
// add ReadTimeoutHandler to deal with server doesn't response connection header
233235
// because of the different configuration in client side and server side
234-
p.addFirst(
235-
new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS));
236-
p.addLast(chHandler);
236+
final String readTimeoutHandlerName = "ReadTimeout";
237+
p.addBefore(BufferCallBeforeInitHandler.NAME, readTimeoutHandlerName,
238+
new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS))
239+
.addBefore(BufferCallBeforeInitHandler.NAME, null, chHandler);
237240
connectionHeaderPromise.addListener(new FutureListener<Boolean>() {
238241
@Override
239242
public void operationComplete(Future<Boolean> future) throws Exception {
240243
if (future.isSuccess()) {
241244
ChannelPipeline p = ch.pipeline();
242-
p.remove(ReadTimeoutHandler.class);
245+
p.remove(readTimeoutHandlerName);
243246
p.remove(NettyHBaseRpcConnectionHeaderHandler.class);
244247
// don't send connection header, NettyHbaseRpcConnectionHeaderHandler
245248
// sent it already
@@ -273,8 +276,15 @@ private void connect() {
273276
.option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
274277
.option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
275278
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO)
276-
.handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr)
277-
.remoteAddress(remoteId.address).connect().addListener(new ChannelFutureListener() {
279+
.handler(new ChannelInitializer<Channel>() {
280+
281+
@Override
282+
protected void initChannel(Channel ch) throws Exception {
283+
ch.pipeline().addLast(BufferCallBeforeInitHandler.NAME,
284+
new BufferCallBeforeInitHandler());
285+
}
286+
}).localAddress(rpcClient.localAddr).remoteAddress(remoteId.address).connect()
287+
.addListener(new ChannelFutureListener() {
278288

279289
@Override
280290
public void operationComplete(ChannelFuture future) throws Exception {

0 commit comments

Comments
 (0)