Skip to content

Commit 9a1db44

Browse files
Merge pull request apache#22 from shivaram/master
Add support for passing Spark environment vars
2 parents 10228fb + e462448 commit 9a1db44

File tree

3 files changed

+57
-9
lines changed

3 files changed

+57
-9
lines changed

README.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,21 @@ Spark master you can launch R and then run
4242
library(SparkR)
4343
sc <- sparkR.init(master="local")
4444

45+
To increase the memory used by the driver you can export the SPARK\_MEM
46+
environment variable. For example to use 1g, you can run
47+
48+
SPARK_MEM=1g ./sparkR
49+
50+
In a cluster settting to set the amount of memory used by the executors you can
51+
pass the variable `spark.executor.memory` to the SparkContext constructor.
52+
53+
library(SparkR)
54+
sc <- sparkR.init(master="spark://<master>:7077",
55+
sparkEnvir=list(spark.executor.memory="1g"))
56+
57+
58+
## Examples, Unit tests
59+
4560
SparkR comes with several sample programs in the `examples` directory.
4661
To run one of them, use `./sparkR <filename> <args>`. For example:
4762

pkg/R/sparkR.R

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ assemblyJarName <- "sparkr-assembly-0.1.jar"
55
sparkR.onLoad <- function(libname, pkgname) {
66
assemblyJarPath <- paste(libname, "/SparkR/", assemblyJarName, sep="")
77
packageStartupMessage("[SparkR] Initializing with classpath ", assemblyJarPath, "\n")
8+
9+
sparkMem <- Sys.getenv("SPARK_MEM", "512m")
810
.sparkREnv$libname <- libname
911
.sparkREnv$assemblyJarPath <- assemblyJarPath
10-
.jinit(classpath=assemblyJarPath)
12+
.jinit(classpath=assemblyJarPath, parameters=paste("-Xmx", sparkMem, sep=""))
1113
}
1214

1315
#' Initialize a new Spark Context.
@@ -17,16 +19,20 @@ sparkR.onLoad <- function(libname, pkgname) {
1719
#' @param master The Spark master URL.
1820
#' @param appName Application name to register with cluster manager
1921
#' @param sparkHome Spark Home directory
22+
#' @param sparkEnvir Named list of environment variables to set on worker nodes.
2023
#' @export
2124
#' @examples
2225
#'\dontrun{
23-
#' sparkR.init("local[2]", "SparkR", "/home/spark")
26+
#' sc <- sparkR.init("local[2]", "SparkR", "/home/spark")
27+
#' sc <- sparkR.init("local[2]", "SparkR", "/home/spark",
28+
#' list(spark.executor.memory="1g"))
2429
#'}
2530

2631
sparkR.init <- function(
2732
master = "local",
2833
appName = "SparkR",
29-
sparkHome = Sys.getenv("SPARK_HOME")) {
34+
sparkHome = Sys.getenv("SPARK_HOME"),
35+
sparkEnvir = list() ) {
3036

3137
if (exists(".sparkRjsc", envir=.sparkREnv)) {
3238
return(get(".sparkRjsc", envir=.sparkREnv))
@@ -36,13 +42,22 @@ sparkR.init <- function(
3642
sparkHome <- normalizePath(sparkHome)
3743
}
3844

39-
# TODO: support other constructors
45+
hm <- .jnew("java/util/HashMap")
46+
for ( varname in names(sparkEnvir)) {
47+
hm$put(varname, sparkEnvir[[varname]])
48+
}
49+
4050
assign(
4151
".sparkRjsc",
42-
.jnew("org/apache/spark/api/java/JavaSparkContext", master, appName,
43-
as.character(sparkHome),
44-
as.character(.sparkREnv$assemblyJarPath)),
45-
envir=.sparkREnv
52+
J("edu.berkeley.cs.amplab.sparkr.RRDD",
53+
"createSparkContext",
54+
master,
55+
appName,
56+
as.character(sparkHome),
57+
.jarray(as.character(.sparkREnv$assemblyJarPath),
58+
"java/lang/String"),
59+
hm),
60+
envir=.sparkREnv
4661
)
4762

4863
get(".sparkRjsc", envir=.sparkREnv)

pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/RRDD.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
package edu.berkeley.cs.amplab.sparkr
22

33
import java.io._
4+
import java.util.{Map => JMap}
45

56
import scala.collection.JavaConversions._
67
import scala.io.Source
78
import scala.reflect.ClassTag
89

9-
import org.apache.spark.{SparkEnv, Partition, SparkException, TaskContext}
10+
import org.apache.spark.{SparkEnv, Partition, SparkException, TaskContext, SparkConf}
1011
import org.apache.spark.api.java.{JavaSparkContext, JavaRDD, JavaPairRDD}
1112
import org.apache.spark.broadcast.Broadcast
1213
import org.apache.spark.rdd.RDD
@@ -156,6 +157,23 @@ class RRDD[T: ClassTag](
156157

157158
object RRDD {
158159

160+
def createSparkContext(
161+
master: String,
162+
appName: String,
163+
sparkHome: String,
164+
jars: Array[String],
165+
vars: JMap[Object, Object]): JavaSparkContext = {
166+
167+
val sparkConf = new SparkConf().setMaster(master)
168+
.setAppName(appName)
169+
.setSparkHome(sparkHome)
170+
.setJars(jars)
171+
for ( (name, value) <- vars) {
172+
sparkConf.set(name.asInstanceOf[String], value.asInstanceOf[String])
173+
}
174+
new JavaSparkContext(sparkConf)
175+
}
176+
159177
/**
160178
* Create an RRDD given a sequence of byte arrays. Used to create RRDD when `parallelize` is
161179
* called from R.

0 commit comments

Comments
 (0)