Skip to content

YARN-11445. [Federation] Add getClusterInfo, getClusterUserInfo REST APIs for Router. #5472

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class ClusterInfo {
protected String hadoopVersionBuiltOn;
protected String haZooKeeperConnectionState;

private String subClusterId;

public ClusterInfo() {
} // JAXB needs this

Expand Down Expand Up @@ -113,4 +115,12 @@ public long getStartedOn() {
public String getHAZookeeperConnectionState() {
return this.haZooKeeperConnectionState;
}

public String getSubClusterId() {
return subClusterId;
}

public void setSubClusterId(String subClusterId) {
this.subClusterId = subClusterId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class ClusterUserInfo {
// User who has placed the request
protected String requestedUser;

private String subClusterId;

public ClusterUserInfo() {
}

Expand All @@ -61,4 +63,12 @@ public String getRmLoginUser() {
public String getRequestedUser() {
return requestedUser;
}

public String getSubClusterId() {
return subClusterId;
}

public void setSubClusterId(String subClusterId) {
this.subClusterId = subClusterId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ public final class RouterMetrics {
private MutableGaugeInt numAddToClusterNodeLabelsFailedRetrieved;
@Metric("# of removeFromClusterNodeLabels failed to be retrieved")
private MutableGaugeInt numRemoveFromClusterNodeLabelsFailedRetrieved;
@Metric("# of getClusterInfo failed to be retrieved")
private MutableGaugeInt numGetClusterInfoFailedRetrieved;
@Metric("# of getClusterUserInfo failed to be retrieved")
private MutableGaugeInt numGetClusterUserInfoFailedRetrieved;

// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
Expand Down Expand Up @@ -279,6 +283,10 @@ public final class RouterMetrics {
private MutableRate totalSucceededAddToClusterNodeLabelsRetrieved;
@Metric("Total number of successful Retrieved RemoveFromClusterNodeLabels and latency(ms)")
private MutableRate totalSucceededRemoveFromClusterNodeLabelsRetrieved;
@Metric("Total number of successful Retrieved GetClusterInfoRetrieved and latency(ms)")
private MutableRate totalSucceededGetClusterInfoRetrieved;
@Metric("Total number of successful Retrieved GetClusterUserInfoRetrieved and latency(ms)")
private MutableRate totalSucceededGetClusterUserInfoRetrieved;

/**
* Provide quantile counters for all latencies.
Expand Down Expand Up @@ -342,6 +350,8 @@ public final class RouterMetrics {
private MutableQuantiles replaceLabelsOnNodeLatency;
private MutableQuantiles addToClusterNodeLabelsLatency;
private MutableQuantiles removeFromClusterNodeLabelsLatency;
private MutableQuantiles getClusterInfoLatency;
private MutableQuantiles getClusterUserInfoLatency;

private static volatile RouterMetrics instance = null;
private static MetricsRegistry registry;
Expand Down Expand Up @@ -551,6 +561,12 @@ private RouterMetrics() {

removeFromClusterNodeLabelsLatency = registry.newQuantiles("removeFromClusterNodeLabelsLatency",
"latency of remove cluster nodelabels timeouts", "ops", "latency", 10);

getClusterInfoLatency = registry.newQuantiles("getClusterInfoLatency",
"latency of get cluster info timeouts", "ops", "latency", 10);

getClusterUserInfoLatency = registry.newQuantiles("getClusterUserInfoLatency",
"latency of get cluster user info timeouts", "ops", "latency", 10);
}

public static RouterMetrics getMetrics() {
Expand Down Expand Up @@ -847,6 +863,16 @@ public long getNumSucceededRemoveFromClusterNodeLabelsRetrieved() {
return totalSucceededRemoveFromClusterNodeLabelsRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetClusterInfoRetrieved() {
return totalSucceededGetClusterInfoRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededGetClusterUserInfoRetrieved() {
return totalSucceededGetClusterUserInfoRetrieved.lastStat().numSamples();
}

@VisibleForTesting
public long getNumSucceededRefreshSuperUserGroupsConfigurationRetrieved() {
return totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.lastStat().numSamples();
Expand Down Expand Up @@ -1137,6 +1163,16 @@ public double getLatencySucceededRemoveFromClusterNodeLabelsRetrieved() {
return totalSucceededRemoveFromClusterNodeLabelsRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetClusterInfoRetrieved() {
return totalSucceededGetClusterInfoRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededGetClusterUserInfoRetrieved() {
return totalSucceededGetClusterUserInfoRetrieved.lastStat().mean();
}

@VisibleForTesting
public double getLatencySucceededRefreshSuperUserGroupsConfigurationRetrieved() {
return totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.lastStat().mean();
Expand Down Expand Up @@ -1382,6 +1418,14 @@ public int getNumRemoveFromClusterNodeLabelsFailedRetrieved() {
return numRemoveFromClusterNodeLabelsFailedRetrieved.value();
}

public int getClusterInfoFailedRetrieved() {
return numGetClusterInfoFailedRetrieved.value();
}

public int getClusterUserInfoFailedRetrieved() {
return numGetClusterUserInfoFailedRetrieved.value();
}

public int getDelegationTokenFailedRetrieved() {
return numGetDelegationTokenFailedRetrieved.value();
}
Expand Down Expand Up @@ -1685,6 +1729,16 @@ public void succeededRemoveFromClusterNodeLabelsRetrieved(long duration) {
removeFromClusterNodeLabelsLatency.add(duration);
}

public void succeededGetClusterInfoRetrieved(long duration) {
totalSucceededGetClusterInfoRetrieved.add(duration);
getClusterInfoLatency.add(duration);
}

public void succeededGetClusterUserInfoRetrieved(long duration) {
totalSucceededGetClusterUserInfoRetrieved.add(duration);
getClusterUserInfoLatency.add(duration);
}

public void succeededRefreshSuperUserGroupsConfRetrieved(long duration) {
totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.add(duration);
refreshSuperUserGroupsConfLatency.add(duration);
Expand Down Expand Up @@ -1905,6 +1959,14 @@ public void incrRemoveFromClusterNodeLabelsFailedRetrieved() {
numRemoveFromClusterNodeLabelsFailedRetrieved.incr();
}

public void incrGetClusterInfoFailedRetrieved() {
numGetClusterInfoFailedRetrieved.incr();
}

public void incrGetClusterUserInfoFailedRetrieved() {
numGetClusterUserInfoFailedRetrieved.incr();
}

public void incrGetDelegationTokenFailedRetrieved() {
numGetDelegationTokenFailedRetrieved.incr();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.SubClusterResult;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationSchedulerTypeInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterUserInfo;
import org.apache.hadoop.yarn.server.router.webapp.dao.FederationClusterInfo;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
Expand Down Expand Up @@ -1137,14 +1139,84 @@ public ClusterInfo get() {
return getClusterInfo();
}

/**
* This method retrieves the cluster information, and it is reachable by using
* {@link RMWSConsts#INFO}.
*
* In Federation mode, we will return a FederationClusterInfo object,
* which contains a set of ClusterInfo.
*
* @return the cluster information.
*/
@Override
public ClusterInfo getClusterInfo() {
throw new NotImplementedException("Code is not implemented");
try {
long startTime = Time.now();
Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
Class[] argsClasses = new Class[]{};
Object[] args = new Object[]{};
ClientMethod remoteMethod = new ClientMethod("getClusterInfo", argsClasses, args);
Map<SubClusterInfo, ClusterInfo> subClusterInfoMap =
invokeConcurrent(subClustersActive.values(), remoteMethod, ClusterInfo.class);
FederationClusterInfo federationClusterInfo = new FederationClusterInfo();
subClusterInfoMap.forEach((subClusterInfo, clusterInfo) -> {
SubClusterId subClusterId = subClusterInfo.getSubClusterId();
clusterInfo.setSubClusterId(subClusterId.getId());
federationClusterInfo.getList().add(clusterInfo);
});
long stopTime = Time.now();
routerMetrics.succeededGetClusterInfoRetrieved(stopTime - startTime);
return federationClusterInfo;
} catch (NotFoundException e) {
routerMetrics.incrGetClusterInfoFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException("Get all active sub cluster(s) error.", e);
} catch (YarnException | IOException e) {
routerMetrics.incrGetClusterInfoFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException("getClusterInfo error.", e);
}
routerMetrics.incrGetClusterInfoFailedRetrieved();
throw new RuntimeException("getClusterInfo error.");
}

/**
* This method retrieves the cluster user information, and it is reachable by using
* {@link RMWSConsts#CLUSTER_USER_INFO}.
*
* In Federation mode, we will return a ClusterUserInfo object,
* which contains a set of ClusterUserInfo.
*
* @param hsr the servlet request
* @return the cluster user information
*/
@Override
public ClusterUserInfo getClusterUserInfo(HttpServletRequest hsr) {
throw new NotImplementedException("Code is not implemented");
try {
long startTime = Time.now();
Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
final HttpServletRequest hsrCopy = clone(hsr);
Class[] argsClasses = new Class[]{HttpServletRequest.class};
Object[] args = new Object[]{hsrCopy};
ClientMethod remoteMethod = new ClientMethod("getClusterUserInfo", argsClasses, args);
Map<SubClusterInfo, ClusterUserInfo> subClusterInfoMap =
invokeConcurrent(subClustersActive.values(), remoteMethod, ClusterUserInfo.class);
FederationClusterUserInfo federationClusterUserInfo = new FederationClusterUserInfo();
subClusterInfoMap.forEach((subClusterInfo, clusterUserInfo) -> {
SubClusterId subClusterId = subClusterInfo.getSubClusterId();
clusterUserInfo.setSubClusterId(subClusterId.getId());
federationClusterUserInfo.getList().add(clusterUserInfo);
});
long stopTime = Time.now();
routerMetrics.succeededGetClusterUserInfoRetrieved(stopTime - startTime);
return federationClusterUserInfo;
} catch (NotFoundException e) {
routerMetrics.incrGetClusterUserInfoFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException("Get all active sub cluster(s) error.", e);
} catch (YarnException | IOException e) {
routerMetrics.incrGetClusterUserInfoFailedRetrieved();
RouterServerUtil.logAndThrowRunTimeException("getClusterUserInfo error.", e);
}
routerMetrics.incrGetClusterUserInfoFailedRetrieved();
throw new RuntimeException("getClusterUserInfo error.");
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.webapp.dao;

import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;

import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.ArrayList;
import java.util.List;

@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
public class FederationClusterInfo extends ClusterInfo {

@XmlElement(name = "subCluster")
private List<ClusterInfo> list = new ArrayList<>();

public FederationClusterInfo() {
} // JAXB needs this

public FederationClusterInfo(ArrayList<ClusterInfo> list) {
this.list = list;
}

public List<ClusterInfo> getList() {
return list;
}

public void setList(List<ClusterInfo> list) {
this.list = list;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.webapp.dao;

import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterUserInfo;

import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.ArrayList;
import java.util.List;

@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
public class FederationClusterUserInfo extends ClusterUserInfo {
@XmlElement(name = "subCluster")
private List<ClusterUserInfo> list = new ArrayList<>();

public FederationClusterUserInfo() {
} // JAXB needs this

public FederationClusterUserInfo(ArrayList<ClusterUserInfo> list) {
this.list = list;
}

public List<ClusterUserInfo> getList() {
return list;
}

public void setList(List<ClusterUserInfo> list) {
this.list = list;
}
}
Loading