Skip to content

Commit

Permalink
Spark Azkaban Jobtype changes to enable version-aware queue enforceme…
Browse files Browse the repository at this point in the history
…nt (#262)

* Spark Azkaban Jobtype changes to enable version-aware queue enforcement

* Spark Azkaban Jobtype changes to enable version-aware queue enforcement

* Spark Azkaban Jobtype changes to enable version-aware queue enforcement
  • Loading branch information
jakhani authored and Victsm committed Mar 14, 2017
1 parent fd91cf7 commit 7f510d0
Show file tree
Hide file tree
Showing 5 changed files with 346 additions and 120 deletions.
279 changes: 197 additions & 82 deletions plugins/jobtype/src/azkaban/jobtype/HadoopSecureSparkWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@

package azkaban.jobtype;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import azkaban.utils.Props;
import com.google.common.collect.Maps;

import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -38,8 +37,6 @@
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.util.Utils;

import azkaban.security.commons.HadoopSecurityManager;
import azkaban.utils.Props;
import static azkaban.flow.CommonJobProperties.ATTEMPT_LINK;
import static azkaban.flow.CommonJobProperties.EXECUTION_LINK;
import static azkaban.flow.CommonJobProperties.JOB_LINK;
Expand Down Expand Up @@ -79,6 +76,7 @@ public class HadoopSecureSparkWrapper {

//YARN CONF PARAM
private static final String YARN_CONF_NODE_LABELING_ENABLED = "yarn.node-labels.enabled";
public static final String DEFAULT_QUEUE = "default";

/**
* Entry point: a Java wrapper to the spark-submit command
Expand Down Expand Up @@ -137,7 +135,7 @@ private static void runSpark(String[] args) {
handleDriverJavaOpts(newArgs);

// If dynamic allocation policy for this jobtype is turned on, adjust related param
handleDynamicResourceAllocation(newArgs);
newArgs = handleDynamicResourceAllocation(newArgs);

// If yarn cluster enables node labeling, adjust related param
newArgs = handleNodeLabeling(newArgs);
Expand Down Expand Up @@ -174,7 +172,7 @@ private static void handleDriverJavaOpts(String[] argArray) {
argArray[1] = driverJavaOptions.toString();
}

private static void handleDynamicResourceAllocation(String[] argArray) {
private static String[] handleDynamicResourceAllocation(String[] argArray) {
// HadoopSparkJob will set env var on this process if we enforce dynamic allocation policy for spark jobtype.
// This policy can be enabled through spark jobtype plugin's conf property.
// Enabling dynamic allocation policy for azkaban spark jobtype is different from enabling dynamic allocation
Expand All @@ -184,23 +182,118 @@ private static void handleDynamicResourceAllocation(String[] argArray) {
boolean dynamicAllocEnabled = dynamicAllocProp != null && dynamicAllocProp.equals(Boolean.TRUE.toString());
if (dynamicAllocEnabled) {
for (int i = 0; i < argArray.length; i++) {
if (argArray[i] == null) continue;
if (argArray[i] == null) {
continue;
}

// If user tries to disable dynamic allocation for his application
// by setting some conf params to false, we need to ignore these settings to enforce the application
// uses dynamic allocation for spark
if (argArray[i].equals(SparkJobArg.SPARK_CONF_PREFIX.sparkParamName) // --conf
&& (argArray[i+1].startsWith(SPARK_CONF_SHUFFLE_SERVICE_ENABLED) // spark.shuffle.service.enabled
|| argArray[i+1].startsWith(SPARK_CONF_DYNAMIC_ALLOC_ENABLED)) // spark.dynamicAllocation.enabled
) {
&& (argArray[i + 1].startsWith(SPARK_CONF_SHUFFLE_SERVICE_ENABLED) // spark.shuffle.service.enabled
|| argArray[i + 1].startsWith(SPARK_CONF_DYNAMIC_ALLOC_ENABLED)) // spark.dynamicAllocation.enabled
) {

logger.info("Azbakan enforces dynamic resource allocation. Ignore user param: "
+ argArray[i] + " " + argArray[i+1]);
logger.info(
"Azbakan enforces dynamic resource allocation. Ignore user param: " + argArray[i] + " " + argArray[i
+ 1]);
argArray[i] = null;
argArray[++i] = null;
}
}
// If dynamic allocation is enabled, make sure application is scheduled in right queue
argArray = handleQueueEnforcement(argArray);
}
return argArray;
}

/**
* This method is used to enforce queue for Spark application. Rules are explained below.
* a) If dynamic resource allocation is enabled for selected spark version and application requires large container
* then schedule it into default queue by a default conf(spark.yarn.queue) in spark-defaults.conf.
* b) If dynamic resource allocation is enabled for selected spark version and application requires small container
* then schedule it into Org specific queue.
* c) If dynamic resource allocation is disabled for selected spark version then schedule application into default
* queue by a default conf(spark.yarn.queue) in spark-defaults.conf.
* @param argArray
* @return
*/
protected static String[] handleQueueEnforcement(String[] argArray) {
SparkConf sparkConf = getSparkProperties();
Configuration conf = new Configuration();

int queueParameterIndex = getUserSpecifiedQueueParameterIndex(argArray);
boolean requiredSparkDefaultQueue = false;
if (sparkConf.getBoolean(SPARK_CONF_DYNAMIC_ALLOC_ENABLED, false)) {
if (isLargeContainerRequired(argArray, conf, sparkConf)) {
// Case A
requiredSparkDefaultQueue = true;
logger.info("Spark application requires Large containers. Scheduling this application into default queue by a "
+ "default conf(spark.yarn.queue) in spark-defaults.conf.");
} else {
// Case B
logger.info(
"Dynamic allocation is enabled for selected spark version and application requires small container. "
+ "Hence, scheduling this application into Org specific queue");
if (queueParameterIndex == -1) {
LinkedList<String> argList = new LinkedList(Arrays.asList(argArray));
argList.addFirst(SPARK_CONF_QUEUE + "=" + DEFAULT_QUEUE);
argList.addFirst(SparkJobArg.SPARK_CONF_PREFIX.sparkParamName);
argArray = argList.toArray(new String[argList.size()]);
}
}
} else {
// Case C
logger.info("Spark version, selected for this application, doesn't support dynamic allocation. Scheduling this "
+ "application into default queue by a default conf(spark.yarn.queue) in spark-defaults.conf.");
requiredSparkDefaultQueue = true;
}

if (queueParameterIndex != -1 && requiredSparkDefaultQueue) {
logger.info("Azbakan enforces spark.yarn.queue queue. Ignore user param: " + argArray[queueParameterIndex] + " "
+ argArray[queueParameterIndex + 1]);
argArray[queueParameterIndex] = null;
argArray[queueParameterIndex + 1] = null;
}
return argArray;
}

/**
* This method is used to check whether large container is required for application or not.
* To decide that, it is using parameters like
* User Job parameters/default value for : spark.executor.cores, spark.executor.memory, spark.yarn.executor.memoryOverhead
* Jobtype Plugin parameters: spark.min.mem.vore.ratio, spark.min.memory-gb.size
* If rounded memory / spark.executor.cores >= spark.min.mem.vore.ratio or rounded memory >= spark.min.memory-gb.size
* then large container is required to schedule this application.
* @param conf
* @param sparkConf
* @return
*/
private static boolean isLargeContainerRequired(String[] argArray, Configuration conf, SparkConf sparkConf) {
Map<String, String> executorParameters = getUserSpecifiedExecutorParameters(argArray);
String executorVcore = executorParameters.get(SPARK_EXECUTOR_CORES);
String executorMem = executorParameters.get(SPARK_EXECUTOR_MEMORY);
String executorMemOverhead = executorParameters.get(SPARK_EXECUTOR_MEMORY_OVERHEAD);
if (executorVcore == null) {
executorVcore = sparkConf.get(SPARK_EXECUTOR_CORES, SPARK_EXECUTOR_DEFAULT_CORES);
}
if (executorMem == null) {
executorMem = sparkConf.get(SPARK_EXECUTOR_MEMORY, SPARK_EXECUTOR_DEFAULT_MEMORY);
}
if (executorMemOverhead == null) {
executorMemOverhead = sparkConf.get(SPARK_EXECUTOR_MEMORY_OVERHEAD, null);
}

double roundedMemoryGbSize = getRoundedMemoryGb(executorMem, executorMemOverhead, conf);

double minRatio = Double.parseDouble(System.getenv(HadoopSparkJob.SPARK_MIN_MEM_VCORE_RATIO_ENV_VAR));
double minMemSize = Double.parseDouble(System.getenv(HadoopSparkJob.SPARK_MIN_MEM_SIZE_ENV_VAR));

logger.info(
"RoundedMemoryGbSize: " + roundedMemoryGbSize + ", ExecutorVcore: " + executorVcore + ", MinRatio: " + minRatio
+ ", MinMemSize: " + minMemSize);
return roundedMemoryGbSize / (double) Integer.parseInt(executorVcore) >= minRatio
|| roundedMemoryGbSize >= minMemSize;
}

protected static String[] handleNodeLabeling(String[] argArray) {
Expand All @@ -210,86 +303,22 @@ protected static String[] handleNodeLabeling(String[] argArray) {
// feature for Yarn. This config inside Spark job type is to enforce node labeling feature for all
// Spark applications submitted via Azkaban Spark job type.
Configuration conf = new Configuration();
String sparkPropertyFile = HadoopSecureSparkWrapper.class.getClassLoader()
.getResource("spark-defaults.conf").getPath();
boolean nodeLabelingYarn = conf.getBoolean(YARN_CONF_NODE_LABELING_ENABLED, false);
String nodeLabelingProp = System.getenv(HadoopSparkJob.SPARK_NODE_LABELING_ENV_VAR);
boolean nodeLabelingPolicy = nodeLabelingProp != null && nodeLabelingProp.equals(Boolean.TRUE.toString());
String autoNodeLabelProp = System.getenv(HadoopSparkJob.SPARK_AUTO_NODE_LABELING_ENV_VAR);
boolean autoNodeLabeling = autoNodeLabelProp != null && autoNodeLabelProp.equals(Boolean.TRUE.toString());
String desiredNodeLabel = System.getenv(HadoopSparkJob.SPARK_DESIRED_NODE_LABEL_ENV_VAR);
String minMemVcoreRatio = System.getenv(HadoopSparkJob.SPARK_MIN_MEM_VCORE_RATIO_ENV_VAR);
String minMemGBSize = System.getenv(HadoopSparkJob.SPARK_MIN_MEM_SIZE_ENV_VAR);
String executorMem = null;
String executorVcore = null;
String executorMemOverhead = null;

SparkConf sparkConf = new SparkConf(false);
sparkConf.setAll(Utils.getPropertiesFromFile(sparkPropertyFile));
SparkConf sparkConf = getSparkProperties();

if (nodeLabelingYarn && nodeLabelingPolicy) {
for (int i = 0; i < argArray.length; i++) {
if (argArray[i] == null) {
continue;
}
if (nodeLabelingPolicy) {
// If yarn cluster enables node labeling, applications should be submitted to a default
// queue by a default conf(spark.yarn.queue) in spark-defaults.conf
// We should ignore user-specified queue param to enforece the node labeling
// (--queue test or --conf spark.yarn.queue=test)
if ((argArray[i].equals(SparkJobArg.SPARK_CONF_PREFIX.sparkParamName) &&
argArray[i+1].startsWith(SPARK_CONF_QUEUE))
|| (argArray[i].equals(SparkJobArg.QUEUE.sparkParamName))) {

logger.info("Azbakan enforces node labeling. Ignore user param: "
+ argArray[i] + " " + argArray[i+1]);
argArray[i] = null;
argArray[++i] = null;
continue;
}
if (autoNodeLabeling) {
// If auto node labeling is enabled, job type should ignore user supplied
// node label expression for Spark executors. This config will be automatically
// set by the job type based on the mem-to-vcore resource ratio requested by
// the user application.
if (argArray[i].equals(SparkJobArg.SPARK_CONF_PREFIX.sparkParamName) &&
argArray[i+1].startsWith(SPARK_EXECUTOR_NODE_LABEL_EXP)) {
logger.info("Azbakan auto-sets node label expression. Ignore user param: "
+ argArray[i] + " " + argArray[i+1]);
argArray[i] = null;
argArray[++i] = null;
continue;
}
if (argArray[i].equals(SparkJobArg.EXECUTOR_CORES.sparkParamName)) {
executorVcore = argArray[++i];
}
if (argArray[i].equals(SparkJobArg.EXECUTOR_MEMORY.sparkParamName)) {
executorMem = argArray[++i];
}
if (argArray[i].equals(SparkJobArg.SPARK_CONF_PREFIX.sparkParamName) &&
argArray[i+1].startsWith(SPARK_EXECUTOR_MEMORY_OVERHEAD)) {
executorMemOverhead = argArray[i+1].split("=")[1].trim();
}
}
}
}
ignoreUserSpecifiedNodeLabelParameter(argArray, autoNodeLabeling);

// If auto node labeling is enabled, automatically sets spark.yarn.executor.nodeLabelExpression
// config based on user requested resources.
if (autoNodeLabeling) {
double minRatio = Double.parseDouble(minMemVcoreRatio);
double minMemSize = Double.parseDouble(minMemGBSize);
if (executorVcore == null) {
executorVcore = sparkConf.get(SPARK_EXECUTOR_CORES, SPARK_EXECUTOR_DEFAULT_CORES);
}
if (executorMem == null) {
executorMem = sparkConf.get(SPARK_EXECUTOR_MEMORY, SPARK_EXECUTOR_DEFAULT_MEMORY);
}
if (executorMemOverhead == null) {
executorMemOverhead = sparkConf.get(SPARK_EXECUTOR_MEMORY_OVERHEAD, null);
}
double roundedMemoryGbSize = getRoundedMemoryGb(executorMem, executorMemOverhead, conf);
if (roundedMemoryGbSize / Integer.parseInt(executorVcore) >= minRatio ||
roundedMemoryGbSize >= minMemSize) {
if (isLargeContainerRequired(argArray, conf, sparkConf)) {
LinkedList<String> argList = new LinkedList<String>(Arrays.asList(argArray));
argList.addFirst(SPARK_EXECUTOR_NODE_LABEL_EXP + "=" + desiredNodeLabel);
argList.addFirst(SparkJobArg.SPARK_CONF_PREFIX.sparkParamName);
Expand All @@ -300,6 +329,92 @@ protected static String[] handleNodeLabeling(String[] argArray) {
return argArray;
}

/**
* This method is used to ignore user specified node label Parameter. When auto node labeling is enabled,
* job type should ignore user supplied node label expression for Spark executors.
* @param argArray
* @param autoNodeLabeling
*/
private static void ignoreUserSpecifiedNodeLabelParameter(String[] argArray, boolean autoNodeLabeling) {
for (int i = 0; i < argArray.length; i++) {
if (argArray[i] == null) {
continue;
}
if (autoNodeLabeling) {
// This config will be automatically set by the job type based on the mem-to-vcore resource ratio requested by
// the user application.
if (argArray[i].equals(SparkJobArg.SPARK_CONF_PREFIX.sparkParamName) && argArray[i + 1]
.startsWith(SPARK_EXECUTOR_NODE_LABEL_EXP)) {
logger.info(
"Azbakan auto-sets node label expression. Ignore user param: " + argArray[i] + " " + argArray[i + 1]);
argArray[i] = null;
argArray[++i] = null;
continue;
}
}
}
}

/**
* This method is used to get User specified executor parameters. It is capturing executor-memory, executor-cores and
* spark.yarn.executor.memoryOverhead.
* @param argArray
* @return
*/
private static Map<String, String> getUserSpecifiedExecutorParameters(String[] argArray) {
Map<String, String> executorParameters = Maps.newHashMap();
for (int i = 0; i < argArray.length; i++) {
if (argArray[i] == null) {
continue;
}
if (argArray[i].equals(SparkJobArg.EXECUTOR_CORES.sparkParamName)) {
executorParameters.put(SPARK_EXECUTOR_CORES, argArray[++i]);
}
if (argArray[i].equals(SparkJobArg.EXECUTOR_MEMORY.sparkParamName)) {
executorParameters.put(SPARK_EXECUTOR_MEMORY, argArray[++i]);
}
if (argArray[i].equals(SparkJobArg.SPARK_CONF_PREFIX.sparkParamName) && argArray[i + 1]
.startsWith(SPARK_EXECUTOR_MEMORY_OVERHEAD)) {
executorParameters.put(SPARK_EXECUTOR_MEMORY_OVERHEAD, argArray[i + 1].split("=")[1].trim());
}
}
return executorParameters;
}

/**
* This method is used to retrieve index of queue parameter passed by User.
* @param argArray
* @return
*/
private static int getUserSpecifiedQueueParameterIndex(String[] argArray) {
int queueParameterIndex = -1;
for (int i = 0; i < argArray.length; i++) {
if (argArray[i] == null) {
continue;
}
// Fetch index of queue parameter passed by User.
// (--queue test or --conf spark.yarn.queue=test)
if ((argArray[i].equals(SparkJobArg.SPARK_CONF_PREFIX.sparkParamName) && argArray[i + 1]
.startsWith(SPARK_CONF_QUEUE)) || (argArray[i].equals(SparkJobArg.QUEUE.sparkParamName))) {
queueParameterIndex = i++;
break;
}
}
return queueParameterIndex;
}

/**
* This method is used to get Spark properties which will fetch properties from spark-defaults.conf file.
* @return
*/
private static SparkConf getSparkProperties() {
String sparkPropertyFile = HadoopSecureSparkWrapper.class.getClassLoader()
.getResource("spark-defaults.conf").getPath();
SparkConf sparkConf = new SparkConf(false);
sparkConf.setAll(Utils.getPropertiesFromFile(sparkPropertyFile));
return sparkConf;
}

/**
* Get the memory GB size of Spark executor containers. The logic is as follows:
* 1) Transforms requested memory String into a number representing amount of MBs requested.
Expand Down
Loading

0 comments on commit 7f510d0

Please sign in to comment.