Skip to content

Commit d2c9eb6

Browse files
jchenjcJian Chen
andauthored
YARN-11073. Avoid unnecessary preemption for tiny queues under certain corner cases (#4110)
Co-authored-by: Jian Chen <jian.chen@airbnb.com> Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
1 parent eced5be commit d2c9eb6

File tree

2 files changed

+156
-21
lines changed

2 files changed

+156
-21
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java

Lines changed: 152 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,16 @@
3131
import java.util.Comparator;
3232
import java.util.Iterator;
3333
import java.util.PriorityQueue;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
3436

3537
/**
3638
* Calculate how much resources need to be preempted for each queue,
3739
* will be used by {@link PreemptionCandidatesSelector}.
3840
*/
3941
public class AbstractPreemptableResourceCalculator {
42+
private static final Logger LOG = LoggerFactory.getLogger(
43+
AbstractPreemptableResourceCalculator.class);
4044

4145
protected final CapacitySchedulerPreemptionContext context;
4246
protected final ResourceCalculator rc;
@@ -76,6 +80,34 @@ private double getIdealPctOfGuaranteed(TempQueuePerPartition q) {
7680
}
7781
}
7882

83+
private static class NormalizationTuple {
84+
private Resource numerator;
85+
private Resource denominator;
86+
87+
NormalizationTuple(Resource numer, Resource denom) {
88+
this.numerator = numer;
89+
this.denominator = denom;
90+
}
91+
92+
long getNumeratorValue(int i) {
93+
return numerator.getResourceInformation(i).getValue();
94+
}
95+
96+
long getDenominatorValue(int i) {
97+
String nUnits = numerator.getResourceInformation(i).getUnits();
98+
ResourceInformation dResourceInformation = denominator
99+
.getResourceInformation(i);
100+
return UnitsConversionUtil.convert(
101+
dResourceInformation.getUnits(), nUnits, dResourceInformation.getValue());
102+
}
103+
104+
float getNormalizedValue(int i) {
105+
long nValue = getNumeratorValue(i);
106+
long dValue = getDenominatorValue(i);
107+
return dValue == 0 ? 0.0f : (float) nValue / dValue;
108+
}
109+
}
110+
79111
/**
80112
* PreemptableResourceCalculator constructor.
81113
*
@@ -175,7 +207,7 @@ protected void computeFixpointAllocation(Resource totGuarant,
175207
unassigned, Resources.none())) {
176208
// we compute normalizedGuarantees capacity based on currently active
177209
// queues
178-
resetCapacity(unassigned, orderedByNeed, ignoreGuarantee);
210+
resetCapacity(orderedByNeed, ignoreGuarantee);
179211

180212
// For each underserved queue (or set of queues if multiple are equally
181213
// underserved), offer its share of the unassigned resources based on its
@@ -252,47 +284,146 @@ protected void initIdealAssignment(Resource totGuarant,
252284
/**
253285
* Computes a normalizedGuaranteed capacity based on active queues.
254286
*
255-
* @param clusterResource
256-
* the total amount of resources in the cluster
257287
* @param queues
258288
* the list of queues to consider
259289
* @param ignoreGuar
260290
* ignore guarantee.
261291
*/
262-
private void resetCapacity(Resource clusterResource,
263-
Collection<TempQueuePerPartition> queues, boolean ignoreGuar) {
292+
private void resetCapacity(Collection<TempQueuePerPartition> queues,
293+
boolean ignoreGuar) {
264294
Resource activeCap = Resource.newInstance(0, 0);
295+
float activeTotalAbsCap = 0.0f;
265296
int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
266297

267298
if (ignoreGuar) {
268-
for (TempQueuePerPartition q : queues) {
269-
for (int i = 0; i < maxLength; i++) {
270-
q.normalizedGuarantee[i] = 1.0f / queues.size();
299+
for (int i = 0; i < maxLength; i++) {
300+
for (TempQueuePerPartition q : queues) {
301+
computeNormGuarEvenly(q, queues.size(), i);
271302
}
272303
}
273304
} else {
274305
for (TempQueuePerPartition q : queues) {
275306
Resources.addTo(activeCap, q.getGuaranteed());
307+
activeTotalAbsCap += q.getAbsCapacity();
276308
}
277-
for (TempQueuePerPartition q : queues) {
278-
for (int i = 0; i < maxLength; i++) {
279-
ResourceInformation nResourceInformation = q.getGuaranteed()
280-
.getResourceInformation(i);
281-
ResourceInformation dResourceInformation = activeCap
282-
.getResourceInformation(i);
283-
284-
long nValue = nResourceInformation.getValue();
285-
long dValue = UnitsConversionUtil.convert(
286-
dResourceInformation.getUnits(), nResourceInformation.getUnits(),
287-
dResourceInformation.getValue());
288-
if (dValue != 0) {
289-
q.normalizedGuarantee[i] = (float) nValue / dValue;
309+
310+
// loop through all resource types and normalize guaranteed capacity for all queues
311+
for (int i = 0; i < maxLength; i++) {
312+
boolean useAbsCapBasedNorm = false;
313+
// if the sum of absolute capacity of all queues involved is 0,
314+
// we should normalize evenly
315+
boolean useEvenlyDistNorm = activeTotalAbsCap == 0;
316+
317+
// loop through all the queues once to determine the
318+
// right normalization strategy for current processing resource type
319+
for (TempQueuePerPartition q : queues) {
320+
NormalizationTuple normTuple = new NormalizationTuple(
321+
q.getGuaranteed(), activeCap);
322+
long queueGuaranValue = normTuple.getNumeratorValue(i);
323+
long totalActiveGuaranValue = normTuple.getDenominatorValue(i);
324+
325+
if (queueGuaranValue == 0 && q.getAbsCapacity() != 0 && totalActiveGuaranValue != 0) {
326+
// when the rounded value of a resource type is 0 but its absolute capacity is not 0,
327+
// we should consider taking the normalized guarantee based on absolute capacity
328+
useAbsCapBasedNorm = true;
329+
break;
330+
}
331+
332+
if (totalActiveGuaranValue == 0) {
333+
// If totalActiveGuaranValue from activeCap is zero, that means the guaranteed capacity
334+
// of this resource dimension for all active queues is tiny (close to 0).
335+
// For example, if a queue has 1% of minCapacity on a cluster with a totalVcores of 48,
336+
// then the idealAssigned Vcores for this queue is (48 * 0.01)=0.48 which then
337+
// get rounded/casted into 0 (double -> long)
338+
// In this scenario where the denominator is 0, we can just spread resources across
339+
// all tiny queues evenly since their absoluteCapacity are roughly the same
340+
useEvenlyDistNorm = true;
341+
}
342+
}
343+
344+
if (LOG.isDebugEnabled()) {
345+
LOG.debug("Queue normalization strategy: " +
346+
"absoluteCapacityBasedNormalization(" + useAbsCapBasedNorm +
347+
"), evenlyDistributedNormalization(" + useEvenlyDistNorm +
348+
"), defaultNormalization(" + !(useAbsCapBasedNorm || useEvenlyDistNorm) + ")");
349+
}
350+
351+
// loop through all the queues again to apply normalization strategy
352+
for (TempQueuePerPartition q : queues) {
353+
if (useAbsCapBasedNorm) {
354+
computeNormGuarFromAbsCapacity(q, activeTotalAbsCap, i);
355+
} else if (useEvenlyDistNorm) {
356+
computeNormGuarEvenly(q, queues.size(), i);
357+
} else {
358+
computeDefaultNormGuar(q, activeCap, i);
290359
}
291360
}
292361
}
293362
}
294363
}
295364

365+
/**
366+
* Computes the normalized guaranteed capacity based on the weight of a queue's abs capacity.
367+
*
368+
* Example:
369+
* There are two active queues: queueA & queueB, and
370+
* their configured absolute minimum capacity is 1% and 3% respectively.
371+
*
372+
* Then their normalized guaranteed capacity are:
373+
* normalized_guar_queueA = 0.01 / (0.01 + 0.03) = 0.25
374+
* normalized_guar_queueB = 0.03 / (0.01 + 0.03) = 0.75
375+
*
376+
* @param q
377+
* the queue to consider
378+
* @param activeTotalAbsCap
379+
* the sum of absolute capacity of all active queues
380+
* @param resourceTypeIdx
381+
* index of the processing resource type
382+
*/
383+
private static void computeNormGuarFromAbsCapacity(TempQueuePerPartition q,
384+
float activeTotalAbsCap,
385+
int resourceTypeIdx) {
386+
if (activeTotalAbsCap != 0) {
387+
q.normalizedGuarantee[resourceTypeIdx] = q.getAbsCapacity() / activeTotalAbsCap;
388+
}
389+
}
390+
391+
/**
392+
* Computes the normalized guaranteed capacity evenly based on num of active queues.
393+
*
394+
* @param q
395+
* the queue to consider
396+
* @param numOfActiveQueues
397+
* number of active queues
398+
* @param resourceTypeIdx
399+
* index of the processing resource type
400+
*/
401+
private static void computeNormGuarEvenly(TempQueuePerPartition q,
402+
int numOfActiveQueues,
403+
int resourceTypeIdx) {
404+
q.normalizedGuarantee[resourceTypeIdx] = 1.0f / numOfActiveQueues;
405+
}
406+
407+
/**
408+
* The default way to compute a queue's normalized guaranteed capacity.
409+
*
410+
* For each resource type, divide a queue's configured guaranteed amount (MBs/Vcores) by
411+
* the total amount of guaranteed resource of all active queues
412+
*
413+
* @param q
414+
* the queue to consider
415+
* @param activeCap
416+
* total guaranteed resources of all active queues
417+
* @param resourceTypeIdx
418+
* index of the processing resource type
419+
*/
420+
private static void computeDefaultNormGuar(TempQueuePerPartition q,
421+
Resource activeCap,
422+
int resourceTypeIdx) {
423+
NormalizationTuple normTuple = new NormalizationTuple(q.getGuaranteed(), activeCap);
424+
q.normalizedGuarantee[resourceTypeIdx] = normTuple.getNormalizedValue(resourceTypeIdx);
425+
}
426+
296427
// Take the most underserved TempQueue (the one on the head). Collect and
297428
// return the list of all queues that have the same idealAssigned
298429
// percentage of guaranteed.

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,10 @@ Resource offer(Resource avail, ResourceCalculator rc,
201201
return remain;
202202
}
203203

204+
public float getAbsCapacity() {
205+
return absCapacity;
206+
}
207+
204208
public Resource getGuaranteed() {
205209
if(!effMinRes.equals(Resources.none())) {
206210
return Resources.clone(effMinRes);

0 commit comments

Comments
 (0)