Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ public final class SubjectUtil {
HAS_CALL_AS ? null : lookupDoAsThrowException();
private static final MethodHandle CURRENT = lookupCurrent();

// copied from org.apache.hadoop.util.Shell to break circular dependency
// "1.8"->8, "9"->9, "10"->10
private static final int JAVA_SPEC_VER = Math.max(8,
Integer.parseInt(System.getProperty("java.specification.version").split("\\.")[0]));

public static final boolean THREAD_INHERITS_SUBJECT = checkThreadInheritsSubject();

/**
* Try to return the method handle for Subject#callAs()
*
* @return the method handle, or null if the Java version does not have it
*/
private static MethodHandle lookupCallAs() {
MethodHandles.Lookup lookup = MethodHandles.lookup();
try {
Expand All @@ -71,6 +83,38 @@ private static MethodHandle lookupCallAs() {
}
}

/**
* Determine whether we need to explicitly propagate the Subject into new Threads.
*
* @return true if new Threads inherit the Subject from the parent
*/
private static boolean checkThreadInheritsSubject() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, javadoc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added javadoc


boolean securityManagerEnabled = true;
try {
// TODO this needs SecurityManager to compile, use reflection to look it up instead
SecurityManager sm = System.getSecurityManager();
System.setSecurityManager(sm);
} catch (UnsupportedOperationException e) {
// JDK24+ unconditionally throws this, so we don't need to check for JDK24+
// explicitly
securityManagerEnabled = false;
} catch (Throwable t) {
// don't care
}

return JAVA_SPEC_VER < 22 || securityManagerEnabled;
}

/**
* Look up the method handle for Subject#doAs(PrivilegedAction)
*
* This is only called if Subject#callAs() does not exist.
* If we can't fall back to doAs(), that's a hard error.
*
* @return the method handle
* @throws ExceptionInInitializerError if unable to get the method handle
*/
private static MethodHandle lookupDoAs() {
MethodHandles.Lookup lookup = MethodHandles.lookup();
try {
Expand All @@ -82,6 +126,15 @@ private static MethodHandle lookupDoAs() {
}
}

/**
* Look up the method handle for Subject#doAs(PrivilegedExceptionAction)
*
* This is only called if Subject#callAs() does not exist.
* If we can't fall back to doAs(), that's a hard error.
*
* @return the method handle
* @throws ExceptionInInitializerError if unable to get the method handle
*/
private static MethodHandle lookupDoAsThrowException() {
MethodHandles.Lookup lookup = MethodHandles.lookup();
try {
Expand All @@ -93,6 +146,15 @@ private static MethodHandle lookupDoAsThrowException() {
}
}

/**
* Look up the method handle for Subject#current().
*
* If Subject#current() is not present, fall back to returning
* a method handle for Subject.getSubject(AccessController.getContext())
*
* @return the method handle or null if it does not exist
* @throws ExceptionInInitializerError if neither current() nor the fallback is found
*/
private static MethodHandle lookupCurrent() {
MethodHandles.Lookup lookup = MethodHandles.lookup();
try {
Expand All @@ -112,6 +174,15 @@ private static MethodHandle lookupCurrent() {
}
}

/**
* Look up the method handle for Subject#getSubject(AccessControlContext)
*
* This is only called if Subject#current() does not exist.
* If we can't fall back to getSubject(), that's a hard error.
*
* @return the method handle
* @throws ExceptionInInitializerError if cannot get the handle
*/
private static MethodHandle lookupGetSubject() {
MethodHandles.Lookup lookup = MethodHandles.lookup();
try {
Expand All @@ -124,6 +195,15 @@ private static MethodHandle lookupGetSubject() {
}
}

/**
* Look up the method handle for AccessController.getAccessControlContext()
*
* This is only called if Subject#current() does not exist.
* If we can't find this method, then we can't fall back which is hard error.
*
* @return the method handle
* @throws ExceptionInInitializerError if cannot get the handle
*/
private static MethodHandle lookupGetContext() {
try {
// Use reflection to work with Java versions that have and don't have
Expand Down Expand Up @@ -264,6 +344,13 @@ public static Subject current() {
}
}

/**
* Convert a Callable into a PrivilegedAction
*
* @param <T> return type
* @param callable to be converted
* @return PrivilegedAction wrapping the callable
*/
private static <T> PrivilegedAction<T> callableToPrivilegedAction(
Callable<T> callable) {
return () -> {
Expand All @@ -275,11 +362,25 @@ private static <T> PrivilegedAction<T> callableToPrivilegedAction(
};
}

/**
* Convert a PrivilegedExceptionAction into a Callable
*
* @param <T> return type
* @param action to be wrapped
* @return Callable wrapping the action
*/
private static <T> Callable<T> privilegedExceptionActionToCallable(
PrivilegedExceptionAction<T> action) {
return action::run;
}

/**
* Convert a PrivilegedAction into a Callable
*
* @param <T> return type
* @param action to be wrapped
* @return Callable wrapping the action
*/
private static <T> Callable<T> privilegedActionToCallable(
PrivilegedAction<T> action) {
return action::run;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.SubjectInheritingThread;
import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -105,15 +106,16 @@ public Collection<PropertyChange> getChangedProperties(
/**
* A background thread to apply configuration changes.
*/
private static class ReconfigurationThread extends Thread {
private static class ReconfigurationThread extends SubjectInheritingThread {
private ReconfigurableBase parent;

ReconfigurationThread(ReconfigurableBase base) {
super();
this.parent = base;
}

// See {@link ReconfigurationServlet#applyChanges}
public void run() {
public void work() {
LOG.info("Starting reconfiguration task.");
final Configuration oldConf = parent.getConf();
final Configuration newConf = parent.getNewConf();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.concurrent.SubjectInheritingThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -107,7 +108,7 @@ void init() {
*/
private void initRefreshThread(boolean runImmediately) {
if (refreshInterval > 0) {
refreshUsed = new Thread(new RefreshThread(this, runImmediately),
refreshUsed = new SubjectInheritingThread(new RefreshThread(this, runImmediately),
"refreshUsed-" + dirPath);
refreshUsed.setDaemon(true);
refreshUsed.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.SubjectInheritingThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -38,7 +39,7 @@
*/
@InterfaceAudience.Private
public class DelegationTokenRenewer
extends Thread {
extends SubjectInheritingThread {
private static final Logger LOG = LoggerFactory
.getLogger(DelegationTokenRenewer.class);

Expand Down Expand Up @@ -263,7 +264,7 @@ public <T extends FileSystem & Renewable> void removeRenewAction(
}

@Override
public void run() {
public void work() {
for(;;) {
RenewAction<?> action = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.concurrent.SubjectInheritingThread;
import org.apache.hadoop.tracing.Tracer;
import org.apache.hadoop.tracing.TraceScope;
import org.apache.hadoop.util.Preconditions;
Expand Down Expand Up @@ -4087,7 +4088,7 @@ private interface StatisticsAggregator<T> {
static {
STATS_DATA_REF_QUEUE = new ReferenceQueue<>();
// start a single daemon cleaner thread
STATS_DATA_CLEANER = new Thread(new StatisticsDataReferenceCleaner());
STATS_DATA_CLEANER = new SubjectInheritingThread(new StatisticsDataReferenceCleaner());
STATS_DATA_CLEANER.
setName(StatisticsDataReferenceCleaner.class.getName());
STATS_DATA_CLEANER.setDaemon(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ public void uncaughtException(Thread t, Throwable e) {
}

@Override
public void run() {
public void work() {
while (shouldRun) {
try {
loopUntilConnected();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.ha;

import org.apache.hadoop.util.concurrent.SubjectInheritingThread;
import org.slf4j.Logger;

import java.io.BufferedReader;
Expand Down Expand Up @@ -50,7 +51,7 @@ enum StreamType {
this.stream = stream;
this.type = type;

thread = new Thread(new Runnable() {
thread = new SubjectInheritingThread(new Runnable() {
@Override
public void run() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ void tryStart() {
if (running.compareAndSet(null, current)) {
final Daemon daemon = new Daemon() {
@Override
public void run() {
public void work() {
for (; isRunning(this);) {
final long waitTime = checkCalls();
tryStop(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.AsyncGet;
import org.apache.hadoop.util.concurrent.SubjectInheritingThread;
import org.apache.hadoop.tracing.Span;
import org.apache.hadoop.tracing.Tracer;
import org.slf4j.Logger;
Expand Down Expand Up @@ -407,7 +408,7 @@ public synchronized void setRpcResponse(Writable rpcResponse) {
/** Thread that reads responses and notifies callers. Each connection owns a
* socket connected to a remote address. Calls are multiplexed through this
* socket: responses may be delivered out of order. */
private class Connection extends Thread {
private class Connection extends SubjectInheritingThread {
private InetSocketAddress server; // server ip:port
private final ConnectionId remoteId; // connection id
private AuthMethod authMethod; // authentication method
Expand Down Expand Up @@ -448,7 +449,7 @@ private class Connection extends Thread {
Consumer<Connection> removeMethod) {
this.remoteId = remoteId;
this.server = remoteId.getAddress();
this.rpcRequestThread = new Thread(new RpcRequestSender(),
this.rpcRequestThread = new SubjectInheritingThread(new RpcRequestSender(),
"IPC Parameter Sending Thread for " + remoteId);
this.rpcRequestThread.setDaemon(true);

Expand Down Expand Up @@ -1126,7 +1127,7 @@ private synchronized void sendPing() throws IOException {
}

@Override
public void run() {
public void work() {
try {
// Don't start the ipc parameter sending thread until we start this
// thread, because the shutdown logic only gets triggered if this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@
import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.SubjectInheritingThread;

import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.tracing.Span;
import org.apache.hadoop.tracing.SpanContext;
Expand Down Expand Up @@ -1471,7 +1473,7 @@ public String toString() {
}

/** Listens on the socket. Creates jobs for the handler threads*/
private class Listener extends Thread {
private class Listener extends SubjectInheritingThread {

private ServerSocketChannel acceptChannel = null; //the accept channel
private Selector selector = null; //the selector that we use for the server
Expand Down Expand Up @@ -1520,7 +1522,7 @@ void setIsAuxiliary() {
this.isOnAuxiliaryPort = true;
}

private class Reader extends Thread {
private class Reader extends SubjectInheritingThread {
final private BlockingQueue<Connection> pendingConnections;
private final Selector readSelector;

Expand All @@ -1533,7 +1535,7 @@ private class Reader extends Thread {
}

@Override
public void run() {
public void work() {
LOG.info("Starting " + Thread.currentThread().getName());
try {
doRunLoop();
Expand Down Expand Up @@ -1612,7 +1614,7 @@ void shutdown() {
}

@Override
public void run() {
public void work() {
LOG.info(Thread.currentThread().getName() + ": starting");
SERVER.set(Server.this);
connectionManager.startIdleScan();
Expand Down Expand Up @@ -1760,7 +1762,7 @@ Reader getReader() {
}

// Sends responses of RPC back to clients.
private class Responder extends Thread {
private class Responder extends SubjectInheritingThread {
private final Selector writeSelector;
private int pending; // connections waiting to register

Expand All @@ -1772,7 +1774,7 @@ private class Responder extends Thread {
}

@Override
public void run() {
public void work() {
LOG.info(Thread.currentThread().getName() + ": starting");
SERVER.set(Server.this);
try {
Expand Down Expand Up @@ -3219,15 +3221,15 @@ private void internalQueueCall(Call call, boolean blocking)
}

/** Handles queued calls . */
private class Handler extends Thread {
private class Handler extends SubjectInheritingThread {
public Handler(int instanceNumber) {
this.setDaemon(true);
this.setName("IPC Server handler "+ instanceNumber +
" on default port " + port);
}

@Override
public void run() {
public void work() {
LOG.debug("{}: starting", Thread.currentThread().getName());
SERVER.set(Server.this);
while (running) {
Expand Down
Loading
Loading