Skip to content

HADOOP-15980. Securing Hadoop RPC using SSL #4638

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions hadoop-client-modules/hadoop-client-minicluster/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@
to process initial sasl response token included in the INITIATE
-->
<Match>
<Class name="org.apache.hadoop.ipc.Server$Connection" />
<Class name="org.apache.hadoop.ipc.netty.server.Connection" />
<Method name="processSaslMessage" />
<Bug pattern="SF_SWITCH_FALLTHROUGH" />
</Match>
Expand Down
6 changes: 6 additions & 0 deletions hadoop-common-project/hadoop-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,12 @@
<artifactId>lz4-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.27.Final</version>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,35 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
"ipc.client.rpc-timeout.ms";
/** Default value for IPC_CLIENT_RPC_TIMEOUT_KEY. */
public static final int IPC_CLIENT_RPC_TIMEOUT_DEFAULT = 120000;
/** Enable SSL. */
public static final String IPC_SSL_KEY =
"ipc.ssl.enable";
/** Default value for IPC_SSL_KEY */
public static final boolean IPC_SSL_DEFAULT = false;
/**
* Enable the use of SSL self-signed certificates.
*
* NOTE:
*
* THIS IS INSECURE AND IS PURELY PRESENT FOR TESTING. THIS SHALL BE DISABLED
* IN FUTURE REVISIONS.
*
* **/
public static final String IPC_SSL_SELF_SIGNED_CERTIFICATE_TEST =
"test.ipc.ssl.self-signed-cert";
/**
* Default value for using SSL self-signed certificates
*
*
* NOTE:
*
* THIS IS INSECURE AND IS PURELY PRESENT FOR TESTING. THIS SHALL BE DISABLED
* IN FUTURE REVISIONS. FOR NOW THE DEFAULT IS false AND WILL NEED TO BE
* EXPLICITLY ENABLED FOR USE.
*
**/
public static final boolean IPC_SSL_SELF_SIGNED_CERTIFICATE_TEST_DEFAULT =
false;
/** Responses larger than this will be logged */
public static final String IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY =
"ipc.server.max.response.size";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ void addResponseTime(String name, Schedulable e, ProcessingDetails details) {
}

// This should be only called once per call and cached in the call object
int getPriorityLevel(Schedulable e) {
public int getPriorityLevel(Schedulable e) {
return scheduler.getPriorityLevel(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.ipc.netty.client.IpcStreams;
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.Server.AuthProtocol;
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
Expand Down Expand Up @@ -62,13 +62,34 @@
import javax.net.SocketFactory;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import java.io.*;
import java.net.*;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.*;
import java.util.Arrays;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.*;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -77,6 +98,7 @@

import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
import static org.apache.hadoop.ipc.netty.client.IpcStreams.*;

/** A client for an IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
Expand Down Expand Up @@ -150,13 +172,7 @@ public static void setCallIdAndRetryCount(int cid, int rc,
private final int maxAsyncCalls;
private final AtomicInteger asyncCallCounter = new AtomicInteger(0);

/**
* Executor on which IPC calls' parameters are sent.
* Deferring the sending of parameters to a separate
* thread isolates them from thread interruptions in the
* calling code.
*/
private final ExecutorService sendParamsExecutor;

private final static ClientExecutorServiceFactory clientExcecutorFactory =
new ClientExecutorServiceFactory();

Expand Down Expand Up @@ -437,6 +453,7 @@ private class Connection extends Thread {
private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
private final boolean tcpLowLatency; // if T then use low-delay QoS
private final boolean doPing; //do we need to send ping message
private final boolean useNettySSL; // do we need SSL on in the implementation
private final int pingInterval; // how often sends ping to the server
private final int soTimeout; // used by ipc ping and rpc timeout
private byte[] pingRequest; // ping message
Expand All @@ -460,6 +477,9 @@ private class Connection extends Thread {
this.maxResponseLength = remoteId.conf.getInt(
CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH,
CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT);
this.useNettySSL = remoteId.conf.getBoolean(
CommonConfigurationKeys.IPC_SSL_KEY,
CommonConfigurationKeys.IPC_SSL_DEFAULT);
this.rpcTimeout = remoteId.getRpcTimeout();
this.maxIdleTime = remoteId.getMaxIdleTime();
this.connectionRetryPolicy = remoteId.connectionRetryPolicy;
Expand Down Expand Up @@ -837,7 +857,8 @@ private synchronized void setupIOstreams(
Random rand = null;
while (true) {
setupConnection(ticket);
ipcStreams = new IpcStreams(socket, maxResponseLength);
ipcStreams = newInstance(socket, maxResponseLength,
remoteId.conf);
writeConnectionHeader(ipcStreams);
if (authProtocol == AuthProtocol.SASL) {
try {
Expand Down Expand Up @@ -944,14 +965,17 @@ private void closeConnection() {
return;
}
// close the current connection
try {
socket.close();
} catch (IOException e) {
LOG.warn("Not able to close a socket", e);
}
IOUtils.cleanupWithLogger(LOG, ipcStreams, socket);
// set socket to null so that the next call to setupIOstreams
// can start the process of connect all over again.
socket = null;
// TODO: This change causes TestApplicationClientProtocolOnHA to throw
// a NullPointerException on synchronized(ipcStreams.out). This
// happens because closeConnection is called before the thread that
// handles client.submit finishes. This change was introduced as
// part of the SSL changes. Revisit later for further investigation
// and possibly better handling.
// ipcStreams = null;
}

/* Handle connection failures due to timeout on connect
Expand Down Expand Up @@ -1165,11 +1189,8 @@ public void sendRpcRequest(final Call call)
return;
}

// Serialize the call to be sent. This is done from the actual
// caller thread, rather than the sendParamsExecutor thread,

// so that if the serialization throws an error, it is reported
// properly. This also parallelizes the serialization.
// Serialize the call to be sent so that if the serialization throws an
// error, it is reported properly.
//
// Format of a call on the wire:
// 0) Length of rest below (1 + 2)
Expand All @@ -1186,7 +1207,7 @@ public void sendRpcRequest(final Call call)
RpcWritable.wrap(call.rpcRequest).writeTo(buf);

synchronized (sendRpcRequestLock) {
Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
Future<?> senderFuture = ipcStreams.submit(new Runnable() {
@Override
public void run() {
try {
Expand Down Expand Up @@ -1377,7 +1398,11 @@ public Client(Class<? extends Writable> valueClass, Configuration conf,
CommonConfigurationKeys.IPC_CLIENT_BIND_WILDCARD_ADDR_DEFAULT);

this.clientId = ClientId.getClientId();
this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
// TODO: This call can be moved to the place where getClientExecutor is
// invoked. However this move will change the general behaviour of
// the client, which initializes the factory in the constructor. This
// should be done with additional testing.
clientExcecutorFactory.refAndGetInstance();
this.maxAsyncCalls = conf.getInt(
CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT);
Expand Down Expand Up @@ -1900,79 +1925,4 @@ public static int nextCallId() {
public void close() throws Exception {
stop();
}

/** Manages the input and output streams for an IPC connection.
* Only exposed for use by SaslRpcClient.
*/
@InterfaceAudience.Private
public static class IpcStreams implements Closeable, Flushable {
private DataInputStream in;
public DataOutputStream out;
private int maxResponseLength;
private boolean firstResponse = true;

IpcStreams(Socket socket, int maxResponseLength) throws IOException {
this.maxResponseLength = maxResponseLength;
setInputStream(
new BufferedInputStream(NetUtils.getInputStream(socket)));
setOutputStream(
new BufferedOutputStream(NetUtils.getOutputStream(socket)));
}

void setSaslClient(SaslRpcClient client) throws IOException {
// Wrap the input stream in a BufferedInputStream to fill the buffer
// before reading its length (HADOOP-14062).
setInputStream(new BufferedInputStream(client.getInputStream(in)));
setOutputStream(client.getOutputStream(out));
}

private void setInputStream(InputStream is) {
this.in = (is instanceof DataInputStream)
? (DataInputStream)is : new DataInputStream(is);
}

private void setOutputStream(OutputStream os) {
this.out = (os instanceof DataOutputStream)
? (DataOutputStream)os : new DataOutputStream(os);
}

public ByteBuffer readResponse() throws IOException {
int length = in.readInt();
if (firstResponse) {
firstResponse = false;
// pre-rpcv9 exception, almost certainly a version mismatch.
if (length == -1) {
in.readInt(); // ignore fatal/error status, it's fatal for us.
throw new RemoteException(WritableUtils.readString(in),
WritableUtils.readString(in));
}
}
if (length <= 0) {
throw new RpcException(String.format("RPC response has " +
"invalid length of %d", length));
}
if (maxResponseLength > 0 && length > maxResponseLength) {
throw new RpcException(String.format("RPC response has a " +
"length of %d exceeds maximum data length", length));
}
ByteBuffer bb = ByteBuffer.allocate(length);
in.readFully(bb.array());
return bb;
}

public void sendRequest(byte[] buf) throws IOException {
out.write(buf);
}

@Override
public void flush() throws IOException {
out.flush();
}

@Override
public void close() {
IOUtils.closeStream(out);
IOUtils.closeStream(in);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ private RpcWritable call(RPC.Server server,
// htrace in the ipc layer creates the span name based on toString()
// which uses the rpc header. in the normal case we want to defer decoding
// the rpc header until needed by the rpc engine.
static class RpcProtobufRequest extends RpcWritable.Buffer {
public static class RpcProtobufRequest extends RpcWritable.Buffer {
private volatile RequestHeaderProto requestHeader;
private Message payload;

Expand All @@ -656,7 +656,7 @@ static class RpcProtobufRequest extends RpcWritable.Buffer {
this.payload = payload;
}

RequestHeaderProto getRequestHeader() throws IOException {
public RequestHeaderProto getRequestHeader() throws IOException {
if (getByteBuffer() != null && requestHeader == null) {
requestHeader = getValue(RequestHeaderProto.getDefaultInstance());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class RpcException extends IOException {
*
* @param messages detailed message.
*/
RpcException(final String message) {
public RpcException(final String message) {
super(message);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
@InterfaceAudience.Private
public abstract class RpcWritable implements Writable {

static RpcWritable wrap(Object o) {
public static RpcWritable wrap(Object o) {
if (o instanceof RpcWritable) {
return (RpcWritable)o;
} else if (o instanceof Message) {
Expand Down
Loading