Skip to content

Commit 835a79d

Browse files
Sun Ruishivaram
Sun Rui
authored andcommitted
[SPARK-10500][SPARKR] sparkr.zip cannot be created if /R/lib is unwritable
The basic idea is that: The archive of the SparkR package itself, that is sparkr.zip, is created during build process and is contained in the Spark binary distribution. No change to it after the distribution is installed as the directory it resides ($SPARK_HOME/R/lib) may not be writable. When there is R source code contained in jars or Spark packages specified with "--jars" or "--packages" command line option, a temporary directory is created by calling Utils.createTempDir() where the R packages built from the R source code will be installed. The temporary directory is writable, and won't interfere with each other when there are multiple SparkR sessions, and will be deleted when this SparkR session ends. The R binary packages installed in the temporary directory then are packed into an archive named rpkg.zip. sparkr.zip and rpkg.zip are distributed to the cluster in YARN modes. The distribution of rpkg.zip in Standalone modes is not supported in this PR, and will be address in another PR. Various R files are updated to accept multiple lib paths (one is for SparkR package, the other is for other R packages) so that these package can be accessed in R. Author: Sun Rui <rui.sun@intel.com> Closes #9390 from sun-rui/SPARK-10500.
1 parent d7d9fa0 commit 835a79d

File tree

14 files changed

+121
-36
lines changed

14 files changed

+121
-36
lines changed

R/install-dev.bat

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,9 @@ set SPARK_HOME=%~dp0..
2525
MKDIR %SPARK_HOME%\R\lib
2626

2727
R.exe CMD INSTALL --library="%SPARK_HOME%\R\lib" %SPARK_HOME%\R\pkg\
28+
29+
rem Zip the SparkR package so that it can be distributed to worker nodes on YARN
30+
pushd %SPARK_HOME%\R\lib
31+
%JAVA_HOME%\bin\jar.exe cfM "%SPARK_HOME%\R\lib\sparkr.zip" SparkR
32+
popd
33+

R/install-dev.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,8 @@ Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtoo
4242
# Install SparkR to $LIB_DIR
4343
R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/
4444

45+
# Zip the SparkR package so that it can be distributed to worker nodes on YARN
46+
cd $LIB_DIR
47+
jar cfM "$LIB_DIR/sparkr.zip" SparkR
48+
4549
popd > /dev/null

R/pkg/R/sparkR.R

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,12 @@ sparkR.stop <- function() {
4848
}
4949
}
5050

51+
# Remove the R package lib path from .libPaths()
52+
if (exists(".libPath", envir = env)) {
53+
libPath <- get(".libPath", envir = env)
54+
.libPaths(.libPaths()[.libPaths() != libPath])
55+
}
56+
5157
if (exists(".backendLaunched", envir = env)) {
5258
callJStatic("SparkRHandler", "stopBackend")
5359
}
@@ -155,14 +161,20 @@ sparkR.init <- function(
155161
f <- file(path, open="rb")
156162
backendPort <- readInt(f)
157163
monitorPort <- readInt(f)
164+
rLibPath <- readString(f)
158165
close(f)
159166
file.remove(path)
160167
if (length(backendPort) == 0 || backendPort == 0 ||
161-
length(monitorPort) == 0 || monitorPort == 0) {
168+
length(monitorPort) == 0 || monitorPort == 0 ||
169+
length(rLibPath) != 1) {
162170
stop("JVM failed to launch")
163171
}
164172
assign(".monitorConn", socketConnection(port = monitorPort), envir = .sparkREnv)
165173
assign(".backendLaunched", 1, envir = .sparkREnv)
174+
if (rLibPath != "") {
175+
assign(".libPath", rLibPath, envir = .sparkREnv)
176+
.libPaths(c(rLibPath, .libPaths()))
177+
}
166178
}
167179

168180
.sparkREnv$backendPort <- backendPort

R/pkg/inst/profile/general.R

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
.First <- function() {
1919
packageDir <- Sys.getenv("SPARKR_PACKAGE_DIR")
20-
.libPaths(c(packageDir, .libPaths()))
20+
dirs <- strsplit(packageDir, ",")[[1]]
21+
.libPaths(c(dirs, .libPaths()))
2122
Sys.setenv(NOAWT=1)
2223
}

R/pkg/inst/worker/daemon.R

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@
1818
# Worker daemon
1919

2020
rLibDir <- Sys.getenv("SPARKR_RLIBDIR")
21-
script <- paste(rLibDir, "SparkR/worker/worker.R", sep = "/")
21+
dirs <- strsplit(rLibDir, ",")[[1]]
22+
script <- file.path(dirs[[1]], "SparkR", "worker", "worker.R")
2223

2324
# preload SparkR package, speedup worker
24-
.libPaths(c(rLibDir, .libPaths()))
25+
.libPaths(c(dirs, .libPaths()))
2526
suppressPackageStartupMessages(library(SparkR))
2627

2728
port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))

R/pkg/inst/worker/worker.R

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,11 @@ bootTime <- currentTimeSecs()
3535
bootElap <- elapsedSecs()
3636

3737
rLibDir <- Sys.getenv("SPARKR_RLIBDIR")
38+
dirs <- strsplit(rLibDir, ",")[[1]]
3839
# Set libPaths to include SparkR package as loadNamespace needs this
3940
# TODO: Figure out if we can avoid this by not loading any objects that require
4041
# SparkR namespace
41-
.libPaths(c(rLibDir, .libPaths()))
42+
.libPaths(c(dirs, .libPaths()))
4243
suppressPackageStartupMessages(library(SparkR))
4344

4445
port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))

core/src/main/scala/org/apache/spark/api/r/RBackend.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ private[spark] object RBackend extends Logging {
113113
val dos = new DataOutputStream(new FileOutputStream(f))
114114
dos.writeInt(boundPort)
115115
dos.writeInt(listenPort)
116+
SerDe.writeString(dos, RUtils.rPackages.getOrElse(""))
116117
dos.close()
117118
f.renameTo(new File(path))
118119

core/src/main/scala/org/apache/spark/api/r/RRDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -400,14 +400,14 @@ private[r] object RRDD {
400400

401401
val rOptions = "--vanilla"
402402
val rLibDir = RUtils.sparkRPackagePath(isDriver = false)
403-
val rExecScript = rLibDir + "/SparkR/worker/" + script
403+
val rExecScript = rLibDir(0) + "/SparkR/worker/" + script
404404
val pb = new ProcessBuilder(Arrays.asList(rCommand, rOptions, rExecScript))
405405
// Unset the R_TESTS environment variable for workers.
406406
// This is set by R CMD check as startup.Rs
407407
// (http://svn.r-project.org/R/trunk/src/library/tools/R/testing.R)
408408
// and confuses worker script which tries to load a non-existent file
409409
pb.environment().put("R_TESTS", "")
410-
pb.environment().put("SPARKR_RLIBDIR", rLibDir)
410+
pb.environment().put("SPARKR_RLIBDIR", rLibDir.mkString(","))
411411
pb.environment().put("SPARKR_WORKER_PORT", port.toString)
412412
pb.redirectErrorStream(true) // redirect stderr into stdout
413413
val proc = pb.start()

core/src/main/scala/org/apache/spark/api/r/RUtils.scala

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ import java.util.Arrays
2323
import org.apache.spark.{SparkEnv, SparkException}
2424

2525
private[spark] object RUtils {
26+
// Local path where R binary packages built from R source code contained in the spark
27+
// packages specified with "--packages" or "--jars" command line option reside.
28+
var rPackages: Option[String] = None
29+
2630
/**
2731
* Get the SparkR package path in the local spark distribution.
2832
*/
@@ -34,11 +38,15 @@ private[spark] object RUtils {
3438
}
3539

3640
/**
37-
* Get the SparkR package path in various deployment modes.
41+
* Get the list of paths for R packages in various deployment modes, of which the first
42+
* path is for the SparkR package itself. The second path is for R packages built as
43+
* part of Spark Packages, if any exist. Spark Packages can be provided through the
44+
* "--packages" or "--jars" command line options.
45+
*
3846
* This assumes that Spark properties `spark.master` and `spark.submit.deployMode`
3947
* and environment variable `SPARK_HOME` are set.
4048
*/
41-
def sparkRPackagePath(isDriver: Boolean): String = {
49+
def sparkRPackagePath(isDriver: Boolean): Seq[String] = {
4250
val (master, deployMode) =
4351
if (isDriver) {
4452
(sys.props("spark.master"), sys.props("spark.submit.deployMode"))
@@ -51,15 +59,30 @@ private[spark] object RUtils {
5159
val isYarnClient = master != null && master.contains("yarn") && deployMode == "client"
5260

5361
// In YARN mode, the SparkR package is distributed as an archive symbolically
54-
// linked to the "sparkr" file in the current directory. Note that this does not apply
55-
// to the driver in client mode because it is run outside of the cluster.
62+
// linked to the "sparkr" file in the current directory and additional R packages
63+
// are distributed as an archive symbolically linked to the "rpkg" file in the
64+
// current directory.
65+
//
66+
// Note that this does not apply to the driver in client mode because it is run
67+
// outside of the cluster.
5668
if (isYarnCluster || (isYarnClient && !isDriver)) {
57-
new File("sparkr").getAbsolutePath
69+
val sparkRPkgPath = new File("sparkr").getAbsolutePath
70+
val rPkgPath = new File("rpkg")
71+
if (rPkgPath.exists()) {
72+
Seq(sparkRPkgPath, rPkgPath.getAbsolutePath)
73+
} else {
74+
Seq(sparkRPkgPath)
75+
}
5876
} else {
5977
// Otherwise, assume the package is local
6078
// TODO: support this for Mesos
61-
localSparkRPackagePath.getOrElse {
62-
throw new SparkException("SPARK_HOME not set. Can't locate SparkR package.")
79+
val sparkRPkgPath = localSparkRPackagePath.getOrElse {
80+
throw new SparkException("SPARK_HOME not set. Can't locate SparkR package.")
81+
}
82+
if (!rPackages.isEmpty) {
83+
Seq(sparkRPkgPath, rPackages.get)
84+
} else {
85+
Seq(sparkRPkgPath)
6386
}
6487
}
6588
}

core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -100,20 +100,29 @@ private[deploy] object RPackageUtils extends Logging {
100100
* Runs the standard R package installation code to build the R package from source.
101101
* Multiple runs don't cause problems.
102102
*/
103-
private def rPackageBuilder(dir: File, printStream: PrintStream, verbose: Boolean): Boolean = {
103+
private def rPackageBuilder(
104+
dir: File,
105+
printStream: PrintStream,
106+
verbose: Boolean,
107+
libDir: String): Boolean = {
104108
// this code should be always running on the driver.
105-
val pathToSparkR = RUtils.localSparkRPackagePath.getOrElse(
106-
throw new SparkException("SPARK_HOME not set. Can't locate SparkR package."))
107109
val pathToPkg = Seq(dir, "R", "pkg").mkString(File.separator)
108-
val installCmd = baseInstallCmd ++ Seq(pathToSparkR, pathToPkg)
110+
val installCmd = baseInstallCmd ++ Seq(libDir, pathToPkg)
109111
if (verbose) {
110112
print(s"Building R package with the command: $installCmd", printStream)
111113
}
112114
try {
113115
val builder = new ProcessBuilder(installCmd.asJava)
114116
builder.redirectErrorStream(true)
117+
118+
// Put the SparkR package directory into R library search paths in case this R package
119+
// may depend on SparkR.
115120
val env = builder.environment()
116-
env.clear()
121+
val rPackageDir = RUtils.sparkRPackagePath(isDriver = true)
122+
env.put("SPARKR_PACKAGE_DIR", rPackageDir.mkString(","))
123+
env.put("R_PROFILE_USER",
124+
Seq(rPackageDir(0), "SparkR", "profile", "general.R").mkString(File.separator))
125+
117126
val process = builder.start()
118127
new RedirectThread(process.getInputStream, printStream, "redirect R packaging").start()
119128
process.waitFor() == 0
@@ -170,8 +179,11 @@ private[deploy] object RPackageUtils extends Logging {
170179
if (checkManifestForR(jar)) {
171180
print(s"$file contains R source code. Now installing package.", printStream, Level.INFO)
172181
val rSource = extractRFolder(jar, printStream, verbose)
182+
if (RUtils.rPackages.isEmpty) {
183+
RUtils.rPackages = Some(Utils.createTempDir().getAbsolutePath)
184+
}
173185
try {
174-
if (!rPackageBuilder(rSource, printStream, verbose)) {
186+
if (!rPackageBuilder(rSource, printStream, verbose, RUtils.rPackages.get)) {
175187
print(s"ERROR: Failed to build R package in $file.", printStream)
176188
print(RJarDoc, printStream)
177189
}
@@ -208,7 +220,7 @@ private[deploy] object RPackageUtils extends Logging {
208220
}
209221
}
210222

211-
/** Zips all the libraries found with SparkR in the R/lib directory for distribution with Yarn. */
223+
/** Zips all the R libraries built for distribution to the cluster. */
212224
private[deploy] def zipRLibraries(dir: File, name: String): File = {
213225
val filesToBundle = listFilesRecursively(dir, Seq(".zip"))
214226
// create a zip file from scratch, do not append to existing file.

core/src/main/scala/org/apache/spark/deploy/RRunner.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,10 @@ object RRunner {
8282
val env = builder.environment()
8383
env.put("EXISTING_SPARKR_BACKEND_PORT", sparkRBackendPort.toString)
8484
val rPackageDir = RUtils.sparkRPackagePath(isDriver = true)
85-
env.put("SPARKR_PACKAGE_DIR", rPackageDir)
85+
// Put the R package directories into an env variable of comma-separated paths
86+
env.put("SPARKR_PACKAGE_DIR", rPackageDir.mkString(","))
8687
env.put("R_PROFILE_USER",
87-
Seq(rPackageDir, "SparkR", "profile", "general.R").mkString(File.separator))
88+
Seq(rPackageDir(0), "SparkR", "profile", "general.R").mkString(File.separator))
8889
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
8990
val process = builder.start()
9091

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ object SparkSubmit {
8383
private val PYSPARK_SHELL = "pyspark-shell"
8484
private val SPARKR_SHELL = "sparkr-shell"
8585
private val SPARKR_PACKAGE_ARCHIVE = "sparkr.zip"
86+
private val R_PACKAGE_ARCHIVE = "rpkg.zip"
8687

8788
private val CLASS_NOT_FOUND_EXIT_STATUS = 101
8889

@@ -362,22 +363,46 @@ object SparkSubmit {
362363
}
363364
}
364365

365-
// In YARN mode for an R app, add the SparkR package archive to archives
366-
// that can be distributed with the job
366+
// In YARN mode for an R app, add the SparkR package archive and the R package
367+
// archive containing all of the built R libraries to archives so that they can
368+
// be distributed with the job
367369
if (args.isR && clusterManager == YARN) {
368-
val rPackagePath = RUtils.localSparkRPackagePath
369-
if (rPackagePath.isEmpty) {
370+
val sparkRPackagePath = RUtils.localSparkRPackagePath
371+
if (sparkRPackagePath.isEmpty) {
370372
printErrorAndExit("SPARK_HOME does not exist for R application in YARN mode.")
371373
}
372-
val rPackageFile =
373-
RPackageUtils.zipRLibraries(new File(rPackagePath.get), SPARKR_PACKAGE_ARCHIVE)
374-
if (!rPackageFile.exists()) {
374+
val sparkRPackageFile = new File(sparkRPackagePath.get, SPARKR_PACKAGE_ARCHIVE)
375+
if (!sparkRPackageFile.exists()) {
375376
printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.")
376377
}
377-
val localURI = Utils.resolveURI(rPackageFile.getAbsolutePath)
378+
val sparkRPackageURI = Utils.resolveURI(sparkRPackageFile.getAbsolutePath).toString
378379

380+
// Distribute the SparkR package.
379381
// Assigns a symbol link name "sparkr" to the shipped package.
380-
args.archives = mergeFileLists(args.archives, localURI.toString + "#sparkr")
382+
args.archives = mergeFileLists(args.archives, sparkRPackageURI + "#sparkr")
383+
384+
// Distribute the R package archive containing all the built R packages.
385+
if (!RUtils.rPackages.isEmpty) {
386+
val rPackageFile =
387+
RPackageUtils.zipRLibraries(new File(RUtils.rPackages.get), R_PACKAGE_ARCHIVE)
388+
if (!rPackageFile.exists()) {
389+
printErrorAndExit("Failed to zip all the built R packages.")
390+
}
391+
392+
val rPackageURI = Utils.resolveURI(rPackageFile.getAbsolutePath).toString
393+
// Assigns a symbol link name "rpkg" to the shipped package.
394+
args.archives = mergeFileLists(args.archives, rPackageURI + "#rpkg")
395+
}
396+
}
397+
398+
// TODO: Support distributing R packages with standalone cluster
399+
if (args.isR && clusterManager == STANDALONE && !RUtils.rPackages.isEmpty) {
400+
printErrorAndExit("Distributing R packages with standalone cluster is not supported.")
401+
}
402+
403+
// TODO: Support SparkR with mesos cluster
404+
if (args.isR && clusterManager == MESOS) {
405+
printErrorAndExit("SparkR is not supported for Mesos cluster.")
381406
}
382407

383408
// If we're running a R app, set the main class to our specific R runner

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.scalatest.concurrent.Timeouts
2828
import org.scalatest.time.SpanSugar._
2929

3030
import org.apache.spark._
31+
import org.apache.spark.api.r.RUtils
3132
import org.apache.spark.deploy.SparkSubmit._
3233
import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
3334
import org.apache.spark.util.{ResetSystemProperties, Utils}
@@ -369,9 +370,6 @@ class SparkSubmitSuite
369370
}
370371

371372
test("correctly builds R packages included in a jar with --packages") {
372-
// TODO(SPARK-9603): Building a package to $SPARK_HOME/R/lib is unavailable on Jenkins.
373-
// It's hard to write the test in SparkR (because we can't create the repository dynamically)
374-
/*
375373
assume(RUtils.isRInstalled, "R isn't installed on this machine.")
376374
val main = MavenCoordinate("my.great.lib", "mylib", "0.1")
377375
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
@@ -389,7 +387,6 @@ class SparkSubmitSuite
389387
rScriptDir)
390388
runSparkSubmit(args)
391389
}
392-
*/
393390
}
394391

395392
test("resolves command line argument paths correctly") {

make-distribution.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ cp -r "$SPARK_HOME/ec2" "$DISTDIR"
220220
if [ -d "$SPARK_HOME"/R/lib/SparkR ]; then
221221
mkdir -p "$DISTDIR"/R/lib
222222
cp -r "$SPARK_HOME/R/lib/SparkR" "$DISTDIR"/R/lib
223+
cp "$SPARK_HOME/R/lib/sparkr.zip" "$DISTDIR"/R/lib
223224
fi
224225

225226
# Download and copy in tachyon, if requested

0 commit comments

Comments
 (0)