Skip to content

YARN-11350. [Federation] Router Support DelegationToken With ZK. #5131

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,39 @@ RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request)
*/
RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest request)
throws YarnException, IOException;

/**
* The Router Supports incrementDelegationTokenSeqNum.
*
* @return DelegationTokenSeqNum.
*/
int incrementDelegationTokenSeqNum();

/**
* The Router Supports getDelegationTokenSeqNum.
*
* @return DelegationTokenSeqNum.
*/
int getDelegationTokenSeqNum();

/**
* The Router Supports setDelegationTokenSeqNum.
*
* @param seqNum DelegationTokenSeqNum.
*/
void setDelegationTokenSeqNum(int seqNum);

/**
* The Router Supports getCurrentKeyId.
*
* @return CurrentKeyId.
*/
int getCurrentKeyId();

/**
* The Router Supports incrementCurrentKeyId.
*
* @return CurrentKeyId.
*/
int incrementCurrentKeyId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.Comparator;

Expand Down Expand Up @@ -110,6 +111,8 @@ public class MemoryFederationStateStore implements FederationStateStore {
private Map<String, SubClusterPolicyConfiguration> policies;
private RouterRMDTSecretManagerState routerRMSecretManagerState;
private int maxAppsInStateStore;
private AtomicInteger sequenceNum;
private AtomicInteger masterKeyId;

private final MonotonicClock clock = new MonotonicClock();

Expand All @@ -126,6 +129,8 @@ public void init(Configuration conf) {
maxAppsInStateStore = conf.getInt(
YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS);
sequenceNum = new AtomicInteger();
masterKeyId = new AtomicInteger();
}

@Override
Expand Down Expand Up @@ -534,6 +539,31 @@ public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest req
return RouterRMTokenResponse.newInstance(resultToken);
}

@Override
public int incrementDelegationTokenSeqNum() {
return sequenceNum.incrementAndGet();
}

@Override
public int getDelegationTokenSeqNum() {
return sequenceNum.get();
}

@Override
public void setDelegationTokenSeqNum(int seqNum) {
sequenceNum.set(seqNum);
}

@Override
public int getCurrentKeyId() {
return masterKeyId.get();
}

@Override
public int incrementCurrentKeyId() {
return masterKeyId.incrementAndGet();
}

private void storeOrUpdateRouterRMDT(RMDelegationTokenIdentifier rmDTIdentifier,
Long renewDate, boolean isUpdate) throws IOException {
Map<RMDelegationTokenIdentifier, Long> rmDTState = routerRMSecretManagerState.getTokenState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1394,4 +1394,29 @@ public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest req
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
}

@Override
public int incrementDelegationTokenSeqNum() {
return 0;
}

@Override
public int getDelegationTokenSeqNum() {
return 0;
}

@Override
public void setDelegationTokenSeqNum(int seqNum) {
return;
}

@Override
public int getCurrentKeyId() {
return 0;
}

@Override
public int incrementCurrentKeyId() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,27 @@ public final class ZKFederationStateStoreOpDurations implements MetricsSource {
@Metric("Duration for a update reservation homeSubCluster call")
private MutableRate updateReservationHomeSubCluster;

@Metric("Duration for a store new master key call")
private MutableRate storeNewMasterKey;

@Metric("Duration for a remove new master key call")
private MutableRate removeStoredMasterKey;

@Metric("Duration for a get master key by delegation key call")
private MutableRate getMasterKeyByDelegationKey;

@Metric("Duration for a store new token call")
private MutableRate storeNewToken;

@Metric("Duration for a update stored token call")
private MutableRate updateStoredToken;

@Metric("Duration for a remove stored token call")
private MutableRate removeStoredToken;

@Metric("Duration for a get token by router store token call")
private MutableRate getTokenByRouterStoreToken;

protected static final MetricsInfo RECORD_INFO =
info("ZKFederationStateStoreOpDurations", "Durations of ZKFederationStateStore calls");

Expand Down Expand Up @@ -187,4 +208,32 @@ public void addDeleteReservationHomeSubClusterDuration(long startTime, long endT
public void addUpdateReservationHomeSubClusterDuration(long startTime, long endTime) {
updateReservationHomeSubCluster.add(endTime - startTime);
}

public void addStoreNewMasterKeyDuration(long startTime, long endTime) {
storeNewMasterKey.add(endTime - startTime);
}

public void removeStoredMasterKeyDuration(long startTime, long endTime) {
removeStoredMasterKey.add(endTime - startTime);
}

public void getMasterKeyByDelegationKeyDuration(long startTime, long endTime) {
getMasterKeyByDelegationKey.add(endTime - startTime);
}

public void getStoreNewTokenDuration(long startTime, long endTime) {
storeNewToken.add(endTime - startTime);
}

public void updateStoredTokenDuration(long startTime, long endTime) {
updateStoredToken.add(endTime - startTime);
}

public void removeStoredTokenDuration(long startTime, long endTime) {
removeStoredToken.add(endTime - startTime);
}

public void getTokenByRouterStoreTokenDuration(long startTime, long endTime) {
getTokenByRouterStoreToken.add(endTime - startTime);
}
}
Loading