Skip to content

YARN-11350. [Federation] Router Support DelegationToken With ZK.(Old Version) #5111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 15 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,11 @@ RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request)
*/
RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest request)
throws YarnException, IOException;

/**
* The Router Supports incrementDelegationTokenSeqNum.
*
* @return DelegationTokenSeqNum.
*/
int incrementDelegationTokenSeqNum();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.Comparator;

Expand Down Expand Up @@ -110,6 +111,7 @@ public class MemoryFederationStateStore implements FederationStateStore {
private Map<String, SubClusterPolicyConfiguration> policies;
private RouterRMDTSecretManagerState routerRMSecretManagerState;
private int maxAppsInStateStore;
private AtomicInteger sequenceNum;

private final MonotonicClock clock = new MonotonicClock();

Expand All @@ -126,6 +128,7 @@ public void init(Configuration conf) {
maxAppsInStateStore = conf.getInt(
YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS);
sequenceNum = new AtomicInteger();
}

@Override
Expand Down Expand Up @@ -534,6 +537,11 @@ public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest req
return RouterRMTokenResponse.newInstance(resultToken);
}

@Override
public synchronized int incrementDelegationTokenSeqNum() {
return sequenceNum.incrementAndGet();
}

private void storeOrUpdateRouterRMDT(RMDelegationTokenIdentifier rmDTIdentifier,
Long renewDate, boolean isUpdate) throws IOException {
Map<RMDelegationTokenIdentifier, Long> rmDTState = routerRMSecretManagerState.getTokenState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1394,4 +1394,9 @@ public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest req
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
}

@Override
public int incrementDelegationTokenSeqNum() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,27 @@ public final class ZKFederationStateStoreOpDurations implements MetricsSource {
@Metric("Duration for a update reservation homeSubCluster call")
private MutableRate updateReservationHomeSubCluster;

@Metric("Duration for a store new master key call")
private MutableRate storeNewMasterKey;

@Metric("Duration for a remove new master key call")
private MutableRate removeStoredMasterKey;

@Metric("Duration for a get master key by delegation key call")
private MutableRate getMasterKeyByDelegationKey;

@Metric("Duration for a store new token call")
private MutableRate storeNewToken;

@Metric("Duration for a update stored token call")
private MutableRate updateStoredToken;

@Metric("Duration for a remove stored token call")
private MutableRate removeStoredToken;

@Metric("Duration for a get token by router store token call")
private MutableRate getTokenByRouterStoreToken;

protected static final MetricsInfo RECORD_INFO =
info("ZKFederationStateStoreOpDurations", "Durations of ZKFederationStateStore calls");

Expand Down Expand Up @@ -187,4 +208,32 @@ public void addDeleteReservationHomeSubClusterDuration(long startTime, long endT
public void addUpdateReservationHomeSubClusterDuration(long startTime, long endTime) {
updateReservationHomeSubCluster.add(endTime - startTime);
}

public void addStoreNewMasterKeyDuration(long startTime, long endTime) {
storeNewMasterKey.add(endTime - startTime);
}

public void removeStoredMasterKeyDuration(long startTime, long endTime) {
removeStoredMasterKey.add(endTime - startTime);
}

public void getMasterKeyByDelegationKeyDuration(long startTime, long endTime) {
getMasterKeyByDelegationKey.add(endTime - startTime);
}

public void getStoreNewTokenDuration(long startTime, long endTime) {
storeNewToken.add(endTime - startTime);
}

public void updateStoredTokenDuration(long startTime, long endTime) {
updateStoredToken.add(endTime - startTime);
}

public void removeStoredTokenDuration(long startTime, long endTime) {
removeStoredToken.add(endTime - startTime);
}

public void getTokenByRouterStoreTokenDuration(long startTime, long endTime) {
getTokenByRouterStoreToken.add(endTime - startTime);
}
}
Loading