diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationDelegationTokenStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationDelegationTokenStateStore.java index 5e553eee04437..9dc0f09d19778 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationDelegationTokenStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationDelegationTokenStateStore.java @@ -29,20 +29,83 @@ import org.apache.hadoop.yarn.server.federation.store.records.RouterRemoveStoredTokenRequest; import org.apache.hadoop.yarn.server.federation.store.records.RouterRemoveStoredTokenResponse; +/** + * FederationDelegationTokenStateStore maintains the state of all + * DelegationToken that have been submitted to the federated cluster. + * + *

+ * It mainly includes the following operations: + *

+ * + * + */ @InterfaceAudience.Private @InterfaceStability.Unstable public interface FederationDelegationTokenStateStore { + /** + * The Router Supports Store the new master key. + * + * @param request DelegationKey. + * @return StoreNewMasterKeyResponse. + * @throws Exception exception occurred. + */ StoreNewMasterKeyResponse storeNewMasterKey(StoreNewMasterKeyRequest request) throws Exception; + /** + * The Router Supports Remove the master key. + * + * @param request DelegationKey. + * @return RemoveStoredMasterKeyResponse. + * @throws Exception exception occurred. + */ RemoveStoredMasterKeyResponse removeStoredMasterKey(RemoveStoredMasterKeyRequest request) throws Exception; + /** + * The Router Supports Store new Token. + * + * @param request DelegationKey. + * @return RouterStoreNewTokenResponse. + * @throws Exception exception occurred. + */ RouterStoreNewTokenResponse storeNewToken(RouterStoreNewTokenRequest request) throws Exception; + /** + * The Router Supports Update Token. + * + * @param request DelegationKey. + * @return RouterUpdateStoredTokenResponse. + * @throws Exception exception occurred. + */ RouterUpdateStoredTokenResponse updateStoredToken(RouterUpdateStoredTokenRequest request) throws Exception; + /** + * The Router Supports Remove Token. + * + * @param request DelegationKey. + * @return RouterRemoveStoredTokenResponse. + * @throws Exception exception occurred. + */ RouterRemoveStoredTokenResponse removeStoredToken(RouterRemoveStoredTokenRequest request) throws Exception; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index 9df4e533e50a7..591e3b5ce6509 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.federation.utils; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; @@ -446,27 +447,50 @@ public void removeStoredMasterKey(DelegationKey newKey) throws Exception { stateStore.removeStoredMasterKey(keyRequest); } - public void storeNewToken(RMDelegationTokenIdentifier id, + /** + * The Router Supports Store new Token. + * + * @param identifier Delegation Token + * @param renewDate renewDate + * @throws IOException IO exception occurred. + */ + public void storeNewToken(RMDelegationTokenIdentifier identifier, long renewDate) throws Exception { - LOG.info("storing RMDelegation token with sequence number: {}.", id.getSequenceNumber()); - RouterStoreToken storeToken = RouterStoreToken.newInstance(id, renewDate); + LOG.info("storing RMDelegation token with sequence number: {}.", + identifier.getSequenceNumber()); + RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate); RouterStoreNewTokenRequest request = RouterStoreNewTokenRequest.newInstance(storeToken); stateStore.storeNewToken(request); } - public void updateStoredToken(RMDelegationTokenIdentifier id, + /** + * The Router Supports Update Token. + * + * @param identifier Delegation Token + * @param renewDate renewDate + * @throws IOException IO exception occurred. + */ + public void updateStoredToken(RMDelegationTokenIdentifier identifier, long renewDate) throws Exception { - LOG.info("updating RMDelegation token with sequence number: {}.", id.getSequenceNumber()); - RouterStoreToken storeToken = RouterStoreToken.newInstance(id, renewDate); + LOG.info("updating RMDelegation token with sequence number: {}.", + identifier.getSequenceNumber()); + RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate); RouterUpdateStoredTokenRequest request = RouterUpdateStoredTokenRequest.newInstance(storeToken); stateStore.updateStoredToken(request); } - public void removeStoredToken(RMDelegationTokenIdentifier id) + /** + * The Router Supports Remove Token. + * + * @param identifier Delegation Token + * @throws IOException IO exception occurred. + */ + public void removeStoredToken(RMDelegationTokenIdentifier identifier) throws Exception{ - LOG.info("removing RMDelegation token with sequence number: {}", id.getSequenceNumber()); - RouterStoreToken storeToken = RouterStoreToken.newInstance(id, 0L); + LOG.info("removing RMDelegation token with sequence number: {}", + identifier.getSequenceNumber()); + RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, 0L); RouterRemoveStoredTokenRequest request = RouterRemoveStoredTokenRequest.newInstance(storeToken); stateStore.removeStoredToken(request); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java index 961026d014611..c71be3c46f755 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +45,8 @@ public abstract class AbstractClientRequestInterceptor @SuppressWarnings("checkstyle:visibilitymodifier") protected UserGroupInformation user = null; + private RouterDelegationTokenSecretManager tokenSecretManager = null; + /** * Sets the {@link ClientRequestInterceptor} in the chain. */ @@ -125,4 +128,13 @@ private void setupUser(String userName) { } } + @Override + public RouterDelegationTokenSecretManager getTokenSecretManager() { + return tokenSecretManager; + } + + @Override + public void setTokenSecretManager(RouterDelegationTokenSecretManager tokenSecretManager) { + this.tokenSecretManager = tokenSecretManager; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/ClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/ClientRequestInterceptor.java index 3e3ffce5f4b3f..6e19cbadf9d38 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/ClientRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/ClientRequestInterceptor.java @@ -20,6 +20,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager; /** * Defines the contract to be implemented by the request interceptor classes, @@ -62,4 +63,18 @@ public interface ClientRequestInterceptor */ ClientRequestInterceptor getNextInterceptor(); + /** + * Set RouterDelegationTokenSecretManager for specific interceptor to support Token operations, + * including create Token, update Token, and delete Token. + * + * @param tokenSecretManager Router DelegationTokenSecretManager + */ + void setTokenSecretManager(RouterDelegationTokenSecretManager tokenSecretManager); + + /** + * Get RouterDelegationTokenSecretManager. + * + * @return Router DelegationTokenSecretManager. + */ + RouterDelegationTokenSecretManager getTokenSecretManager(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java index b60a267746e4f..1c8550ef26a08 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java @@ -105,6 +105,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.router.RouterServerUtil; +import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.router.security.authorize.RouterPolicyProvider; import org.apache.hadoop.yarn.util.LRUCacheHashMap; import org.slf4j.Logger; @@ -136,6 +137,8 @@ public class RouterClientRMService extends AbstractService // and remove the oldest used ones. private Map userPipelineMap; + private RouterDelegationTokenSecretManager routerDTSecretManager; + public RouterClientRMService() { super(RouterClientRMService.class.getName()); } @@ -164,8 +167,11 @@ protected void serviceStart() throws Exception { serverConf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT, YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT); + // Initialize RouterRMDelegationTokenSecretManager. + routerDTSecretManager = createRouterRMDelegationTokenSecretManager(conf); + this.server = rpc.getServer(ApplicationClientProtocol.class, this, - listenerEndpoint, serverConf, null, numWorkerThreads); + listenerEndpoint, serverConf, routerDTSecretManager, numWorkerThreads); // Enable service authorization? if (conf.getBoolean( @@ -508,6 +514,12 @@ private RequestInterceptorChainWrapper initializePipeline(String user) { ClientRequestInterceptor interceptorChain = this.createRequestInterceptorChain(); interceptorChain.init(user); + + // + if (routerDTSecretManager != null) { + interceptorChain.setTokenSecretManager(routerDTSecretManager); + } + chainWrapper.init(interceptorChain); } catch (Exception e) { LOG.error("Init ClientRequestInterceptor error for user: {}.", user, e); @@ -558,4 +570,27 @@ protected void finalize() { public Map getUserPipelineMap() { return userPipelineMap; } + + /** + * Create RouterRMDelegationTokenSecretManager. + * In the YARN federation, the Router will replace the RM to + * manage the RMDelegationToken (generate, update, cancel), + * so the relevant configuration parameters still obtain the configuration parameters of the RM. + * + * @param conf Configuration + * @return RouterDelegationTokenSecretManager. + */ + protected RouterDelegationTokenSecretManager createRouterRMDelegationTokenSecretManager( + Configuration conf) { + + long secretKeyInterval = conf.getLong(YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY, + YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT); + long tokenMaxLifetime = conf.getLong(YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_KEY, + YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT); + long tokenRenewInterval = conf.getLong(YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, + YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); + + return new RouterDelegationTokenSecretManager(secretKeyInterval, + tokenMaxLifetime, tokenRenewInterval, 3600000); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/RouterDelegationTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/RouterDelegationTokenSecretManager.java index 4396e6c28bfe1..3f934f6cd916b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/RouterDelegationTokenSecretManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/RouterDelegationTokenSecretManager.java @@ -43,7 +43,7 @@ public class RouterDelegationTokenSecretManager private FederationStateStoreFacade federationFacade; /** - * Create a secret manager. + * Create a Router Secret manager. * * @param delegationKeyUpdateInterval the number of milliseconds for rolling * new secret keys. @@ -59,7 +59,6 @@ public RouterDelegationTokenSecretManager(long delegationKeyUpdateInterval, long delegationTokenRemoverScanInterval) { super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, delegationTokenRenewInterval, delegationTokenRemoverScanInterval); - federationFacade = FederationStateStoreFacade.getInstance(); } @Override @@ -71,6 +70,11 @@ private boolean shouldIgnoreException(Exception e) { return !running && e.getCause() instanceof InterruptedException; } + /** + * The Router Supports Store the new master key. + * + * @param newKey DelegationKey + */ @Override protected void storeNewMasterKey(DelegationKey newKey) { try { @@ -83,18 +87,30 @@ protected void storeNewMasterKey(DelegationKey newKey) { } } + /** + * The Router Supports Remove the master key. + * + * @param delegationKey DelegationKey + */ @Override - protected void removeStoredMasterKey(DelegationKey key) { + protected void removeStoredMasterKey(DelegationKey delegationKey) { try { - federationFacade.removeStoredMasterKey(key); + federationFacade.removeStoredMasterKey(delegationKey); } catch (Exception e) { if (!shouldIgnoreException(e)) { - LOG.error("Error in removing master key with KeyID: {}.", key.getKeyId()); + LOG.error("Error in removing master key with KeyID: {}.", delegationKey.getKeyId()); ExitUtil.terminate(1, e); } } } + /** + * The Router Supports Store new Token. + * + * @param identifier Delegation Token + * @param renewDate renewDate + * @throws IOException IO exception occurred. + */ @Override protected void storeNewToken(RMDelegationTokenIdentifier identifier, long renewDate) throws IOException { @@ -109,8 +125,16 @@ protected void storeNewToken(RMDelegationTokenIdentifier identifier, } } + /** + * The Router Supports Update Token. + * + * @param id Delegation Token + * @param renewDate renewDate + * @throws IOException IO exception occurred. + */ @Override - protected void updateStoredToken(RMDelegationTokenIdentifier id, long renewDate) { + protected void updateStoredToken(RMDelegationTokenIdentifier id, long renewDate) + throws IOException { try { federationFacade.updateStoredToken(id, renewDate); } catch (Exception e) { @@ -122,6 +146,12 @@ protected void updateStoredToken(RMDelegationTokenIdentifier id, long renewDate) } } + /** + * The Router Supports Remove Token. + * + * @param identifier Delegation Token + * @throws IOException IO exception occurred. + */ @Override protected void removeStoredToken(RMDelegationTokenIdentifier identifier) throws IOException {