Skip to content

Commit 5f231d3

Browse files
committed
HBASE-27185 Rewrite NettyRpcServer to decode rpc request with netty handler (#4624)
Signed-off-by: Xin Sun <ddupgs@gmail.com> (cherry picked from commit 0c4263a) Conflicts: hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
1 parent 6f3538b commit 5f231d3

19 files changed

+508
-466
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -223,9 +223,6 @@ private void saslNegotiate(final Channel ch) {
223223
public void operationComplete(Future<Boolean> future) throws Exception {
224224
if (future.isSuccess()) {
225225
ChannelPipeline p = ch.pipeline();
226-
p.remove(SaslChallengeDecoder.class);
227-
p.remove(NettyHBaseSaslRpcClientHandler.class);
228-
229226
// check if negotiate with server for connection header is necessary
230227
if (saslHandler.isNeedProcessConnectionHeader()) {
231228
Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise();

hbase-client/src/main/java/org/apache/hadoop/hbase/security/CryptoAESUnwrapHandler.java

Lines changed: 0 additions & 47 deletions
This file was deleted.

hbase-client/src/main/java/org/apache/hadoop/hbase/security/CryptoAESWrapHandler.java

Lines changed: 0 additions & 48 deletions
This file was deleted.

hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseRpcConnectionHeaderHandler.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
2626
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
2727
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
28-
import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
2928
import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
3029

3130
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
@@ -92,10 +91,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
9291
* Remove handlers for sasl encryption and add handlers for Crypto AES encryption
9392
*/
9493
private void setupCryptoAESHandler(ChannelPipeline p, CryptoAES cryptoAES) {
95-
p.remove(SaslWrapHandler.class);
96-
p.remove(SaslUnwrapHandler.class);
97-
String lengthDecoder = p.context(LengthFieldBasedFrameDecoder.class).name();
98-
p.addAfter(lengthDecoder, null, new CryptoAESUnwrapHandler(cryptoAES));
99-
p.addAfter(lengthDecoder, null, new CryptoAESWrapHandler(cryptoAES));
94+
p.replace(SaslWrapHandler.class, null, new SaslWrapHandler(cryptoAES::wrap));
95+
p.replace(SaslUnwrapHandler.class, null, new SaslUnwrapHandler(cryptoAES::unwrap));
10096
}
10197
}

hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ public void setupSaslHandler(ChannelPipeline p) {
5252
return;
5353
}
5454
// add wrap and unwrap handlers to pipeline.
55-
p.addFirst(new SaslWrapHandler(saslClient),
55+
p.addFirst(new SaslWrapHandler(saslClient::wrap),
5656
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
57-
new SaslUnwrapHandler(saslClient));
57+
new SaslUnwrapHandler(saslClient::unwrap));
5858
}
5959

6060
public String getSaslQOP() {

hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
2525
import org.apache.hadoop.hbase.ipc.FallbackDisallowedException;
2626
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
27+
import org.apache.hadoop.hbase.util.NettyFutureUtils;
2728
import org.apache.hadoop.security.UserGroupInformation;
2829
import org.apache.hadoop.security.token.Token;
2930
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -33,6 +34,7 @@
3334

3435
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
3536
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
37+
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
3638
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
3739
import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
3840

@@ -74,7 +76,7 @@ public NettyHBaseSaslRpcClientHandler(Promise<Boolean> saslPromise, UserGroupInf
7476

7577
private void writeResponse(ChannelHandlerContext ctx, byte[] response) {
7678
LOG.trace("Sending token size={} from initSASLContext.", response.length);
77-
ctx.writeAndFlush(
79+
NettyFutureUtils.safeWriteAndFlush(ctx,
7880
ctx.alloc().buffer(4 + response.length).writeInt(response.length).writeBytes(response));
7981
}
8082

@@ -83,7 +85,11 @@ private void tryComplete(ChannelHandlerContext ctx) {
8385
return;
8486
}
8587

86-
saslRpcClient.setupSaslHandler(ctx.pipeline());
88+
ChannelPipeline p = ctx.pipeline();
89+
saslRpcClient.setupSaslHandler(p);
90+
p.remove(SaslChallengeDecoder.class);
91+
p.remove(this);
92+
8793
setCryptoAESOption();
8894

8995
saslPromise.setSuccess(true);
@@ -102,6 +108,9 @@ public boolean isNeedProcessConnectionHeader() {
102108

103109
@Override
104110
public void handlerAdded(ChannelHandlerContext ctx) {
111+
// dispose the saslRpcClient when the channel is closed, since saslRpcClient is final, it is
112+
// safe to reference it in lambda expr.
113+
NettyFutureUtils.addListener(ctx.channel().closeFuture(), f -> saslRpcClient.dispose());
105114
try {
106115
byte[] initialResponse = ugi.doAs(new PrivilegedExceptionAction<byte[]>() {
107116

@@ -151,14 +160,12 @@ public byte[] run() throws Exception {
151160

152161
@Override
153162
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
154-
saslRpcClient.dispose();
155163
saslPromise.tryFailure(new ConnectionClosedException("Connection closed"));
156164
ctx.fireChannelInactive();
157165
}
158166

159167
@Override
160168
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
161-
saslRpcClient.dispose();
162169
saslPromise.tryFailure(cause);
163170
}
164171
}

hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.security;
1919

20-
import javax.security.sasl.SaslClient;
20+
import javax.security.sasl.SaslException;
2121
import org.apache.yetus.audience.InterfaceAudience;
2222

2323
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
@@ -32,22 +32,20 @@
3232
@InterfaceAudience.Private
3333
public class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> {
3434

35-
private final SaslClient saslClient;
36-
37-
public SaslUnwrapHandler(SaslClient saslClient) {
38-
this.saslClient = saslClient;
35+
public interface Unwrapper {
36+
byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException;
3937
}
4038

41-
@Override
42-
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
43-
SaslUtil.safeDispose(saslClient);
44-
ctx.fireChannelInactive();
39+
private final Unwrapper unwrapper;
40+
41+
public SaslUnwrapHandler(Unwrapper unwrapper) {
42+
this.unwrapper = unwrapper;
4543
}
4644

4745
@Override
4846
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
4947
byte[] bytes = new byte[msg.readableBytes()];
5048
msg.readBytes(bytes);
51-
ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(bytes, 0, bytes.length)));
49+
ctx.fireChannelRead(Unpooled.wrappedBuffer(unwrapper.unwrap(bytes, 0, bytes.length)));
5250
}
5351
}

hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.security;
1919

20-
import javax.security.sasl.SaslClient;
20+
import javax.security.sasl.SaslException;
2121
import org.apache.yetus.audience.InterfaceAudience;
2222

2323
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
@@ -30,17 +30,21 @@
3030
@InterfaceAudience.Private
3131
public class SaslWrapHandler extends MessageToByteEncoder<ByteBuf> {
3232

33-
private final SaslClient saslClient;
33+
public interface Wrapper {
34+
byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException;
35+
}
36+
37+
private final Wrapper wrapper;
3438

35-
public SaslWrapHandler(SaslClient saslClient) {
36-
this.saslClient = saslClient;
39+
public SaslWrapHandler(Wrapper wrapper) {
40+
this.wrapper = wrapper;
3741
}
3842

3943
@Override
4044
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
4145
byte[] bytes = new byte[msg.readableBytes()];
4246
msg.readBytes(bytes);
43-
byte[] wrapperBytes = saslClient.wrap(bytes, 0, bytes.length);
47+
byte[] wrapperBytes = wrapper.wrap(bytes, 0, bytes.length);
4448
out.ensureWritable(4 + wrapperBytes.length);
4549
out.writeInt(wrapperBytes.length);
4650
out.writeBytes(wrapperBytes);
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.ipc;
19+
20+
import java.io.IOException;
21+
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
22+
import org.apache.hadoop.hbase.security.SaslStatus;
23+
import org.apache.hadoop.hbase.security.SaslUnwrapHandler;
24+
import org.apache.hadoop.hbase.security.SaslWrapHandler;
25+
import org.apache.hadoop.hbase.util.NettyFutureUtils;
26+
import org.apache.hadoop.io.BytesWritable;
27+
import org.apache.hadoop.io.Writable;
28+
import org.apache.hadoop.io.WritableUtils;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
33+
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
34+
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
35+
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
36+
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
37+
import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
38+
39+
/**
40+
* Implement SASL negotiation logic for rpc server.
41+
*/
42+
class NettyHBaseSaslRpcServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
43+
44+
private static final Logger LOG = LoggerFactory.getLogger(NettyHBaseSaslRpcServerHandler.class);
45+
46+
static final String DECODER_NAME = "SaslNegotiationDecoder";
47+
48+
private final NettyRpcServer rpcServer;
49+
50+
private final NettyServerRpcConnection conn;
51+
52+
NettyHBaseSaslRpcServerHandler(NettyRpcServer rpcServer, NettyServerRpcConnection conn) {
53+
this.rpcServer = rpcServer;
54+
this.conn = conn;
55+
}
56+
57+
private void doResponse(ChannelHandlerContext ctx, SaslStatus status, Writable rv,
58+
String errorClass, String error) throws IOException {
59+
// In my testing, have noticed that sasl messages are usually
60+
// in the ballpark of 100-200. That's why the initial capacity is 256.
61+
ByteBuf resp = ctx.alloc().buffer(256);
62+
try (ByteBufOutputStream out = new ByteBufOutputStream(resp)) {
63+
out.writeInt(status.state); // write status
64+
if (status == SaslStatus.SUCCESS) {
65+
rv.write(out);
66+
} else {
67+
WritableUtils.writeString(out, errorClass);
68+
WritableUtils.writeString(out, error);
69+
}
70+
}
71+
NettyFutureUtils.safeWriteAndFlush(ctx, resp);
72+
}
73+
74+
@Override
75+
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
76+
LOG.debug("Read input token of size={} for processing by saslServer.evaluateResponse()",
77+
msg.readableBytes());
78+
HBaseSaslRpcServer saslServer = conn.getOrCreateSaslServer();
79+
byte[] saslToken = new byte[msg.readableBytes()];
80+
msg.readBytes(saslToken, 0, saslToken.length);
81+
byte[] replyToken = saslServer.evaluateResponse(saslToken);
82+
if (replyToken != null) {
83+
LOG.debug("Will send token of size {} from saslServer.", replyToken.length);
84+
doResponse(ctx, SaslStatus.SUCCESS, new BytesWritable(replyToken), null, null);
85+
}
86+
if (saslServer.isComplete()) {
87+
conn.finishSaslNegotiation();
88+
String qop = saslServer.getNegotiatedQop();
89+
boolean useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
90+
ChannelPipeline p = ctx.pipeline();
91+
if (useWrap) {
92+
p.addFirst(new SaslWrapHandler(saslServer::wrap));
93+
p.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
94+
new SaslUnwrapHandler(saslServer::unwrap));
95+
}
96+
conn.setupDecoder();
97+
p.remove(this);
98+
p.remove(DECODER_NAME);
99+
}
100+
}
101+
102+
@Override
103+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
104+
LOG.error("Error when doing SASL handshade, provider={}", conn.provider, cause);
105+
Throwable sendToClient = HBaseSaslRpcServer.unwrap(cause);
106+
doResponse(ctx, SaslStatus.ERROR, null, sendToClient.getClass().getName(),
107+
sendToClient.getLocalizedMessage());
108+
rpcServer.metrics.authenticationFailure();
109+
String clientIP = this.toString();
110+
// attempting user could be null
111+
RpcServer.AUDITLOG.warn("{}{}: {}", RpcServer.AUTH_FAILED_FOR, clientIP,
112+
conn.saslServer != null ? conn.saslServer.getAttemptingUser() : "Unknown");
113+
NettyFutureUtils.safeClose(ctx);
114+
}
115+
}

0 commit comments

Comments
 (0)