Skip to content

YARN-11446. [Federation] Add updateSchedulerConfiguration, getSchedulerConfiguration REST APIs for Router. #5476

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 28, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class ConfInfo {

private ArrayList<ConfItem> property = new ArrayList<>();

private String subClusterId;

public ConfInfo() {
} // JAXB needs this

Expand Down Expand Up @@ -74,5 +76,14 @@ public String getKey() {
public String getValue() {
return value;
}

}

public String getSubClusterId() {
return subClusterId;
}

public void setSubClusterId(String subClusterId) {
this.subClusterId = subClusterId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public class SchedConfUpdateInfo {
@XmlElement(name = "update-queue")
private ArrayList<QueueConfigInfo> updateQueueInfo = new ArrayList<>();

@XmlElement(name = "subClusterId")
private String subClusterId = "";

private HashMap<String, String> global = new HashMap<>();

public SchedConfUpdateInfo() {
Expand Down Expand Up @@ -82,4 +85,12 @@ public HashMap<String, String> getGlobalParams() {
public void setGlobalParams(HashMap<String, String> globalInfo) {
this.global = globalInfo;
}

public String getSubClusterId() {
return subClusterId;
}

public void setSubClusterId(String subClusterId) {
this.subClusterId = subClusterId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ public final class RouterMetrics {
private MutableGaugeInt numAddToClusterNodeLabelsFailedRetrieved;
@Metric("# of removeFromClusterNodeLabels failed to be retrieved")
private MutableGaugeInt numRemoveFromClusterNodeLabelsFailedRetrieved;
@Metric("# of numUpdateSchedulerConfiguration failed to be retrieved")
private MutableGaugeInt numUpdateSchedulerConfigurationFailedRetrieved;
@Metric("# of numGetSchedulerConfiguration failed to be retrieved")
private MutableGaugeInt numGetSchedulerConfigurationFailedRetrieved;
@Metric("# of getClusterInfo failed to be retrieved")
private MutableGaugeInt numGetClusterInfoFailedRetrieved;
@Metric("# of getClusterUserInfo failed to be retrieved")
Expand Down Expand Up @@ -287,6 +291,10 @@ public final class RouterMetrics {
private MutableRate totalSucceededAddToClusterNodeLabelsRetrieved;
@Metric("Total number of successful Retrieved RemoveFromClusterNodeLabels and latency(ms)")
private MutableRate totalSucceededRemoveFromClusterNodeLabelsRetrieved;
@Metric("Total number of successful Retrieved updateSchedulerConfiguration and latency(ms)")
private MutableRate totalSucceededUpdateSchedulerConfigurationRetrieved;
@Metric("Total number of successful Retrieved getSchedulerConfiguration and latency(ms)")
private MutableRate totalSucceededGetSchedulerConfigurationRetrieved;
@Metric("Total number of successful Retrieved GetClusterInfoRetrieved and latency(ms)")
private MutableRate totalSucceededGetClusterInfoRetrieved;
@Metric("Total number of successful Retrieved GetClusterUserInfoRetrieved and latency(ms)")
Expand Down Expand Up @@ -358,6 +366,8 @@ public final class RouterMetrics {
private MutableQuantiles replaceLabelsOnNodeLatency;
private MutableQuantiles addToClusterNodeLabelsLatency;
private MutableQuantiles removeFromClusterNodeLabelsLatency;
private MutableQuantiles updateSchedulerConfigLatency;
private MutableQuantiles getSchedulerConfigurationLatency;
private MutableQuantiles getClusterInfoLatency;
private MutableQuantiles getClusterUserInfoLatency;
private MutableQuantiles updateNodeResourceLatency;
Expand Down Expand Up @@ -572,6 +582,12 @@ private RouterMetrics() {
removeFromClusterNodeLabelsLatency = registry.newQuantiles("removeFromClusterNodeLabelsLatency",
"latency of remove cluster nodelabels timeouts", "ops", "latency", 10);

updateSchedulerConfigLatency = registry.newQuantiles("updateSchedulerConfigurationLatency",
"latency of update scheduler configuration timeouts", "ops", "latency", 10);

getSchedulerConfigurationLatency = registry.newQuantiles("getSchedulerConfigurationLatency",
"latency of get scheduler configuration timeouts", "ops", "latency", 10);

getClusterInfoLatency = registry.newQuantiles("getClusterInfoLatency",
"latency of get cluster info timeouts", "ops", "latency", 10);

Expand Down Expand Up @@ -879,6 +895,16 @@ public long getNumSucceededRemoveFromClusterNodeLabelsRetrieved() {
return totalSucceededRemoveFromClusterNodeLabelsRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededUpdateSchedulerConfigurationRetrieved() {
return totalSucceededUpdateSchedulerConfigurationRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetSchedulerConfigurationRetrieved() {
return totalSucceededGetSchedulerConfigurationRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetClusterInfoRetrieved() {
return totalSucceededGetClusterInfoRetrieved.lastStat().numSamples();
Expand Down Expand Up @@ -1189,6 +1215,16 @@ public double getLatencySucceededRemoveFromClusterNodeLabelsRetrieved() {
return totalSucceededRemoveFromClusterNodeLabelsRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededUpdateSchedulerConfigurationRetrieved() {
return totalSucceededUpdateSchedulerConfigurationRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetSchedulerConfigurationRetrieved() {
return totalSucceededGetSchedulerConfigurationRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetClusterInfoRetrieved() {
return totalSucceededGetClusterInfoRetrieved.lastStat().mean();
Expand Down Expand Up @@ -1454,6 +1490,14 @@ public int getNumRemoveFromClusterNodeLabelsFailedRetrieved() {
return numRemoveFromClusterNodeLabelsFailedRetrieved.value();
}

public int getUpdateSchedulerConfigurationFailedRetrieved() {
return numUpdateSchedulerConfigurationFailedRetrieved.value();
}

public int getSchedulerConfigurationFailedRetrieved() {
return numGetSchedulerConfigurationFailedRetrieved.value();
}

public int getClusterInfoFailedRetrieved() {
return numGetClusterInfoFailedRetrieved.value();
}
Expand Down Expand Up @@ -1773,6 +1817,16 @@ public void succeededRemoveFromClusterNodeLabelsRetrieved(long duration) {
removeFromClusterNodeLabelsLatency.add(duration);
}

public void succeededUpdateSchedulerConfigurationRetrieved(long duration) {
totalSucceededUpdateSchedulerConfigurationRetrieved.add(duration);
updateSchedulerConfigLatency.add(duration);
}

public void succeededGetSchedulerConfigurationRetrieved(long duration) {
totalSucceededGetSchedulerConfigurationRetrieved.add(duration);
getSchedulerConfigurationLatency.add(duration);
}

public void succeededGetClusterInfoRetrieved(long duration) {
totalSucceededGetClusterInfoRetrieved.add(duration);
getClusterInfoLatency.add(duration);
Expand Down Expand Up @@ -2013,6 +2067,14 @@ public void incrRemoveFromClusterNodeLabelsFailedRetrieved() {
numRemoveFromClusterNodeLabelsFailedRetrieved.incr();
}

public void incrUpdateSchedulerConfigurationFailedRetrieved() {
numUpdateSchedulerConfigurationFailedRetrieved.incr();
}

public void incrGetSchedulerConfigurationFailedRetrieved() {
numGetSchedulerConfigurationFailedRetrieved.incr();
}

public void incrGetClusterInfoFailedRetrieved() {
numGetClusterInfoFailedRetrieved.incr();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import javax.ws.rs.core.Response.Status;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.impl.prefetch.Validate;
Expand Down Expand Up @@ -129,13 +128,15 @@
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.SubClusterResult;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationSchedulerTypeInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationConfInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterUserInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterInfo;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
import org.apache.hadoop.yarn.webapp.dao.ConfInfo;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.MonotonicClock;
Expand Down Expand Up @@ -848,6 +849,29 @@ private Map<SubClusterId, SubClusterInfo> getActiveSubclusters()
}
}

/**
* Get the active subcluster in the federation.
*
* @param subClusterId subClusterId.
* @return subClusterInfo.
* @throws NotFoundException If the subclusters cannot be found.
*/
private SubClusterInfo getActiveSubCluster(String subClusterId)
throws NotFoundException {
try {
SubClusterId pSubClusterId = SubClusterId.newInstance(subClusterId);
Map<SubClusterId, SubClusterInfo> subClusterInfoMap =
federationFacade.getSubClusters(true);
SubClusterInfo subClusterInfo = subClusterInfoMap.get(pSubClusterId);
if (subClusterInfo == null) {
throw new NotFoundException(subClusterId + " not found.");
}
return subClusterInfo;
} catch (YarnException e) {
throw new NotFoundException(e.getMessage());
}
}

/**
* The YARN Router will forward to the request to all the SubClusters to find
* where the node is running.
Expand Down Expand Up @@ -2906,17 +2930,117 @@ public ContainerInfo getContainer(HttpServletRequest req,
throw new RuntimeException("getContainer Failed.");
}

/**
* This method updates the Scheduler configuration, and it is reachable by
* using {@link RMWSConsts#SCHEDULER_CONF}.
*
* @param mutationInfo th information for making scheduler configuration
* changes (supports adding, removing, or updating a queue, as well
* as global scheduler conf changes)
* @param hsr the servlet request
* @return Response containing the status code
* @throws AuthorizationException if the user is not authorized to invoke this
* method
* @throws InterruptedException if interrupted
*/
@Override
public Response updateSchedulerConfiguration(SchedConfUpdateInfo mutationInfo,
HttpServletRequest hsr)
throws AuthorizationException, InterruptedException {
throw new NotImplementedException("Code is not implemented");
HttpServletRequest hsr) throws AuthorizationException, InterruptedException {

// Make Sure mutationInfo is not null.
if (mutationInfo == null) {
routerMetrics.incrUpdateSchedulerConfigurationFailedRetrieved();
throw new IllegalArgumentException(
"Parameter error, the schedConfUpdateInfo is empty or null.");
}

// In federated mode, we may have a mix of multiple schedulers.
// In order to ensure accurate update scheduler configuration,
// we need users to explicitly set subClusterId.
String pSubClusterId = mutationInfo.getSubClusterId();
if (StringUtils.isBlank(pSubClusterId)) {
routerMetrics.incrUpdateSchedulerConfigurationFailedRetrieved();
throw new IllegalArgumentException("Parameter error, " +
"the subClusterId is empty or null.");
}

// Get the subClusterInfo , then update the scheduler configuration.
try {
long startTime = clock.getTime();
SubClusterInfo subClusterInfo = getActiveSubCluster(pSubClusterId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
Response response = interceptor.updateSchedulerConfiguration(mutationInfo, hsr);
if (response != null) {
long endTime = clock.getTime();
routerMetrics.succeededUpdateSchedulerConfigurationRetrieved(endTime - startTime);
return Response.status(response.getStatus()).entity(response.getEntity()).build();
}
} catch (NotFoundException e) {
routerMetrics.incrUpdateSchedulerConfigurationFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException(e,
"Get subCluster error. subClusterId = %s", pSubClusterId);
} catch (Exception e) {
routerMetrics.incrUpdateSchedulerConfigurationFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException(e,
"UpdateSchedulerConfiguration error. subClusterId = %s", pSubClusterId);
}

routerMetrics.incrUpdateSchedulerConfigurationFailedRetrieved();
throw new RuntimeException("UpdateSchedulerConfiguration error. subClusterId = "
+ pSubClusterId);
}

/**
* This method retrieves all the Scheduler configuration, and it is reachable
* by using {@link RMWSConsts#SCHEDULER_CONF}.
*
* @param hsr the servlet request
* @return Response containing the status code
* @throws AuthorizationException if the user is not authorized to invoke this
* method.
*/
@Override
public Response getSchedulerConfiguration(HttpServletRequest hsr)
throws AuthorizationException {
throw new NotImplementedException("Code is not implemented");
try {
long startTime = clock.getTime();
FederationConfInfo federationConfInfo = new FederationConfInfo();
Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
final HttpServletRequest hsrCopy = clone(hsr);
Class[] argsClasses = new Class[]{HttpServletRequest.class};
Object[] args = new Object[]{hsrCopy};
ClientMethod remoteMethod = new ClientMethod("getSchedulerConfiguration", argsClasses, args);
Map<SubClusterInfo, Response> responseMap =
invokeConcurrent(subClustersActive.values(), remoteMethod, Response.class);
responseMap.forEach((subClusterInfo, response) -> {
SubClusterId subClusterId = subClusterInfo.getSubClusterId();
if (response == null) {
String errorMsg = subClusterId + " Can't getSchedulerConfiguration.";
federationConfInfo.getErrorMsgs().add(errorMsg);
} else if (response.getStatus() == Status.BAD_REQUEST.getStatusCode()) {
String errorMsg = String.valueOf(response.getEntity());
federationConfInfo.getErrorMsgs().add(errorMsg);
} else if (response.getStatus() == Status.OK.getStatusCode()) {
ConfInfo fedConfInfo = ConfInfo.class.cast(response.getEntity());
fedConfInfo.setSubClusterId(subClusterId.getId());
federationConfInfo.getList().add(fedConfInfo);
}
});
long endTime = clock.getTime();
routerMetrics.succeededGetSchedulerConfigurationRetrieved(endTime - startTime);
return Response.status(Status.OK).entity(federationConfInfo).build();
} catch (NotFoundException e) {
RouterServerUtil.logAndThrowRunTimeException("get all active sub cluster(s) error.", e);
routerMetrics.incrGetSchedulerConfigurationFailedRetrieved();
} catch (Exception e) {
routerMetrics.incrGetSchedulerConfigurationFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException("getSchedulerConfiguration error.", e);
return Response.status(Status.BAD_REQUEST).entity("getSchedulerConfiguration error.").build();
}

routerMetrics.incrGetSchedulerConfigurationFailedRetrieved();
throw new RuntimeException("getSchedulerConfiguration error.");
}

@Override
Expand Down
Loading