Skip to content

YARN-11371. [Federation] Refactor FederationInterceptorREST#createNewApplication\submitApplication Use FederationActionRetry. #5130

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.Random;

import javax.cache.Cache;
import javax.cache.CacheManager;
Expand All @@ -38,6 +40,8 @@
import javax.cache.integration.CacheLoaderException;
import javax.cache.spi.CachingProvider;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.RetryPolicies;
Expand All @@ -50,6 +54,8 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException;
Expand Down Expand Up @@ -110,6 +116,8 @@ public final class FederationStateStoreFacade {
private static final FederationStateStoreFacade FACADE =
new FederationStateStoreFacade();

private static Random rand = new Random(System.currentTimeMillis());

private FederationStateStore stateStore;
private int cacheTimeToLive;
private Configuration conf;
Expand Down Expand Up @@ -496,6 +504,7 @@ public void deleteReservationHomeSubCluster(ReservationId reservationId) throws
* @param defaultValue the default implementation for fallback
* @param type the class for which a retry proxy is required
* @param retryPolicy the policy for retrying method call failures
* @param <T> The type of the instance
* @return a retry proxy for the specified interface
*/
public static <T> Object createRetryInstance(Configuration conf,
Expand Down Expand Up @@ -731,7 +740,7 @@ public FederationStateStore getStateStore() {
return stateStore;
}

/*
/**
* The Router Supports Store NewMasterKey (RouterMasterKey{@link RouterMasterKey}).
*
* @param newKey Key used for generating and verifying delegation tokens
Expand Down Expand Up @@ -849,4 +858,163 @@ public RouterRMTokenResponse getTokenByRouterStoreToken(RMDelegationTokenIdentif
RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken);
return stateStore.getTokenByRouterStoreToken(request);
}

/**
* Get the number of active cluster nodes.
*
* @return number of active cluster nodes.
* @throws YarnException if the call to the state store is unsuccessful.
*/
public int getActiveSubClustersCount() throws YarnException {
Map<SubClusterId, SubClusterInfo> activeSubClusters = getSubClusters(true);
if (activeSubClusters == null || activeSubClusters.isEmpty()) {
return 0;
} else {
return activeSubClusters.size();
}
}

/**
* Randomly pick ActiveSubCluster.
* During the selection process, we will exclude SubClusters from the blacklist.
*
* @param activeSubClusters List of active subClusters.
* @param blackList blacklist.
* @return Active SubClusterId.
* @throws YarnException When there is no Active SubCluster,
* an exception will be thrown (No active SubCluster available to submit the request.)
*/
public static SubClusterId getRandomActiveSubCluster(
Map<SubClusterId, SubClusterInfo> activeSubClusters, List<SubClusterId> blackList)
throws YarnException {

// Check if activeSubClusters is empty, if it is empty, we need to throw an exception
if (MapUtils.isEmpty(activeSubClusters)) {
throw new FederationPolicyException(
FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE);
}

// Change activeSubClusters to List
List<SubClusterId> subClusterIds = new ArrayList<>(activeSubClusters.keySet());

// If the blacklist is not empty, we need to remove all the subClusters in the blacklist
if (CollectionUtils.isNotEmpty(blackList)) {
subClusterIds.removeAll(blackList);
}

// Check there are still active subcluster after removing the blacklist
if (CollectionUtils.isEmpty(subClusterIds)) {
throw new FederationPolicyException(
FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE);
}

// Randomly choose a SubCluster
return subClusterIds.get(rand.nextInt(subClusterIds.size()));
}

/**
* Get the number of retries.
*
* @param configRetries User-configured number of retries.
* @return number of retries.
* @throws YarnException yarn exception.
*/
public int getRetryNumbers(int configRetries) throws YarnException {
int activeSubClustersCount = getActiveSubClustersCount();
int actualRetryNums = Math.min(activeSubClustersCount, configRetries);
// Normally, we don't set a negative number for the number of retries,
// but if the user sets a negative number for the number of retries,
// we will return 0
if (actualRetryNums < 0) {
return 0;
}
return actualRetryNums;
}

/**
* Query SubClusterId By applicationId.
*
* If SubClusterId is not empty, it means it exists and returns true;
* if SubClusterId is empty, it means it does not exist and returns false.
*
* @param applicationId applicationId
* @return true, SubClusterId exists; false, SubClusterId not exists.
*/
public boolean existsApplicationHomeSubCluster(ApplicationId applicationId) {
try {
SubClusterId subClusterId = getApplicationHomeSubCluster(applicationId);
if (subClusterId != null) {
return true;
}
} catch (YarnException e) {
LOG.warn("get homeSubCluster by applicationId = {} error.", applicationId, e);
}
return false;
}

/**
* Add ApplicationHomeSubCluster to FederationStateStore.
*
* @param applicationId applicationId.
* @param homeSubCluster homeSubCluster, homeSubCluster selected according to policy.
* @throws YarnException yarn exception.
*/
public void addApplicationHomeSubCluster(ApplicationId applicationId,
ApplicationHomeSubCluster homeSubCluster) throws YarnException {
try {
addApplicationHomeSubCluster(homeSubCluster);
} catch (YarnException e) {
String msg = String.format(
"Unable to insert the ApplicationId %s into the FederationStateStore.", applicationId);
throw new YarnException(msg, e);
}
}

/**
* Update ApplicationHomeSubCluster to FederationStateStore.
*
* @param subClusterId homeSubClusterId
* @param applicationId applicationId.
* @param homeSubCluster homeSubCluster, homeSubCluster selected according to policy.
* @throws YarnException yarn exception.
*/
public void updateApplicationHomeSubCluster(SubClusterId subClusterId,
ApplicationId applicationId, ApplicationHomeSubCluster homeSubCluster) throws YarnException {
try {
updateApplicationHomeSubCluster(homeSubCluster);
} catch (YarnException e) {
SubClusterId subClusterIdInStateStore = getApplicationHomeSubCluster(applicationId);
if (subClusterId == subClusterIdInStateStore) {
LOG.info("Application {} already submitted on SubCluster {}.", applicationId, subClusterId);
} else {
String msg = String.format(
"Unable to update the ApplicationId %s into the FederationStateStore.", applicationId);
throw new YarnException(msg, e);
}
}
}

/**
* Add or Update ApplicationHomeSubCluster.
*
* @param applicationId applicationId, is the id of the application.
* @param subClusterId homeSubClusterId, this is selected by strategy.
* @param retryCount number of retries.
* @throws YarnException yarn exception.
*/
public void addOrUpdateApplicationHomeSubCluster(ApplicationId applicationId,
SubClusterId subClusterId, int retryCount) throws YarnException {
Boolean exists = existsApplicationHomeSubCluster(applicationId);
ApplicationHomeSubCluster appHomeSubCluster =
ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
if (!exists || retryCount == 0) {
// persist the mapping of applicationId and the subClusterId which has
// been selected as its home.
addApplicationHomeSubCluster(applicationId, appHomeSubCluster);
} else {
// update the mapping of applicationId and the home subClusterId to
// the new subClusterId we have selected.
updateApplicationHomeSubCluster(subClusterId, applicationId, appHomeSubCluster);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.hadoop.yarn.server.router;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
Expand All @@ -30,9 +28,6 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,8 +36,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.io.IOException;

/**
Expand All @@ -61,8 +54,6 @@ public final class RouterServerUtil {

private static final String EPOCH_PREFIX = "e";

private static Random rand = new Random(System.currentTimeMillis());

/** Disable constructor. */
private RouterServerUtil() {
}
Expand Down Expand Up @@ -479,42 +470,6 @@ public static void validateContainerId(String containerId)
}
}

/**
* Randomly pick ActiveSubCluster.
* During the selection process, we will exclude SubClusters from the blacklist.
*
* @param activeSubClusters List of active subClusters.
* @param blackList blacklist.
* @return Active SubClusterId.
* @throws YarnException When there is no Active SubCluster,
* an exception will be thrown (No active SubCluster available to submit the request.)
*/
public static SubClusterId getRandomActiveSubCluster(
Map<SubClusterId, SubClusterInfo> activeSubClusters, List<SubClusterId> blackList)
throws YarnException {

// Check if activeSubClusters is empty, if it is empty, we need to throw an exception
if (MapUtils.isEmpty(activeSubClusters)) {
logAndThrowException(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null);
}

// Change activeSubClusters to List
List<SubClusterId> subClusterIds = new ArrayList<>(activeSubClusters.keySet());

// If the blacklist is not empty, we need to remove all the subClusters in the blacklist
if (CollectionUtils.isNotEmpty(blackList)) {
subClusterIds.removeAll(blackList);
}

// Check there are still active subcluster after removing the blacklist
if (CollectionUtils.isEmpty(subClusterIds)) {
logAndThrowException(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null);
}

// Randomly choose a SubCluster
return subClusterIds.get(rand.nextInt(subClusterIds.size()));
}

public static UserGroupInformation setupUser(final String userName) {
UserGroupInformation user = null;
try {
Expand Down
Loading