Skip to content

Commit

Permalink
Remove the requirement of having to configure path to default spark c…
Browse files Browse the repository at this point in the history
…onfiguration file.
  • Loading branch information
Victsm committed Sep 2, 2016
1 parent 85d36d2 commit c04cc32
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ protected static String[] handleNodeLabeling(String[] argArray) {
// feature for Yarn. This config inside Spark job type is to enforce node labeling feature for all
// Spark applications submitted via Azkaban Spark job type.
Configuration conf = new Configuration();
String sparkPropertyFile = System.getenv(HadoopSparkJob.SPARK_PROPERTY_FILE_PATH_ENV_VAR);
String sparkPropertyFile = HadoopSecureSparkWrapper.class.getClassLoader()
.getResource("spark-defaults.conf").getPath();
boolean nodeLabelingYarn = conf.getBoolean(YARN_CONF_NODE_LABELING_ENABLED, false);
String nodeLabelingProp = System.getenv(HadoopSparkJob.SPARK_NODE_LABELING_ENV_VAR);
boolean nodeLabelingPolicy = nodeLabelingProp != null && nodeLabelingProp.equals(Boolean.TRUE.toString());
Expand Down Expand Up @@ -299,6 +300,24 @@ protected static String[] handleNodeLabeling(String[] argArray) {
return argArray;
}

/**
* Caculate the memory to vcore ratio for the executors requested by the user. The logic is
* as follows:
* 1) Transforms requested memory String into a number representing amount of MBs requested.
* 2a) If memory overhead is not set by the user, use the default logic to calculate it,
* which is to add max(requestedMemInMB * 10%, 384) to the requested memory size.
* 2b) If memory overhead is set by the user, directly add it.
* 3) Use the logic inside YARN to round up the container size according to defined min
* allocation for memory size.
* 4) Transform the MB number to GB number and divided it by number of vCore to get the
* ratio
* @param mem requested executor memory size, of the format 2G or 1024M
* @param memOverhead user defined memory overhead
* @param vcore requesed executor vCore number
* @param sparkConf SparkConf object
* @param config Hadoop Configuration object
* @return the calculated ratio between allocated executor's memory and vcore resources.
*/
private static double calculateMemVcoreRatio(String mem, String memOverhead, String vcore,
SparkConf sparkConf, Configuration config) {
int memoryMb = (int) JavaUtils.byteStringAsMb(mem);
Expand Down
17 changes: 6 additions & 11 deletions plugins/jobtype/src/azkaban/jobtype/HadoopSparkJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,6 @@ public class HadoopSparkJob extends JavaProcessJob {
// Env var to be passed to {@HadoopSecureSparkWrapper} for the value of minimum
// mem/vcore ratio
public static final String SPARK_MIN_MEM_VCORE_RATIO_ENV_VAR = "SPARK_MIN_MEM_VCORE_RATIO";
// Jobtype property to define where default spark-defaults.conf file is
public static final String SPARK_PROEPRTY_FILE_PATH_JOBTYPE_PROPERTY = "spark.property.file";
// Env var to be passed to {@HadoopSecureSparkWrapper} for the path to spark-defaults.conf
public static final String SPARK_PROPERTY_FILE_PATH_ENV_VAR = "SPARK_PROERTY_FILE_PATH";

// security variables
private String userToProxy = null;
Expand Down Expand Up @@ -196,13 +192,6 @@ public void run() throws Exception {
getJobProps().put("env." + SPARK_NODE_LABELING_ENV_VAR, Boolean.TRUE.toString());
}

String sparkPropertyFile = getSysProps().get(SPARK_PROEPRTY_FILE_PATH_JOBTYPE_PROPERTY);
File propertyFile = new File(sparkPropertyFile);
if (sparkPropertyFile == null || !propertyFile.exists()) {
throw new RuntimeException(SPARK_PROEPRTY_FILE_PATH_JOBTYPE_PROPERTY + " must be configured.");
}
getJobProps().put("env." + SPARK_PROPERTY_FILE_PATH_ENV_VAR, sparkPropertyFile);

if (getSysProps().getBoolean(SPARK_AUTO_NODE_LABELING_JOBTYPE_PROPERTY, Boolean.FALSE)) {
String desiredNodeLabel = getSysProps().get(SPARK_DESIRED_NODE_LABEL_JOBTYPE_PROPERTY);
String minMemVcoreRatio = getSysProps().get(SPARK_MIN_MEM_VCORE_RATIO_JOBTYPE_PROPERTY);
Expand Down Expand Up @@ -513,6 +502,12 @@ private String[] getSparkHomeConf() {
File confDir = new File(sparkConf);
if (!confDir.exists()) {
error("SPARK conf dir does not exist. Will use SPARK_HOME/conf as default.");
sparkConf = sparkHome + "/conf";
}
File defaultSparkConf = new File(sparkConf + "/spark-defaults.conf");
if (!defaultSparkConf.exists()) {
throw new RuntimeException("Default Spark config file spark-defaults.conf cannot"
+ " be found at " + defaultSparkConf);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ public void testAutoLabeling() {
envs.put(HadoopSparkJob.SPARK_AUTO_NODE_LABELING_ENV_VAR, Boolean.TRUE.toString());
envs.put(HadoopSparkJob.SPARK_DESIRED_NODE_LABEL_ENV_VAR, "test2");
envs.put(HadoopSparkJob.SPARK_MIN_MEM_VCORE_RATIO_ENV_VAR, "3");
URL url = getClass().getClassLoader().getResource("spark-defaults.conf");
envs.put(HadoopSparkJob.SPARK_PROPERTY_FILE_PATH_ENV_VAR, url.getPath());
setEnv(envs);
Configuration.addDefaultResource("yarn-site.xml");
String[] argArray = new String[] {
Expand All @@ -70,8 +68,6 @@ public void testDisableAutoLabeling() {
envs.put(HadoopSparkJob.SPARK_NODE_LABELING_ENV_VAR, Boolean.TRUE.toString());
envs.put(HadoopSparkJob.SPARK_DESIRED_NODE_LABEL_ENV_VAR, "test2");
envs.put(HadoopSparkJob.SPARK_MIN_MEM_VCORE_RATIO_ENV_VAR, "3");
URL url = getClass().getClassLoader().getResource("spark-defaults.conf");
envs.put(HadoopSparkJob.SPARK_PROPERTY_FILE_PATH_ENV_VAR, url.getPath());
setEnv(envs);
Configuration.addDefaultResource("yarn-site.xml");
String[] argArray = new String[] {
Expand Down Expand Up @@ -99,8 +95,6 @@ public void testLoadConfigFromPropertyFile() {
envs.put(HadoopSparkJob.SPARK_AUTO_NODE_LABELING_ENV_VAR, Boolean.TRUE.toString());
envs.put(HadoopSparkJob.SPARK_DESIRED_NODE_LABEL_ENV_VAR, "test2");
envs.put(HadoopSparkJob.SPARK_MIN_MEM_VCORE_RATIO_ENV_VAR, "3");
URL url = getClass().getClassLoader().getResource("spark-defaults.conf");
envs.put(HadoopSparkJob.SPARK_PROPERTY_FILE_PATH_ENV_VAR, url.getPath());
setEnv(envs);
Configuration.addDefaultResource("yarn-site.xml");
String[] argArray = new String[] {
Expand Down

0 comments on commit c04cc32

Please sign in to comment.