Skip to content

Commit

Permalink
YARN-10946. Moved QueueInfo creation to separate class
Browse files Browse the repository at this point in the history
Change-Id: If4da3cd24e9b2a94689ef8e1ea5b69dc1b84288b
  • Loading branch information
p-szucs committed Nov 29, 2022
1 parent 3c37a01 commit 844f6e8
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueConfigurations;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueStatistics;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.exceptions.YarnException;
Expand Down Expand Up @@ -200,16 +198,28 @@ public float getCapacity() {
return queueCapacities.getCapacity();
}

public float getCapacityByNodeLabel(String nodeLabel) {
return queueCapacities.getCapacity(nodeLabel);
}

@Override
public float getAbsoluteCapacity() {
return queueCapacities.getAbsoluteCapacity();
}

public float getAbsoluteCapacityByNodeLabel(String nodeLabel) {
return queueCapacities.getAbsoluteCapacity(nodeLabel);
}

@Override
public float getAbsoluteMaximumCapacity() {
return queueCapacities.getAbsoluteMaximumCapacity();
}

public float getAbsoluteMaximumCapacityByNodeLabel(String nodeLabel) {
return queueCapacities.getAbsoluteMaximumCapacity(nodeLabel);
}

@Override
public float getAbsoluteUsedCapacity() {
return queueCapacities.getAbsoluteUsedCapacity();
Expand All @@ -220,11 +230,23 @@ public float getMaximumCapacity() {
return queueCapacities.getMaximumCapacity();
}

public float getMaximumCapacityByNodeLabel(String nodeLabel) {
return queueCapacities.getMaximumCapacity(nodeLabel);
}

public float getMaxAMResourcePercentageByNodeLabel(String nodeLabel) {
return queueCapacities.getMaxAMResourcePercentage(nodeLabel);
}

@Override
public float getUsedCapacity() {
return queueCapacities.getUsedCapacity();
}

public float getWeight() {
return queueCapacities.getWeight();
}

@Override
public Resource getUsedResources() {
return usageTracker.getQueueUsage().getUsed();
Expand Down Expand Up @@ -580,87 +602,7 @@ public QueueCapacityVector getConfiguredCapacityVector(
}

protected QueueInfo getQueueInfo() {
// Deliberately doesn't use lock here, because this method will be invoked
// from schedulerApplicationAttempt, to avoid deadlock, sacrifice
// consistency here.
// TODO, improve this
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
queueInfo.setQueueName(queuePath.getLeafName());
queueInfo.setQueuePath(queuePath.getFullPath());
queueInfo.setAccessibleNodeLabels(queueNodeLabelsSettings.getAccessibleNodeLabels());
queueInfo.setCapacity(queueCapacities.getCapacity());
queueInfo.setMaximumCapacity(queueCapacities.getMaximumCapacity());
queueInfo.setQueueState(getState());
queueInfo.setDefaultNodeLabelExpression(queueNodeLabelsSettings.getDefaultLabelExpression());
queueInfo.setCurrentCapacity(getUsedCapacity());
queueInfo.setQueueStatistics(getQueueStatistics());
queueInfo.setPreemptionDisabled(preemptionSettings.isPreemptionDisabled());
queueInfo.setIntraQueuePreemptionDisabled(
getIntraQueuePreemptionDisabled());
queueInfo.setQueueConfigurations(getQueueConfigurations());
queueInfo.setWeight(queueCapacities.getWeight());
queueInfo.setMaxParallelApps(queueAppLifetimeSettings.getMaxParallelApps());
return queueInfo;
}

public QueueStatistics getQueueStatistics() {
// Deliberately doesn't use lock here, because this method will be invoked
// from schedulerApplicationAttempt, to avoid deadlock, sacrifice
// consistency here.
// TODO, improve this
QueueStatistics stats = recordFactory.newRecordInstance(
QueueStatistics.class);
stats.setNumAppsSubmitted(getMetrics().getAppsSubmitted());
stats.setNumAppsRunning(getMetrics().getAppsRunning());
stats.setNumAppsPending(getMetrics().getAppsPending());
stats.setNumAppsCompleted(getMetrics().getAppsCompleted());
stats.setNumAppsKilled(getMetrics().getAppsKilled());
stats.setNumAppsFailed(getMetrics().getAppsFailed());
stats.setNumActiveUsers(getMetrics().getActiveUsers());
stats.setAvailableMemoryMB(getMetrics().getAvailableMB());
stats.setAllocatedMemoryMB(getMetrics().getAllocatedMB());
stats.setPendingMemoryMB(getMetrics().getPendingMB());
stats.setReservedMemoryMB(getMetrics().getReservedMB());
stats.setAvailableVCores(getMetrics().getAvailableVirtualCores());
stats.setAllocatedVCores(getMetrics().getAllocatedVirtualCores());
stats.setPendingVCores(getMetrics().getPendingVirtualCores());
stats.setReservedVCores(getMetrics().getReservedVirtualCores());
stats.setPendingContainers(getMetrics().getPendingContainers());
stats.setAllocatedContainers(getMetrics().getAllocatedContainers());
stats.setReservedContainers(getMetrics().getReservedContainers());
return stats;
}

public Map<String, QueueConfigurations> getQueueConfigurations() {
Map<String, QueueConfigurations> queueConfigurations = new HashMap<>();
Set<String> nodeLabels = getNodeLabelsForQueue();
QueueResourceQuotas queueResourceQuotas = usageTracker.getQueueResourceQuotas();
for (String nodeLabel : nodeLabels) {
QueueConfigurations queueConfiguration =
recordFactory.newRecordInstance(QueueConfigurations.class);
float capacity = queueCapacities.getCapacity(nodeLabel);
float absoluteCapacity = queueCapacities.getAbsoluteCapacity(nodeLabel);
float maxCapacity = queueCapacities.getMaximumCapacity(nodeLabel);
float absMaxCapacity =
queueCapacities.getAbsoluteMaximumCapacity(nodeLabel);
float maxAMPercentage =
queueCapacities.getMaxAMResourcePercentage(nodeLabel);
queueConfiguration.setCapacity(capacity);
queueConfiguration.setAbsoluteCapacity(absoluteCapacity);
queueConfiguration.setMaxCapacity(maxCapacity);
queueConfiguration.setAbsoluteMaxCapacity(absMaxCapacity);
queueConfiguration.setMaxAMPercentage(maxAMPercentage);
queueConfiguration.setConfiguredMinCapacity(
queueResourceQuotas.getConfiguredMinResource(nodeLabel));
queueConfiguration.setConfiguredMaxCapacity(
queueResourceQuotas.getConfiguredMaxResource(nodeLabel));
queueConfiguration.setEffectiveMinCapacity(
queueResourceQuotas.getEffectiveMinResource(nodeLabel));
queueConfiguration.setEffectiveMaxCapacity(
queueResourceQuotas.getEffectiveMaxResource(nodeLabel));
queueConfigurations.put(nodeLabel, queueConfiguration);
}
return queueConfigurations;
return CSQueueInfoProvider.getQueueInfo(this);
}

@Private
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import org.apache.hadoop.yarn.api.records.QueueConfigurations;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueStatistics;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public final class CSQueueInfoProvider {

private static final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);

public CSQueueInfoProvider() {
}

public static QueueInfo getQueueInfo(AbstractCSQueue csQueue) {
// Deliberately doesn't use lock here, because this method will be invoked
// from schedulerApplicationAttempt, to avoid deadlock, sacrifice
// consistency here.
// TODO, improve this
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
queueInfo.setQueueName(csQueue.getQueuePathObject().getLeafName());
queueInfo.setQueuePath(csQueue.getQueuePathObject().getFullPath());
queueInfo.setAccessibleNodeLabels(csQueue.getAccessibleNodeLabels());
queueInfo.setCapacity(csQueue.getCapacity());
queueInfo.setMaximumCapacity(csQueue.getMaximumCapacity());
queueInfo.setQueueState(csQueue.getState());
queueInfo.setDefaultNodeLabelExpression(csQueue.getDefaultNodeLabelExpression());
queueInfo.setCurrentCapacity(csQueue.getUsedCapacity());
queueInfo.setQueueStatistics(getQueueStatistics(csQueue));
queueInfo.setPreemptionDisabled(csQueue.getPreemptionDisabled());
queueInfo.setIntraQueuePreemptionDisabled(
csQueue.getIntraQueuePreemptionDisabled());
queueInfo.setQueueConfigurations(getQueueConfigurations(csQueue));
queueInfo.setWeight(csQueue.getWeight());
queueInfo.setMaxParallelApps(csQueue.getMaxParallelApps());
return queueInfo;
}

private static QueueStatistics getQueueStatistics(AbstractCSQueue csQueue) {
// Deliberately doesn't use lock here, because this method will be invoked
// from schedulerApplicationAttempt, to avoid deadlock, sacrifice
// consistency here.
// TODO, improve this
QueueStatistics stats = recordFactory.newRecordInstance(
QueueStatistics.class);
CSQueueMetrics queueMetrics = csQueue.getMetrics();
stats.setNumAppsSubmitted(queueMetrics.getAppsSubmitted());
stats.setNumAppsRunning(queueMetrics.getAppsRunning());
stats.setNumAppsPending(queueMetrics.getAppsPending());
stats.setNumAppsCompleted(queueMetrics.getAppsCompleted());
stats.setNumAppsKilled(queueMetrics.getAppsKilled());
stats.setNumAppsFailed(queueMetrics.getAppsFailed());
stats.setNumActiveUsers(queueMetrics.getActiveUsers());
stats.setAvailableMemoryMB(queueMetrics.getAvailableMB());
stats.setAllocatedMemoryMB(queueMetrics.getAllocatedMB());
stats.setPendingMemoryMB(queueMetrics.getPendingMB());
stats.setReservedMemoryMB(queueMetrics.getReservedMB());
stats.setAvailableVCores(queueMetrics.getAvailableVirtualCores());
stats.setAllocatedVCores(queueMetrics.getAllocatedVirtualCores());
stats.setPendingVCores(queueMetrics.getPendingVirtualCores());
stats.setReservedVCores(queueMetrics.getReservedVirtualCores());
stats.setPendingContainers(queueMetrics.getPendingContainers());
stats.setAllocatedContainers(queueMetrics.getAllocatedContainers());
stats.setReservedContainers(queueMetrics.getReservedContainers());
return stats;
}

private static Map<String, QueueConfigurations> getQueueConfigurations(AbstractCSQueue csQueue) {
Map<String, QueueConfigurations> queueConfigurations = new HashMap<>();
Set<String> nodeLabels = csQueue.getNodeLabelsForQueue();
QueueResourceQuotas queueResourceQuotas = csQueue.getQueueResourceQuotas();
for (String nodeLabel : nodeLabels) {
QueueConfigurations queueConfiguration =
recordFactory.newRecordInstance(QueueConfigurations.class);
float capacity = csQueue.getCapacityByNodeLabel(nodeLabel);
float absoluteCapacity = csQueue.getAbsoluteCapacityByNodeLabel(nodeLabel);
float maxCapacity = csQueue.getMaximumCapacityByNodeLabel(nodeLabel);
float absMaxCapacity =
csQueue.getAbsoluteMaximumCapacityByNodeLabel(nodeLabel);
float maxAMPercentage =
csQueue.getMaxAMResourcePercentageByNodeLabel(nodeLabel);
queueConfiguration.setCapacity(capacity);
queueConfiguration.setAbsoluteCapacity(absoluteCapacity);
queueConfiguration.setMaxCapacity(maxCapacity);
queueConfiguration.setAbsoluteMaxCapacity(absMaxCapacity);
queueConfiguration.setMaxAMPercentage(maxAMPercentage);
queueConfiguration.setConfiguredMinCapacity(
queueResourceQuotas.getConfiguredMinResource(nodeLabel));
queueConfiguration.setConfiguredMaxCapacity(
queueResourceQuotas.getConfiguredMaxResource(nodeLabel));
queueConfiguration.setEffectiveMinCapacity(
queueResourceQuotas.getEffectiveMinResource(nodeLabel));
queueConfiguration.setEffectiveMaxCapacity(
queueResourceQuotas.getEffectiveMaxResource(nodeLabel));
queueConfigurations.put(nodeLabel, queueConfiguration);
}
return queueConfigurations;
}
}

0 comments on commit 844f6e8

Please sign in to comment.