Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@

import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
import static org.apache.hadoop.hbase.ipc.NettyRpcClient.CLIENT_BUFFER_LOW_WATERMARK;
import static org.apache.hadoop.hbase.ipc.NettyRpcClient.CLIENT_BUFFER_HIGH_WATERMARK;
import static org.apache.hadoop.hbase.ipc.NettyRpcClient.CLIENT_CONNECT_MAX_RETRIES;
import static org.apache.hadoop.hbase.ipc.NettyRpcClient.CLIENT_TCP_KEEPALIVE;
import static org.apache.hadoop.hbase.ipc.NettyRpcClient.CLIENT_TCP_NODELAY;
import static org.apache.hadoop.hbase.ipc.NettyRpcClient.DEFAULT_CLIENT_BUFFER_LOW_WATERMARK;
import static org.apache.hadoop.hbase.ipc.NettyRpcClient.DEFAULT_CLIENT_BUFFER_HIGH_WATERMARK;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -128,6 +135,8 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
protected final long failureSleep; // Time to sleep before retry on failure.
protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
protected final boolean tcpKeepAlive; // if T then use keepalives
protected final int bufferLowWatermark;
protected final int bufferHighWatermark;
protected final Codec codec;
protected final CompressionCodec compressor;
protected final boolean fallbackAllowed;
Expand Down Expand Up @@ -165,12 +174,15 @@ public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress loc
MetricsConnection metrics) {
this.userProvider = UserProvider.instantiate(conf);
this.localAddr = localAddr;
this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
this.maxRetries = conf.getInt(CLIENT_CONNECT_MAX_RETRIES, 0);
this.tcpNoDelay = conf.getBoolean(CLIENT_TCP_NODELAY, true);
this.tcpKeepAlive = conf.getBoolean(CLIENT_TCP_KEEPALIVE, true);
this.bufferLowWatermark = conf.getInt(CLIENT_BUFFER_LOW_WATERMARK, DEFAULT_CLIENT_BUFFER_LOW_WATERMARK);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only used for NettyRpcClient so better put this into the NettyRpcClient?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only used for NettyRpcClient so better put this into the NettyRpcClient?

Yeah,I referred to other constant of RpcClient like SOCKET_TIMEOUT_WRITE so that I put this into RpcClient.I will put these constants into NettyRpcClient.

this.bufferHighWatermark = conf.getInt(CLIENT_BUFFER_HIGH_WATERMARK, DEFAULT_CLIENT_BUFFER_HIGH_WATERMARK);

this.cellBlockBuilder = new CellBlockBuilder(conf);

this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.WriteBufferWaterMark;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory;
Expand Down Expand Up @@ -46,9 +47,21 @@ public class NettyRpcClient extends AbstractRpcClient<NettyRpcConnection> {

private final boolean shutdownGroupWhenClose;

protected final WriteBufferWaterMark writeBufferWaterMark;

protected static final String CLIENT_CONNECT_MAX_RETRIES = "hbase.ipc.client.connect.max.retries";
protected static final String CLIENT_TCP_NODELAY = "hbase.ipc.client.tcpnodelay";
protected static final String CLIENT_TCP_KEEPALIVE = "hbase.ipc.client.tcpkeepalive";
protected static final String CLIENT_BUFFER_LOW_WATERMARK = "hbase.ipc.client.bufferlowwatermark";
protected static final String CLIENT_BUFFER_HIGH_WATERMARK = "hbase.ipc.client.bufferhighwatermark";

protected static final int DEFAULT_CLIENT_BUFFER_LOW_WATERMARK = 1024;
protected static final int DEFAULT_CLIENT_BUFFER_HIGH_WATERMARK = 64 * 1024;

public NettyRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress,
MetricsConnection metrics) {
super(configuration, clusterId, localAddress, metrics);
this.writeBufferWaterMark = new WriteBufferWaterMark(bufferLowWatermark, bufferHighWatermark);
Pair<EventLoopGroup, Class<? extends Channel>> groupAndChannelClass = NettyRpcClientConfigHelper
.getEventLoopConfig(conf);
if (groupAndChannelClass == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
import org.apache.hbase.thirdparty.io.netty.channel.WriteBufferWaterMark;
import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil;
Expand Down Expand Up @@ -257,6 +258,7 @@ private void connect() {
.option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
.option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO)
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, rpcClient.writeBufferWaterMark)
.handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr)
.remoteAddress(remoteId.address).connect().addListener(new ChannelFutureListener() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.ServerChannel;
import org.apache.hbase.thirdparty.io.netty.channel.WriteBufferWaterMark;
import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup;
import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
Expand Down Expand Up @@ -89,8 +90,11 @@ public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterfa
channelClass = NioServerSocketChannel.class;
}
ServerBootstrap bootstrap = new ServerBootstrap().group(eventLoopGroup).channel(channelClass)
.option(ChannelOption.SO_BACKLOG, tcpBacklog)
.option(ChannelOption.SO_REUSEADDR, tcpReuseAddr)
.childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
.childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(bufferLowWatermark, bufferHighWatermark))
.childHandler(new ChannelInitializer<Channel>() {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,14 @@ public abstract class RpcServer implements RpcServerInterface,
*/
protected final LongAdder callQueueSizeInBytes = new LongAdder();

protected final boolean tcpReuseAddr; // if T then reuse address
protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
protected final boolean tcpKeepAlive; // if T then use keepalives

protected final int tcpBacklog;
protected final int bufferLowWatermark;
protected final int bufferHighWatermark;

/**
* This flag is used to indicate to sub threads when they should go down. When we call
* {@link #start()}, all threads started will consult this flag on whether they should
Expand All @@ -174,6 +179,14 @@ public abstract class RpcServer implements RpcServerInterface,
protected static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
protected static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";

protected static final String SERVER_TCP_BACKLOG = "hbase.ipc.server.tcpbacklog";
protected static final String SERVER_TCP_REUSEADDR = "hbase.ipc.server.tcpreuseaddr";
protected static final String SERVER_TCP_NODELAY = "hbase.ipc.server.tcpnodelay";
protected static final String SERVER_TCP_KEEPALIVE = "hbase.ipc.server.tcpkeepalive";

protected static final String SERVER_BUFFER_LOW_WATERMARK = "hbase.ipc.server.bufferlowwatermark";
protected static final String SERVER_BUFFER_HIGH_WATERMARK = "hbase.ipc.server.bufferhighwatermark";

/**
* Minimum allowable timeout (in milliseconds) in rpc request's header. This
* configuration exists to prevent the rpc service regarding this request as timeout immediately.
Expand All @@ -186,6 +199,10 @@ public abstract class RpcServer implements RpcServerInterface,
protected static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
protected static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;

protected static final int DEFAULT_SERVER_TCP_BACKLOG = 1024;
protected static final int DEFAULT_SERVER_BUFFER_LOW_WATERMARK = 1024;
protected static final int DEFAULT_SERVER_BUFFER_HIGH_WATERMARK = 64 * 1024;

protected static final int DEFAULT_TRACE_LOG_MAX_LENGTH = 1000;
protected static final String TRACE_LOG_MAX_LENGTH = "hbase.ipc.trace.log.max.length";
protected static final String KEY_WORD_TRUNCATED = " <TRUNCATED>";
Expand Down Expand Up @@ -273,8 +290,12 @@ public RpcServer(final Server server, final String name,
this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE);

this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
this.tcpBacklog = conf.getInt(SERVER_TCP_BACKLOG, DEFAULT_SERVER_TCP_BACKLOG);
this.tcpReuseAddr = conf.getBoolean(SERVER_TCP_REUSEADDR, true);
this.tcpNoDelay = conf.getBoolean(SERVER_TCP_NODELAY, true);
this.tcpKeepAlive = conf.getBoolean(SERVER_TCP_KEEPALIVE, true);
this.bufferLowWatermark = conf.getInt(SERVER_BUFFER_LOW_WATERMARK, DEFAULT_SERVER_BUFFER_LOW_WATERMARK);
this.bufferHighWatermark = conf.getInt(SERVER_BUFFER_HIGH_WATERMARK, DEFAULT_SERVER_BUFFER_HIGH_WATERMARK);

this.cellBlockBuilder = new CellBlockBuilder(conf);

Expand Down