Skip to content

HBASE-22905 Avoid temp ByteBuffer allocation in BlockingRpcConnection#writeRequest #538

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayDeque;
import java.util.Locale;
Expand Down Expand Up @@ -70,6 +69,8 @@
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
Expand Down Expand Up @@ -599,37 +600,44 @@ private void tracedWriteRequest(Call call) throws IOException {
* @see #readResponse()
*/
private void writeRequest(Call call) throws IOException {
ByteBuffer cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec,
this.compressor, call.cells);
CellBlockMeta cellBlockMeta;
if (cellBlock != null) {
cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.limit()).build();
} else {
cellBlockMeta = null;
}
RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta);
ByteBuf cellBlock = null;
try {
cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec, this.compressor,
call.cells, PooledByteBufAllocator.DEFAULT);
CellBlockMeta cellBlockMeta;
if (cellBlock != null) {
cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.readableBytes()).build();
} else {
cellBlockMeta = null;
}
RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta);

setupIOstreams();
setupIOstreams();

// Now we're going to write the call. We take the lock, then check that the connection
// is still valid, and, if so we do the write to the socket. If the write fails, we don't
// know where we stand, we have to close the connection.
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
// Now we're going to write the call. We take the lock, then check that the connection
// is still valid, and, if so we do the write to the socket. If the write fails, we don't
// know where we stand, we have to close the connection.
if (Thread.interrupted()) {
throw new InterruptedIOException();
}

calls.put(call.id, call); // We put first as we don't want the connection to become idle.
// from here, we do not throw any exception to upper layer as the call has been tracked in the
// pending calls map.
try {
call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock));
} catch (Throwable t) {
if(LOG.isTraceEnabled()) {
LOG.trace("Error while writing call, call_id:" + call.id, t);
calls.put(call.id, call); // We put first as we don't want the connection to become idle.
// from here, we do not throw any exception to upper layer as the call has been tracked in
// the pending calls map.
try {
call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock));
} catch (Throwable t) {
if(LOG.isTraceEnabled()) {
LOG.trace("Error while writing call, call_id:" + call.id, t);
}
IOException e = IPCUtil.toIOE(t);
closeConn(e);
return;
}
} finally {
if (cellBlock != null) {
cellBlock.release();
}
IOException e = IPCUtil.toIOE(t);
closeConn(e);
return;
}
notifyAll();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
Expand All @@ -41,7 +40,7 @@
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;

import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
Expand All @@ -62,19 +61,19 @@ class IPCUtil {
* @throws IOException if write action fails
*/
public static int write(final OutputStream dos, final Message header, final Message param,
final ByteBuffer cellBlock) throws IOException {
final ByteBuf cellBlock) throws IOException {
// Must calculate total size and write that first so other side can read it all in in one
// swoop. This is dictated by how the server is currently written. Server needs to change
// if we are to be able to write without the length prefixing.
int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
if (cellBlock != null) {
totalSize += cellBlock.remaining();
totalSize += cellBlock.readableBytes();
}
return write(dos, header, param, cellBlock, totalSize);
}

private static int write(final OutputStream dos, final Message header, final Message param,
final ByteBuffer cellBlock, final int totalSize) throws IOException {
final ByteBuf cellBlock, final int totalSize) throws IOException {
// I confirmed toBytes does same as DataOutputStream#writeInt.
dos.write(Bytes.toBytes(totalSize));
// This allocates a buffer that is the size of the message internally.
Expand All @@ -83,7 +82,7 @@ private static int write(final OutputStream dos, final Message header, final Mes
param.writeDelimitedTo(dos);
}
if (cellBlock != null) {
dos.write(cellBlock.array(), 0, cellBlock.remaining());
cellBlock.readBytes(dos, cellBlock.readableBytes());
}
dos.flush();
return totalSize;
Expand Down