Skip to content

Commit 67fbc60

Browse files
committed
Add support for SparkR shell to use spark-submit
This ensures that SparkConf options are read in both in batch and interactive modes
1 parent 050390b commit 67fbc60

File tree

5 files changed

+96
-15
lines changed

5 files changed

+96
-15
lines changed

pkg/R/sparkR.R

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ sparkR.stop <- function(env) {
7474
#'}
7575

7676
sparkR.init <- function(
77-
master = "local",
77+
master = "",
7878
appName = "SparkR",
7979
sparkHome = Sys.getenv("SPARK_HOME"),
8080
sparkEnvir = list(),
@@ -101,10 +101,21 @@ sparkR.init <- function(
101101
if (sparkRExistingPort != "") {
102102
sparkRBackendPort <- sparkRExistingPort
103103
} else {
104-
launchBackend(classPath = cp,
105-
mainClass = "edu.berkeley.cs.amplab.sparkr.SparkRBackend",
106-
args = as.character(sparkRBackendPort),
107-
javaOpts = paste("-Xmx", sparkMem, sep = ""))
104+
if (Sys.getenv("SPARKR_USE_SPARK_SUBMIT", "") == "") {
105+
launchBackend(classPath = cp,
106+
mainClass = "edu.berkeley.cs.amplab.sparkr.SparkRBackend",
107+
args = as.character(sparkRBackendPort),
108+
javaOpts = paste("-Xmx", sparkMem, sep = ""))
109+
} else {
110+
# TODO: We should deprecate sparkJars and ask users to add it to the
111+
# command line (using --jars) which is picked up by SparkSubmit
112+
launchBackendSparkSubmit(
113+
mainClass = "edu.berkeley.cs.amplab.sparkr.SparkRBackend",
114+
args = as.character(sparkRBackendPort),
115+
appJar = .sparkREnv$assemblyJarPath,
116+
sparkHome = sparkHome,
117+
sparkSubmitOpts = Sys.getenv("SPARKR_SUBMIT_ARGS", ""))
118+
}
108119
Sys.sleep(2) # Wait for backend to come up
109120
}
110121
.sparkREnv$sparkRBackendPort <- sparkRBackendPort

pkg/R/sparkRClient.R

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,30 @@ launchBackend <- function(
3737
cat("Launching java with command ", java_bin, " ", combinedArgs, "\n")
3838
invisible(system2(java_bin, combinedArgs, wait = F))
3939
}
40+
41+
launchBackendSparkSubmit <- function(
42+
mainClass,
43+
args,
44+
appJar,
45+
sparkHome,
46+
sparkSubmitOpts) {
47+
if (.Platform$OS.type == "unix") {
48+
spark_submit_bin_name = "spark-submit"
49+
} else {
50+
spark_submit_bin_name = "spark-submit.cmd"
51+
}
52+
53+
if (sparkHome != "") {
54+
spark_submit_bin <- file.path(sparkHome, "bin", spark_submit_bin_name)
55+
} else {
56+
spark_submit_bin <- spark_submit_bin_name
57+
}
58+
59+
# Since this function is only used while launching R shell using spark-submit,
60+
# the format we need to construct is
61+
# spark-submit --class <mainClass> <sparkSubmitOpts> <jarFile> <appOpts>
62+
63+
combinedArgs <- paste("--class", mainClass, sparkSubmitOpts, appJar, args, sep = " ")
64+
cat("Launching java with spark-submit command ", spark_submit_bin, " ", combinedArgs, "\n")
65+
invisible(system2(spark_submit_bin, combinedArgs, wait = F))
66+
}

pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/RRDD.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -357,10 +357,19 @@ object RRDD {
357357
sparkEnvirMap: JMap[Object, Object],
358358
sparkExecutorEnvMap: JMap[Object, Object]): JavaSparkContext = {
359359

360-
val sparkConf = new SparkConf().setMaster(master)
361-
.setAppName(appName)
360+
val sparkConf = new SparkConf().setAppName(appName)
362361
.setSparkHome(sparkHome)
363362
.setJars(jars)
363+
364+
// Override `master` if we have a user-specified value
365+
if (master != "") {
366+
sparkConf.setMaster(master)
367+
} else {
368+
// If conf has no master set it to "local" to maintain
369+
// backwards compatibility
370+
sparkConf.setIfMissing("spark.master", "local")
371+
}
372+
364373
for ((name, value) <- sparkEnvirMap) {
365374
sparkConf.set(name.asInstanceOf[String], value.asInstanceOf[String])
366375
}

sparkR

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ cat > /tmp/sparkR.profile << EOF
2828
Sys.setenv(NOAWT=1)
2929
.libPaths(c(paste(projecHome,"/lib", sep=""), .libPaths()))
3030
require(SparkR)
31-
sc <- sparkR.init(Sys.getenv("MASTER", unset = "local"))
31+
sc <- sparkR.init(Sys.getenv("MASTER", unset = ""))
3232
assign("sc", sc, envir=.GlobalEnv)
3333
cat("\n Welcome to SparkR!")
3434
cat("\n Spark context is available as sc\n")

sparkR-submit

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,26 @@ fi
3333

3434
export R_PROFILE_USER="/tmp/sparkR.profile"
3535

36+
# Build up arguments list manually to preserve quotes and backslashes.
37+
SUBMIT_USAGE_FUNCTION=usage
38+
gatherSparkSubmitOpts "$@"
39+
40+
SPARKR_SUBMIT_ARGS=""
41+
whitespace="[[:space:]]"
42+
for i in "${SUBMISSION_OPTS[@]}"
43+
do
44+
if [[ $i =~ \" ]]; then i=$(echo $i | sed 's/\"/\\\"/g'); fi
45+
if [[ $i =~ $whitespace ]]; then i=\"$i\"; fi
46+
SPARKR_SUBMIT_ARGS="$SPARKR_SUBMIT_ARGS $i"
47+
done
48+
export SPARKR_SUBMIT_ARGS
49+
export SPARKR_USE_SPARK_SUBMIT=1
50+
51+
NUM_APPLICATION_OPTS=${#APPLICATION_OPTS[@]}
52+
53+
# If a R file is provided, directly run spark-submit.
54+
if [[ $NUM_APPLICATION_OPTS -gt 0 && "${APPLICATION_OPTS[0]}" =~ \.R$ ]]; then
55+
3656
cat > /tmp/sparkR.profile << EOF
3757
.First <- function() {
3858
projecHome <- Sys.getenv("PROJECT_HOME")
@@ -41,16 +61,30 @@ cat > /tmp/sparkR.profile << EOF
4161
}
4262
EOF
4363

44-
# Build up arguments list manually to preserve quotes and backslashes.
45-
SUBMIT_USAGE_FUNCTION=usage
46-
gatherSparkSubmitOpts "$@"
47-
48-
# If a R file is provided, directly run spark-submit.
49-
if [[ "${APPLICATION_OPTS[0]}" =~ \.R$ ]]; then
5064
primary="${APPLICATION_OPTS[0]}"
5165
shift
5266
# Set the main class to SparkRRunner and add the primary R file to --files to make sure its copied to the cluster
67+
echo "Running $SPARK_HOME/bin/spark-submit --class edu.berkeley.cs.amplab.sparkr.SparkRRunner --files $primary ${SUBMISSION_OPTS[@]} $SPARKR_JAR_FILE $primary" "${APPLICATION_OPTS[@]:1}"
5368
exec "$SPARK_HOME"/bin/spark-submit --class edu.berkeley.cs.amplab.sparkr.SparkRRunner --files "$primary" "${SUBMISSION_OPTS[@]}" "$SPARKR_JAR_FILE" "$primary" "${APPLICATION_OPTS[@]:1}"
5469
else
55-
echo "sparkR-submit can only be used to run R programs. Please use sparkR to launch a shell."
70+
71+
# If we don't have an R file to run, initialize context and run R
72+
cat > /tmp/sparkR.profile << EOF
73+
.First <- function() {
74+
projecHome <- Sys.getenv("PROJECT_HOME")
75+
Sys.setenv(NOAWT=1)
76+
.libPaths(c(paste(projecHome,"/lib", sep=""), .libPaths()))
77+
require(SparkR)
78+
sc <- sparkR.init(Sys.getenv("MASTER", unset = ""))
79+
assign("sc", sc, envir=.GlobalEnv)
80+
cat("\n Welcome to SparkR!")
81+
cat("\n Spark context is available as sc\n")
82+
}
83+
EOF
84+
85+
# Add SPARKR_JAR, main class etc. to SPARKR_SUBMIT_ARGS
86+
export SPARKR_SUBMIT_ARGS
87+
88+
R
89+
5690
fi

0 commit comments

Comments
 (0)