Skip to content

Commit

Permalink
HADOOP-6299. Reimplement the UserGroupInformation to use the OS
Browse files Browse the repository at this point in the history
specific and Kerberos JAAS login. (omalley)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@903560 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
omalley committed Jan 27, 2010
1 parent d70b663 commit ce7292a
Show file tree
Hide file tree
Showing 30 changed files with 1,059 additions and 1,656 deletions.
3 changes: 2 additions & 1 deletion .eclipse.templates/.classpath
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Core/test/ftplet-api-1.0.0.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Core/test/ftpserver-core-1.0.0.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Core/test/ftpserver-deprecated-1.0.0-M2.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Core/test/mina-core-2.0.0-M5.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Core/test/mina-core-2.0.0-M5.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Core/test/mockito-all-1.8.0.jar"/>
<classpathentry kind="lib" path="build/test/core/classes"/>
<classpathentry kind="lib" path="build/classes"/>
<classpathentry kind="lib" path="conf"/>
Expand Down
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ Trunk (unreleased changes)

INCOMPATIBLE CHANGES

HADOOP-6299. Reimplement the UserGroupInformation to use the OS
specific and Kerberos JAAS login. (omalley)

NEW FEATURES

HADOOP-6284. Add a new parameter, HADOOP_JAVA_PLATFORM_OPTS, to
Expand Down
5 changes: 5 additions & 0 deletions ivy.xml
Original file line number Diff line number Diff line change
Expand Up @@ -294,5 +294,10 @@
rev="${aspectj.version}"
conf="common->default">
</dependency>
<dependency org="org.mockito"
name="mockito-all"
rev="${mockito-all.version}"
conf="common->default">
</dependency>
</dependencies>
</ivy-module>
3 changes: 3 additions & 0 deletions ivy/libraries.properties
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,6 @@ xmlenc.version=0.52
xerces.version=1.4.4

aspectj.version=1.6.5

mockito-all.version=1.8.0

1 change: 1 addition & 0 deletions src/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,5 +128,6 @@ public class CommonConfigurationKeys {
public static final String HADOOP_UTIL_HASH_TYPE_DEFAULT = "murmur";
public static final String HADOOP_SECURITY_GROUP_MAPPING = "hadoop.security.group.mapping";
public static final String HADOOP_SECURITY_GROUPS_CACHE_SECS = "hadoop.security.groups.cache.secs";
public static final String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication";
}

17 changes: 3 additions & 14 deletions src/java/org/apache/hadoop/fs/FileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;

import javax.security.auth.login.LoginException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -1318,9 +1316,6 @@ private static class GlobFilter implements PathFilter {
/** Default pattern character: Character set close. */
private static final char PAT_SET_CLOSE = ']';

GlobFilter() {
}

GlobFilter(String filePattern) throws IOException {
setRegex(filePattern);
}
Expand Down Expand Up @@ -1870,15 +1865,9 @@ static class Key {
scheme = uri.getScheme()==null?"":uri.getScheme().toLowerCase();
authority = uri.getAuthority()==null?"":uri.getAuthority().toLowerCase();
this.unique = unique;
UserGroupInformation ugi = UserGroupInformation.readFrom(conf);
if (ugi == null) {
try {
ugi = UserGroupInformation.login(conf);
} catch(LoginException e) {
LOG.warn("uri=" + uri, e);
}
}
username = ugi == null? null: ugi.getUserName();

UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
username = ugi.getUserName();
}

/** {@inheritDoc} */
Expand Down
12 changes: 6 additions & 6 deletions src/java/org/apache/hadoop/ipc/ConnectionHeader.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation;

/**
Expand All @@ -36,7 +35,7 @@ class ConnectionHeader implements Writable {
public static final Log LOG = LogFactory.getLog(ConnectionHeader.class);

private String protocol;
private UserGroupInformation ugi = new UnixUserGroupInformation();
private UserGroupInformation ugi = null;

public ConnectionHeader() {}

Expand All @@ -60,9 +59,10 @@ public void readFields(DataInput in) throws IOException {
protocol = null;
}

boolean ugiPresent = in.readBoolean();
if (ugiPresent) {
ugi.readFields(in);
boolean ugiUsernamePresent = in.readBoolean();
if (ugiUsernamePresent) {
String username = in.readUTF();
ugi = UserGroupInformation.createRemoteUser(username);
} else {
ugi = null;
}
Expand All @@ -73,7 +73,7 @@ public void write(DataOutput out) throws IOException {
Text.writeString(out, (protocol == null) ? "" : protocol);
if (ugi != null) {
out.writeBoolean(true);
ugi.write(out);
out.writeUTF(ugi.getUserName());
} else {
out.writeBoolean(false);
}
Expand Down
8 changes: 1 addition & 7 deletions src/java/org/apache/hadoop/ipc/RPC.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.HashMap;

import javax.net.SocketFactory;
import javax.security.auth.login.LoginException;

import org.apache.commons.logging.*;

Expand Down Expand Up @@ -205,12 +204,7 @@ public static Object waitForProxy(Class protocol, long clientVersion,
public static Object getProxy(Class protocol, long clientVersion,
InetSocketAddress addr, Configuration conf,
SocketFactory factory) throws IOException {
UserGroupInformation ugi = null;
try {
ugi = UserGroupInformation.login(conf);
} catch (LoginException le) {
throw new RuntimeException("Couldn't login!");
}
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
return getProxy(protocol, clientVersion, addr, ugi, conf, factory);
}

Expand Down
101 changes: 52 additions & 49 deletions src/java/org/apache/hadoop/ipc/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@

package org.apache.hadoop.ipc;

import java.io.IOException;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
Expand All @@ -33,41 +39,30 @@
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;

import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;

import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;

import javax.security.auth.Subject;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;

/** An abstract IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
Expand All @@ -76,6 +71,7 @@
* @see Client
*/
public abstract class Server {
private final boolean authorize;

/**
* The first four bytes of Hadoop RPC connections
Expand Down Expand Up @@ -728,7 +724,7 @@ private class Connection {
ConnectionHeader header = new ConnectionHeader();
Class<?> protocol;

Subject user = null;
UserGroupInformation user = null;

// Fake 'call' for failed authorization response
private static final int AUTHROIZATION_FAILED_CALLID = -1;
Expand Down Expand Up @@ -899,14 +895,7 @@ private void processHeader() throws IOException {
throw new IOException("Unknown protocol: " + header.getProtocol());
}

// TODO: Get the user name from the GSS API for Kerberbos-based security
// Create the user subject; however use the groups as defined on the
// server-side, don't trust the user groups provided by the client
UserGroupInformation ugi = header.getUgi();
user = null;
if(ugi != null) {
user = SecurityUtil.getSubject(conf, header.getUgi().getUserName());
}
user = header.getUgi();
}

private void processData() throws IOException, InterruptedException {
Expand Down Expand Up @@ -968,24 +957,23 @@ public void run() {
try {
// Make the call as the user via Subject.doAs, thus associating
// the call with the Subject
value =
Subject.doAs(call.connection.user,
new PrivilegedExceptionAction<Writable>() {
@Override
public Writable run() throws Exception {
// make the call
return call(call.connection.protocol,
call.param, call.timestamp);

}
}
);

} catch (PrivilegedActionException pae) {
Exception e = pae.getException();
LOG.info(getName()+", call "+call+": error: " + e, e);
errorClass = e.getClass().getName();
error = StringUtils.stringifyException(e);
if (call.connection.user == null) {
value = call(call.connection.protocol, call.param,
call.timestamp);
} else {
value =
call.connection.user.doAs
(new PrivilegedExceptionAction<Writable>() {
@Override
public Writable run() throws Exception {
// make the call
return call(call.connection.protocol,
call.param, call.timestamp);

}
}
);
}
} catch (Throwable e) {
LOG.info(getName()+", call "+call+": error: " + e, e);
errorClass = e.getClass().getName();
Expand Down Expand Up @@ -1045,6 +1033,9 @@ protected Server(String bindAddress, int port,
this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
this.authorize =
conf.getBoolean(ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG,
false);

// Start the listener here and let it bind to the port
listener = new Listener();
Expand Down Expand Up @@ -1176,8 +1167,20 @@ public abstract Writable call(Class<?> protocol,
* @param connection incoming connection
* @throws AuthorizationException when the client isn't authorized to talk the protocol
*/
public void authorize(Subject user, ConnectionHeader connection)
throws AuthorizationException {}
public void authorize(UserGroupInformation user,
ConnectionHeader connection
) throws AuthorizationException {
if (authorize) {
Class<?> protocol = null;
try {
protocol = getProtocolClass(connection.getProtocol(), getConf());
} catch (ClassNotFoundException cfne) {
throw new AuthorizationException("Unknown protocol: " +
connection.getProtocol());
}
ServiceAuthorizationManager.authorize(user, protocol);
}
}

/**
* The number of open RPC conections
Expand Down
25 changes: 0 additions & 25 deletions src/java/org/apache/hadoop/ipc/WritableRpcEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,17 @@
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;

import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.io.*;
import java.util.Map;
import java.util.HashMap;

import javax.net.SocketFactory;
import javax.security.auth.Subject;
import javax.security.auth.login.LoginException;

import org.apache.commons.logging.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
Expand Down Expand Up @@ -291,7 +285,6 @@ public Server getServer(Class protocol,
public static class Server extends RPC.Server {
private Object instance;
private boolean verbose;
private boolean authorize = false;

/** Construct an RPC server.
* @param instance the instance whose methods will be called
Expand Down Expand Up @@ -325,9 +318,6 @@ public Server(Object instance, Configuration conf, String bindAddress, int port
super(bindAddress, port, Invocation.class, numHandlers, conf, classNameBase(instance.getClass().getName()));
this.instance = instance;
this.verbose = verbose;
this.authorize =
conf.getBoolean(ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG,
false);
}

public Writable call(Class<?> protocol, Writable param, long receivedTime)
Expand Down Expand Up @@ -390,21 +380,6 @@ public Writable call(Class<?> protocol, Writable param, long receivedTime)
throw ioe;
}
}

@Override
public void authorize(Subject user, ConnectionHeader connection)
throws AuthorizationException {
if (authorize) {
Class<?> protocol = null;
try {
protocol = getProtocolClass(connection.getProtocol(), getConf());
} catch (ClassNotFoundException cfne) {
throw new AuthorizationException("Unknown protocol: " +
connection.getProtocol());
}
ServiceAuthorizationManager.authorize(user, protocol);
}
}
}

private static void log(String value) {
Expand Down
Loading

0 comments on commit ce7292a

Please sign in to comment.