Skip to content

Commit fb529e2

Browse files
authored
HBASE-27271 BufferCallBeforeInitHandler should ignore the flush request (#4676)
Signed-off-by: Balazs Meszaros <meszibalu@apache.org>
1 parent d734acc commit fb529e2

File tree

2 files changed

+35
-19
lines changed

2 files changed

+35
-19
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
@InterfaceAudience.Private
3434
class BufferCallBeforeInitHandler extends ChannelDuplexHandler {
3535

36+
static final String NAME = "BufferCall";
37+
3638
private enum BufferCallAction {
3739
FLUSH,
3840
FAIL
@@ -77,6 +79,11 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
7779
}
7880
}
7981

82+
@Override
83+
public void flush(ChannelHandlerContext ctx) throws Exception {
84+
// do not flush anything out
85+
}
86+
8087
@Override
8188
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
8289
if (evt instanceof BufferCallEvent) {

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
5252
import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
5353
import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
54-
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
54+
import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
5555
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
5656
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
5757
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
@@ -156,14 +156,14 @@ public void cleanupConnection() {
156156

157157
private void established(Channel ch) throws IOException {
158158
assert eventLoop.inEventLoop();
159-
ChannelPipeline p = ch.pipeline();
160-
String addBeforeHandler = p.context(BufferCallBeforeInitHandler.class).name();
161-
p.addBefore(addBeforeHandler, null,
162-
new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS));
163-
p.addBefore(addBeforeHandler, null, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4));
164-
p.addBefore(addBeforeHandler, null,
165-
new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor));
166-
p.fireUserEventTriggered(BufferCallEvent.success());
159+
ch.pipeline()
160+
.addBefore(BufferCallBeforeInitHandler.NAME, null,
161+
new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS))
162+
.addBefore(BufferCallBeforeInitHandler.NAME, null,
163+
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4))
164+
.addBefore(BufferCallBeforeInitHandler.NAME, null,
165+
new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor))
166+
.fireUserEventTriggered(BufferCallEvent.success());
167167
}
168168

169169
private boolean reloginInProgress;
@@ -217,8 +217,8 @@ private void saslNegotiate(final Channel ch) {
217217
failInit(ch, e);
218218
return;
219219
}
220-
ch.pipeline().addFirst("SaslDecoder", new SaslChallengeDecoder()).addAfter("SaslDecoder",
221-
"SaslHandler", saslHandler);
220+
ch.pipeline().addBefore(BufferCallBeforeInitHandler.NAME, null, new SaslChallengeDecoder())
221+
.addBefore(BufferCallBeforeInitHandler.NAME, null, saslHandler);
222222
NettyFutureUtils.addListener(saslPromise, new FutureListener<Boolean>() {
223223

224224
@Override
@@ -229,20 +229,22 @@ public void operationComplete(Future<Boolean> future) throws Exception {
229229
if (saslHandler.isNeedProcessConnectionHeader()) {
230230
Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise();
231231
// create the handler to handle the connection header
232-
ChannelHandler chHandler = new NettyHBaseRpcConnectionHeaderHandler(
233-
connectionHeaderPromise, conf, connectionHeaderWithLength);
232+
NettyHBaseRpcConnectionHeaderHandler chHandler =
233+
new NettyHBaseRpcConnectionHeaderHandler(connectionHeaderPromise, conf,
234+
connectionHeaderWithLength);
234235

235236
// add ReadTimeoutHandler to deal with server doesn't response connection header
236237
// because of the different configuration in client side and server side
237-
p.addFirst(
238-
new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS));
239-
p.addLast(chHandler);
238+
final String readTimeoutHandlerName = "ReadTimeout";
239+
p.addBefore(BufferCallBeforeInitHandler.NAME, readTimeoutHandlerName,
240+
new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS))
241+
.addBefore(BufferCallBeforeInitHandler.NAME, null, chHandler);
240242
connectionHeaderPromise.addListener(new FutureListener<Boolean>() {
241243
@Override
242244
public void operationComplete(Future<Boolean> future) throws Exception {
243245
if (future.isSuccess()) {
244246
ChannelPipeline p = ch.pipeline();
245-
p.remove(ReadTimeoutHandler.class);
247+
p.remove(readTimeoutHandlerName);
246248
p.remove(NettyHBaseRpcConnectionHeaderHandler.class);
247249
// don't send connection header, NettyHbaseRpcConnectionHeaderHandler
248250
// sent it already
@@ -276,8 +278,15 @@ private void connect() throws UnknownHostException {
276278
.option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
277279
.option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
278280
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO)
279-
.handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr)
280-
.remoteAddress(remoteAddr).connect().addListener(new ChannelFutureListener() {
281+
.handler(new ChannelInitializer<Channel>() {
282+
283+
@Override
284+
protected void initChannel(Channel ch) throws Exception {
285+
ch.pipeline().addLast(BufferCallBeforeInitHandler.NAME,
286+
new BufferCallBeforeInitHandler());
287+
}
288+
}).localAddress(rpcClient.localAddr).remoteAddress(remoteAddr).connect()
289+
.addListener(new ChannelFutureListener() {
281290

282291
@Override
283292
public void operationComplete(ChannelFuture future) throws Exception {

0 commit comments

Comments
 (0)