From 844f6e8313ca38e238fce8e15aa07bdd3c4c90c5 Mon Sep 17 00:00:00 2001 From: Peter Szucs Date: Tue, 29 Nov 2022 11:10:08 +0100 Subject: [PATCH] YARN-10946. Moved QueueInfo creation to separate class Change-Id: If4da3cd24e9b2a94689ef8e1ea5b69dc1b84288b --- .../scheduler/capacity/AbstractCSQueue.java | 108 ++++-------------- .../capacity/CSQueueInfoProvider.java | 106 +++++++++++++++++ 2 files changed, 131 insertions(+), 83 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueInfoProvider.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 1a5a1ce0fd461..9912237060ac9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -29,10 +29,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; -import org.apache.hadoop.yarn.api.records.QueueConfigurations; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; -import org.apache.hadoop.yarn.api.records.QueueStatistics; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -200,16 +198,28 @@ public float getCapacity() { return queueCapacities.getCapacity(); } + public float getCapacityByNodeLabel(String nodeLabel) { + return queueCapacities.getCapacity(nodeLabel); + } + @Override public float getAbsoluteCapacity() { return queueCapacities.getAbsoluteCapacity(); } + public float getAbsoluteCapacityByNodeLabel(String nodeLabel) { + return queueCapacities.getAbsoluteCapacity(nodeLabel); + } + @Override public float getAbsoluteMaximumCapacity() { return queueCapacities.getAbsoluteMaximumCapacity(); } + public float getAbsoluteMaximumCapacityByNodeLabel(String nodeLabel) { + return queueCapacities.getAbsoluteMaximumCapacity(nodeLabel); + } + @Override public float getAbsoluteUsedCapacity() { return queueCapacities.getAbsoluteUsedCapacity(); @@ -220,11 +230,23 @@ public float getMaximumCapacity() { return queueCapacities.getMaximumCapacity(); } + public float getMaximumCapacityByNodeLabel(String nodeLabel) { + return queueCapacities.getMaximumCapacity(nodeLabel); + } + + public float getMaxAMResourcePercentageByNodeLabel(String nodeLabel) { + return queueCapacities.getMaxAMResourcePercentage(nodeLabel); + } + @Override public float getUsedCapacity() { return queueCapacities.getUsedCapacity(); } + public float getWeight() { + return queueCapacities.getWeight(); + } + @Override public Resource getUsedResources() { return usageTracker.getQueueUsage().getUsed(); @@ -580,87 +602,7 @@ public QueueCapacityVector getConfiguredCapacityVector( } protected QueueInfo getQueueInfo() { - // Deliberately doesn't use lock here, because this method will be invoked - // from schedulerApplicationAttempt, to avoid deadlock, sacrifice - // consistency here. - // TODO, improve this - QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class); - queueInfo.setQueueName(queuePath.getLeafName()); - queueInfo.setQueuePath(queuePath.getFullPath()); - queueInfo.setAccessibleNodeLabels(queueNodeLabelsSettings.getAccessibleNodeLabels()); - queueInfo.setCapacity(queueCapacities.getCapacity()); - queueInfo.setMaximumCapacity(queueCapacities.getMaximumCapacity()); - queueInfo.setQueueState(getState()); - queueInfo.setDefaultNodeLabelExpression(queueNodeLabelsSettings.getDefaultLabelExpression()); - queueInfo.setCurrentCapacity(getUsedCapacity()); - queueInfo.setQueueStatistics(getQueueStatistics()); - queueInfo.setPreemptionDisabled(preemptionSettings.isPreemptionDisabled()); - queueInfo.setIntraQueuePreemptionDisabled( - getIntraQueuePreemptionDisabled()); - queueInfo.setQueueConfigurations(getQueueConfigurations()); - queueInfo.setWeight(queueCapacities.getWeight()); - queueInfo.setMaxParallelApps(queueAppLifetimeSettings.getMaxParallelApps()); - return queueInfo; - } - - public QueueStatistics getQueueStatistics() { - // Deliberately doesn't use lock here, because this method will be invoked - // from schedulerApplicationAttempt, to avoid deadlock, sacrifice - // consistency here. - // TODO, improve this - QueueStatistics stats = recordFactory.newRecordInstance( - QueueStatistics.class); - stats.setNumAppsSubmitted(getMetrics().getAppsSubmitted()); - stats.setNumAppsRunning(getMetrics().getAppsRunning()); - stats.setNumAppsPending(getMetrics().getAppsPending()); - stats.setNumAppsCompleted(getMetrics().getAppsCompleted()); - stats.setNumAppsKilled(getMetrics().getAppsKilled()); - stats.setNumAppsFailed(getMetrics().getAppsFailed()); - stats.setNumActiveUsers(getMetrics().getActiveUsers()); - stats.setAvailableMemoryMB(getMetrics().getAvailableMB()); - stats.setAllocatedMemoryMB(getMetrics().getAllocatedMB()); - stats.setPendingMemoryMB(getMetrics().getPendingMB()); - stats.setReservedMemoryMB(getMetrics().getReservedMB()); - stats.setAvailableVCores(getMetrics().getAvailableVirtualCores()); - stats.setAllocatedVCores(getMetrics().getAllocatedVirtualCores()); - stats.setPendingVCores(getMetrics().getPendingVirtualCores()); - stats.setReservedVCores(getMetrics().getReservedVirtualCores()); - stats.setPendingContainers(getMetrics().getPendingContainers()); - stats.setAllocatedContainers(getMetrics().getAllocatedContainers()); - stats.setReservedContainers(getMetrics().getReservedContainers()); - return stats; - } - - public Map getQueueConfigurations() { - Map queueConfigurations = new HashMap<>(); - Set nodeLabels = getNodeLabelsForQueue(); - QueueResourceQuotas queueResourceQuotas = usageTracker.getQueueResourceQuotas(); - for (String nodeLabel : nodeLabels) { - QueueConfigurations queueConfiguration = - recordFactory.newRecordInstance(QueueConfigurations.class); - float capacity = queueCapacities.getCapacity(nodeLabel); - float absoluteCapacity = queueCapacities.getAbsoluteCapacity(nodeLabel); - float maxCapacity = queueCapacities.getMaximumCapacity(nodeLabel); - float absMaxCapacity = - queueCapacities.getAbsoluteMaximumCapacity(nodeLabel); - float maxAMPercentage = - queueCapacities.getMaxAMResourcePercentage(nodeLabel); - queueConfiguration.setCapacity(capacity); - queueConfiguration.setAbsoluteCapacity(absoluteCapacity); - queueConfiguration.setMaxCapacity(maxCapacity); - queueConfiguration.setAbsoluteMaxCapacity(absMaxCapacity); - queueConfiguration.setMaxAMPercentage(maxAMPercentage); - queueConfiguration.setConfiguredMinCapacity( - queueResourceQuotas.getConfiguredMinResource(nodeLabel)); - queueConfiguration.setConfiguredMaxCapacity( - queueResourceQuotas.getConfiguredMaxResource(nodeLabel)); - queueConfiguration.setEffectiveMinCapacity( - queueResourceQuotas.getEffectiveMinResource(nodeLabel)); - queueConfiguration.setEffectiveMaxCapacity( - queueResourceQuotas.getEffectiveMaxResource(nodeLabel)); - queueConfigurations.put(nodeLabel, queueConfiguration); - } - return queueConfigurations; + return CSQueueInfoProvider.getQueueInfo(this); } @Private diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueInfoProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueInfoProvider.java new file mode 100644 index 0000000000000..008ee19cfe7be --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueInfoProvider.java @@ -0,0 +1,106 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.api.records.QueueConfigurations; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueStatistics; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public final class CSQueueInfoProvider { + + private static final RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + public CSQueueInfoProvider() { + } + + public static QueueInfo getQueueInfo(AbstractCSQueue csQueue) { + // Deliberately doesn't use lock here, because this method will be invoked + // from schedulerApplicationAttempt, to avoid deadlock, sacrifice + // consistency here. + // TODO, improve this + QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class); + queueInfo.setQueueName(csQueue.getQueuePathObject().getLeafName()); + queueInfo.setQueuePath(csQueue.getQueuePathObject().getFullPath()); + queueInfo.setAccessibleNodeLabels(csQueue.getAccessibleNodeLabels()); + queueInfo.setCapacity(csQueue.getCapacity()); + queueInfo.setMaximumCapacity(csQueue.getMaximumCapacity()); + queueInfo.setQueueState(csQueue.getState()); + queueInfo.setDefaultNodeLabelExpression(csQueue.getDefaultNodeLabelExpression()); + queueInfo.setCurrentCapacity(csQueue.getUsedCapacity()); + queueInfo.setQueueStatistics(getQueueStatistics(csQueue)); + queueInfo.setPreemptionDisabled(csQueue.getPreemptionDisabled()); + queueInfo.setIntraQueuePreemptionDisabled( + csQueue.getIntraQueuePreemptionDisabled()); + queueInfo.setQueueConfigurations(getQueueConfigurations(csQueue)); + queueInfo.setWeight(csQueue.getWeight()); + queueInfo.setMaxParallelApps(csQueue.getMaxParallelApps()); + return queueInfo; + } + + private static QueueStatistics getQueueStatistics(AbstractCSQueue csQueue) { + // Deliberately doesn't use lock here, because this method will be invoked + // from schedulerApplicationAttempt, to avoid deadlock, sacrifice + // consistency here. + // TODO, improve this + QueueStatistics stats = recordFactory.newRecordInstance( + QueueStatistics.class); + CSQueueMetrics queueMetrics = csQueue.getMetrics(); + stats.setNumAppsSubmitted(queueMetrics.getAppsSubmitted()); + stats.setNumAppsRunning(queueMetrics.getAppsRunning()); + stats.setNumAppsPending(queueMetrics.getAppsPending()); + stats.setNumAppsCompleted(queueMetrics.getAppsCompleted()); + stats.setNumAppsKilled(queueMetrics.getAppsKilled()); + stats.setNumAppsFailed(queueMetrics.getAppsFailed()); + stats.setNumActiveUsers(queueMetrics.getActiveUsers()); + stats.setAvailableMemoryMB(queueMetrics.getAvailableMB()); + stats.setAllocatedMemoryMB(queueMetrics.getAllocatedMB()); + stats.setPendingMemoryMB(queueMetrics.getPendingMB()); + stats.setReservedMemoryMB(queueMetrics.getReservedMB()); + stats.setAvailableVCores(queueMetrics.getAvailableVirtualCores()); + stats.setAllocatedVCores(queueMetrics.getAllocatedVirtualCores()); + stats.setPendingVCores(queueMetrics.getPendingVirtualCores()); + stats.setReservedVCores(queueMetrics.getReservedVirtualCores()); + stats.setPendingContainers(queueMetrics.getPendingContainers()); + stats.setAllocatedContainers(queueMetrics.getAllocatedContainers()); + stats.setReservedContainers(queueMetrics.getReservedContainers()); + return stats; + } + + private static Map getQueueConfigurations(AbstractCSQueue csQueue) { + Map queueConfigurations = new HashMap<>(); + Set nodeLabels = csQueue.getNodeLabelsForQueue(); + QueueResourceQuotas queueResourceQuotas = csQueue.getQueueResourceQuotas(); + for (String nodeLabel : nodeLabels) { + QueueConfigurations queueConfiguration = + recordFactory.newRecordInstance(QueueConfigurations.class); + float capacity = csQueue.getCapacityByNodeLabel(nodeLabel); + float absoluteCapacity = csQueue.getAbsoluteCapacityByNodeLabel(nodeLabel); + float maxCapacity = csQueue.getMaximumCapacityByNodeLabel(nodeLabel); + float absMaxCapacity = + csQueue.getAbsoluteMaximumCapacityByNodeLabel(nodeLabel); + float maxAMPercentage = + csQueue.getMaxAMResourcePercentageByNodeLabel(nodeLabel); + queueConfiguration.setCapacity(capacity); + queueConfiguration.setAbsoluteCapacity(absoluteCapacity); + queueConfiguration.setMaxCapacity(maxCapacity); + queueConfiguration.setAbsoluteMaxCapacity(absMaxCapacity); + queueConfiguration.setMaxAMPercentage(maxAMPercentage); + queueConfiguration.setConfiguredMinCapacity( + queueResourceQuotas.getConfiguredMinResource(nodeLabel)); + queueConfiguration.setConfiguredMaxCapacity( + queueResourceQuotas.getConfiguredMaxResource(nodeLabel)); + queueConfiguration.setEffectiveMinCapacity( + queueResourceQuotas.getEffectiveMinResource(nodeLabel)); + queueConfiguration.setEffectiveMaxCapacity( + queueResourceQuotas.getEffectiveMaxResource(nodeLabel)); + queueConfigurations.put(nodeLabel, queueConfiguration); + } + return queueConfigurations; + } +}