Skip to content

YARN-11416. FS2CS should use CapacitySchedulerConfiguration in FSQueueConverterBuilder #5320

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Aug 4, 2023
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,10 @@ public int getMaximumSystemApplications() {
return maxApplications;
}

public void setMaximumApplicationMasterResourcePercent(float percent) {
setFloat(PREFIX + MAXIMUM_AM_RESOURCE_SUFFIX, percent);
}

public float getMaximumApplicationMasterResourcePercent() {
return getFloat(MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT);
Expand Down Expand Up @@ -1222,6 +1226,16 @@ public void reinitializeConfigurationProperties() {
configurationProperties = new ConfigurationProperties(props);
}

public void setQueueMaximumAllocationMb(String queue, int value) {
String queuePrefix = getQueuePrefix(queue);
setInt(queuePrefix + MAXIMUM_ALLOCATION_MB, value);
}

public void setQueueMaximumAllocationVcores(String queue, int value) {
String queuePrefix = getQueuePrefix(queue);
setInt(queuePrefix + MAXIMUM_ALLOCATION_VCORES, value);
}

public long getQueueMaximumAllocationMb(String queue) {
String queuePrefix = getQueuePrefix(queue);
return getInt(queuePrefix + MAXIMUM_ALLOCATION_MB, (int)UNDEFINED);
Expand Down Expand Up @@ -1496,6 +1510,14 @@ public List<MappingRule> parseJSONMappingRules() throws IOException {
return new ArrayList<>();
}

public void setMappingRuleFormat(String format) {
set(MAPPING_RULE_FORMAT, format);
}

public void setMappingRuleJson(String json) {
set(MAPPING_RULE_JSON, json);
}

public List<MappingRule> getMappingRules() throws IOException {
String mappingFormat =
get(MAPPING_RULE_FORMAT, MAPPING_RULE_FORMAT_DEFAULT);
Expand Down Expand Up @@ -1712,6 +1734,14 @@ public boolean getIntraQueuePreemptionDisabled(String queue,
+ QUEUE_PREEMPTION_DISABLED, defaultVal);
}

public void setPreemptionObserveOnly(boolean value) {
setBoolean(PREEMPTION_OBSERVE_ONLY, value);
}

public boolean getPreemptionObserveOnly() {
return getBoolean(PREEMPTION_OBSERVE_ONLY, DEFAULT_PREEMPTION_OBSERVE_ONLY);
}

/**
* Get configured node labels in a given queuePath.
*
Expand Down Expand Up @@ -1816,29 +1846,48 @@ public static boolean shouldAppFailFast(Configuration conf) {
return conf.getBoolean(APP_FAIL_FAST, DEFAULT_APP_FAIL_FAST);
}

public Integer getMaxParallelAppsForQueue(String queue) {
int defaultMaxParallelAppsForQueue =
getInt(PREFIX + MAX_PARALLEL_APPLICATIONS,
public void setDefaultMaxParallelApps(int value) {
setInt(PREFIX + MAX_PARALLEL_APPLICATIONS, value);
}

public Integer getDefaultMaxParallelApps() {
return getInt(PREFIX + MAX_PARALLEL_APPLICATIONS,
DEFAULT_MAX_PARALLEL_APPLICATIONS);
}

String maxParallelAppsForQueue = get(getQueuePrefix(queue)
+ MAX_PARALLEL_APPLICATIONS);
public void setDefaultMaxParallelAppsPerUser(int value) {
setInt(PREFIX + "user." + MAX_PARALLEL_APPLICATIONS, value);
}

return (maxParallelAppsForQueue != null) ?
Integer.parseInt(maxParallelAppsForQueue)
: defaultMaxParallelAppsForQueue;
public Integer getDefaultMaxParallelAppsPerUser() {
return getInt(PREFIX + "user." + MAX_PARALLEL_APPLICATIONS,
DEFAULT_MAX_PARALLEL_APPLICATIONS);
}

public void setMaxParallelAppsForUser(String user, int value) {
setInt(getUserPrefix(user) + MAX_PARALLEL_APPLICATIONS, value);
}

public Integer getMaxParallelAppsForUser(String user) {
int defaultMaxParallelAppsForUser =
getInt(PREFIX + "user." + MAX_PARALLEL_APPLICATIONS,
DEFAULT_MAX_PARALLEL_APPLICATIONS);
String maxParallelAppsForUser = get(getUserPrefix(user)
+ MAX_PARALLEL_APPLICATIONS);

return (maxParallelAppsForUser != null) ?
Integer.parseInt(maxParallelAppsForUser)
: defaultMaxParallelAppsForUser;
Integer.valueOf(maxParallelAppsForUser)
: getDefaultMaxParallelAppsPerUser();
}

public void setMaxParallelAppsForQueue(String queue, String value) {
set(getQueuePrefix(queue) + MAX_PARALLEL_APPLICATIONS, value);
}

public Integer getMaxParallelAppsForQueue(String queue) {
String maxParallelAppsForQueue = get(getQueuePrefix(queue)
+ MAX_PARALLEL_APPLICATIONS);

return (maxParallelAppsForQueue != null) ?
Integer.valueOf(maxParallelAppsForQueue)
: getDefaultMaxParallelApps();
}

public boolean getAllowZeroCapacitySum(String queue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;

import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAPPING_RULE_JSON;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT_JSON;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSQueueConverter.QUEUE_MAX_AM_SHARE_DISABLED;

Expand All @@ -35,6 +32,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AccessType;
Expand Down Expand Up @@ -342,14 +340,13 @@ private void performRuleConversion(FairScheduler fs)
}
writer.writeValue(mappingRulesOutputStream, desc);

capacitySchedulerConfig.set(MAPPING_RULE_FORMAT,
MAPPING_RULE_FORMAT_JSON);
capacitySchedulerConfig.setMappingRuleFormat(MAPPING_RULE_FORMAT_JSON);
capacitySchedulerConfig.setOverrideWithQueueMappings(true);
if (!rulesToFile) {
String json =
((ByteArrayOutputStream)mappingRulesOutputStream)
.toString(StandardCharsets.UTF_8.displayName());
capacitySchedulerConfig.set(MAPPING_RULE_JSON, json);
capacitySchedulerConfig.setMappingRuleJson(json);
}
} else {
LOG.info("No rules to convert");
Expand Down Expand Up @@ -377,47 +374,39 @@ private OutputStream getOutputStreamForJson() throws FileNotFoundException {

private void emitDefaultQueueMaxParallelApplications() {
if (queueMaxAppsDefault != Integer.MAX_VALUE) {
capacitySchedulerConfig.set(
PREFIX + "max-parallel-apps",
String.valueOf(queueMaxAppsDefault));
capacitySchedulerConfig.setDefaultMaxParallelApps(
queueMaxAppsDefault);
}
}

private void emitDefaultUserMaxParallelApplications() {
if (userMaxAppsDefault != Integer.MAX_VALUE) {
capacitySchedulerConfig.set(
PREFIX + "user.max-parallel-apps",
String.valueOf(userMaxAppsDefault));
capacitySchedulerConfig.setDefaultMaxParallelAppsPerUser(
userMaxAppsDefault);
}
}

private void emitUserMaxParallelApplications() {
userMaxApps
.forEach((user, apps) -> {
capacitySchedulerConfig.setInt(
PREFIX + "user." + user + ".max-parallel-apps", apps);
capacitySchedulerConfig.setMaxParallelAppsForUser(user, apps);
});
}

private void emitDefaultMaxAMShare() {
if (queueMaxAMShareDefault == QUEUE_MAX_AM_SHARE_DISABLED) {
capacitySchedulerConfig.setFloat(
CapacitySchedulerConfiguration.
MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
capacitySchedulerConfig.setMaximumApplicationMasterResourcePercent(
1.0f);
} else {
capacitySchedulerConfig.setFloat(
CapacitySchedulerConfiguration.
MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
capacitySchedulerConfig.setMaximumApplicationMasterResourcePercent(
queueMaxAMShareDefault);
}
}
private void emitDisablePreemptionForObserveOnlyMode() {
if (preemptionMode == FSConfigToCSConfigConverterParams
.PreemptionMode.OBSERVE_ONLY) {
capacitySchedulerConfig.
setBoolean(CapacitySchedulerConfiguration.
PREEMPTION_OBSERVE_ONLY, true);
setPreemptionObserveOnly(true);
}
}

Expand All @@ -433,13 +422,13 @@ private void generateQueueAcl(String queue,

if (!submitAcls.getGroups().isEmpty() ||
!submitAcls.getUsers().isEmpty() || submitAcls.isAllAllowed()) {
capacitySchedulerConfig.set(PREFIX + queue + ".acl_submit_applications",
capacitySchedulerConfig.setAcl(queue, QueueACL.SUBMIT_APPLICATIONS,
submitAcls.getAclString());
}

if (!adminAcls.getGroups().isEmpty() ||
!adminAcls.getUsers().isEmpty() || adminAcls.isAllAllowed()) {
capacitySchedulerConfig.set(PREFIX + queue + ".acl_administer_queue",
capacitySchedulerConfig.setAcl(queue, QueueACL.ADMINISTER_QUEUE,
adminAcls.getAclString());
}
}
Expand Down Expand Up @@ -501,7 +490,7 @@ Configuration getYarnSiteConfig() {
}

@VisibleForTesting
Configuration getCapacitySchedulerConfig() {
CapacitySchedulerConfiguration getCapacitySchedulerConfig() {
return capacitySchedulerConfig;
}

Expand Down
Loading