Skip to content

Commit

Permalink
YARN-9708. Yarn Router Support DelegationToken.
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 committed Aug 14, 2022
1 parent 2522887 commit 606a2c9
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,83 @@
import org.apache.hadoop.yarn.server.federation.store.records.RouterRemoveStoredTokenRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRemoveStoredTokenResponse;

/**
* FederationDelegationTokenStateStore maintains the state of all
* <em>DelegationToken</em> that have been submitted to the federated cluster.
*
* <p>
* It mainly includes the following operations:
* </p>
*
* <ul>
* <li>
* storeNewMasterKey <br>
* Store the new MasterKey
* </li>
*
* <li>
* removeStoredMasterKey <br>
* Remove MasterKey
* </li>
*
* <li>
* storeNewToken <br>
* Store New delegationToken.
* </li>
*
* updateStoredToken
* removeStoredToken
* </ul>
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface FederationDelegationTokenStateStore {

/**
* The Router Supports Store the new master key.
*
* @param request DelegationKey.
* @return StoreNewMasterKeyResponse.
* @throws Exception exception occurred.
*/
StoreNewMasterKeyResponse storeNewMasterKey(StoreNewMasterKeyRequest request) throws Exception;

/**
* The Router Supports Remove the master key.
*
* @param request DelegationKey.
* @return RemoveStoredMasterKeyResponse.
* @throws Exception exception occurred.
*/
RemoveStoredMasterKeyResponse removeStoredMasterKey(RemoveStoredMasterKeyRequest request)
throws Exception;

/**
* The Router Supports Store new Token.
*
* @param request DelegationKey.
* @return RouterStoreNewTokenResponse.
* @throws Exception exception occurred.
*/
RouterStoreNewTokenResponse storeNewToken(RouterStoreNewTokenRequest request) throws Exception;

/**
* The Router Supports Update Token.
*
* @param request DelegationKey.
* @return RouterUpdateStoredTokenResponse.
* @throws Exception exception occurred.
*/
RouterUpdateStoredTokenResponse updateStoredToken(RouterUpdateStoredTokenRequest request)
throws Exception;

/**
* The Router Supports Remove Token.
*
* @param request DelegationKey.
* @return RouterRemoveStoredTokenResponse.
* @throws Exception exception occurred.
*/
RouterRemoveStoredTokenResponse removeStoredToken(RouterRemoveStoredTokenRequest request)
throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.yarn.server.federation.utils;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -446,27 +447,50 @@ public void removeStoredMasterKey(DelegationKey newKey) throws Exception {
stateStore.removeStoredMasterKey(keyRequest);
}

public void storeNewToken(RMDelegationTokenIdentifier id,
/**
* The Router Supports Store new Token.
*
* @param identifier Delegation Token
* @param renewDate renewDate
* @throws IOException IO exception occurred.
*/
public void storeNewToken(RMDelegationTokenIdentifier identifier,
long renewDate) throws Exception {
LOG.info("storing RMDelegation token with sequence number: {}.", id.getSequenceNumber());
RouterStoreToken storeToken = RouterStoreToken.newInstance(id, renewDate);
LOG.info("storing RMDelegation token with sequence number: {}.",
identifier.getSequenceNumber());
RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate);
RouterStoreNewTokenRequest request = RouterStoreNewTokenRequest.newInstance(storeToken);
stateStore.storeNewToken(request);
}

public void updateStoredToken(RMDelegationTokenIdentifier id,
/**
* The Router Supports Update Token.
*
* @param identifier Delegation Token
* @param renewDate renewDate
* @throws IOException IO exception occurred.
*/
public void updateStoredToken(RMDelegationTokenIdentifier identifier,
long renewDate) throws Exception {
LOG.info("updating RMDelegation token with sequence number: {}.", id.getSequenceNumber());
RouterStoreToken storeToken = RouterStoreToken.newInstance(id, renewDate);
LOG.info("updating RMDelegation token with sequence number: {}.",
identifier.getSequenceNumber());
RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate);
RouterUpdateStoredTokenRequest request =
RouterUpdateStoredTokenRequest.newInstance(storeToken);
stateStore.updateStoredToken(request);
}

public void removeStoredToken(RMDelegationTokenIdentifier id)
/**
* The Router Supports Remove Token.
*
* @param identifier Delegation Token
* @throws IOException IO exception occurred.
*/
public void removeStoredToken(RMDelegationTokenIdentifier identifier)
throws Exception{
LOG.info("removing RMDelegation token with sequence number: {}", id.getSequenceNumber());
RouterStoreToken storeToken = RouterStoreToken.newInstance(id, 0L);
LOG.info("removing RMDelegation token with sequence number: {}",
identifier.getSequenceNumber());
RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, 0L);
RouterRemoveStoredTokenRequest request =
RouterRemoveStoredTokenRequest.newInstance(storeToken);
stateStore.removeStoredToken(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -44,6 +45,8 @@ public abstract class AbstractClientRequestInterceptor
@SuppressWarnings("checkstyle:visibilitymodifier")
protected UserGroupInformation user = null;

private RouterDelegationTokenSecretManager tokenSecretManager = null;

/**
* Sets the {@link ClientRequestInterceptor} in the chain.
*/
Expand Down Expand Up @@ -125,4 +128,13 @@ private void setupUser(String userName) {
}
}

@Override
public RouterDelegationTokenSecretManager getTokenSecretManager() {
return tokenSecretManager;
}

@Override
public void setTokenSecretManager(RouterDelegationTokenSecretManager tokenSecretManager) {
this.tokenSecretManager = tokenSecretManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;

/**
* Defines the contract to be implemented by the request interceptor classes,
Expand Down Expand Up @@ -62,4 +63,18 @@ public interface ClientRequestInterceptor
*/
ClientRequestInterceptor getNextInterceptor();

/**
* Set RouterDelegationTokenSecretManager for specific interceptor to support Token operations,
* including create Token, update Token, and delete Token.
*
* @param tokenSecretManager Router DelegationTokenSecretManager
*/
void setTokenSecretManager(RouterDelegationTokenSecretManager tokenSecretManager);

/**
* Get RouterDelegationTokenSecretManager.
*
* @return Router DelegationTokenSecretManager.
*/
RouterDelegationTokenSecretManager getTokenSecretManager();
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.router.security.authorize.RouterPolicyProvider;
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
import org.slf4j.Logger;
Expand Down Expand Up @@ -136,6 +137,8 @@ public class RouterClientRMService extends AbstractService
// and remove the oldest used ones.
private Map<String, RequestInterceptorChainWrapper> userPipelineMap;

private RouterDelegationTokenSecretManager routerDTSecretManager;

public RouterClientRMService() {
super(RouterClientRMService.class.getName());
}
Expand Down Expand Up @@ -164,8 +167,11 @@ protected void serviceStart() throws Exception {
serverConf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT);

// Initialize RouterRMDelegationTokenSecretManager.
routerDTSecretManager = createRouterRMDelegationTokenSecretManager(conf);

this.server = rpc.getServer(ApplicationClientProtocol.class, this,
listenerEndpoint, serverConf, null, numWorkerThreads);
listenerEndpoint, serverConf, routerDTSecretManager, numWorkerThreads);

// Enable service authorization?
if (conf.getBoolean(
Expand Down Expand Up @@ -508,6 +514,12 @@ private RequestInterceptorChainWrapper initializePipeline(String user) {
ClientRequestInterceptor interceptorChain =
this.createRequestInterceptorChain();
interceptorChain.init(user);

//
if (routerDTSecretManager != null) {
interceptorChain.setTokenSecretManager(routerDTSecretManager);
}

chainWrapper.init(interceptorChain);
} catch (Exception e) {
LOG.error("Init ClientRequestInterceptor error for user: {}.", user, e);
Expand Down Expand Up @@ -558,4 +570,27 @@ protected void finalize() {
public Map<String, RequestInterceptorChainWrapper> getUserPipelineMap() {
return userPipelineMap;
}

/**
* Create RouterRMDelegationTokenSecretManager.
* In the YARN federation, the Router will replace the RM to
* manage the RMDelegationToken (generate, update, cancel),
* so the relevant configuration parameters still obtain the configuration parameters of the RM.
*
* @param conf Configuration
* @return RouterDelegationTokenSecretManager.
*/
protected RouterDelegationTokenSecretManager createRouterRMDelegationTokenSecretManager(
Configuration conf) {

long secretKeyInterval = conf.getLong(YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY,
YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
long tokenMaxLifetime = conf.getLong(YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_KEY,
YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
long tokenRenewInterval = conf.getLong(YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);

return new RouterDelegationTokenSecretManager(secretKeyInterval,
tokenMaxLifetime, tokenRenewInterval, 3600000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class RouterDelegationTokenSecretManager
private FederationStateStoreFacade federationFacade;

/**
* Create a secret manager.
* Create a Router Secret manager.
*
* @param delegationKeyUpdateInterval the number of milliseconds for rolling
* new secret keys.
Expand All @@ -59,7 +59,6 @@ public RouterDelegationTokenSecretManager(long delegationKeyUpdateInterval,
long delegationTokenRemoverScanInterval) {
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
federationFacade = FederationStateStoreFacade.getInstance();
}

@Override
Expand All @@ -71,6 +70,11 @@ private boolean shouldIgnoreException(Exception e) {
return !running && e.getCause() instanceof InterruptedException;
}

/**
* The Router Supports Store the new master key.
*
* @param newKey DelegationKey
*/
@Override
protected void storeNewMasterKey(DelegationKey newKey) {
try {
Expand All @@ -83,18 +87,30 @@ protected void storeNewMasterKey(DelegationKey newKey) {
}
}

/**
* The Router Supports Remove the master key.
*
* @param delegationKey DelegationKey
*/
@Override
protected void removeStoredMasterKey(DelegationKey key) {
protected void removeStoredMasterKey(DelegationKey delegationKey) {
try {
federationFacade.removeStoredMasterKey(key);
federationFacade.removeStoredMasterKey(delegationKey);
} catch (Exception e) {
if (!shouldIgnoreException(e)) {
LOG.error("Error in removing master key with KeyID: {}.", key.getKeyId());
LOG.error("Error in removing master key with KeyID: {}.", delegationKey.getKeyId());
ExitUtil.terminate(1, e);
}
}
}

/**
* The Router Supports Store new Token.
*
* @param identifier Delegation Token
* @param renewDate renewDate
* @throws IOException IO exception occurred.
*/
@Override
protected void storeNewToken(RMDelegationTokenIdentifier identifier,
long renewDate) throws IOException {
Expand All @@ -109,8 +125,16 @@ protected void storeNewToken(RMDelegationTokenIdentifier identifier,
}
}

/**
* The Router Supports Update Token.
*
* @param id Delegation Token
* @param renewDate renewDate
* @throws IOException IO exception occurred.
*/
@Override
protected void updateStoredToken(RMDelegationTokenIdentifier id, long renewDate) {
protected void updateStoredToken(RMDelegationTokenIdentifier id, long renewDate)
throws IOException {
try {
federationFacade.updateStoredToken(id, renewDate);
} catch (Exception e) {
Expand All @@ -122,6 +146,12 @@ protected void updateStoredToken(RMDelegationTokenIdentifier id, long renewDate)
}
}

/**
* The Router Supports Remove Token.
*
* @param identifier Delegation Token
* @throws IOException IO exception occurred.
*/
@Override
protected void removeStoredToken(RMDelegationTokenIdentifier
identifier) throws IOException {
Expand Down

0 comments on commit 606a2c9

Please sign in to comment.